You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/05/02 01:46:01 UTC

[06/11] beam git commit: Removes AvroIO.Read.Bound

Removes AvroIO.Read.Bound


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1499d256
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1499d256
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1499d256

Branch: refs/heads/master
Commit: 1499d256c616e34b4416fa202a45aa256ac88d20
Parents: 0166e19
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri Apr 28 18:19:21 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Mon May 1 18:43:38 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/spark/io/AvroPipelineTest.java |   2 +-
 .../java/org/apache/beam/sdk/io/AvroIO.java     | 222 +++++++------------
 .../java/org/apache/beam/sdk/io/AvroIOTest.java |  24 +-
 .../apache/beam/sdk/io/AvroIOTransformTest.java |  18 +-
 4 files changed, 108 insertions(+), 158 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/1499d256/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
index 2a73c28..e3a44d2 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
@@ -74,7 +74,7 @@ public class AvroPipelineTest {
 
     Pipeline p = pipelineRule.createPipeline();
     PCollection<GenericRecord> input = p.apply(
-        AvroIO.Read.from(inputFile.getAbsolutePath()).withSchema(schema));
+        AvroIO.read().from(inputFile.getAbsolutePath()).withSchema(schema));
     input.apply(AvroIO.Write.to(outputDir.getAbsolutePath()).withSchema(schema));
     p.run().waitUntilFinish();
 

http://git-wip-us.apache.org/repos/asf/beam/blob/1499d256/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index 75e14d5..abde9cb 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -63,7 +63,7 @@ import org.apache.beam.sdk.values.PDone;
  *
  * // A simple Read of a local file (only runs locally):
  * PCollection<AvroAutoGenClass> records =
