You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by am...@apache.org on 2016/03/10 21:58:36 UTC

[11/50] [abbrv] incubator-beam git commit: Remove some HadoopIO.Read.Bound factory methods and fluent setters; always set key/value at creation

Remove some HadoopIO.Read.Bound factory methods and fluent setters; always set key/value at creation


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b47a8d0a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b47a8d0a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b47a8d0a

Branch: refs/heads/master
Commit: b47a8d0a31f8f160a26cd5b41b3317857969f66a
Parents: b2f495e
Author: Sean Owen <so...@cloudera.com>
Authored: Mon Jul 6 12:23:07 2015 +0100
Committer: Tom White <to...@cloudera.com>
Committed: Thu Mar 10 11:15:14 2016 +0000

----------------------------------------------------------------------
 .../com/cloudera/dataflow/hadoop/HadoopIO.java  | 30 +++++++-------------
 .../spark/HadoopFileFormatPipelineTest.java     |  4 +--
 2 files changed, 12 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b47a8d0a/runners/spark/src/main/java/com/cloudera/dataflow/hadoop/HadoopIO.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/hadoop/HadoopIO.java b/runners/spark/src/main/java/com/cloudera/dataflow/hadoop/HadoopIO.java
index 803c495..587e66e 100644
--- a/runners/spark/src/main/java/com/cloudera/dataflow/hadoop/HadoopIO.java
+++ b/runners/spark/src/main/java/com/cloudera/dataflow/hadoop/HadoopIO.java
@@ -32,8 +32,9 @@ public final class HadoopIO {
     private Read() {
     }
 
-    public static <K, V> Bound<K, V> withKeyValueClass(Class<K> key, Class<V> value) {
-      return new Bound<>(null, null, key, value);
+    public static <K, V> Bound<K, V> from(String filepattern, Class<? extends FileInputFormat<K, V>> format,
+        Class<K> key, Class<V> value) {
+      return new Bound<>(filepattern, format, key, value);
     }
 
     public static class Bound<K, V> extends PTransform<PInput, PCollection<KV<K, V>>> {
@@ -45,20 +46,20 @@ public final class HadoopIO {
 
       Bound(String filepattern, Class<? extends FileInputFormat<K, V>> format, Class<K> key,
           Class<V> value) {
+        Preconditions.checkNotNull(filepattern,
+                                   "need to set the filepattern of an HadoopIO.Read transform");
+        Preconditions.checkNotNull(format,
+                                   "need to set the format class of an HadoopIO.Read transform");
+        Preconditions.checkNotNull(key,
+                                   "need to set the key class of an HadoopIO.Read transform");
+        Preconditions.checkNotNull(value,
+                                   "need to set the value class of an HadoopIO.Read transform");
         this.filepattern = filepattern;
         this.formatClass = format;
         this.keyClass = key;
         this.valueClass = value;
       }
 
-      public Bound<K, V> from(String file) {
-        return new Bound<>(file, formatClass, keyClass, valueClass);
-      }
-
-      public Bound<K, V> withFormatClass(Class<? extends FileInputFormat<K, V>> format) {
-        return new Bound<>(filepattern, format, keyClass, valueClass);
-      }
-
       public String getFilepattern() {
         return filepattern;
       }
@@ -77,15 +78,6 @@ public final class HadoopIO {
 
       @Override
       public PCollection<KV<K, V>> apply(PInput input) {
-        Preconditions.checkNotNull(filepattern,
-            "need to set the filepattern of an HadoopIO.Read transform");
-        Preconditions.checkNotNull(formatClass,
-            "need to set the format class of an HadoopIO.Read transform");
-        Preconditions.checkNotNull(keyClass,
-            "need to set the key class of an HadoopIO.Read transform");
-        Preconditions.checkNotNull(valueClass,
-            "need to set the value class of an HadoopIO.Read transform");
-
         return PCollection.createPrimitiveOutputInternal(input.getPipeline(),
             WindowingStrategy.globalDefault(), PCollection.IsBounded.BOUNDED);
       }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b47a8d0a/runners/spark/src/test/java/com/cloudera/dataflow/spark/HadoopFileFormatPipelineTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/com/cloudera/dataflow/spark/HadoopFileFormatPipelineTest.java b/runners/spark/src/test/java/com/cloudera/dataflow/spark/HadoopFileFormatPipelineTest.java
index 127d58f..ba6f7b0 100644
--- a/runners/spark/src/test/java/com/cloudera/dataflow/spark/HadoopFileFormatPipelineTest.java
+++ b/runners/spark/src/test/java/com/cloudera/dataflow/spark/HadoopFileFormatPipelineTest.java
@@ -68,9 +68,7 @@ public class HadoopFileFormatPipelineTest {
     Class<? extends FileInputFormat<IntWritable, Text>> inputFormatClass =
         (Class<? extends FileInputFormat<IntWritable, Text>>) (Class<?>) SequenceFileInputFormat.class;
     HadoopIO.Read.Bound<IntWritable,Text> bound =
-        HadoopIO.Read.withKeyValueClass(IntWritable.class, Text.class).
-        from(inputFile.getAbsolutePath())
-        .withFormatClass(inputFormatClass);
+        HadoopIO.Read.from(inputFile.getAbsolutePath(), inputFormatClass, IntWritable.class, Text.class);
     PCollection<KV<IntWritable, Text>> input = p.apply(bound);
     input.apply(ParDo.of(new TabSeparatedString()))
         .apply(TextIO.Write.to(outputFile.getAbsolutePath()).withoutSharding());