You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by am...@apache.org on 2016/03/10 21:58:36 UTC
[11/50] [abbrv] incubator-beam git commit: Remove some
HadoopIO.Read.Bound factory methods and fluent setters;
always set key/value at creation
Remove some HadoopIO.Read.Bound factory methods and fluent setters; always set key/value at creation
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b47a8d0a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b47a8d0a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b47a8d0a
Branch: refs/heads/master
Commit: b47a8d0a31f8f160a26cd5b41b3317857969f66a
Parents: b2f495e
Author: Sean Owen <so...@cloudera.com>
Authored: Mon Jul 6 12:23:07 2015 +0100
Committer: Tom White <to...@cloudera.com>
Committed: Thu Mar 10 11:15:14 2016 +0000
----------------------------------------------------------------------
.../com/cloudera/dataflow/hadoop/HadoopIO.java | 30 +++++++-------------
.../spark/HadoopFileFormatPipelineTest.java | 4 +--
2 files changed, 12 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b47a8d0a/runners/spark/src/main/java/com/cloudera/dataflow/hadoop/HadoopIO.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/hadoop/HadoopIO.java b/runners/spark/src/main/java/com/cloudera/dataflow/hadoop/HadoopIO.java
index 803c495..587e66e 100644
--- a/runners/spark/src/main/java/com/cloudera/dataflow/hadoop/HadoopIO.java
+++ b/runners/spark/src/main/java/com/cloudera/dataflow/hadoop/HadoopIO.java
@@ -32,8 +32,9 @@ public final class HadoopIO {
private Read() {
}
- public static <K, V> Bound<K, V> withKeyValueClass(Class<K> key, Class<V> value) {
- return new Bound<>(null, null, key, value);
+ public static <K, V> Bound<K, V> from(String filepattern, Class<? extends FileInputFormat<K, V>> format,
+ Class<K> key, Class<V> value) {
+ return new Bound<>(filepattern, format, key, value);
}
public static class Bound<K, V> extends PTransform<PInput, PCollection<KV<K, V>>> {
@@ -45,20 +46,20 @@ public final class HadoopIO {
Bound(String filepattern, Class<? extends FileInputFormat<K, V>> format, Class<K> key,
Class<V> value) {
+ Preconditions.checkNotNull(filepattern,
+ "need to set the filepattern of an HadoopIO.Read transform");
+ Preconditions.checkNotNull(format,
+ "need to set the format class of an HadoopIO.Read transform");
+ Preconditions.checkNotNull(key,
+ "need to set the key class of an HadoopIO.Read transform");
+ Preconditions.checkNotNull(value,
+ "need to set the value class of an HadoopIO.Read transform");
this.filepattern = filepattern;
this.formatClass = format;
this.keyClass = key;
this.valueClass = value;
}
- public Bound<K, V> from(String file) {
- return new Bound<>(file, formatClass, keyClass, valueClass);
- }
-
- public Bound<K, V> withFormatClass(Class<? extends FileInputFormat<K, V>> format) {
- return new Bound<>(filepattern, format, keyClass, valueClass);
- }
-
public String getFilepattern() {
return filepattern;
}
@@ -77,15 +78,6 @@ public final class HadoopIO {
@Override
public PCollection<KV<K, V>> apply(PInput input) {
- Preconditions.checkNotNull(filepattern,
- "need to set the filepattern of an HadoopIO.Read transform");
- Preconditions.checkNotNull(formatClass,
- "need to set the format class of an HadoopIO.Read transform");
- Preconditions.checkNotNull(keyClass,
- "need to set the key class of an HadoopIO.Read transform");
- Preconditions.checkNotNull(valueClass,
- "need to set the value class of an HadoopIO.Read transform");
-
return PCollection.createPrimitiveOutputInternal(input.getPipeline(),
WindowingStrategy.globalDefault(), PCollection.IsBounded.BOUNDED);
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b47a8d0a/runners/spark/src/test/java/com/cloudera/dataflow/spark/HadoopFileFormatPipelineTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/com/cloudera/dataflow/spark/HadoopFileFormatPipelineTest.java b/runners/spark/src/test/java/com/cloudera/dataflow/spark/HadoopFileFormatPipelineTest.java
index 127d58f..ba6f7b0 100644
--- a/runners/spark/src/test/java/com/cloudera/dataflow/spark/HadoopFileFormatPipelineTest.java
+++ b/runners/spark/src/test/java/com/cloudera/dataflow/spark/HadoopFileFormatPipelineTest.java
@@ -68,9 +68,7 @@ public class HadoopFileFormatPipelineTest {
Class<? extends FileInputFormat<IntWritable, Text>> inputFormatClass =
(Class<? extends FileInputFormat<IntWritable, Text>>) (Class<?>) SequenceFileInputFormat.class;
HadoopIO.Read.Bound<IntWritable,Text> bound =
- HadoopIO.Read.withKeyValueClass(IntWritable.class, Text.class).
- from(inputFile.getAbsolutePath())
- .withFormatClass(inputFormatClass);
+ HadoopIO.Read.from(inputFile.getAbsolutePath(), inputFormatClass, IntWritable.class, Text.class);
PCollection<KV<IntWritable, Text>> input = p.apply(bound);
input.apply(ParDo.of(new TabSeparatedString()))
.apply(TextIO.Write.to(outputFile.getAbsolutePath()).withoutSharding());