- *     p.apply(AvroIO.Read.from("/path/to/file.avro")
+ *     p.apply(AvroIO.read().from("/path/to/file.avro")
  *                 .withSchema(AvroAutoGenClass.class));
  *
  * // A Read from a GCS file (runs locally and using remote execution):
@@ -125,15 +125,39 @@ import org.apache.beam.sdk.values.PDone;
  */
 public class AvroIO {
   /**
-   * A root {@link PTransform} that reads from an Avro file (or multiple Avro
-   * files matching a pattern) and returns a {@link PCollection} containing
-   * the decoding of each record.
+   * Reads records of the given type from an Avro file (or multiple Avro files matching a pattern).
+   *
+   * <p>The schema must be specified using one of the {@code withSchema} functions.
    */
-  public static class Read {
+  public static <T> Read<T> read() {
+    return new Read<>();
+  }
+
+  /** Implementation of {@link #read}. */
+  public static class Read<T> extends PTransform<PBegin, PCollection<T>> {
+    /** The filepattern to read from. */
+    @Nullable
+    final String filepattern;
+    /** The class type of the records. */
+    @Nullable
+    final Class<T> type;
+    /** The schema of the input file. */
+    @Nullable
+    final Schema schema;
+
+    Read() {
+      this(null, null, null, null);
+    }
+
+    Read(String name, String filepattern, Class<T> type, Schema schema) {
+      super(name);
+      this.filepattern = filepattern;
+      this.type = type;
+      this.schema = schema;
+    }
 
     /**
-     * Returns a {@link PTransform} that reads from the file(s)
-     * with the given name or pattern. This can be a local filename
+     * Reads from the file(s) with the given name or pattern. This can be a local filename
      * or filename pattern (if running locally), or a Google Cloud
      * Storage filename or filename pattern of the form
      * {@code "gs://<bucket>/<filepath>"} (if running locally or
@@ -141,162 +165,82 @@ public class AvroIO {
      * <a href="http://docs.oracle.com/javase/tutorial/essential/io/find.html">Java
      * Filesystem glob patterns</a> ("*", "?", "[..]") are supported.
      */
-    public static Bound<GenericRecord> from(String filepattern) {
-      return new Bound<>(GenericRecord.class).from(filepattern);
+    public Read<T> from(String filepattern) {
+      return new Read<>(name, filepattern, type, schema);
     }
 
     /**
-     * Returns a {@link PTransform} that reads Avro file(s)
-     * containing records whose type is the specified Avro-generated class.
-     *
-     * @param <T> the type of the decoded elements, and the elements
-     * of the resulting {@link PCollection}
+     * Returns a new {@link PTransform} that's like this one but
+     * that reads Avro file(s) containing records whose type is the
+     * specified Avro-generated class.
      */
-    public static <T> Bound<T> withSchema(Class<T> type) {
-      return new Bound<>(type).withSchema(type);
+    public Read<T> withSchema(Class<T> type) {
+      return new Read<>(name, filepattern, type, ReflectData.get().getSchema(type));
     }
 
     /**
-     * Returns a {@link PTransform} that reads Avro file(s)
-     * containing records of the specified schema.
+     * Returns a new {@link PTransform} that's like this one but
+     * that reads Avro file(s) containing records of the specified schema.
      */
-    public static Bound<GenericRecord> withSchema(Schema schema) {
-      return new Bound<>(GenericRecord.class).withSchema(schema);
+    public Read<GenericRecord> withSchema(Schema schema) {
+      return new Read<>(name, filepattern, GenericRecord.class, schema);
     }
 
     /**
-     * Returns a {@link PTransform} that reads Avro file(s)
-     * containing records of the specified schema in a JSON-encoded
-     * string form.
+     * Returns a new {@link PTransform} that's like this one but
+     * that reads Avro file(s) containing records of the specified schema
+     * in a JSON-encoded string form.
+     *
+     * <p>Does not modify this object.
      */
-    public static Bound<GenericRecord> withSchema(String schema) {
+    public Read<GenericRecord> withSchema(String schema) {
       return withSchema((new Schema.Parser()).parse(schema));
     }
 
-    /**
-     * A {@link PTransform} that reads from an Avro file (or multiple Avro
-     * files matching a pattern) and returns a bounded {@link PCollection} containing
-     * the decoding of each record.
-     *
-     * @param <T> the type of each of the elements of the resulting
-     * PCollection
-     */
-    public static class Bound<T> extends PTransform<PBegin, PCollection<T>> {
-      /** The filepattern to read from. */
-      @Nullable
-      final String filepattern;
-      /** The class type of the records. */
-      final Class<T> type;
-      /** The schema of the input file. */
-      @Nullable
-      final Schema schema;
-
-      Bound(Class<T> type) {
-        this(null, null, type, null);
+    @Override
+    public PCollection<T> expand(PBegin input) {
+      if (filepattern == null) {
+        throw new IllegalStateException(
+            "need to set the filepattern of an AvroIO.Read transform");
       }
-
-      Bound(String name, String filepattern, Class<T> type, Schema schema) {
-        super(name);
-        this.filepattern = filepattern;
-        this.type = type;
-        this.schema = schema;
+      if (schema == null) {
+        throw new IllegalStateException("need to set the schema of an AvroIO.Read transform");
       }
 
-      /**
-       * Returns a new {@link PTransform} that's like this one but
-       * that reads from the file(s) with the given name or pattern.
-       * (See {@link AvroIO.Read#from} for a description of
-       * filepatterns.)
-       *
-       * <p>Does not modify this object.
-       */
-      public Bound<T> from(String filepattern) {
-        return new Bound<>(name, filepattern, type, schema);
-      }
+      @SuppressWarnings("unchecked")
+      Bounded<T> read =
+          type == GenericRecord.class
+              ? (Bounded<T>) org.apache.beam.sdk.io.Read.from(
+                  AvroSource.from(filepattern).withSchema(schema))
+              : org.apache.beam.sdk.io.Read.from(
+                  AvroSource.from(filepattern).withSchema(type));
 
-      /**
-       * Returns a new {@link PTransform} that's like this one but
-       * that reads Avro file(s) containing records whose type is the
-       * specified Avro-generated class.
-       *
-       * <p>Does not modify this object.
-       *
-       * @param <X> the type of the decoded elements and the elements of
-       * the resulting PCollection
-       */
-      public <X> Bound<X> withSchema(Class<X> type) {
-        return new Bound<>(name, filepattern, type, ReflectData.get().getSchema(type));
-      }
-
-      /**
-       * Returns a new {@link PTransform} that's like this one but
-       * that reads Avro file(s) containing records of the specified schema.
-       *
-       * <p>Does not modify this object.
-       */
-      public Bound<GenericRecord> withSchema(Schema schema) {
-        return new Bound<>(name, filepattern, GenericRecord.class, schema);
-      }
-
-      /**
-       * Returns a new {@link PTransform} that's like this one but
-       * that reads Avro file(s) containing records of the specified schema
-       * in a JSON-encoded string form.
-       *
-       * <p>Does not modify this object.
-       */
-      public Bound<GenericRecord> withSchema(String schema) {
-        return withSchema((new Schema.Parser()).parse(schema));
-      }
-
-      @Override
-      public PCollection<T> expand(PBegin input) {
-        if (filepattern == null) {
-          throw new IllegalStateException(
-              "need to set the filepattern of an AvroIO.Read transform");
-        }
-        if (schema == null) {
-          throw new IllegalStateException("need to set the schema of an AvroIO.Read transform");
-        }
-
-        @SuppressWarnings("unchecked")
-        Bounded<T> read =
-            type == GenericRecord.class
-                ? (Bounded<T>) org.apache.beam.sdk.io.Read.from(
-                    AvroSource.from(filepattern).withSchema(schema))
-                : org.apache.beam.sdk.io.Read.from(
-                    AvroSource.from(filepattern).withSchema(type));
-
-        PCollection<T> pcol = input.getPipeline().apply("Read", read);
-        // Honor the default output coder that would have been used by this PTransform.
-        pcol.setCoder(getDefaultOutputCoder());
-        return pcol;
-      }
-
-      @Override
-      public void populateDisplayData(DisplayData.Builder builder) {
-        super.populateDisplayData(builder);
-        builder
-          .addIfNotNull(DisplayData.item("filePattern", filepattern)
-            .withLabel("Input File Pattern"));
-      }
+      PCollection<T> pcol = input.getPipeline().apply("Read", read);
+      // Honor the default output coder that would have been used by this PTransform.
+      pcol.setCoder(getDefaultOutputCoder());
+      return pcol;
+    }
 
-      @Override
-      protected Coder<T> getDefaultOutputCoder() {
-        return AvroCoder.of(type, schema);
-      }
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+      builder
+        .addIfNotNull(DisplayData.item("filePattern", filepattern)
+          .withLabel("Input File Pattern"));
+    }
 
-      public String getFilepattern() {
-        return filepattern;
-      }
+    @Override
+    protected Coder<T> getDefaultOutputCoder() {
+      return AvroCoder.of(type, schema);
+    }
 
-      public Schema getSchema() {
-        return schema;
-      }
+    public String getFilepattern() {
+      return filepattern;
     }
 
-    /** Disallow construction of utility class. */
-    private Read() {}
+    public Schema getSchema() {
+      return schema;
+    }
   }
 
   /////////////////////////////////////////////////////////////////////////////

http://git-wip-us.apache.org/repos/asf/beam/blob/1499d256/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
index ece7997..6d842b3 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
@@ -103,7 +103,7 @@ public class AvroIOTest {
 
   @Test
   public void testAvroIOGetName() {
-    assertEquals("AvroIO.Read", AvroIO.Read.from("gs://bucket/foo*/baz").getName());
+    assertEquals("AvroIO.Read", AvroIO.read().from("gs://bucket/foo*/baz").getName());
     assertEquals("AvroIO.Write", AvroIO.Write.to("gs://bucket/foo/baz").getName());
   }
 
@@ -150,8 +150,11 @@ public class AvroIOTest {
           .withSchema(GenericClass.class));
     p.run();
 
-    PCollection<GenericClass> input = p
-        .apply(AvroIO.Read.from(outputFile.getAbsolutePath()).withSchema(GenericClass.class));
+    PCollection<GenericClass> input =
+        p.apply(
+            AvroIO.<GenericClass>read()
+                .from(outputFile.getAbsolutePath())
+                .withSchema(GenericClass.class));
 
     PAssert.that(input).containsInAnyOrder(values);
     p.run();
@@ -173,7 +176,7 @@ public class AvroIOTest {
     p.run();
 
     PCollection<GenericClass> input = p
-        .apply(AvroIO.Read
+        .apply(AvroIO.<GenericClass>read()
             .from(outputFile.getAbsolutePath())
             .withSchema(GenericClass.class));
 
@@ -200,7 +203,7 @@ public class AvroIOTest {
     p.run();
 
     PCollection<GenericClass> input = p
-        .apply(AvroIO.Read
+        .apply(AvroIO.<GenericClass>read()
             .from(outputFile.getAbsolutePath())
             .withSchema(GenericClass.class));
 
@@ -269,8 +272,11 @@ public class AvroIOTest {
     List<GenericClassV2> expected = ImmutableList.of(new GenericClassV2(3, "hi", null),
         new GenericClassV2(5, "bar", null));
 
-    PCollection<GenericClassV2> input = p
-        .apply(AvroIO.Read.from(outputFile.getAbsolutePath()).withSchema(GenericClassV2.class));
+    PCollection<GenericClassV2> input =
+        p.apply(
+            AvroIO.<GenericClassV2>read()
+                .from(outputFile.getAbsolutePath())
+                .withSchema(GenericClassV2.class));
 
     PAssert.that(input).containsInAnyOrder(expected);
     p.run();
@@ -533,7 +539,7 @@ public class AvroIOTest {
 
   @Test
   public void testReadDisplayData() {
-    AvroIO.Read.Bound<?> read = AvroIO.Read.from("foo.*");
+    AvroIO.Read<?> read = AvroIO.read().from("foo.*");
 
     DisplayData displayData = DisplayData.from(read);
     assertThat(displayData, hasDisplayItem("filePattern", "foo.*"));
@@ -544,7 +550,7 @@ public class AvroIOTest {
   public void testPrimitiveReadDisplayData() {
     DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
 
-    AvroIO.Read.Bound<?> read = AvroIO.Read.from("foo.*")
+    AvroIO.Read<?> read = AvroIO.read().from("foo.*")
         .withSchema(Schema.create(Schema.Type.STRING));
 
     Set<DisplayData> displayData = evaluator.displayDataForPrimitiveSourceTransforms(read);

http://git-wip-us.apache.org/repos/asf/beam/blob/1499d256/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java
index 3cf52a4..06b9841 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java
@@ -138,13 +138,13 @@ public class AvroIOTransformTest {
     }
 
     private <T> void runTestRead(@Nullable final String applyName,
-                                 final AvroIO.Read.Bound<T> readBuilder,
+                                 final AvroIO.Read<T> readBuilder,
                                  final String expectedName,
                                  final T[] expectedOutput) throws Exception {
 
       final File avroFile = tmpFolder.newFile("file.avro");
       generateAvroFile(generateAvroObjects(), avroFile);
-      final AvroIO.Read.Bound<T> read = readBuilder.from(avroFile.getPath());
+      final AvroIO.Read<T> read = readBuilder.from(avroFile.getPath());
       final PCollection<T> output =
           applyName == null ? pipeline.apply(read) : pipeline.apply(applyName, read);
 
@@ -169,14 +169,14 @@ public class AvroIOTransformTest {
                   // test read using generated class
                   new Object[] {
                       null,
-                      AvroIO.Read.withSchema(AvroGeneratedUser.class),
+                      AvroIO.<AvroGeneratedUser>read().withSchema(AvroGeneratedUser.class),
                       "AvroIO.Read/Read.out",
                       generateAvroObjects(),
                       generatedClass
                   },
                   new Object[] {
                       "MyRead",
-                      AvroIO.Read.withSchema(AvroGeneratedUser.class),
+                      AvroIO.<AvroGeneratedUser>read().withSchema(AvroGeneratedUser.class),
                       "MyRead/Read.out",
                       generateAvroObjects(),
                       generatedClass
@@ -185,14 +185,14 @@ public class AvroIOTransformTest {
                   // test read using schema object
                   new Object[] {
                       null,
-                      AvroIO.Read.withSchema(SCHEMA),
+                      AvroIO.read().withSchema(SCHEMA),
                       "AvroIO.Read/Read.out",
                       generateAvroGenericRecords(),
                       fromSchema
                   },
                   new Object[] {
                       "MyRead",
-                      AvroIO.Read.withSchema(SCHEMA),
+                      AvroIO.read().withSchema(SCHEMA),
                       "MyRead/Read.out",
                       generateAvroGenericRecords(),
                       fromSchema
@@ -201,14 +201,14 @@ public class AvroIOTransformTest {
                   // test read using schema string
                   new Object[] {
                       null,
-                      AvroIO.Read.withSchema(SCHEMA_STRING),
+                      AvroIO.read().withSchema(SCHEMA_STRING),
                       "AvroIO.Read/Read.out",
                       generateAvroGenericRecords(),
                       fromSchemaString
                   },
                   new Object[] {
                       "MyRead",
-                      AvroIO.Read.withSchema(SCHEMA_STRING),
+                      AvroIO.read().withSchema(SCHEMA_STRING),
                       "MyRead/Read.out",
                       generateAvroGenericRecords(),
                       fromSchemaString
@@ -221,7 +221,7 @@ public class AvroIOTransformTest {
     public String transformName;
 
     @Parameterized.Parameter(1)
-    public AvroIO.Read.Bound readTransform;
+    public AvroIO.Read readTransform;
 
     @Parameterized.Parameter(2)
     public String expectedReadTransformName;