You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/05/02 01:45:56 UTC

[01/11] beam git commit: Scattered minor improvements per review comments

Repository: beam
Updated Branches:
  refs/heads/master 6d443bc39 -> 034565c68


Scattered minor improvements per review comments


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/caf2faeb
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/caf2faeb
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/caf2faeb

Branch: refs/heads/master
Commit: caf2faeb5e0b173f4e40f4af70c14d1d5d4244e4
Parents: 27d7462
Author: Eugene Kirpichov <ki...@google.com>
Authored: Mon May 1 17:00:46 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Mon May 1 18:43:38 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/io/AvroIO.java     | 81 +++++++++-----------
 .../java/org/apache/beam/sdk/io/AvroSink.java   | 14 +---
 .../java/org/apache/beam/sdk/io/AvroSource.java |  4 +-
 .../beam/sdk/testing/SourceTestUtils.java       |  5 +-
 .../java/org/apache/beam/sdk/io/AvroIOTest.java | 44 ++++++-----
 5 files changed, 69 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/caf2faeb/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index 6b66a98..755cdb9 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -33,6 +33,7 @@ import org.apache.avro.reflect.ReflectData;
 import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
 import org.apache.beam.sdk.io.Read.Bounded;
 import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -45,11 +46,9 @@ import org.apache.beam.sdk.values.PDone;
 /**
  * {@link PTransform}s for reading and writing Avro files.
  *
- * <p>To read a {@link PCollection} from one or more Avro files, use
- * {@code AvroIO.read()}, specifying {@link AvroIO.Read#from} to specify
- * the path of the file(s) to read from (e.g., a local filename or
- * filename pattern if running locally, or a Google Cloud Storage
- * filename or filename pattern of the form {@code "gs://<bucket>/<filepath>"}).
+ * <p>To read a {@link PCollection} from one or more Avro files, use {@code AvroIO.read()},
+ * specifying {@link AvroIO.Read#from} to specify the filename or filepattern to read from.
+ * See {@link FileSystems} for information on supported file systems and filepatterns.
  *
  * <p>To read specific records, such as Avro-generated classes, use {@link #read(Class)}.
  * To read {@link GenericRecord GenericRecords}, use {@link #readGenericRecords(Schema)} which takes
@@ -72,13 +71,12 @@ import org.apache.beam.sdk.values.PDone;
  *                .from("gs://my_bucket/path/to/records-*.avro"));
  * } </pre>
  *
- * <p>To write a {@link PCollection} to one or more Avro files, use
- * {@link AvroIO.Write}, specifying {@code AvroIO.write().to(String)} to specify
- * the path of the file to write to (e.g., a local filename or sharded
- * filename pattern if running locally, or a Google Cloud Storage
- * filename or sharded filename pattern of the form
- * {@code "gs://<bucket>/<filepath>"}). {@code AvroIO.write().to(FileBasedSink.FilenamePolicy)}
- * can also be used to specify a custom file naming policy.
+ * <p>To write a {@link PCollection} to one or more Avro files, use {@link AvroIO.Write}, specifying
+ * {@code AvroIO.write().to(String)} to specify the filename or sharded filepattern to write to.
+ * See {@link FileSystems} for information on supported file systems and {@link ShardNameTemplate}
+ * for information on naming of output files. You can also use {@code AvroIO.write()} with
+ * {@link Write#to(FileBasedSink.FilenamePolicy)} to
+ * specify a custom file naming policy.
  *
  * <p>By default, all input is put into the global window before writing. If per-window writes are
  * desired - for example, when using a streaming runner -
@@ -140,7 +138,8 @@ public class AvroIO {
   }
 
   /**
-   * Like {@link #readGenericRecords(Schema)} but the schema is specified as a JSON-encoded string.
+   * Reads Avro file(s) containing records of the specified schema. The schema is specified as a
+   * JSON-encoded string.
    */
   public static Read<GenericRecord> readGenericRecords(String schema) {
     return readGenericRecords(new Schema.Parser().parse(schema));
@@ -165,6 +164,13 @@ public class AvroIO {
         .build();
   }
 
+  /**
+   * Writes Avro records of the specified schema. The schema is specified as a JSON-encoded string.
+   */
+  public static Write<GenericRecord> writeGenericRecords(String schema) {
+    return writeGenericRecords(new Schema.Parser().parse(schema));
+  }
+
   private static <T> Write.Builder<T> defaultWriteBuilder() {
     return new AutoValue_AvroIO_Write.Builder<T>()
         .setFilenameSuffix("")
@@ -175,13 +181,6 @@ public class AvroIO {
         .setWindowedWrites(false);
   }
 
-  /**
-   * Like {@link #writeGenericRecords(Schema)} but the schema is specified as a JSON-encoded string.
-   */
-  public static Write<GenericRecord> writeGenericRecords(String schema) {
-    return writeGenericRecords(new Schema.Parser().parse(schema));
-  }
-
   /** Implementation of {@link #read}. */
   @AutoValue
   public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>> {
@@ -200,15 +199,7 @@ public class AvroIO {
       abstract Read<T> build();
     }
 
-    /**
-     * Reads from the file(s) with the given name or pattern. This can be a local filename
-     * or filename pattern (if running locally), or a Google Cloud
-     * Storage filename or filename pattern of the form
-     * {@code "gs://<bucket>/<filepath>"} (if running locally or
-     * using remote execution). Standard
-     * <a href="http://docs.oracle.com/javase/tutorial/essential/io/find.html">Java
-     * Filesystem glob patterns</a> ("*", "?", "[..]") are supported.
-     */
+    /** Reads from the given filename or filepattern. */
     public Read<T> from(String filepattern) {
       return toBuilder().setFilepattern(filepattern).build();
     }
@@ -275,7 +266,7 @@ public class AvroIO {
     abstract Class<T> getRecordClass();
     @Nullable abstract Schema getSchema();
     abstract boolean getWindowedWrites();
-    @Nullable abstract FileBasedSink.FilenamePolicy getFilenamePolicy();
+    @Nullable abstract FilenamePolicy getFilenamePolicy();
     /**
      * The codec used to encode the blocks in the Avro file. String value drawn from those in
      * https://avro.apache.org/docs/1.7.7/api/java/org/apache/avro/file/CodecFactory.html
@@ -295,7 +286,7 @@ public class AvroIO {
       abstract Builder<T> setRecordClass(Class<T> recordClass);
       abstract Builder<T> setSchema(Schema schema);
       abstract Builder<T> setWindowedWrites(boolean windowedWrites);
-      abstract Builder<T> setFilenamePolicy(FileBasedSink.FilenamePolicy filenamePolicy);
+      abstract Builder<T> setFilenamePolicy(FilenamePolicy filenamePolicy);
       abstract Builder<T> setCodec(SerializableAvroCodecFactory codec);
       abstract Builder<T> setMetadata(ImmutableMap<String, Object> metadata);
 
@@ -303,10 +294,8 @@ public class AvroIO {
     }
 
     /**
-     * Writes to the file(s) with the given prefix. This can be a local filename
-     * (if running locally), or a Google Cloud Storage filename of
-     * the form {@code "gs://<bucket>/<filepath>"}
-     * (if running locally or using remote execution).
+     * Writes to the file(s) with the given prefix. See {@link FileSystems} for information on
+     * supported file systems.
      *
      * <p>The files written will begin with this prefix, followed by
      * a shard identifier (see {@link #withNumShards}, and end
@@ -318,7 +307,7 @@ public class AvroIO {
     }
 
     /** Writes to the file(s) specified by the provided {@link FileBasedSink.FilenamePolicy}. */
-    public Write<T> to(FileBasedSink.FilenamePolicy filenamePolicy) {
+    public Write<T> to(FilenamePolicy filenamePolicy) {
       return toBuilder().setFilenamePolicy(filenamePolicy).build();
     }
 
@@ -333,7 +322,8 @@ public class AvroIO {
     }
 
     /**
-     * Uses the provided shard count.
+     * Uses the provided shard count. See {@link ShardNameTemplate} for a description of shard
+     * templates.
      *
      * <p>Constraining the number of shards is likely to reduce
      * the performance of a pipeline. Setting this value is not recommended
@@ -341,19 +331,13 @@ public class AvroIO {
      *
      * @param numShards the number of shards to use, or 0 to let the system
      *                  decide.
-     * @see ShardNameTemplate
      */
     public Write<T> withNumShards(int numShards) {
       checkArgument(numShards >= 0);
       return toBuilder().setNumShards(numShards).build();
     }
 
-    /**
-     * Returns a new {@link PTransform} that's like this one but
-     * that uses the given shard name template.
-     *
-     * @see ShardNameTemplate
-     */
+    /** Uses the given {@link ShardNameTemplate} for naming output files. */
     public Write<T> withShardNameTemplate(String shardTemplate) {
       return toBuilder().setShardTemplate(shardTemplate).build();
     }
@@ -361,12 +345,19 @@ public class AvroIO {
     /**
      * Forces a single file as output.
      *
-     * <p>This is a shortcut for {@code .withNumShards(1).withShardNameTemplate("")}
+     * <p>This is equivalent to {@code .withNumShards(1).withShardNameTemplate("")}
      */
     public Write<T> withoutSharding() {
       return withNumShards(1).withShardNameTemplate("");
     }
 
+    /**
+     * Preserves windowing of input elements and writes them to files based on the element's window.
+     *
+     * <p>Requires use of {@link #to(FileBasedSink.FilenamePolicy)}. Filenames will be generated
+     * using {@link FilenamePolicy#windowedFilename(FileBasedSink.FilenamePolicy.WindowedContext)}.
+     * See also {@link WriteFiles#withWindowedWrites()}.
+     */
     public Write<T> withWindowedWrites() {
       return toBuilder().setWindowedWrites(true).build();
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/caf2faeb/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java
index 16f233c..46bb4f3 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java
@@ -30,9 +30,7 @@ import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.util.MimeTypes;
 
-/**
- * A {@link FileBasedSink} for Avro files.
- */
+/** A {@link FileBasedSink} for Avro files. */
 class AvroSink<T> extends FileBasedSink<T> {
   private final AvroCoder<T> coder;
   private final SerializableAvroCodecFactory codec;
@@ -67,10 +65,7 @@ class AvroSink<T> extends FileBasedSink<T> {
     return new AvroWriteOperation<>(this, coder, codec, metadata);
   }
 
-  /**
-   * A {@link FileBasedWriteOperation
-   * FileBasedWriteOperation} for Avro files.
-   */
+  /** A {@link FileBasedWriteOperation FileBasedWriteOperation} for Avro files. */
   private static class AvroWriteOperation<T> extends FileBasedWriteOperation<T> {
     private final AvroCoder<T> coder;
     private final SerializableAvroCodecFactory codec;
@@ -92,10 +87,7 @@ class AvroSink<T> extends FileBasedSink<T> {
     }
   }
 
-  /**
-   * A {@link FileBasedWriter FileBasedWriter}
-   * for Avro files.
-   */
+  /** A {@link FileBasedWriter FileBasedWriter} for Avro files. */
   private static class AvroWriter<T> extends FileBasedWriter<T> {
     private final AvroCoder<T> coder;
     private DataFileWriter<T> dataFileWriter;

http://git-wip-us.apache.org/repos/asf/beam/blob/caf2faeb/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
index 58e6555..96d21c6 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
@@ -62,7 +62,9 @@ import org.apache.commons.compress.utils.CountingInputStream;
 
 // CHECKSTYLE.OFF: JavadocStyle
 /**
- * A {@link FileBasedSource} for reading Avro files.
+ * Do not use in pipelines directly: most users should use {@link AvroIO.Read}.
+ *
+ * <p>A {@link FileBasedSource} for reading Avro files.
  *
  * <p>To read a {@link PCollection} of objects from one or more Avro files, use
  * {@link AvroSource#from} to specify the path(s) of the files to read. The {@link AvroSource} that

http://git-wip-us.apache.org/repos/asf/beam/blob/caf2faeb/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SourceTestUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SourceTestUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SourceTestUtils.java
index fd7ae85..cde0b94 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SourceTestUtils.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/SourceTestUtils.java
@@ -72,9 +72,8 @@ import org.slf4j.LoggerFactory;
  *   as a heavy-weight stress test including concurrency. We strongly recommend to
  *   use both.
  * </ul>
- * For example usages, see the unit tests of classes such as
- * {@link org.apache.beam.sdk.io.AvroSource} or
- * {@link org.apache.beam.sdk.io.TextIO TextIO.TextSource}.
+ * For example usages, see the unit tests of classes such as {@code AvroSource} or
+ * {@code TextSource}.
  *
  * <p>Like {@link PAssert}, requires JUnit and Hamcrest to be present in the classpath.
  */

http://git-wip-us.apache.org/repos/asf/beam/blob/caf2faeb/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
index e421b96..d14d9b2 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
@@ -45,6 +45,7 @@ import org.apache.avro.file.CodecFactory;
 import org.apache.avro.file.DataFileReader;
 import org.apache.avro.file.DataFileStream;
 import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.reflect.Nullable;
 import org.apache.avro.reflect.ReflectData;
 import org.apache.avro.reflect.ReflectDatumReader;
@@ -143,8 +144,9 @@ public class AvroIOTest {
     File outputFile = tmpFolder.newFile("output.avro");
 
     p.apply(Create.of(values))
-      .apply(AvroIO.write(GenericClass.class).to(outputFile.getAbsolutePath())
-          .withoutSharding());
+     .apply(AvroIO.write(GenericClass.class)
+         .to(outputFile.getAbsolutePath())
+         .withoutSharding());
     p.run();
 
     PCollection<GenericClass> input =
@@ -165,9 +167,10 @@ public class AvroIOTest {
     File outputFile = tmpFolder.newFile("output.avro");
 
     p.apply(Create.of(values))
-        .apply(AvroIO.write(GenericClass.class).to(outputFile.getAbsolutePath())
-            .withoutSharding()
-            .withCodec(CodecFactory.deflateCodec(9)));
+     .apply(AvroIO.write(GenericClass.class)
+         .to(outputFile.getAbsolutePath())
+         .withoutSharding()
+         .withCodec(CodecFactory.deflateCodec(9)));
     p.run();
 
     PCollection<GenericClass> input = p
@@ -190,9 +193,10 @@ public class AvroIOTest {
     File outputFile = tmpFolder.newFile("output.avro");
 
     p.apply(Create.of(values))
-        .apply(AvroIO.write(GenericClass.class).to(outputFile.getAbsolutePath())
-            .withoutSharding()
-            .withCodec(CodecFactory.nullCodec()));
+      .apply(AvroIO.write(GenericClass.class)
+          .to(outputFile.getAbsolutePath())
+          .withoutSharding()
+          .withCodec(CodecFactory.nullCodec()));
     p.run();
 
     PCollection<GenericClass> input = p
@@ -256,7 +260,8 @@ public class AvroIOTest {
     File outputFile = tmpFolder.newFile("output.avro");
 
     p.apply(Create.of(values))
-      .apply(AvroIO.write(GenericClass.class).to(outputFile.getAbsolutePath())
+      .apply(AvroIO.write(GenericClass.class)
+          .to(outputFile.getAbsolutePath())
           .withoutSharding());
     p.run();
 
@@ -362,7 +367,8 @@ public class AvroIOTest {
     windowedAvroWritePipeline
         .apply(values)
         .apply(Window.<GenericClass>into(FixedWindows.of(Duration.standardMinutes(1))))
-        .apply(AvroIO.write(GenericClass.class).to(new WindowedFilenamePolicy(outputFilePrefix))
+        .apply(AvroIO.write(GenericClass.class)
+            .to(new WindowedFilenamePolicy(outputFilePrefix))
             .withWindowedWrites()
             .withNumShards(2));
     windowedAvroWritePipeline.run();
@@ -403,7 +409,7 @@ public class AvroIOTest {
 
   @Test
   public void testWriteWithCustomCodec() throws Exception {
-    AvroIO.Write<?> write = AvroIO.write(String.class)
+    AvroIO.Write<String> write = AvroIO.write(String.class)
         .to("gs://bucket/foo/baz")
         .withCodec(CodecFactory.snappyCodec());
     assertEquals(SNAPPY_CODEC, write.getCodec().toString());
@@ -442,7 +448,8 @@ public class AvroIOTest {
     File outputFile = tmpFolder.newFile("output.avro");
 
     p.apply(Create.of(values))
-        .apply(AvroIO.write(GenericClass.class).to(outputFile.getAbsolutePath())
+        .apply(AvroIO.write(GenericClass.class)
+            .to(outputFile.getAbsolutePath())
             .withoutSharding()
             .withMetadata(ImmutableMap.<String, Object>of(
                 "stringKey", "stringValue",
@@ -463,8 +470,7 @@ public class AvroIOTest {
     File baseOutputFile = new File(tmpFolder.getRoot(), "prefix");
     String outputFilePrefix = baseOutputFile.getAbsolutePath();
 
-    AvroIO.Write<String> write =
-        AvroIO.write(String.class).to(outputFilePrefix);
+    AvroIO.Write<String> write = AvroIO.write(String.class).to(outputFilePrefix);
     if (numShards > 1) {
       System.out.println("NumShards " + numShards);
       write = write.withNumShards(numShards);
@@ -524,7 +530,7 @@ public class AvroIOTest {
 
   @Test
   public void testReadDisplayData() {
-    AvroIO.Read<?> read = AvroIO.read(String.class).from("foo.*");
+    AvroIO.Read<String> read = AvroIO.read(String.class).from("foo.*");
 
     DisplayData displayData = DisplayData.from(read);
     assertThat(displayData, hasDisplayItem("filePattern", "foo.*"));
@@ -535,7 +541,7 @@ public class AvroIOTest {
   public void testPrimitiveReadDisplayData() {
     DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
 
-    AvroIO.Read<?> read =
+    AvroIO.Read<GenericRecord> read =
         AvroIO.readGenericRecords(Schema.create(Schema.Type.STRING)).from("foo.*");
 
     Set<DisplayData> displayData = evaluator.displayDataForPrimitiveSourceTransforms(read);
@@ -545,7 +551,7 @@ public class AvroIOTest {
 
   @Test
   public void testWriteDisplayData() {
-    AvroIO.Write<?> write = AvroIO.write(GenericClass.class)
+    AvroIO.Write<GenericClass> write = AvroIO.write(GenericClass.class)
         .to("foo")
         .withShardNameTemplate("-SS-of-NN-")
         .withSuffix("bar")
@@ -572,8 +578,8 @@ public class AvroIOTest {
 
     DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(options);
 
-    AvroIO.Write<?> write = AvroIO.writeGenericRecords(Schema.create(Schema.Type.STRING))
-        .to(outputPath);
+    AvroIO.Write<GenericRecord> write =
+        AvroIO.writeGenericRecords(Schema.create(Schema.Type.STRING)).to(outputPath);
 
     Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write);
     assertThat("AvroIO.Write should include the file pattern in its primitive transform",


[09/11] beam git commit: Adds AvroIO.readGenericRecords()

Posted by jk...@apache.org.
Adds AvroIO.readGenericRecords()


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ff7a1d42
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ff7a1d42
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ff7a1d42

Branch: refs/heads/master
Commit: ff7a1d42f2902bebdf998d3f00b2b268ba150058
Parents: 1499d25
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri Apr 28 18:36:20 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Mon May 1 18:43:38 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/spark/io/AvroPipelineTest.java |  2 +-
 .../java/org/apache/beam/sdk/io/AvroIO.java     | 31 ++++++++------------
 .../java/org/apache/beam/sdk/io/AvroIOTest.java |  8 ++---
 .../apache/beam/sdk/io/AvroIOTransformTest.java |  8 ++---
 4 files changed, 19 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/ff7a1d42/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
index e3a44d2..62db14f 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
@@ -74,7 +74,7 @@ public class AvroPipelineTest {
 
     Pipeline p = pipelineRule.createPipeline();
     PCollection<GenericRecord> input = p.apply(
-        AvroIO.read().from(inputFile.getAbsolutePath()).withSchema(schema));
+        AvroIO.readGenericRecords(schema).from(inputFile.getAbsolutePath()));
     input.apply(AvroIO.Write.to(outputDir.getAbsolutePath()).withSchema(schema));
     p.run().waitUntilFinish();
 

http://git-wip-us.apache.org/repos/asf/beam/blob/ff7a1d42/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index abde9cb..ed172d1 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -133,6 +133,18 @@ public class AvroIO {
     return new Read<>();
   }
 
+  /** Reads Avro file(s) containing records of the specified schema. */
+  public static Read<GenericRecord> readGenericRecords(Schema schema) {
+    return new Read<>(null, null, GenericRecord.class, schema);
+  }
+
+  /**
+   * Like {@link #readGenericRecords(Schema)} but the schema is specified as a JSON-encoded string.
+   */
+  public static Read<GenericRecord> readGenericRecords(String schema) {
+    return readGenericRecords(new Schema.Parser().parse(schema));
+  }
+
   /** Implementation of {@link #read}. */
   public static class Read<T> extends PTransform<PBegin, PCollection<T>> {
     /** The filepattern to read from. */
@@ -178,25 +190,6 @@ public class AvroIO {
       return new Read<>(name, filepattern, type, ReflectData.get().getSchema(type));
     }
 
-    /**
-     * Returns a new {@link PTransform} that's like this one but
-     * that reads Avro file(s) containing records of the specified schema.
-     */
-    public Read<GenericRecord> withSchema(Schema schema) {
-      return new Read<>(name, filepattern, GenericRecord.class, schema);
-    }
-
-    /**
-     * Returns a new {@link PTransform} that's like this one but
-     * that reads Avro file(s) containing records of the specified schema
-     * in a JSON-encoded string form.
-     *
-     * <p>Does not modify this object.
-     */
-    public Read<GenericRecord> withSchema(String schema) {
-      return withSchema((new Schema.Parser()).parse(schema));
-    }
-
     @Override
     public PCollection<T> expand(PBegin input) {
       if (filepattern == null) {

http://git-wip-us.apache.org/repos/asf/beam/blob/ff7a1d42/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
index 6d842b3..2144b0d 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
@@ -282,10 +282,6 @@ public class AvroIOTest {
     p.run();
   }
 
-  private TimestampedValue<GenericClass> newValue(GenericClass element, Duration duration) {
-    return TimestampedValue.of(element, new Instant(0).plus(duration));
-  }
-
   private static class WindowedFilenamePolicy extends FilenamePolicy {
     String outputFilePrefix;
 
@@ -550,8 +546,8 @@ public class AvroIOTest {
   public void testPrimitiveReadDisplayData() {
     DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
 
-    AvroIO.Read<?> read = AvroIO.read().from("foo.*")
-        .withSchema(Schema.create(Schema.Type.STRING));
+    AvroIO.Read<?> read =
+        AvroIO.readGenericRecords(Schema.create(Schema.Type.STRING)).from("foo.*");
 
     Set<DisplayData> displayData = evaluator.displayDataForPrimitiveSourceTransforms(read);
     assertThat("AvroIO.Read should include the file pattern in its primitive transform",

http://git-wip-us.apache.org/repos/asf/beam/blob/ff7a1d42/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java
index 06b9841..b974663 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java
@@ -185,14 +185,14 @@ public class AvroIOTransformTest {
                   // test read using schema object
                   new Object[] {
                       null,
-                      AvroIO.read().withSchema(SCHEMA),
+                      AvroIO.readGenericRecords(SCHEMA),
                       "AvroIO.Read/Read.out",
                       generateAvroGenericRecords(),
                       fromSchema
                   },
                   new Object[] {
                       "MyRead",
-                      AvroIO.read().withSchema(SCHEMA),
+                      AvroIO.readGenericRecords(SCHEMA),
                       "MyRead/Read.out",
                       generateAvroGenericRecords(),
                       fromSchema
@@ -201,14 +201,14 @@ public class AvroIOTransformTest {
                   // test read using schema string
                   new Object[] {
                       null,
-                      AvroIO.read().withSchema(SCHEMA_STRING),
+                      AvroIO.readGenericRecords(SCHEMA_STRING),
                       "AvroIO.Read/Read.out",
                       generateAvroGenericRecords(),
                       fromSchemaString
                   },
                   new Object[] {
                       "MyRead",
-                      AvroIO.read().withSchema(SCHEMA_STRING),
+                      AvroIO.readGenericRecords(SCHEMA_STRING),
                       "MyRead/Read.out",
                       generateAvroGenericRecords(),
                       fromSchemaString


[04/11] beam git commit: Fixes javadoc of TextIO to not point to AvroIO

Posted by jk...@apache.org.
Fixes javadoc of TextIO to not point to AvroIO


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2fa3c348
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2fa3c348
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2fa3c348

Branch: refs/heads/master
Commit: 2fa3c348d5a6aab6a7da7c6c62f6b9254feb13af
Parents: 6d443bc
Author: Eugene Kirpichov <ki...@google.com>
Authored: Sat Apr 29 16:15:24 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Mon May 1 18:43:38 2017 -0700

----------------------------------------------------------------------
 sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/2fa3c348/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index 6b58391..0947702 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -85,9 +85,9 @@ import org.apache.beam.sdk.values.PDone;
  *
  * <p>By default, all input is put into the global window before writing. If per-window writes are
  * desired - for example, when using a streaming runner -
- * {@link AvroIO.Write.Bound#withWindowedWrites()} will cause windowing and triggering to be
+ * {@link TextIO.Write.Bound#withWindowedWrites()} will cause windowing and triggering to be
  * preserved. When producing windowed writes, the number of output shards must be set explicitly
- * using {@link AvroIO.Write.Bound#withNumShards(int)}; some runners may set this for you to a
+ * using {@link TextIO.Write.Bound#withNumShards(int)}; some runners may set this for you to a
  * runner-chosen value, so you may need not set it yourself. A {@link FilenamePolicy} must be
  * set, and unique windows and triggers must produce unique filenames.
  *


[06/11] beam git commit: Removes AvroIO.Read.Bound

Posted by jk...@apache.org.
Removes AvroIO.Read.Bound


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1499d256
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1499d256
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1499d256

Branch: refs/heads/master
Commit: 1499d256c616e34b4416fa202a45aa256ac88d20
Parents: 0166e19
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri Apr 28 18:19:21 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Mon May 1 18:43:38 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/spark/io/AvroPipelineTest.java |   2 +-
 .../java/org/apache/beam/sdk/io/AvroIO.java     | 222 +++++++------------
 .../java/org/apache/beam/sdk/io/AvroIOTest.java |  24 +-
 .../apache/beam/sdk/io/AvroIOTransformTest.java |  18 +-
 4 files changed, 108 insertions(+), 158 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/1499d256/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
index 2a73c28..e3a44d2 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
@@ -74,7 +74,7 @@ public class AvroPipelineTest {
 
     Pipeline p = pipelineRule.createPipeline();
     PCollection<GenericRecord> input = p.apply(
-        AvroIO.Read.from(inputFile.getAbsolutePath()).withSchema(schema));
+        AvroIO.read().from(inputFile.getAbsolutePath()).withSchema(schema));
     input.apply(AvroIO.Write.to(outputDir.getAbsolutePath()).withSchema(schema));
     p.run().waitUntilFinish();
 

http://git-wip-us.apache.org/repos/asf/beam/blob/1499d256/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index 75e14d5..abde9cb 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -63,7 +63,7 @@ import org.apache.beam.sdk.values.PDone;
  *
  * // A simple Read of a local file (only runs locally):
  * PCollection<AvroAutoGenClass> records =
- *     p.apply(AvroIO.Read.from("/path/to/file.avro")
+ *     p.apply(AvroIO.read().from("/path/to/file.avro")
  *                 .withSchema(AvroAutoGenClass.class));
  *
  * // A Read from a GCS file (runs locally and using remote execution):
@@ -125,15 +125,39 @@ import org.apache.beam.sdk.values.PDone;
  */
 public class AvroIO {
   /**
-   * A root {@link PTransform} that reads from an Avro file (or multiple Avro
-   * files matching a pattern) and returns a {@link PCollection} containing
-   * the decoding of each record.
+   * Reads records of the given type from an Avro file (or multiple Avro files matching a pattern).
+   *
+   * <p>The schema must be specified using one of the {@code withSchema} functions.
    */
-  public static class Read {
+  public static <T> Read<T> read() {
+    return new Read<>();
+  }
+
+  /** Implementation of {@link #read}. */
+  public static class Read<T> extends PTransform<PBegin, PCollection<T>> {
+    /** The filepattern to read from. */
+    @Nullable
+    final String filepattern;
+    /** The class type of the records. */
+    @Nullable
+    final Class<T> type;
+    /** The schema of the input file. */
+    @Nullable
+    final Schema schema;
+
+    Read() {
+      this(null, null, null, null);
+    }
+
+    Read(String name, String filepattern, Class<T> type, Schema schema) {
+      super(name);
+      this.filepattern = filepattern;
+      this.type = type;
+      this.schema = schema;
+    }
 
     /**
-     * Returns a {@link PTransform} that reads from the file(s)
-     * with the given name or pattern. This can be a local filename
+     * Reads from the file(s) with the given name or pattern. This can be a local filename
      * or filename pattern (if running locally), or a Google Cloud
      * Storage filename or filename pattern of the form
      * {@code "gs://<bucket>/<filepath>"} (if running locally or
@@ -141,162 +165,82 @@ public class AvroIO {
      * <a href="http://docs.oracle.com/javase/tutorial/essential/io/find.html">Java
      * Filesystem glob patterns</a> ("*", "?", "[..]") are supported.
      */
-    public static Bound<GenericRecord> from(String filepattern) {
-      return new Bound<>(GenericRecord.class).from(filepattern);
+    public Read<T> from(String filepattern) {
+      return new Read<>(name, filepattern, type, schema);
     }
 
     /**
-     * Returns a {@link PTransform} that reads Avro file(s)
-     * containing records whose type is the specified Avro-generated class.
-     *
-     * @param <T> the type of the decoded elements, and the elements
-     * of the resulting {@link PCollection}
+     * Returns a new {@link PTransform} that's like this one but
+     * that reads Avro file(s) containing records whose type is the
+     * specified Avro-generated class.
      */
-    public static <T> Bound<T> withSchema(Class<T> type) {
-      return new Bound<>(type).withSchema(type);
+    public Read<T> withSchema(Class<T> type) {
+      return new Read<>(name, filepattern, type, ReflectData.get().getSchema(type));
     }
 
     /**
-     * Returns a {@link PTransform} that reads Avro file(s)
-     * containing records of the specified schema.
+     * Returns a new {@link PTransform} that's like this one but
+     * that reads Avro file(s) containing records of the specified schema.
      */
-    public static Bound<GenericRecord> withSchema(Schema schema) {
-      return new Bound<>(GenericRecord.class).withSchema(schema);
+    public Read<GenericRecord> withSchema(Schema schema) {
+      return new Read<>(name, filepattern, GenericRecord.class, schema);
     }
 
     /**
-     * Returns a {@link PTransform} that reads Avro file(s)
-     * containing records of the specified schema in a JSON-encoded
-     * string form.
+     * Returns a new {@link PTransform} that's like this one but
+     * that reads Avro file(s) containing records of the specified schema
+     * in a JSON-encoded string form.
+     *
+     * <p>Does not modify this object.
      */
-    public static Bound<GenericRecord> withSchema(String schema) {
+    public Read<GenericRecord> withSchema(String schema) {
       return withSchema((new Schema.Parser()).parse(schema));
     }
 
-    /**
-     * A {@link PTransform} that reads from an Avro file (or multiple Avro
-     * files matching a pattern) and returns a bounded {@link PCollection} containing
-     * the decoding of each record.
-     *
-     * @param <T> the type of each of the elements of the resulting
-     * PCollection
-     */
-    public static class Bound<T> extends PTransform<PBegin, PCollection<T>> {
-      /** The filepattern to read from. */
-      @Nullable
-      final String filepattern;
-      /** The class type of the records. */
-      final Class<T> type;
-      /** The schema of the input file. */
-      @Nullable
-      final Schema schema;
-
-      Bound(Class<T> type) {
-        this(null, null, type, null);
+    @Override
+    public PCollection<T> expand(PBegin input) {
+      if (filepattern == null) {
+        throw new IllegalStateException(
+            "need to set the filepattern of an AvroIO.Read transform");
       }
-
-      Bound(String name, String filepattern, Class<T> type, Schema schema) {
-        super(name);
-        this.filepattern = filepattern;
-        this.type = type;
-        this.schema = schema;
+      if (schema == null) {
+        throw new IllegalStateException("need to set the schema of an AvroIO.Read transform");
       }
 
-      /**
-       * Returns a new {@link PTransform} that's like this one but
-       * that reads from the file(s) with the given name or pattern.
-       * (See {@link AvroIO.Read#from} for a description of
-       * filepatterns.)
-       *
-       * <p>Does not modify this object.
-       */
-      public Bound<T> from(String filepattern) {
-        return new Bound<>(name, filepattern, type, schema);
-      }
+      @SuppressWarnings("unchecked")
+      Bounded<T> read =
+          type == GenericRecord.class
+              ? (Bounded<T>) org.apache.beam.sdk.io.Read.from(
+                  AvroSource.from(filepattern).withSchema(schema))
+              : org.apache.beam.sdk.io.Read.from(
+                  AvroSource.from(filepattern).withSchema(type));
 
-      /**
-       * Returns a new {@link PTransform} that's like this one but
-       * that reads Avro file(s) containing records whose type is the
-       * specified Avro-generated class.
-       *
-       * <p>Does not modify this object.
-       *
-       * @param <X> the type of the decoded elements and the elements of
-       * the resulting PCollection
-       */
-      public <X> Bound<X> withSchema(Class<X> type) {
-        return new Bound<>(name, filepattern, type, ReflectData.get().getSchema(type));
-      }
-
-      /**
-       * Returns a new {@link PTransform} that's like this one but
-       * that reads Avro file(s) containing records of the specified schema.
-       *
-       * <p>Does not modify this object.
-       */
-      public Bound<GenericRecord> withSchema(Schema schema) {
-        return new Bound<>(name, filepattern, GenericRecord.class, schema);
-      }
-
-      /**
-       * Returns a new {@link PTransform} that's like this one but
-       * that reads Avro file(s) containing records of the specified schema
-       * in a JSON-encoded string form.
-       *
-       * <p>Does not modify this object.
-       */
-      public Bound<GenericRecord> withSchema(String schema) {
-        return withSchema((new Schema.Parser()).parse(schema));
-      }
-
-      @Override
-      public PCollection<T> expand(PBegin input) {
-        if (filepattern == null) {
-          throw new IllegalStateException(
-              "need to set the filepattern of an AvroIO.Read transform");
-        }
-        if (schema == null) {
-          throw new IllegalStateException("need to set the schema of an AvroIO.Read transform");
-        }
-
-        @SuppressWarnings("unchecked")
-        Bounded<T> read =
-            type == GenericRecord.class
-                ? (Bounded<T>) org.apache.beam.sdk.io.Read.from(
-                    AvroSource.from(filepattern).withSchema(schema))
-                : org.apache.beam.sdk.io.Read.from(
-                    AvroSource.from(filepattern).withSchema(type));
-
-        PCollection<T> pcol = input.getPipeline().apply("Read", read);
-        // Honor the default output coder that would have been used by this PTransform.
-        pcol.setCoder(getDefaultOutputCoder());
-        return pcol;
-      }
-
-      @Override
-      public void populateDisplayData(DisplayData.Builder builder) {
-        super.populateDisplayData(builder);
-        builder
-          .addIfNotNull(DisplayData.item("filePattern", filepattern)
-            .withLabel("Input File Pattern"));
-      }
+      PCollection<T> pcol = input.getPipeline().apply("Read", read);
+      // Honor the default output coder that would have been used by this PTransform.
+      pcol.setCoder(getDefaultOutputCoder());
+      return pcol;
+    }
 
-      @Override
-      protected Coder<T> getDefaultOutputCoder() {
-        return AvroCoder.of(type, schema);
-      }
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+      builder
+        .addIfNotNull(DisplayData.item("filePattern", filepattern)
+          .withLabel("Input File Pattern"));
+    }
 
-      public String getFilepattern() {
-        return filepattern;
-      }
+    @Override
+    protected Coder<T> getDefaultOutputCoder() {
+      return AvroCoder.of(type, schema);
+    }
 
-      public Schema getSchema() {
-        return schema;
-      }
+    public String getFilepattern() {
+      return filepattern;
     }
 
-    /** Disallow construction of utility class. */
-    private Read() {}
+    public Schema getSchema() {
+      return schema;
+    }
   }
 
   /////////////////////////////////////////////////////////////////////////////

http://git-wip-us.apache.org/repos/asf/beam/blob/1499d256/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
index ece7997..6d842b3 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
@@ -103,7 +103,7 @@ public class AvroIOTest {
 
   @Test
   public void testAvroIOGetName() {
-    assertEquals("AvroIO.Read", AvroIO.Read.from("gs://bucket/foo*/baz").getName());
+    assertEquals("AvroIO.Read", AvroIO.read().from("gs://bucket/foo*/baz").getName());
     assertEquals("AvroIO.Write", AvroIO.Write.to("gs://bucket/foo/baz").getName());
   }
 
@@ -150,8 +150,11 @@ public class AvroIOTest {
           .withSchema(GenericClass.class));
     p.run();
 
-    PCollection<GenericClass> input = p
-        .apply(AvroIO.Read.from(outputFile.getAbsolutePath()).withSchema(GenericClass.class));
+    PCollection<GenericClass> input =
+        p.apply(
+            AvroIO.<GenericClass>read()
+                .from(outputFile.getAbsolutePath())
+                .withSchema(GenericClass.class));
 
     PAssert.that(input).containsInAnyOrder(values);
     p.run();
@@ -173,7 +176,7 @@ public class AvroIOTest {
     p.run();
 
     PCollection<GenericClass> input = p
-        .apply(AvroIO.Read
+        .apply(AvroIO.<GenericClass>read()
             .from(outputFile.getAbsolutePath())
             .withSchema(GenericClass.class));
 
@@ -200,7 +203,7 @@ public class AvroIOTest {
     p.run();
 
     PCollection<GenericClass> input = p
-        .apply(AvroIO.Read
+        .apply(AvroIO.<GenericClass>read()
             .from(outputFile.getAbsolutePath())
             .withSchema(GenericClass.class));
 
@@ -269,8 +272,11 @@ public class AvroIOTest {
     List<GenericClassV2> expected = ImmutableList.of(new GenericClassV2(3, "hi", null),
         new GenericClassV2(5, "bar", null));
 
-    PCollection<GenericClassV2> input = p
-        .apply(AvroIO.Read.from(outputFile.getAbsolutePath()).withSchema(GenericClassV2.class));
+    PCollection<GenericClassV2> input =
+        p.apply(
+            AvroIO.<GenericClassV2>read()
+                .from(outputFile.getAbsolutePath())
+                .withSchema(GenericClassV2.class));
 
     PAssert.that(input).containsInAnyOrder(expected);
     p.run();
@@ -533,7 +539,7 @@ public class AvroIOTest {
 
   @Test
   public void testReadDisplayData() {
-    AvroIO.Read.Bound<?> read = AvroIO.Read.from("foo.*");
+    AvroIO.Read<?> read = AvroIO.read().from("foo.*");
 
     DisplayData displayData = DisplayData.from(read);
     assertThat(displayData, hasDisplayItem("filePattern", "foo.*"));
@@ -544,7 +550,7 @@ public class AvroIOTest {
   public void testPrimitiveReadDisplayData() {
     DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
 
-    AvroIO.Read.Bound<?> read = AvroIO.Read.from("foo.*")
+    AvroIO.Read<?> read = AvroIO.read().from("foo.*")
         .withSchema(Schema.create(Schema.Type.STRING));
 
     Set<DisplayData> displayData = evaluator.displayDataForPrimitiveSourceTransforms(read);

http://git-wip-us.apache.org/repos/asf/beam/blob/1499d256/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java
index 3cf52a4..06b9841 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java
@@ -138,13 +138,13 @@ public class AvroIOTransformTest {
     }
 
     private <T> void runTestRead(@Nullable final String applyName,
-                                 final AvroIO.Read.Bound<T> readBuilder,
+                                 final AvroIO.Read<T> readBuilder,
                                  final String expectedName,
                                  final T[] expectedOutput) throws Exception {
 
       final File avroFile = tmpFolder.newFile("file.avro");
       generateAvroFile(generateAvroObjects(), avroFile);
-      final AvroIO.Read.Bound<T> read = readBuilder.from(avroFile.getPath());
+      final AvroIO.Read<T> read = readBuilder.from(avroFile.getPath());
       final PCollection<T> output =
           applyName == null ? pipeline.apply(read) : pipeline.apply(applyName, read);
 
@@ -169,14 +169,14 @@ public class AvroIOTransformTest {
                   // test read using generated class
                   new Object[] {
                       null,
-                      AvroIO.Read.withSchema(AvroGeneratedUser.class),
+                      AvroIO.<AvroGeneratedUser>read().withSchema(AvroGeneratedUser.class),
                       "AvroIO.Read/Read.out",
                       generateAvroObjects(),
                       generatedClass
                   },
                   new Object[] {
                       "MyRead",
-                      AvroIO.Read.withSchema(AvroGeneratedUser.class),
+                      AvroIO.<AvroGeneratedUser>read().withSchema(AvroGeneratedUser.class),
                       "MyRead/Read.out",
                       generateAvroObjects(),
                       generatedClass
@@ -185,14 +185,14 @@ public class AvroIOTransformTest {
                   // test read using schema object
                   new Object[] {
                       null,
-                      AvroIO.Read.withSchema(SCHEMA),
+                      AvroIO.read().withSchema(SCHEMA),
                       "AvroIO.Read/Read.out",
                       generateAvroGenericRecords(),
                       fromSchema
                   },
                   new Object[] {
                       "MyRead",
-                      AvroIO.Read.withSchema(SCHEMA),
+                      AvroIO.read().withSchema(SCHEMA),
                       "MyRead/Read.out",
                       generateAvroGenericRecords(),
                       fromSchema
@@ -201,14 +201,14 @@ public class AvroIOTransformTest {
                   // test read using schema string
                   new Object[] {
                       null,
-                      AvroIO.Read.withSchema(SCHEMA_STRING),
+                      AvroIO.read().withSchema(SCHEMA_STRING),
                       "AvroIO.Read/Read.out",
                       generateAvroGenericRecords(),
                       fromSchemaString
                   },
                   new Object[] {
                       "MyRead",
-                      AvroIO.Read.withSchema(SCHEMA_STRING),
+                      AvroIO.read().withSchema(SCHEMA_STRING),
                       "MyRead/Read.out",
                       generateAvroGenericRecords(),
                       fromSchemaString
@@ -221,7 +221,7 @@ public class AvroIOTransformTest {
     public String transformName;
 
     @Parameterized.Parameter(1)
-    public AvroIO.Read.Bound readTransform;
+    public AvroIO.Read readTransform;
 
     @Parameterized.Parameter(2)
     public String expectedReadTransformName;


[10/11] beam git commit: Moves AvroIO.Read.withSchema into read()

Posted by jk...@apache.org.
Moves AvroIO.Read.withSchema into read()


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/abb4916c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/abb4916c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/abb4916c

Branch: refs/heads/master
Commit: abb4916ce2fa8d4a5caf783b66cc5541053ea83c
Parents: d1dfd4e
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri Apr 28 19:03:25 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Mon May 1 18:43:38 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/io/AvroIO.java     | 35 ++++++++------------
 .../java/org/apache/beam/sdk/io/AvroIOTest.java | 24 ++++++--------
 .../apache/beam/sdk/io/AvroIOTransformTest.java |  4 +--
 3 files changed, 25 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/abb4916c/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index 4bde6ec..08fc8a9 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -46,16 +46,15 @@ import org.apache.beam.sdk.values.PDone;
  * {@link PTransform}s for reading and writing Avro files.
  *
  * <p>To read a {@link PCollection} from one or more Avro files, use
- * {@link AvroIO.Read}, specifying {@link AvroIO.Read#from} to specify
+ * {@code AvroIO.read()}, specifying {@link AvroIO.Read#from} to specify
  * the path of the file(s) to read from (e.g., a local filename or
  * filename pattern if running locally, or a Google Cloud Storage
  * filename or filename pattern of the form {@code "gs://<bucket>/<filepath>"}).
  *
- * <p>It is required to specify {@link AvroIO.Read#withSchema}. To
- * read specific records, such as Avro-generated classes, provide an
- * Avro-generated class type. To read {@link GenericRecord GenericRecords}, provide either
- * a {@link Schema} object or an Avro schema in a JSON-encoded string form.
- * An exception will be thrown if a record doesn't match the specified
+ * <p>To read specific records, such as Avro-generated classes, use {@link #read(Class)}.
+ * To read {@link GenericRecord GenericRecords}, use {@link #readGenericRecords(Schema)} which takes
+ * a {@link Schema} object, or {@link #readGenericRecords(String)} which takes an Avro schema in a
+ * JSON-encoded string form. An exception will be thrown if a record doesn't match the specified
  * schema.
  *
  * <p>For example:
@@ -64,15 +63,13 @@ import org.apache.beam.sdk.values.PDone;
  *
  * // A simple Read of a local file (only runs locally):
  * PCollection<AvroAutoGenClass> records =
- *     p.apply(AvroIO.read().from("/path/to/file.avro")
- *                 .withSchema(AvroAutoGenClass.class));
+ *     p.apply(AvroIO.read(AvroAutoGenClass.class).from("/path/to/file.avro"));
  *
  * // A Read from a GCS file (runs locally and using remote execution):
  * Schema schema = new Schema.Parser().parse(new File("schema.avsc"));
  * PCollection<GenericRecord> records =
- *     p.apply(AvroIO.read()
- *                .from("gs://my_bucket/path/to/records-*.avro")
- *                .withSchema(schema));
+ *     p.apply(AvroIO.readGenericRecords(schema)
+ *                .from("gs://my_bucket/path/to/records-*.avro"));
  * } </pre>
  *
  * <p>To write a {@link PCollection} to one or more Avro files, use
@@ -130,8 +127,11 @@ public class AvroIO {
    *
    * <p>The schema must be specified using one of the {@code withSchema} functions.
    */
-  public static <T> Read<T> read() {
-    return new AutoValue_AvroIO_Read.Builder<T>().build();
+  public static <T> Read<T> read(Class<T> recordClass) {
+    return new AutoValue_AvroIO_Read.Builder<T>()
+        .setRecordClass(recordClass)
+        .setSchema(ReflectData.get().getSchema(recordClass))
+        .build();
   }
 
   /** Reads Avro file(s) containing records of the specified schema. */
@@ -188,15 +188,6 @@ public class AvroIO {
       return toBuilder().setFilepattern(filepattern).build();
     }
 
-    /**
-     * Returns a new {@link PTransform} that's like this one but
-     * that reads Avro file(s) containing records whose type is the
-     * specified Avro-generated class.
-     */
-    public Read<T> withSchema(Class<T> type) {
-      return toBuilder().setRecordClass(type).setSchema(ReflectData.get().getSchema(type)).build();
-    }
-
     @Override
     public PCollection<T> expand(PBegin input) {
       if (getFilepattern() == null) {

http://git-wip-us.apache.org/repos/asf/beam/blob/abb4916c/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
index 7df1b18..38984b5 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
@@ -102,7 +102,7 @@ public class AvroIOTest {
 
   @Test
   public void testAvroIOGetName() {
-    assertEquals("AvroIO.Read", AvroIO.read().from("gs://bucket/foo*/baz").getName());
+    assertEquals("AvroIO.Read", AvroIO.read(String.class).from("gs://bucket/foo*/baz").getName());
     assertEquals("AvroIO.Write", AvroIO.write().to("gs://bucket/foo/baz").getName());
   }
 
@@ -151,9 +151,8 @@ public class AvroIOTest {
 
     PCollection<GenericClass> input =
         p.apply(
-            AvroIO.<GenericClass>read()
-                .from(outputFile.getAbsolutePath())
-                .withSchema(GenericClass.class));
+            AvroIO.read(GenericClass.class)
+                .from(outputFile.getAbsolutePath()));
 
     PAssert.that(input).containsInAnyOrder(values);
     p.run();
@@ -175,9 +174,8 @@ public class AvroIOTest {
     p.run();
 
     PCollection<GenericClass> input = p
-        .apply(AvroIO.<GenericClass>read()
-            .from(outputFile.getAbsolutePath())
-            .withSchema(GenericClass.class));
+        .apply(AvroIO.read(GenericClass.class)
+            .from(outputFile.getAbsolutePath()));
 
     PAssert.that(input).containsInAnyOrder(values);
     p.run();
@@ -202,9 +200,8 @@ public class AvroIOTest {
     p.run();
 
     PCollection<GenericClass> input = p
-        .apply(AvroIO.<GenericClass>read()
-            .from(outputFile.getAbsolutePath())
-            .withSchema(GenericClass.class));
+        .apply(AvroIO.read(GenericClass.class)
+            .from(outputFile.getAbsolutePath()));
 
     PAssert.that(input).containsInAnyOrder(values);
     p.run();
@@ -273,9 +270,8 @@ public class AvroIOTest {
 
     PCollection<GenericClassV2> input =
         p.apply(
-            AvroIO.<GenericClassV2>read()
-                .from(outputFile.getAbsolutePath())
-                .withSchema(GenericClassV2.class));
+            AvroIO.read(GenericClassV2.class)
+                .from(outputFile.getAbsolutePath()));
 
     PAssert.that(input).containsInAnyOrder(expected);
     p.run();
@@ -535,7 +531,7 @@ public class AvroIOTest {
 
   @Test
   public void testReadDisplayData() {
-    AvroIO.Read<?> read = AvroIO.read().from("foo.*");
+    AvroIO.Read<?> read = AvroIO.read(String.class).from("foo.*");
 
     DisplayData displayData = DisplayData.from(read);
     assertThat(displayData, hasDisplayItem("filePattern", "foo.*"));

http://git-wip-us.apache.org/repos/asf/beam/blob/abb4916c/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java
index ba7f1b9..51c9691 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java
@@ -169,14 +169,14 @@ public class AvroIOTransformTest {
                   // test read using generated class
                   new Object[] {
                       null,
-                      AvroIO.<AvroGeneratedUser>read().withSchema(AvroGeneratedUser.class),
+                      AvroIO.read(AvroGeneratedUser.class),
                       "AvroIO.Read/Read.out",
                       generateAvroObjects(),
                       generatedClass
                   },
                   new Object[] {
                       "MyRead",
-                      AvroIO.<AvroGeneratedUser>read().withSchema(AvroGeneratedUser.class),
+                      AvroIO.read(AvroGeneratedUser.class),
                       "MyRead/Read.out",
                       generateAvroObjects(),
                       generatedClass


[02/11] beam git commit: Moves AvroIO.write().withSchema into write()

Posted by jk...@apache.org.
Moves AvroIO.write().withSchema into write()


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/27d74622
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/27d74622
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/27d74622

Branch: refs/heads/master
Commit: 27d74622e877d017aa70feef0ee4cd26a4bece7a
Parents: e0d7475
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri Apr 28 19:25:45 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Mon May 1 18:43:38 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/io/AvroIO.java     | 49 +++++++++-----------
 .../java/org/apache/beam/sdk/io/AvroIOTest.java | 42 +++++++----------
 .../apache/beam/sdk/io/AvroIOTransformTest.java |  2 +-
 3 files changed, 40 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/27d74622/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index 8cdd4e7..6b66a98 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -89,26 +89,23 @@ import org.apache.beam.sdk.values.PDone;
  * {@link FileBasedSink.FilenamePolicy} must be set, and unique windows and triggers must produce
  * unique filenames.
  *
- * <p>It is required to specify {@link AvroIO.Write#withSchema}. To
- * write specific records, such as Avro-generated classes, provide an
- * Avro-generated class type. To write {@link GenericRecord GenericRecords}, provide either
- * a {@link Schema} object or a schema in a JSON-encoded string form.
- * An exception will be thrown if a record doesn't match the specified
- * schema.
+ * <p>To write specific records, such as Avro-generated classes, use {@link #write(Class)}.
+ * To write {@link GenericRecord GenericRecords}, use either {@link #writeGenericRecords(Schema)}
+ * which takes a {@link Schema} object, or {@link #writeGenericRecords(String)} which takes a schema
+ * in a JSON-encoded string form. An exception will be thrown if a record doesn't match the
+ * specified schema.
  *
  * <p>For example:
  * <pre> {@code
  * // A simple Write to a local file (only runs locally):
  * PCollection<AvroAutoGenClass> records = ...;
- * records.apply(AvroIO.write().to("/path/to/file.avro")
- *                           .withSchema(AvroAutoGenClass.class));
+ * records.apply(AvroIO.write(AvroAutoGenClass.class).to("/path/to/file.avro"));
  *
  * // A Write to a sharded GCS file (runs locally and using remote execution):
  * Schema schema = new Schema.Parser().parse(new File("schema.avsc"));
  * PCollection<GenericRecord> records = ...;
- * records.apply("WriteToAvro", AvroIO.write()
+ * records.apply("WriteToAvro", AvroIO.writeGenericRecords(schema)
  *     .to("gs://my_bucket/path/to/numbers")
- *     .withSchema(schema)
  *     .withSuffix(".avro"));
  * } </pre>
  *
@@ -153,26 +150,31 @@ public class AvroIO {
    * Writes a {@link PCollection} to an Avro file (or multiple Avro files matching a sharding
    * pattern).
    */
-  public static <T> Write<T> write() {
-    return new AutoValue_AvroIO_Write.Builder<T>()
-        .setFilenameSuffix("")
-        .setNumShards(0)
-        .setShardTemplate(Write.DEFAULT_SHARD_TEMPLATE)
-        .setCodec(Write.DEFAULT_CODEC)
-        .setMetadata(ImmutableMap.<String, Object>of())
-        .setWindowedWrites(false)
+  public static <T> Write<T> write(Class<T> recordClass) {
+    return AvroIO.<T>defaultWriteBuilder()
+        .setRecordClass(recordClass)
+        .setSchema(ReflectData.get().getSchema(recordClass))
         .build();
   }
 
   /** Writes Avro records of the specified schema. */
   public static Write<GenericRecord> writeGenericRecords(Schema schema) {
-    return AvroIO.<GenericRecord>write()
-        .toBuilder()
+    return AvroIO.<GenericRecord>defaultWriteBuilder()
         .setRecordClass(GenericRecord.class)
         .setSchema(schema)
         .build();
   }
 
+  private static <T> Write.Builder<T> defaultWriteBuilder() {
+    return new AutoValue_AvroIO_Write.Builder<T>()
+        .setFilenameSuffix("")
+        .setNumShards(0)
+        .setShardTemplate(Write.DEFAULT_SHARD_TEMPLATE)
+        .setCodec(Write.DEFAULT_CODEC)
+        .setMetadata(ImmutableMap.<String, Object>of())
+        .setWindowedWrites(false);
+  }
+
   /**
    * Like {@link #writeGenericRecords(Schema)} but the schema is specified as a JSON-encoded string.
    */
@@ -369,13 +371,6 @@ public class AvroIO {
       return toBuilder().setWindowedWrites(true).build();
     }
 
-    /**
-     * Writes to Avro file(s) containing records whose type is the specified Avro-generated class.
-     */
-    public Write<T> withSchema(Class<T> type) {
-      return toBuilder().setRecordClass(type).setSchema(ReflectData.get().getSchema(type)).build();
-    }
-
     /** Writes to Avro file(s) compressed using specified codec. */
     public Write<T> withCodec(CodecFactory codec) {
       return toBuilder().setCodec(new SerializableAvroCodecFactory(codec)).build();

http://git-wip-us.apache.org/repos/asf/beam/blob/27d74622/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
index 4abd3e0..e421b96 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
@@ -45,7 +45,6 @@ import org.apache.avro.file.CodecFactory;
 import org.apache.avro.file.DataFileReader;
 import org.apache.avro.file.DataFileStream;
 import org.apache.avro.generic.GenericDatumReader;
-import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.reflect.Nullable;
 import org.apache.avro.reflect.ReflectData;
 import org.apache.avro.reflect.ReflectDatumReader;
@@ -103,7 +102,7 @@ public class AvroIOTest {
   @Test
   public void testAvroIOGetName() {
     assertEquals("AvroIO.Read", AvroIO.read(String.class).from("gs://bucket/foo*/baz").getName());
-    assertEquals("AvroIO.Write", AvroIO.write().to("gs://bucket/foo/baz").getName());
+    assertEquals("AvroIO.Write", AvroIO.write(String.class).to("gs://bucket/foo/baz").getName());
   }
 
   @DefaultCoder(AvroCoder.class)
@@ -144,9 +143,8 @@ public class AvroIOTest {
     File outputFile = tmpFolder.newFile("output.avro");
 
     p.apply(Create.of(values))
-      .apply(AvroIO.<GenericClass>write().to(outputFile.getAbsolutePath())
-          .withoutSharding()
-          .withSchema(GenericClass.class));
+      .apply(AvroIO.write(GenericClass.class).to(outputFile.getAbsolutePath())
+          .withoutSharding());
     p.run();
 
     PCollection<GenericClass> input =
@@ -167,10 +165,9 @@ public class AvroIOTest {
     File outputFile = tmpFolder.newFile("output.avro");
 
     p.apply(Create.of(values))
-        .apply(AvroIO.<GenericClass>write().to(outputFile.getAbsolutePath())
+        .apply(AvroIO.write(GenericClass.class).to(outputFile.getAbsolutePath())
             .withoutSharding()
-            .withCodec(CodecFactory.deflateCodec(9))
-            .withSchema(GenericClass.class));
+            .withCodec(CodecFactory.deflateCodec(9)));
     p.run();
 
     PCollection<GenericClass> input = p
@@ -193,9 +190,8 @@ public class AvroIOTest {
     File outputFile = tmpFolder.newFile("output.avro");
 
     p.apply(Create.of(values))
-        .apply(AvroIO.<GenericClass>write().to(outputFile.getAbsolutePath())
+        .apply(AvroIO.write(GenericClass.class).to(outputFile.getAbsolutePath())
             .withoutSharding()
-            .withSchema(GenericClass.class)
             .withCodec(CodecFactory.nullCodec()));
     p.run();
 
@@ -260,9 +256,8 @@ public class AvroIOTest {
     File outputFile = tmpFolder.newFile("output.avro");
 
     p.apply(Create.of(values))
-      .apply(AvroIO.<GenericClass>write().to(outputFile.getAbsolutePath())
-          .withoutSharding()
-          .withSchema(GenericClass.class));
+      .apply(AvroIO.write(GenericClass.class).to(outputFile.getAbsolutePath())
+          .withoutSharding());
     p.run();
 
     List<GenericClassV2> expected = ImmutableList.of(new GenericClassV2(3, "hi", null),
@@ -367,10 +362,9 @@ public class AvroIOTest {
     windowedAvroWritePipeline
         .apply(values)
         .apply(Window.<GenericClass>into(FixedWindows.of(Duration.standardMinutes(1))))
-        .apply(AvroIO.<GenericClass>write().to(new WindowedFilenamePolicy(outputFilePrefix))
+        .apply(AvroIO.write(GenericClass.class).to(new WindowedFilenamePolicy(outputFilePrefix))
             .withWindowedWrites()
-            .withNumShards(2)
-            .withSchema(GenericClass.class));
+            .withNumShards(2));
     windowedAvroWritePipeline.run();
 
     // Validate that the data written matches the expected elements in the expected order
@@ -402,14 +396,14 @@ public class AvroIOTest {
 
   @Test
   public void testWriteWithDefaultCodec() throws Exception {
-    AvroIO.Write<GenericRecord> write = AvroIO.<GenericRecord>write()
+    AvroIO.Write<String> write = AvroIO.write(String.class)
         .to("gs://bucket/foo/baz");
     assertEquals(CodecFactory.deflateCodec(6).toString(), write.getCodec().toString());
   }
 
   @Test
   public void testWriteWithCustomCodec() throws Exception {
-    AvroIO.Write<GenericRecord> write = AvroIO.<GenericRecord>write()
+    AvroIO.Write<?> write = AvroIO.write(String.class)
         .to("gs://bucket/foo/baz")
         .withCodec(CodecFactory.snappyCodec());
     assertEquals(SNAPPY_CODEC, write.getCodec().toString());
@@ -418,7 +412,7 @@ public class AvroIOTest {
   @Test
   @SuppressWarnings("unchecked")
   public void testWriteWithSerDeCustomDeflateCodec() throws Exception {
-    AvroIO.Write<GenericRecord> write = AvroIO.<GenericRecord>write()
+    AvroIO.Write<String> write = AvroIO.write(String.class)
         .to("gs://bucket/foo/baz")
         .withCodec(CodecFactory.deflateCodec(9));
 
@@ -430,7 +424,7 @@ public class AvroIOTest {
   @Test
   @SuppressWarnings("unchecked")
   public void testWriteWithSerDeCustomXZCodec() throws Exception {
-    AvroIO.Write<GenericRecord> write = AvroIO.<GenericRecord>write()
+    AvroIO.Write<String> write = AvroIO.write(String.class)
         .to("gs://bucket/foo/baz")
         .withCodec(CodecFactory.xzCodec(9));
 
@@ -448,9 +442,8 @@ public class AvroIOTest {
     File outputFile = tmpFolder.newFile("output.avro");
 
     p.apply(Create.of(values))
-        .apply(AvroIO.<GenericClass>write().to(outputFile.getAbsolutePath())
+        .apply(AvroIO.write(GenericClass.class).to(outputFile.getAbsolutePath())
             .withoutSharding()
-            .withSchema(GenericClass.class)
             .withMetadata(ImmutableMap.<String, Object>of(
                 "stringKey", "stringValue",
                 "longKey", 100L,
@@ -471,7 +464,7 @@ public class AvroIOTest {
     String outputFilePrefix = baseOutputFile.getAbsolutePath();
 
     AvroIO.Write<String> write =
-        AvroIO.<String>write().to(outputFilePrefix).withSchema(String.class);
+        AvroIO.write(String.class).to(outputFilePrefix);
     if (numShards > 1) {
       System.out.println("NumShards " + numShards);
       write = write.withNumShards(numShards);
@@ -552,11 +545,10 @@ public class AvroIOTest {
 
   @Test
   public void testWriteDisplayData() {
-    AvroIO.Write<?> write = AvroIO.<GenericClass>write()
+    AvroIO.Write<?> write = AvroIO.write(GenericClass.class)
         .to("foo")
         .withShardNameTemplate("-SS-of-NN-")
         .withSuffix("bar")
-        .withSchema(GenericClass.class)
         .withNumShards(100)
         .withCodec(CodecFactory.snappyCodec());
 

http://git-wip-us.apache.org/repos/asf/beam/blob/27d74622/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java
index fb57d5c..b4f7a79 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java
@@ -271,7 +271,7 @@ public class AvroIOTransformTest {
           ImmutableList.<Object[]>builder()
               .add(
                   new Object[] {
-                      AvroIO.<AvroGeneratedUser>write().withSchema(AvroGeneratedUser.class),
+                      AvroIO.write(AvroGeneratedUser.class),
                       generatedClass
                   },
                   new Object[] {


[11/11] beam git commit: This closes #2778

Posted by jk...@apache.org.
This closes #2778


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/034565c6
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/034565c6
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/034565c6

Branch: refs/heads/master
Commit: 034565c6811833fa1143362fb44f94672cef1e30
Parents: 6d443bc caf2fae
Author: Eugene Kirpichov <ki...@google.com>
Authored: Mon May 1 18:43:45 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Mon May 1 18:43:45 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/spark/io/AvroPipelineTest.java |    4 +-
 .../java/org/apache/beam/sdk/io/AvroIO.java     | 1193 +++++-------------
 .../java/org/apache/beam/sdk/io/AvroSink.java   |  142 +++
 .../java/org/apache/beam/sdk/io/AvroSource.java |    4 +-
 .../java/org/apache/beam/sdk/io/TextIO.java     |    4 +-
 .../beam/sdk/testing/SourceTestUtils.java       |    5 +-
 .../java/org/apache/beam/sdk/io/AvroIOTest.java |  108 +-
 .../apache/beam/sdk/io/AvroIOTransformTest.java |   30 +-
 8 files changed, 521 insertions(+), 969 deletions(-)
----------------------------------------------------------------------



[08/11] beam git commit: Converts AvroIO.Write to AutoValue; adds writeGenericRecords()

Posted by jk...@apache.org.
Converts AvroIO.Write to AutoValue; adds writeGenericRecords()


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e0d74750
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e0d74750
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e0d74750

Branch: refs/heads/master
Commit: e0d74750da73658a067e7522f18c23c5e622fb2f
Parents: abb4916
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri Apr 28 19:21:15 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Mon May 1 18:43:38 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/spark/io/AvroPipelineTest.java |   2 +-
 .../java/org/apache/beam/sdk/io/AvroIO.java     | 355 +++++--------------
 .../java/org/apache/beam/sdk/io/AvroIOTest.java |  19 +-
 .../apache/beam/sdk/io/AvroIOTransformTest.java |   4 +-
 4 files changed, 105 insertions(+), 275 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/e0d74750/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
index c58d81e..7188dc5 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
@@ -75,7 +75,7 @@ public class AvroPipelineTest {
     Pipeline p = pipelineRule.createPipeline();
     PCollection<GenericRecord> input = p.apply(
         AvroIO.readGenericRecords(schema).from(inputFile.getAbsolutePath()));
-    input.apply(AvroIO.write().to(outputDir.getAbsolutePath()).withSchema(schema));
+    input.apply(AvroIO.writeGenericRecords(schema).to(outputDir.getAbsolutePath()));
     p.run().waitUntilFinish();
 
     List<GenericRecord> records = readGenericFile();

http://git-wip-us.apache.org/repos/asf/beam/blob/e0d74750/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index 08fc8a9..8cdd4e7 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -154,7 +154,30 @@ public class AvroIO {
    * pattern).
    */
   public static <T> Write<T> write() {
-    return new Write<>(null);
+    return new AutoValue_AvroIO_Write.Builder<T>()
+        .setFilenameSuffix("")
+        .setNumShards(0)
+        .setShardTemplate(Write.DEFAULT_SHARD_TEMPLATE)
+        .setCodec(Write.DEFAULT_CODEC)
+        .setMetadata(ImmutableMap.<String, Object>of())
+        .setWindowedWrites(false)
+        .build();
+  }
+
+  /** Writes Avro records of the specified schema. */
+  public static Write<GenericRecord> writeGenericRecords(Schema schema) {
+    return AvroIO.<GenericRecord>write()
+        .toBuilder()
+        .setRecordClass(GenericRecord.class)
+        .setSchema(schema)
+        .build();
+  }
+
+  /**
+   * Like {@link #writeGenericRecords(Schema)} but the schema is specified as a JSON-encoded string.
+   */
+  public static Write<GenericRecord> writeGenericRecords(String schema) {
+    return writeGenericRecords(new Schema.Parser().parse(schema));
   }
 
   /** Implementation of {@link #read}. */
@@ -229,7 +252,8 @@ public class AvroIO {
   /////////////////////////////////////////////////////////////////////////////
 
   /** Implementation of {@link #write}. */
-  public static class Write<T> extends PTransform<PCollection<T>, PDone> {
+  @AutoValue
+  public abstract static class Write<T> extends PTransform<PCollection<T>, PDone> {
     /**
      * A {@link PTransform} that writes a bounded {@link PCollection} to an Avro file (or
      * multiple Avro files matching a sharding pattern).
@@ -242,80 +266,38 @@ public class AvroIO {
     // This should be a multiple of 4 to not get a partial encoded byte.
     private static final int METADATA_BYTES_MAX_LENGTH = 40;
 
-    /** The filename to write to. */
-    @Nullable
-    final String filenamePrefix;
-    /** Suffix to use for each filename. */
-    final String filenameSuffix;
-    /** Requested number of shards. 0 for automatic. */
-    final int numShards;
-    /** Shard template string. */
-    final String shardTemplate;
-    /** The class type of the records. */
-    final Class<T> type;
-    /** The schema of the output file. */
-    @Nullable
-    final Schema schema;
-    final boolean windowedWrites;
-    FileBasedSink.FilenamePolicy filenamePolicy;
-
+    @Nullable abstract String getFilenamePrefix();
+    abstract String getFilenameSuffix();
+    abstract int getNumShards();
+    abstract String getShardTemplate();
+    abstract Class<T> getRecordClass();
+    @Nullable abstract Schema getSchema();
+    abstract boolean getWindowedWrites();
+    @Nullable abstract FileBasedSink.FilenamePolicy getFilenamePolicy();
     /**
      * The codec used to encode the blocks in the Avro file. String value drawn from those in
      * https://avro.apache.org/docs/1.7.7/api/java/org/apache/avro/file/CodecFactory.html
      */
-    final SerializableAvroCodecFactory codec;
+    abstract SerializableAvroCodecFactory getCodec();
     /** Avro file metadata. */
-    final ImmutableMap<String, Object> metadata;
-
-    Write(Class<T> type) {
-      this(
-          null,
-          null,
-          "",
-          0,
-          DEFAULT_SHARD_TEMPLATE,
-          type,
-          null,
-          DEFAULT_CODEC,
-          ImmutableMap.<String, Object>of(),
-          false,
-          null);
-    }
+    abstract ImmutableMap<String, Object> getMetadata();
 
-    Write(
-        String name,
-        String filenamePrefix,
-        String filenameSuffix,
-        int numShards,
-        String shardTemplate,
-        Class<T> type,
-        Schema schema,
-        SerializableAvroCodecFactory codec,
-        Map<String, Object> metadata,
-        boolean windowedWrites,
-        FileBasedSink.FilenamePolicy filenamePolicy) {
-      super(name);
-      this.filenamePrefix = filenamePrefix;
-      this.filenameSuffix = filenameSuffix;
-      this.numShards = numShards;
-      this.shardTemplate = shardTemplate;
-      this.type = type;
-      this.schema = schema;
-      this.codec = codec;
-      this.windowedWrites = windowedWrites;
-      this.filenamePolicy = filenamePolicy;
+    abstract Builder<T> toBuilder();
 
-      Map<String, String> badKeys = Maps.newLinkedHashMap();
-      for (Map.Entry<String, Object> entry : metadata.entrySet()) {
-        Object v = entry.getValue();
-        if (!(v instanceof String || v instanceof Long || v instanceof byte[])) {
-          badKeys.put(entry.getKey(), v.getClass().getSimpleName());
-        }
-      }
-      checkArgument(
-          badKeys.isEmpty(),
-          "Metadata value type must be one of String, Long, or byte[]. Found {}", badKeys);
-      this.metadata = ImmutableMap.copyOf(metadata);
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      abstract Builder<T> setFilenamePrefix(String filenamePrefix);
+      abstract Builder<T> setFilenameSuffix(String filenameSuffix);
+      abstract Builder<T> setNumShards(int numShards);
+      abstract Builder<T> setShardTemplate(String shardTemplate);
+      abstract Builder<T> setRecordClass(Class<T> recordClass);
+      abstract Builder<T> setSchema(Schema schema);
+      abstract Builder<T> setWindowedWrites(boolean windowedWrites);
+      abstract Builder<T> setFilenamePolicy(FileBasedSink.FilenamePolicy filenamePolicy);
+      abstract Builder<T> setCodec(SerializableAvroCodecFactory codec);
+      abstract Builder<T> setMetadata(ImmutableMap<String, Object> metadata);
+
+      abstract Write<T> build();
     }
 
     /**
@@ -330,34 +312,12 @@ public class AvroIO {
      */
     public Write<T> to(String filenamePrefix) {
       validateOutputComponent(filenamePrefix);
-      return new Write<>(
-          name,
-          filenamePrefix,
-          filenameSuffix,
-          numShards,
-          shardTemplate,
-          type,
-          schema,
-          codec,
-          metadata,
-          windowedWrites,
-          filenamePolicy);
+      return toBuilder().setFilenamePrefix(filenamePrefix).build();
     }
 
     /** Writes to the file(s) specified by the provided {@link FileBasedSink.FilenamePolicy}. */
     public Write<T> to(FileBasedSink.FilenamePolicy filenamePolicy) {
-      return new Write<>(
-          name,
-          filenamePrefix,
-          filenameSuffix,
-          numShards,
-          shardTemplate,
-          type,
-          schema,
-          codec,
-          metadata,
-          windowedWrites,
-          filenamePolicy);
+      return toBuilder().setFilenamePolicy(filenamePolicy).build();
     }
 
     /**
@@ -367,18 +327,7 @@ public class AvroIO {
      */
     public Write<T> withSuffix(String filenameSuffix) {
       validateOutputComponent(filenameSuffix);
-      return new Write<>(
-          name,
-          filenamePrefix,
-          filenameSuffix,
-          numShards,
-          shardTemplate,
-          type,
-          schema,
-          codec,
-          metadata,
-          windowedWrites,
-          filenamePolicy);
+      return toBuilder().setFilenameSuffix(filenameSuffix).build();
     }
 
     /**
@@ -394,18 +343,7 @@ public class AvroIO {
      */
     public Write<T> withNumShards(int numShards) {
       checkArgument(numShards >= 0);
-      return new Write<>(
-          name,
-          filenamePrefix,
-          filenameSuffix,
-          numShards,
-          shardTemplate,
-          type,
-          schema,
-          codec,
-          metadata,
-          windowedWrites,
-          filenamePolicy);
+      return toBuilder().setNumShards(numShards).build();
     }
 
     /**
@@ -415,18 +353,7 @@ public class AvroIO {
      * @see ShardNameTemplate
      */
     public Write<T> withShardNameTemplate(String shardTemplate) {
-      return new Write<>(
-          name,
-          filenamePrefix,
-          filenameSuffix,
-          numShards,
-          shardTemplate,
-          type,
-          schema,
-          codec,
-          metadata,
-          windowedWrites,
-          filenamePolicy);
+      return toBuilder().setShardTemplate(shardTemplate).build();
     }
 
     /**
@@ -439,76 +366,19 @@ public class AvroIO {
     }
 
     public Write<T> withWindowedWrites() {
-      return new Write<>(
-          name,
-          filenamePrefix,
-          filenameSuffix,
-          numShards,
-          shardTemplate,
-          type,
-          schema,
-          codec,
-          metadata,
-          true,
-          filenamePolicy);
+      return toBuilder().setWindowedWrites(true).build();
     }
 
     /**
      * Writes to Avro file(s) containing records whose type is the specified Avro-generated class.
      */
     public Write<T> withSchema(Class<T> type) {
-      return new Write<>(
-          name,
-          filenamePrefix,
-          filenameSuffix,
-          numShards,
-          shardTemplate,
-          type,
-          ReflectData.get().getSchema(type),
-          codec,
-          metadata,
-          windowedWrites,
-          filenamePolicy);
-    }
-
-    /** Writes to Avro file(s) containing records of the specified schema. */
-    public Write<GenericRecord> withSchema(Schema schema) {
-      return new Write<>(
-          name,
-          filenamePrefix,
-          filenameSuffix,
-          numShards,
-          shardTemplate,
-          GenericRecord.class,
-          schema,
-          codec,
-          metadata,
-          windowedWrites,
-          filenamePolicy);
-    }
-
-    /**
-     * Writes to Avro file(s) containing records of the specified schema in a JSON-encoded string
-     * form.
-     */
-    public Write<GenericRecord> withSchema(String schema) {
-      return withSchema((new Schema.Parser()).parse(schema));
+      return toBuilder().setRecordClass(type).setSchema(ReflectData.get().getSchema(type)).build();
     }
 
     /** Writes to Avro file(s) compressed using specified codec. */
     public Write<T> withCodec(CodecFactory codec) {
-      return new Write<>(
-          name,
-          filenamePrefix,
-          filenameSuffix,
-          numShards,
-          shardTemplate,
-          type,
-          schema,
-          new SerializableAvroCodecFactory(codec),
-          metadata,
-          windowedWrites,
-          filenamePolicy);
+      return toBuilder().setCodec(new SerializableAvroCodecFactory(codec)).build();
     }
 
     /**
@@ -517,56 +387,56 @@ public class AvroIO {
      * <p>Supported value types are String, Long, and byte[].
      */
     public Write<T> withMetadata(Map<String, Object> metadata) {
-      return new Write<>(
-          name,
-          filenamePrefix,
-          filenameSuffix,
-          numShards,
-          shardTemplate,
-          type,
-          schema,
-          codec,
-          metadata,
-          windowedWrites,
-          filenamePolicy);
+      Map<String, String> badKeys = Maps.newLinkedHashMap();
+      for (Map.Entry<String, Object> entry : metadata.entrySet()) {
+        Object v = entry.getValue();
+        if (!(v instanceof String || v instanceof Long || v instanceof byte[])) {
+          badKeys.put(entry.getKey(), v.getClass().getSimpleName());
+        }
+      }
+      checkArgument(
+          badKeys.isEmpty(),
+          "Metadata value type must be one of String, Long, or byte[]. Found {}",
+          badKeys);
+      return toBuilder().setMetadata(ImmutableMap.copyOf(metadata)).build();
     }
 
     @Override
     public PDone expand(PCollection<T> input) {
-      if (filenamePolicy == null && filenamePrefix == null) {
+      if (getFilenamePolicy() == null && getFilenamePrefix() == null) {
         throw new IllegalStateException(
             "need to set the filename prefix of an AvroIO.Write transform");
       }
-      if (filenamePolicy != null && filenamePrefix != null) {
+      if (getFilenamePolicy() != null && getFilenamePrefix() != null) {
         throw new IllegalStateException(
             "cannot set both a filename policy and a filename prefix");
       }
-      if (schema == null) {
+      if (getSchema() == null) {
         throw new IllegalStateException("need to set the schema of an AvroIO.Write transform");
       }
 
       WriteFiles<T> write = null;
-      if (filenamePolicy != null) {
+      if (getFilenamePolicy() != null) {
         write = WriteFiles.to(
             new AvroSink<>(
-                filenamePolicy,
-                AvroCoder.of(type, schema),
-                codec,
-                metadata));
+                getFilenamePolicy(),
+                AvroCoder.of(getRecordClass(), getSchema()),
+                getCodec(),
+                getMetadata()));
       } else {
         write = WriteFiles.to(
             new AvroSink<>(
-                filenamePrefix,
-                filenameSuffix,
-                shardTemplate,
-                AvroCoder.of(type, schema),
-                codec,
-                metadata));
+                getFilenamePrefix(),
+                getFilenameSuffix(),
+                getShardTemplate(),
+                AvroCoder.of(getRecordClass(), getSchema()),
+                getCodec(),
+                getMetadata()));
       }
       if (getNumShards() > 0) {
         write = write.withNumShards(getNumShards());
       }
-      if (windowedWrites) {
+      if (getWindowedWrites()) {
         write = write.withWindowedWrites();
       }
       return input.apply("Write", write);
@@ -576,20 +446,20 @@ public class AvroIO {
     public void populateDisplayData(DisplayData.Builder builder) {
       super.populateDisplayData(builder);
       builder
-          .add(DisplayData.item("schema", type)
+          .add(DisplayData.item("schema", getRecordClass())
             .withLabel("Record Schema"))
-          .addIfNotNull(DisplayData.item("filePrefix", filenamePrefix)
+          .addIfNotNull(DisplayData.item("filePrefix", getFilenamePrefix())
             .withLabel("Output File Prefix"))
-          .addIfNotDefault(DisplayData.item("shardNameTemplate", shardTemplate)
+          .addIfNotDefault(DisplayData.item("shardNameTemplate", getShardTemplate())
               .withLabel("Output Shard Name Template"),
               DEFAULT_SHARD_TEMPLATE)
-          .addIfNotDefault(DisplayData.item("fileSuffix", filenameSuffix)
+          .addIfNotDefault(DisplayData.item("fileSuffix", getFilenameSuffix())
               .withLabel("Output File Suffix"),
               "")
-          .addIfNotDefault(DisplayData.item("numShards", numShards)
+          .addIfNotDefault(DisplayData.item("numShards", getNumShards())
               .withLabel("Maximum Output Shards"),
               0)
-          .addIfNotDefault(DisplayData.item("codec", codec.toString())
+          .addIfNotDefault(DisplayData.item("codec", getCodec().toString())
               .withLabel("Avro Compression Codec"),
               DEFAULT_CODEC.toString());
       builder.include("Metadata", new Metadata());
@@ -598,7 +468,7 @@ public class AvroIO {
     private class Metadata implements HasDisplayData {
       @Override
       public void populateDisplayData(DisplayData.Builder builder) {
-        for (Map.Entry<String, Object> entry : metadata.entrySet()) {
+        for (Map.Entry<String, Object> entry : getMetadata().entrySet()) {
           DisplayData.Type type = DisplayData.inferType(entry.getValue());
           if (type != null) {
             builder.add(DisplayData.item(entry.getKey(), type, entry.getValue()));
@@ -612,49 +482,10 @@ public class AvroIO {
       }
     }
 
-    /**
-     * Returns the current shard name template string.
-     */
-    public String getShardNameTemplate() {
-      return shardTemplate;
-    }
-
     @Override
     protected Coder<Void> getDefaultOutputCoder() {
       return VoidCoder.of();
     }
-
-    public String getFilenamePrefix() {
-      return filenamePrefix;
-    }
-
-    public String getShardTemplate() {
-      return shardTemplate;
-    }
-
-    public int getNumShards() {
-      return numShards;
-    }
-
-    public String getFilenameSuffix() {
-      return filenameSuffix;
-    }
-
-    public Class<T> getType() {
-      return type;
-    }
-
-    public Schema getSchema() {
-      return schema;
-    }
-
-    public CodecFactory getCodec() {
-      return codec.getCodec();
-    }
-
-    public Map<String, Object> getMetadata() {
-      return metadata;
-    }
   }
 
   // Pattern which matches old-style shard output patterns, which are now

http://git-wip-us.apache.org/repos/asf/beam/blob/e0d74750/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
index 38984b5..4abd3e0 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
@@ -422,9 +422,9 @@ public class AvroIOTest {
         .to("gs://bucket/foo/baz")
         .withCodec(CodecFactory.deflateCodec(9));
 
-    AvroIO.Write<GenericRecord> serdeWrite = SerializableUtils.clone(write);
-
-    assertEquals(CodecFactory.deflateCodec(9).toString(), serdeWrite.getCodec().toString());
+    assertEquals(
+        CodecFactory.deflateCodec(9).toString(),
+        SerializableUtils.clone(write.getCodec()).getCodec().toString());
   }
 
   @Test
@@ -434,9 +434,9 @@ public class AvroIOTest {
         .to("gs://bucket/foo/baz")
         .withCodec(CodecFactory.xzCodec(9));
 
-    AvroIO.Write<GenericRecord> serdeWrite = SerializableUtils.clone(write);
-
-    assertEquals(CodecFactory.xzCodec(9).toString(), serdeWrite.getCodec().toString());
+    assertEquals(
+        CodecFactory.xzCodec(9).toString(),
+        SerializableUtils.clone(write.getCodec()).getCodec().toString());
   }
 
   @Test
@@ -482,7 +482,7 @@ public class AvroIOTest {
     p.apply(Create.of(ImmutableList.copyOf(expectedElements))).apply(write);
     p.run();
 
-    String shardNameTemplate = write.getShardNameTemplate();
+    String shardNameTemplate = write.getShardTemplate();
 
     assertTestOutputs(expectedElements, numShards, outputFilePrefix, shardNameTemplate);
   }
@@ -580,9 +580,8 @@ public class AvroIOTest {
 
     DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(options);
 
-    AvroIO.Write<?> write = AvroIO.<GenericRecord>write()
-        .to(outputPath)
-        .withSchema(Schema.create(Schema.Type.STRING));
+    AvroIO.Write<?> write = AvroIO.writeGenericRecords(Schema.create(Schema.Type.STRING))
+        .to(outputPath);
 
     Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write);
     assertThat("AvroIO.Write should include the file pattern in its primitive transform",

http://git-wip-us.apache.org/repos/asf/beam/blob/e0d74750/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java
index 51c9691..fb57d5c 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java
@@ -275,12 +275,12 @@ public class AvroIOTransformTest {
                       generatedClass
                   },
                   new Object[] {
-                      AvroIO.write().withSchema(SCHEMA),
+                      AvroIO.writeGenericRecords(SCHEMA),
                       fromSchema
                   },
 
                   new Object[] {
-                      AvroIO.write().withSchema(SCHEMA_STRING),
+                      AvroIO.writeGenericRecords(SCHEMA_STRING),
                       fromSchemaString
                   })
               .build();


[07/11] beam git commit: Removes AvroIO.Write.Bound

Posted by jk...@apache.org.
Removes AvroIO.Write.Bound


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d1dfd4e2
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d1dfd4e2
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d1dfd4e2

Branch: refs/heads/master
Commit: d1dfd4e2a8b82451f28f1f0e6f261eae0d51bb5b
Parents: 439f2ca
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri Apr 28 18:59:03 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Mon May 1 18:43:38 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/spark/io/AvroPipelineTest.java |   2 +-
 .../java/org/apache/beam/sdk/io/AvroIO.java     | 910 ++++++++-----------
 .../java/org/apache/beam/sdk/io/AvroIOTest.java |  37 +-
 .../apache/beam/sdk/io/AvroIOTransformTest.java |  12 +-
 4 files changed, 385 insertions(+), 576 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/d1dfd4e2/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
index 62db14f..c58d81e 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
@@ -75,7 +75,7 @@ public class AvroPipelineTest {
     Pipeline p = pipelineRule.createPipeline();
     PCollection<GenericRecord> input = p.apply(
         AvroIO.readGenericRecords(schema).from(inputFile.getAbsolutePath()));
-    input.apply(AvroIO.Write.to(outputDir.getAbsolutePath()).withSchema(schema));
+    input.apply(AvroIO.write().to(outputDir.getAbsolutePath()).withSchema(schema));
     p.run().waitUntilFinish();
 
     List<GenericRecord> records = readGenericFile();

http://git-wip-us.apache.org/repos/asf/beam/blob/d1dfd4e2/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index 2f1d917..4bde6ec 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -70,24 +70,24 @@ import org.apache.beam.sdk.values.PDone;
  * // A Read from a GCS file (runs locally and using remote execution):
  * Schema schema = new Schema.Parser().parse(new File("schema.avsc"));
  * PCollection<GenericRecord> records =
- *     p.apply(AvroIO.Read
+ *     p.apply(AvroIO.read()
  *                .from("gs://my_bucket/path/to/records-*.avro")
  *                .withSchema(schema));
  * } </pre>
  *
  * <p>To write a {@link PCollection} to one or more Avro files, use
- * {@link AvroIO.Write}, specifying {@link AvroIO.Write#to(String)} to specify
+ * {@link AvroIO.Write}, specifying {@code AvroIO.write().to(String)} to specify
  * the path of the file to write to (e.g., a local filename or sharded
  * filename pattern if running locally, or a Google Cloud Storage
  * filename or sharded filename pattern of the form
- * {@code "gs://<bucket>/<filepath>"}). {@link AvroIO.Write#to(FileBasedSink.FilenamePolicy)}
+ * {@code "gs://<bucket>/<filepath>"}). {@code AvroIO.write().to(FileBasedSink.FilenamePolicy)}
  * can also be used to specify a custom file naming policy.
  *
  * <p>By default, all input is put into the global window before writing. If per-window writes are
  * desired - for example, when using a streaming runner -
- * {@link AvroIO.Write.Bound#withWindowedWrites()} will cause windowing and triggering to be
+ * {@link AvroIO.Write#withWindowedWrites()} will cause windowing and triggering to be
  * preserved. When producing windowed writes, the number of output shards must be set explicitly
- * using {@link AvroIO.Write.Bound#withNumShards(int)}; some runners may set this for you to a
+ * using {@link AvroIO.Write#withNumShards(int)}; some runners may set this for you to a
  * runner-chosen value, so you may need not set it yourself. A
  * {@link FileBasedSink.FilenamePolicy} must be set, and unique windows and triggers must produce
  * unique filenames.
@@ -103,13 +103,13 @@ import org.apache.beam.sdk.values.PDone;
  * <pre> {@code
  * // A simple Write to a local file (only runs locally):
  * PCollection<AvroAutoGenClass> records = ...;
- * records.apply(AvroIO.Write.to("/path/to/file.avro")
+ * records.apply(AvroIO.write().to("/path/to/file.avro")
  *                           .withSchema(AvroAutoGenClass.class));
  *
  * // A Write to a sharded GCS file (runs locally and using remote execution):
  * Schema schema = new Schema.Parser().parse(new File("schema.avsc"));
  * PCollection<GenericRecord> records = ...;
- * records.apply("WriteToAvro", AvroIO.Write
+ * records.apply("WriteToAvro", AvroIO.write()
  *     .to("gs://my_bucket/path/to/numbers")
  *     .withSchema(schema)
  *     .withSuffix(".avro"));
@@ -149,6 +149,14 @@ public class AvroIO {
     return readGenericRecords(new Schema.Parser().parse(schema));
   }
 
+  /**
+   * Writes a {@link PCollection} to an Avro file (or multiple Avro files matching a sharding
+   * pattern).
+   */
+  public static <T> Write<T> write() {
+    return new Write<>(null);
+  }
+
   /** Implementation of {@link #read}. */
   @AutoValue
   public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>> {
@@ -229,45 +237,161 @@ public class AvroIO {
 
   /////////////////////////////////////////////////////////////////////////////
 
-  /**
-   * A root {@link PTransform} that writes a {@link PCollection} to an Avro file (or
-   * multiple Avro files matching a sharding pattern).
-   */
-  public static class Write {
+  /** Implementation of {@link #write}. */
+  public static class Write<T> extends PTransform<PCollection<T>, PDone> {
+    /**
+     * A {@link PTransform} that writes a bounded {@link PCollection} to an Avro file (or
+     * multiple Avro files matching a sharding pattern).
+     *
+     * @param <T> the type of each of the elements of the input PCollection
+     */
+    private static final String DEFAULT_SHARD_TEMPLATE = ShardNameTemplate.INDEX_OF_MAX;
+    private static final SerializableAvroCodecFactory DEFAULT_CODEC =
+        new SerializableAvroCodecFactory(CodecFactory.deflateCodec(6));
+    // This should be a multiple of 4 to not get a partial encoded byte.
+    private static final int METADATA_BYTES_MAX_LENGTH = 40;
+
+    /** The filename to write to. */
+    @Nullable
+    final String filenamePrefix;
+    /** Suffix to use for each filename. */
+    final String filenameSuffix;
+    /** Requested number of shards. 0 for automatic. */
+    final int numShards;
+    /** Shard template string. */
+    final String shardTemplate;
+    /** The class type of the records. */
+    final Class<T> type;
+    /** The schema of the output file. */
+    @Nullable
+    final Schema schema;
+    final boolean windowedWrites;
+    FileBasedSink.FilenamePolicy filenamePolicy;
 
     /**
-     * Returns a {@link PTransform} that writes to the file(s)
-     * with the given prefix. This can be a local filename
+     * The codec used to encode the blocks in the Avro file. String value drawn from those in
+     * https://avro.apache.org/docs/1.7.7/api/java/org/apache/avro/file/CodecFactory.html
+     */
+    final SerializableAvroCodecFactory codec;
+    /** Avro file metadata. */
+    final ImmutableMap<String, Object> metadata;
+
+    Write(Class<T> type) {
+      this(
+          null,
+          null,
+          "",
+          0,
+          DEFAULT_SHARD_TEMPLATE,
+          type,
+          null,
+          DEFAULT_CODEC,
+          ImmutableMap.<String, Object>of(),
+          false,
+          null);
+    }
+
+    Write(
+        String name,
+        String filenamePrefix,
+        String filenameSuffix,
+        int numShards,
+        String shardTemplate,
+        Class<T> type,
+        Schema schema,
+        SerializableAvroCodecFactory codec,
+        Map<String, Object> metadata,
+        boolean windowedWrites,
+        FileBasedSink.FilenamePolicy filenamePolicy) {
+      super(name);
+      this.filenamePrefix = filenamePrefix;
+      this.filenameSuffix = filenameSuffix;
+      this.numShards = numShards;
+      this.shardTemplate = shardTemplate;
+      this.type = type;
+      this.schema = schema;
+      this.codec = codec;
+      this.windowedWrites = windowedWrites;
+      this.filenamePolicy = filenamePolicy;
+
+      Map<String, String> badKeys = Maps.newLinkedHashMap();
+      for (Map.Entry<String, Object> entry : metadata.entrySet()) {
+        Object v = entry.getValue();
+        if (!(v instanceof String || v instanceof Long || v instanceof byte[])) {
+          badKeys.put(entry.getKey(), v.getClass().getSimpleName());
+        }
+      }
+      checkArgument(
+          badKeys.isEmpty(),
+          "Metadata value type must be one of String, Long, or byte[]. Found {}", badKeys);
+      this.metadata = ImmutableMap.copyOf(metadata);
+    }
+
+    /**
+     * Writes to the file(s) with the given prefix. This can be a local filename
      * (if running locally), or a Google Cloud Storage filename of
      * the form {@code "gs://<bucket>/<filepath>"}
      * (if running locally or using remote execution).
      *
      * <p>The files written will begin with this prefix, followed by
-     * a shard identifier (see {@link Bound#withNumShards}, and end
-     * in a common extension, if given by {@link Bound#withSuffix}.
+     * a shard identifier (see {@link #withNumShards}, and end
+     * in a common extension, if given by {@link #withSuffix}.
      */
-    public static Bound<GenericRecord> to(String prefix) {
-      return new Bound<>(GenericRecord.class).to(prefix);
+    public Write<T> to(String filenamePrefix) {
+      validateOutputComponent(filenamePrefix);
+      return new Write<>(
+          name,
+          filenamePrefix,
+          filenameSuffix,
+          numShards,
+          shardTemplate,
+          type,
+          schema,
+          codec,
+          metadata,
+          windowedWrites,
+          filenamePolicy);
     }
 
-    /**
-     * Returns a {@link PTransform} that writes to the file(s) specified by the provided
-     * {@link FileBasedSink.FilenamePolicy}.
-     */
-    public static Bound<GenericRecord> to(FileBasedSink.FilenamePolicy filenamePolicy) {
-      return new Bound<>(GenericRecord.class).to(filenamePolicy);
+    /** Writes to the file(s) specified by the provided {@link FileBasedSink.FilenamePolicy}. */
+    public Write<T> to(FileBasedSink.FilenamePolicy filenamePolicy) {
+      return new Write<>(
+          name,
+          filenamePrefix,
+          filenameSuffix,
+          numShards,
+          shardTemplate,
+          type,
+          schema,
+          codec,
+          metadata,
+          windowedWrites,
+          filenamePolicy);
     }
 
     /**
-     * Returns a {@link PTransform} that writes to the file(s) with the
-     * given filename suffix.
+     * Writes to the file(s) with the given filename suffix.
+     *
+     * <p>See {@link ShardNameTemplate} for a description of shard templates.
      */
-    public static Bound<GenericRecord> withSuffix(String filenameSuffix) {
-      return new Bound<>(GenericRecord.class).withSuffix(filenameSuffix);
+    public Write<T> withSuffix(String filenameSuffix) {
+      validateOutputComponent(filenameSuffix);
+      return new Write<>(
+          name,
+          filenamePrefix,
+          filenameSuffix,
+          numShards,
+          shardTemplate,
+          type,
+          schema,
+          codec,
+          metadata,
+          windowedWrites,
+          filenamePolicy);
     }
 
     /**
-     * Returns a {@link PTransform} that uses the provided shard count.
+     * Uses the provided shard count.
      *
      * <p>Constraining the number of shards is likely to reduce
      * the performance of a pipeline. Setting this value is not recommended
@@ -275,585 +399,271 @@ public class AvroIO {
      *
      * @param numShards the number of shards to use, or 0 to let the system
      *                  decide.
+     * @see ShardNameTemplate
      */
-    public static Bound<GenericRecord> withNumShards(int numShards) {
-      return new Bound<>(GenericRecord.class).withNumShards(numShards);
+    public Write<T> withNumShards(int numShards) {
+      checkArgument(numShards >= 0);
+      return new Write<>(
+          name,
+          filenamePrefix,
+          filenameSuffix,
+          numShards,
+          shardTemplate,
+          type,
+          schema,
+          codec,
+          metadata,
+          windowedWrites,
+          filenamePolicy);
     }
 
     /**
-     * Returns a {@link PTransform} that uses the given shard name
-     * template.
+     * Returns a new {@link PTransform} that's like this one but
+     * that uses the given shard name template.
      *
-     * <p>See {@link ShardNameTemplate} for a description of shard templates.
+     * @see ShardNameTemplate
      */
-    public static Bound<GenericRecord> withShardNameTemplate(String shardTemplate) {
-      return new Bound<>(GenericRecord.class).withShardNameTemplate(shardTemplate);
+    public Write<T> withShardNameTemplate(String shardTemplate) {
+      return new Write<>(
+          name,
+          filenamePrefix,
+          filenameSuffix,
+          numShards,
+          shardTemplate,
+          type,
+          schema,
+          codec,
+          metadata,
+          windowedWrites,
+          filenamePolicy);
     }
 
     /**
-     * Returns a {@link PTransform} that forces a single file as
-     * output.
+     * Forces a single file as output.
      *
-     * <p>Constraining the number of shards is likely to reduce
-     * the performance of a pipeline. Setting this value is not recommended
-     * unless you require a specific number of output files.
+     * <p>This is a shortcut for {@code .withNumShards(1).withShardNameTemplate("")}
      */
-    public static Bound<GenericRecord> withoutSharding() {
-      return new Bound<>(GenericRecord.class).withoutSharding();
+    public Write<T> withoutSharding() {
+      return withNumShards(1).withShardNameTemplate("");
     }
 
-    /**
-     * Returns a {@link PTransform} that writes Avro file(s)
-     * containing records whose type is the specified Avro-generated class.
-     *
-     * @param <T> the type of the elements of the input PCollection
-     */
-    public static <T> Bound<T> withSchema(Class<T> type) {
-      return new Bound<>(type).withSchema(type);
+    public Write<T> withWindowedWrites() {
+      return new Write<>(
+          name,
+          filenamePrefix,
+          filenameSuffix,
+          numShards,
+          shardTemplate,
+          type,
+          schema,
+          codec,
+          metadata,
+          true,
+          filenamePolicy);
     }
 
     /**
-     * Returns a {@link PTransform} that writes Avro file(s)
-     * containing records of the specified schema.
+     * Writes to Avro file(s) containing records whose type is the specified Avro-generated class.
      */
-    public static Bound<GenericRecord> withSchema(Schema schema) {
-      return new Bound<>(GenericRecord.class).withSchema(schema);
+    public Write<T> withSchema(Class<T> type) {
+      return new Write<>(
+          name,
+          filenamePrefix,
+          filenameSuffix,
+          numShards,
+          shardTemplate,
+          type,
+          ReflectData.get().getSchema(type),
+          codec,
+          metadata,
+          windowedWrites,
+          filenamePolicy);
     }
 
-    /**
-     * Returns a {@link PTransform} that writes Avro file(s)
-     * containing records of the specified schema in a JSON-encoded
-     * string form.
-     */
-    public static Bound<GenericRecord> withSchema(String schema) {
-      return withSchema((new Schema.Parser()).parse(schema));
+    /** Writes to Avro file(s) containing records of the specified schema. */
+    public Write<GenericRecord> withSchema(Schema schema) {
+      return new Write<>(
+          name,
+          filenamePrefix,
+          filenameSuffix,
+          numShards,
+          shardTemplate,
+          GenericRecord.class,
+          schema,
+          codec,
+          metadata,
+          windowedWrites,
+          filenamePolicy);
     }
 
     /**
-     * Returns a {@link PTransform} that writes Avro file(s) that has GCS path validation on
-     * pipeline creation disabled.
-     *
-     * <p>This can be useful in the case where the GCS output location does
-     * not exist at the pipeline creation time, but is expected to be available
-     * at execution time.
+     * Writes to Avro file(s) containing records of the specified schema in a JSON-encoded string
+     * form.
      */
-    public static Bound<GenericRecord> withoutValidation() {
-      return new Bound<>(GenericRecord.class).withoutValidation();
+    public Write<GenericRecord> withSchema(String schema) {
+      return withSchema((new Schema.Parser()).parse(schema));
     }
 
-    /**
-     * Returns a {@link PTransform} that writes Avro file(s) using specified codec.
-     */
-    public static Bound<GenericRecord> withCodec(CodecFactory codec) {
-      return new Bound<>(GenericRecord.class).withCodec(codec);
+    /** Writes to Avro file(s) compressed using specified codec. */
+    public Write<T> withCodec(CodecFactory codec) {
+      return new Write<>(
+          name,
+          filenamePrefix,
+          filenameSuffix,
+          numShards,
+          shardTemplate,
+          type,
+          schema,
+          new SerializableAvroCodecFactory(codec),
+          metadata,
+          windowedWrites,
+          filenamePolicy);
     }
 
     /**
-     * Returns a {@link PTransform} that writes Avro file(s) with the specified metadata.
+     * Writes to Avro file(s) with the specified metadata.
      *
      * <p>Supported value types are String, Long, and byte[].
      */
-    public static Bound<GenericRecord> withMetadata(Map<String, Object> metadata) {
-      return new Bound<>(GenericRecord.class).withMetadata(metadata);
+    public Write<T> withMetadata(Map<String, Object> metadata) {
+      return new Write<>(
+          name,
+          filenamePrefix,
+          filenameSuffix,
+          numShards,
+          shardTemplate,
+          type,
+          schema,
+          codec,
+          metadata,
+          windowedWrites,
+          filenamePolicy);
     }
 
-    /**
-     * A {@link PTransform} that writes a bounded {@link PCollection} to an Avro file (or
-     * multiple Avro files matching a sharding pattern).
-     *
-     * @param <T> the type of each of the elements of the input PCollection
-     */
-    public static class Bound<T> extends PTransform<PCollection<T>, PDone> {
-      private static final String DEFAULT_SHARD_TEMPLATE = ShardNameTemplate.INDEX_OF_MAX;
-      private static final SerializableAvroCodecFactory DEFAULT_CODEC =
-          new SerializableAvroCodecFactory(CodecFactory.deflateCodec(6));
-      // This should be a multiple of 4 to not get a partial encoded byte.
-      private static final int METADATA_BYTES_MAX_LENGTH = 40;
-
-      /** The filename to write to. */
-      @Nullable
-      final String filenamePrefix;
-      /** Suffix to use for each filename. */
-      final String filenameSuffix;
-      /** Requested number of shards. 0 for automatic. */
-      final int numShards;
-      /** Shard template string. */
-      final String shardTemplate;
-      /** The class type of the records. */
-      final Class<T> type;
-      /** The schema of the output file. */
-      @Nullable
-      final Schema schema;
-      final boolean windowedWrites;
-      FileBasedSink.FilenamePolicy filenamePolicy;
-
-      /**
-       * The codec used to encode the blocks in the Avro file. String value drawn from those in
-       * https://avro.apache.org/docs/1.7.7/api/java/org/apache/avro/file/CodecFactory.html
-       */
-      final SerializableAvroCodecFactory codec;
-      /** Avro file metadata. */
-      final ImmutableMap<String, Object> metadata;
-
-      Bound(Class<T> type) {
-        this(
-            null,
-            null,
-            "",
-            0,
-            DEFAULT_SHARD_TEMPLATE,
-            type,
-            null,
-            DEFAULT_CODEC,
-            ImmutableMap.<String, Object>of(),
-            false,
-            null);
-      }
-
-      Bound(
-          String name,
-          String filenamePrefix,
-          String filenameSuffix,
-          int numShards,
-          String shardTemplate,
-          Class<T> type,
-          Schema schema,
-          SerializableAvroCodecFactory codec,
-          Map<String, Object> metadata,
-          boolean windowedWrites,
-          FileBasedSink.FilenamePolicy filenamePolicy) {
-        super(name);
-        this.filenamePrefix = filenamePrefix;
-        this.filenameSuffix = filenameSuffix;
-        this.numShards = numShards;
-        this.shardTemplate = shardTemplate;
-        this.type = type;
-        this.schema = schema;
-        this.codec = codec;
-        this.windowedWrites = windowedWrites;
-        this.filenamePolicy = filenamePolicy;
-
-        Map<String, String> badKeys = Maps.newLinkedHashMap();
-        for (Map.Entry<String, Object> entry : metadata.entrySet()) {
-          Object v = entry.getValue();
-          if (!(v instanceof String || v instanceof Long || v instanceof byte[])) {
-            badKeys.put(entry.getKey(), v.getClass().getSimpleName());
-          }
-        }
-        checkArgument(
-            badKeys.isEmpty(),
-            "Metadata value type must be one of String, Long, or byte[]. Found {}", badKeys);
-        this.metadata = ImmutableMap.copyOf(metadata);
-      }
-
-      /**
-       * Returns a new {@link PTransform} that's like this one but
-       * that writes to the file(s) with the given filename prefix.
-       *
-       * <p>See {@link AvroIO.Write#to(String)} for more information
-       * about filenames.
-       *
-       * <p>Does not modify this object.
-       */
-      public Bound<T> to(String filenamePrefix) {
-        validateOutputComponent(filenamePrefix);
-        return new Bound<>(
-            name,
-            filenamePrefix,
-            filenameSuffix,
-            numShards,
-            shardTemplate,
-            type,
-            schema,
-            codec,
-            metadata,
-            windowedWrites,
-            filenamePolicy);
-      }
-
-      public Bound<T> to(FileBasedSink.FilenamePolicy filenamePolicy) {
-        return new Bound<>(
-            name,
-            filenamePrefix,
-            filenameSuffix,
-            numShards,
-            shardTemplate,
-            type,
-            schema,
-            codec,
-            metadata,
-            windowedWrites,
-            filenamePolicy);
-      }
-
-      /**
-       * Returns a new {@link PTransform} that's like this one but
-       * that writes to the file(s) with the given filename suffix.
-       *
-       * <p>See {@link ShardNameTemplate} for a description of shard templates.
-       *
-       * <p>Does not modify this object.
-       */
-      public Bound<T> withSuffix(String filenameSuffix) {
-        validateOutputComponent(filenameSuffix);
-        return new Bound<>(
-            name,
-            filenamePrefix,
-            filenameSuffix,
-            numShards,
-            shardTemplate,
-            type,
-            schema,
-            codec,
-            metadata,
-            windowedWrites,
-            filenamePolicy);
-      }
-
-      /**
-       * Returns a new {@link PTransform} that's like this one but
-       * that uses the provided shard count.
-       *
-       * <p>Constraining the number of shards is likely to reduce
-       * the performance of a pipeline. Setting this value is not recommended
-       * unless you require a specific number of output files.
-       *
-       * <p>Does not modify this object.
-       *
-       * @param numShards the number of shards to use, or 0 to let the system
-       *                  decide.
-       * @see ShardNameTemplate
-       */
-      public Bound<T> withNumShards(int numShards) {
-        checkArgument(numShards >= 0);
-        return new Bound<>(
-            name,
-            filenamePrefix,
-            filenameSuffix,
-            numShards,
-            shardTemplate,
-            type,
-            schema,
-            codec,
-            metadata,
-            windowedWrites,
-            filenamePolicy);
-      }
-
-      /**
-       * Returns a new {@link PTransform} that's like this one but
-       * that uses the given shard name template.
-       *
-       * <p>Does not modify this object.
-       *
-       * @see ShardNameTemplate
-       */
-      public Bound<T> withShardNameTemplate(String shardTemplate) {
-        return new Bound<>(
-            name,
-            filenamePrefix,
-            filenameSuffix,
-            numShards,
-            shardTemplate,
-            type,
-            schema,
-            codec,
-            metadata,
-            windowedWrites,
-            filenamePolicy);
-      }
-
-      /**
-       * Returns a new {@link PTransform} that's like this one but
-       * that forces a single file as output.
-       *
-       * <p>This is a shortcut for
-       * {@code .withNumShards(1).withShardNameTemplate("")}
-       *
-       * <p>Does not modify this object.
-       */
-      public Bound<T> withoutSharding() {
-        return new Bound<>(
-            name,
-            filenamePrefix,
-            filenameSuffix,
-            1,
-            "",
-            type,
-            schema,
-            codec,
-            metadata,
-            windowedWrites,
-            filenamePolicy);
-      }
-
-      public Bound<T> withWindowedWrites() {
-        return new Bound<>(
-            name,
-            filenamePrefix,
-            filenameSuffix,
-            numShards,
-            shardTemplate,
-            type,
-            schema,
-            codec,
-            metadata,
-            true,
-            filenamePolicy);
-      }
-
-      /**
-       * Returns a new {@link PTransform} that's like this one but
-       * that writes to Avro file(s) containing records whose type is the
-       * specified Avro-generated class.
-       *
-       * <p>Does not modify this object.
-       *
-       * @param <X> the type of the elements of the input PCollection
-       */
-      public <X> Bound<X> withSchema(Class<X> type) {
-        return new Bound<>(
-            name,
-            filenamePrefix,
-            filenameSuffix,
-            numShards,
-            shardTemplate,
-            type,
-            ReflectData.get().getSchema(type),
-            codec,
-            metadata,
-            windowedWrites,
-            filenamePolicy);
+    @Override
+    public PDone expand(PCollection<T> input) {
+      if (filenamePolicy == null && filenamePrefix == null) {
+        throw new IllegalStateException(
+            "need to set the filename prefix of an AvroIO.Write transform");
       }
-
-      /**
-       * Returns a new {@link PTransform} that's like this one but
-       * that writes to Avro file(s) containing records of the specified
-       * schema.
-       *
-       * <p>Does not modify this object.
-       */
-      public Bound<GenericRecord> withSchema(Schema schema) {
-        return new Bound<>(
-            name,
-            filenamePrefix,
-            filenameSuffix,
-            numShards,
-            shardTemplate,
-            GenericRecord.class,
-            schema,
-            codec,
-            metadata,
-            windowedWrites,
-            filenamePolicy);
+      if (filenamePolicy != null && filenamePrefix != null) {
+        throw new IllegalStateException(
+            "cannot set both a filename policy and a filename prefix");
       }
-
-      /**
-       * Returns a new {@link PTransform} that's like this one but
-       * that writes to Avro file(s) containing records of the specified
-       * schema in a JSON-encoded string form.
-       *
-       * <p>Does not modify this object.
-       */
-      public Bound<GenericRecord> withSchema(String schema) {
-        return withSchema((new Schema.Parser()).parse(schema));
+      if (schema == null) {
+        throw new IllegalStateException("need to set the schema of an AvroIO.Write transform");
       }
 
-      /**
-       * Returns a new {@link PTransform} that's like this one but
-       * that has GCS output path validation on pipeline creation disabled.
-       *
-       * <p>Does not modify this object.
-       *
-       * <p>This can be useful in the case where the GCS output location does
-       * not exist at the pipeline creation time, but is expected to be
-       * available at execution time.
-       */
-      public Bound<T> withoutValidation() {
-        return new Bound<>(
-            name,
-            filenamePrefix,
-            filenameSuffix,
-            numShards,
-            shardTemplate,
-            type,
-            schema,
-            codec,
-            metadata,
-            windowedWrites,
-            filenamePolicy);
+      WriteFiles<T> write = null;
+      if (filenamePolicy != null) {
+        write = WriteFiles.to(
+            new AvroSink<>(
+                filenamePolicy,
+                AvroCoder.of(type, schema),
+                codec,
+                metadata));
+      } else {
+        write = WriteFiles.to(
+            new AvroSink<>(
+                filenamePrefix,
+                filenameSuffix,
+                shardTemplate,
+                AvroCoder.of(type, schema),
+                codec,
+                metadata));
       }
-
-      /**
-       * Returns a new {@link PTransform} that's like this one but
-       * that writes to Avro file(s) compressed using specified codec.
-       *
-       * <p>Does not modify this object.
-       */
-      public Bound<T> withCodec(CodecFactory codec) {
-        return new Bound<>(
-            name,
-            filenamePrefix,
-            filenameSuffix,
-            numShards,
-            shardTemplate,
-            type,
-            schema,
-            new SerializableAvroCodecFactory(codec),
-            metadata,
-            windowedWrites,
-            filenamePolicy);
+      if (getNumShards() > 0) {
+        write = write.withNumShards(getNumShards());
       }
-
-      /**
-       * Returns a new {@link PTransform} that's like this one but
-       * that writes to Avro file(s) with the specified metadata.
-       *
-       * <p>Does not modify this object.
-       */
-      public Bound<T> withMetadata(Map<String, Object> metadata) {
-        return new Bound<>(
-            name,
-            filenamePrefix,
-            filenameSuffix,
-            numShards,
-            shardTemplate,
-            type,
-            schema,
-            codec,
-            metadata,
-            windowedWrites,
-            filenamePolicy);
+      if (windowedWrites) {
+        write = write.withWindowedWrites();
       }
+      return input.apply("Write", write);
+    }
 
-      @Override
-      public PDone expand(PCollection<T> input) {
-        if (filenamePolicy == null && filenamePrefix == null) {
-          throw new IllegalStateException(
-              "need to set the filename prefix of an AvroIO.Write transform");
-        }
-        if (filenamePolicy != null && filenamePrefix != null) {
-          throw new IllegalStateException(
-              "cannot set both a filename policy and a filename prefix");
-        }
-        if (schema == null) {
-          throw new IllegalStateException("need to set the schema of an AvroIO.Write transform");
-        }
-
-        WriteFiles<T> write = null;
-        if (filenamePolicy != null) {
-          write = WriteFiles.to(
-              new AvroSink<>(
-                  filenamePolicy,
-                  AvroCoder.of(type, schema),
-                  codec,
-                  metadata));
-        } else {
-          write = WriteFiles.to(
-              new AvroSink<>(
-                  filenamePrefix,
-                  filenameSuffix,
-                  shardTemplate,
-                  AvroCoder.of(type, schema),
-                  codec,
-                  metadata));
-        }
-        if (getNumShards() > 0) {
-          write = write.withNumShards(getNumShards());
-        }
-        if (windowedWrites) {
-          write = write.withWindowedWrites();
-        }
-        return input.apply("Write", write);
-      }
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+      builder
+          .add(DisplayData.item("schema", type)
+            .withLabel("Record Schema"))
+          .addIfNotNull(DisplayData.item("filePrefix", filenamePrefix)
+            .withLabel("Output File Prefix"))
+          .addIfNotDefault(DisplayData.item("shardNameTemplate", shardTemplate)
+              .withLabel("Output Shard Name Template"),
+              DEFAULT_SHARD_TEMPLATE)
+          .addIfNotDefault(DisplayData.item("fileSuffix", filenameSuffix)
+              .withLabel("Output File Suffix"),
+              "")
+          .addIfNotDefault(DisplayData.item("numShards", numShards)
+              .withLabel("Maximum Output Shards"),
+              0)
+          .addIfNotDefault(DisplayData.item("codec", codec.toString())
+              .withLabel("Avro Compression Codec"),
+              DEFAULT_CODEC.toString());
+      builder.include("Metadata", new Metadata());
+    }
 
+    private class Metadata implements HasDisplayData {
       @Override
       public void populateDisplayData(DisplayData.Builder builder) {
-        super.populateDisplayData(builder);
-        builder
-            .add(DisplayData.item("schema", type)
-              .withLabel("Record Schema"))
-            .addIfNotNull(DisplayData.item("filePrefix", filenamePrefix)
-              .withLabel("Output File Prefix"))
-            .addIfNotDefault(DisplayData.item("shardNameTemplate", shardTemplate)
-                .withLabel("Output Shard Name Template"),
-                DEFAULT_SHARD_TEMPLATE)
-            .addIfNotDefault(DisplayData.item("fileSuffix", filenameSuffix)
-                .withLabel("Output File Suffix"),
-                "")
-            .addIfNotDefault(DisplayData.item("numShards", numShards)
-                .withLabel("Maximum Output Shards"),
-                0)
-            .addIfNotDefault(DisplayData.item("codec", codec.toString())
-                .withLabel("Avro Compression Codec"),
-                DEFAULT_CODEC.toString());
-        builder.include("Metadata", new Metadata());
-      }
-
-      private class Metadata implements HasDisplayData {
-        @Override
-        public void populateDisplayData(DisplayData.Builder builder) {
-          for (Map.Entry<String, Object> entry : metadata.entrySet()) {
-            DisplayData.Type type = DisplayData.inferType(entry.getValue());
-            if (type != null) {
-              builder.add(DisplayData.item(entry.getKey(), type, entry.getValue()));
-            } else {
-              String base64 = BaseEncoding.base64().encode((byte[]) entry.getValue());
-              String repr = base64.length() <= METADATA_BYTES_MAX_LENGTH
-                  ? base64 : base64.substring(0, METADATA_BYTES_MAX_LENGTH) + "...";
-              builder.add(DisplayData.item(entry.getKey(), repr));
-            }
+        for (Map.Entry<String, Object> entry : metadata.entrySet()) {
+          DisplayData.Type type = DisplayData.inferType(entry.getValue());
+          if (type != null) {
+            builder.add(DisplayData.item(entry.getKey(), type, entry.getValue()));
+          } else {
+            String base64 = BaseEncoding.base64().encode((byte[]) entry.getValue());
+            String repr = base64.length() <= METADATA_BYTES_MAX_LENGTH
+                ? base64 : base64.substring(0, METADATA_BYTES_MAX_LENGTH) + "...";
+            builder.add(DisplayData.item(entry.getKey(), repr));
           }
         }
       }
+    }
 
-      /**
-       * Returns the current shard name template string.
-       */
-      public String getShardNameTemplate() {
-        return shardTemplate;
-      }
-
-      @Override
-      protected Coder<Void> getDefaultOutputCoder() {
-        return VoidCoder.of();
-      }
+    /**
+     * Returns the current shard name template string.
+     */
+    public String getShardNameTemplate() {
+      return shardTemplate;
+    }
 
-      public String getFilenamePrefix() {
-        return filenamePrefix;
-      }
+    @Override
+    protected Coder<Void> getDefaultOutputCoder() {
+      return VoidCoder.of();
+    }
 
-      public String getShardTemplate() {
-        return shardTemplate;
-      }
+    public String getFilenamePrefix() {
+      return filenamePrefix;
+    }
 
-      public int getNumShards() {
-        return numShards;
-      }
+    public String getShardTemplate() {
+      return shardTemplate;
+    }
 
-      public String getFilenameSuffix() {
-        return filenameSuffix;
-      }
+    public int getNumShards() {
+      return numShards;
+    }
 
-      public Class<T> getType() {
-        return type;
-      }
+    public String getFilenameSuffix() {
+      return filenameSuffix;
+    }
 
-      public Schema getSchema() {
-        return schema;
-      }
+    public Class<T> getType() {
+      return type;
+    }
 
-      public CodecFactory getCodec() {
-        return codec.getCodec();
-      }
+    public Schema getSchema() {
+      return schema;
+    }
 
-      public Map<String, Object> getMetadata() {
-        return metadata;
-      }
+    public CodecFactory getCodec() {
+      return codec.getCodec();
     }
 
-    /** Disallow construction of utility class. */
-    private Write() {}
+    public Map<String, Object> getMetadata() {
+      return metadata;
+    }
   }
 
   // Pattern which matches old-style shard output patterns, which are now

http://git-wip-us.apache.org/repos/asf/beam/blob/d1dfd4e2/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
index 2144b0d..7df1b18 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
@@ -51,7 +51,6 @@ import org.apache.avro.reflect.ReflectData;
 import org.apache.avro.reflect.ReflectDatumReader;
 import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.coders.DefaultCoder;
-import org.apache.beam.sdk.io.AvroIO.Write.Bound;
 import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.ValueProvider;
@@ -104,7 +103,7 @@ public class AvroIOTest {
   @Test
   public void testAvroIOGetName() {
     assertEquals("AvroIO.Read", AvroIO.read().from("gs://bucket/foo*/baz").getName());
-    assertEquals("AvroIO.Write", AvroIO.Write.to("gs://bucket/foo/baz").getName());
+    assertEquals("AvroIO.Write", AvroIO.write().to("gs://bucket/foo/baz").getName());
   }
 
   @DefaultCoder(AvroCoder.class)
@@ -145,7 +144,7 @@ public class AvroIOTest {
     File outputFile = tmpFolder.newFile("output.avro");
 
     p.apply(Create.of(values))
-      .apply(AvroIO.Write.to(outputFile.getAbsolutePath())
+      .apply(AvroIO.<GenericClass>write().to(outputFile.getAbsolutePath())
           .withoutSharding()
           .withSchema(GenericClass.class));
     p.run();
@@ -169,7 +168,7 @@ public class AvroIOTest {
     File outputFile = tmpFolder.newFile("output.avro");
 
     p.apply(Create.of(values))
-        .apply(AvroIO.Write.to(outputFile.getAbsolutePath())
+        .apply(AvroIO.<GenericClass>write().to(outputFile.getAbsolutePath())
             .withoutSharding()
             .withCodec(CodecFactory.deflateCodec(9))
             .withSchema(GenericClass.class));
@@ -196,7 +195,7 @@ public class AvroIOTest {
     File outputFile = tmpFolder.newFile("output.avro");
 
     p.apply(Create.of(values))
-        .apply(AvroIO.Write.to(outputFile.getAbsolutePath())
+        .apply(AvroIO.<GenericClass>write().to(outputFile.getAbsolutePath())
             .withoutSharding()
             .withSchema(GenericClass.class)
             .withCodec(CodecFactory.nullCodec()));
@@ -264,7 +263,7 @@ public class AvroIOTest {
     File outputFile = tmpFolder.newFile("output.avro");
 
     p.apply(Create.of(values))
-      .apply(AvroIO.Write.to(outputFile.getAbsolutePath())
+      .apply(AvroIO.<GenericClass>write().to(outputFile.getAbsolutePath())
           .withoutSharding()
           .withSchema(GenericClass.class));
     p.run();
@@ -372,7 +371,7 @@ public class AvroIOTest {
     windowedAvroWritePipeline
         .apply(values)
         .apply(Window.<GenericClass>into(FixedWindows.of(Duration.standardMinutes(1))))
-        .apply(AvroIO.Write.to(new WindowedFilenamePolicy(outputFilePrefix))
+        .apply(AvroIO.<GenericClass>write().to(new WindowedFilenamePolicy(outputFilePrefix))
             .withWindowedWrites()
             .withNumShards(2)
             .withSchema(GenericClass.class));
@@ -407,14 +406,14 @@ public class AvroIOTest {
 
   @Test
   public void testWriteWithDefaultCodec() throws Exception {
-    AvroIO.Write.Bound<GenericRecord> write = AvroIO.Write
+    AvroIO.Write<GenericRecord> write = AvroIO.<GenericRecord>write()
         .to("gs://bucket/foo/baz");
     assertEquals(CodecFactory.deflateCodec(6).toString(), write.getCodec().toString());
   }
 
   @Test
   public void testWriteWithCustomCodec() throws Exception {
-    AvroIO.Write.Bound<GenericRecord> write = AvroIO.Write
+    AvroIO.Write<GenericRecord> write = AvroIO.<GenericRecord>write()
         .to("gs://bucket/foo/baz")
         .withCodec(CodecFactory.snappyCodec());
     assertEquals(SNAPPY_CODEC, write.getCodec().toString());
@@ -423,11 +422,11 @@ public class AvroIOTest {
   @Test
   @SuppressWarnings("unchecked")
   public void testWriteWithSerDeCustomDeflateCodec() throws Exception {
-    AvroIO.Write.Bound<GenericRecord> write = AvroIO.Write
+    AvroIO.Write<GenericRecord> write = AvroIO.<GenericRecord>write()
         .to("gs://bucket/foo/baz")
         .withCodec(CodecFactory.deflateCodec(9));
 
-    AvroIO.Write.Bound<GenericRecord> serdeWrite = SerializableUtils.clone(write);
+    AvroIO.Write<GenericRecord> serdeWrite = SerializableUtils.clone(write);
 
     assertEquals(CodecFactory.deflateCodec(9).toString(), serdeWrite.getCodec().toString());
   }
@@ -435,11 +434,11 @@ public class AvroIOTest {
   @Test
   @SuppressWarnings("unchecked")
   public void testWriteWithSerDeCustomXZCodec() throws Exception {
-    AvroIO.Write.Bound<GenericRecord> write = AvroIO.Write
+    AvroIO.Write<GenericRecord> write = AvroIO.<GenericRecord>write()
         .to("gs://bucket/foo/baz")
         .withCodec(CodecFactory.xzCodec(9));
 
-    AvroIO.Write.Bound<GenericRecord> serdeWrite = SerializableUtils.clone(write);
+    AvroIO.Write<GenericRecord> serdeWrite = SerializableUtils.clone(write);
 
     assertEquals(CodecFactory.xzCodec(9).toString(), serdeWrite.getCodec().toString());
   }
@@ -453,7 +452,7 @@ public class AvroIOTest {
     File outputFile = tmpFolder.newFile("output.avro");
 
     p.apply(Create.of(values))
-        .apply(AvroIO.Write.to(outputFile.getAbsolutePath())
+        .apply(AvroIO.<GenericClass>write().to(outputFile.getAbsolutePath())
             .withoutSharding()
             .withSchema(GenericClass.class)
             .withMetadata(ImmutableMap.<String, Object>of(
@@ -475,7 +474,8 @@ public class AvroIOTest {
     File baseOutputFile = new File(tmpFolder.getRoot(), "prefix");
     String outputFilePrefix = baseOutputFile.getAbsolutePath();
 
-    Bound<String> write = AvroIO.Write.to(outputFilePrefix).withSchema(String.class);
+    AvroIO.Write<String> write =
+        AvroIO.<String>write().to(outputFilePrefix).withSchema(String.class);
     if (numShards > 1) {
       System.out.println("NumShards " + numShards);
       write = write.withNumShards(numShards);
@@ -556,7 +556,7 @@ public class AvroIOTest {
 
   @Test
   public void testWriteDisplayData() {
-    AvroIO.Write.Bound<?> write = AvroIO.Write
+    AvroIO.Write<?> write = AvroIO.<GenericClass>write()
         .to("foo")
         .withShardNameTemplate("-SS-of-NN-")
         .withSuffix("bar")
@@ -584,10 +584,9 @@ public class AvroIOTest {
 
     DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(options);
 
-    AvroIO.Write.Bound<?> write = AvroIO.Write
+    AvroIO.Write<?> write = AvroIO.<GenericRecord>write()
         .to(outputPath)
-        .withSchema(Schema.create(Schema.Type.STRING))
-        .withoutValidation();
+        .withSchema(Schema.create(Schema.Type.STRING));
 
     Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write);
     assertThat("AvroIO.Write should include the file pattern in its primitive transform",

http://git-wip-us.apache.org/repos/asf/beam/blob/d1dfd4e2/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java
index b974663..ba7f1b9 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java
@@ -271,16 +271,16 @@ public class AvroIOTransformTest {
           ImmutableList.<Object[]>builder()
               .add(
                   new Object[] {
-                      AvroIO.Write.withSchema(AvroGeneratedUser.class),
+                      AvroIO.<AvroGeneratedUser>write().withSchema(AvroGeneratedUser.class),
                       generatedClass
                   },
                   new Object[] {
-                      AvroIO.Write.withSchema(SCHEMA),
+                      AvroIO.write().withSchema(SCHEMA),
                       fromSchema
                   },
 
                   new Object[] {
-                      AvroIO.Write.withSchema(SCHEMA_STRING),
+                      AvroIO.write().withSchema(SCHEMA_STRING),
                       fromSchemaString
                   })
               .build();
@@ -288,17 +288,17 @@ public class AvroIOTransformTest {
 
     @SuppressWarnings("DefaultAnnotationParam")
     @Parameterized.Parameter(0)
-    public AvroIO.Write.Bound writeTransform;
+    public AvroIO.Write writeTransform;
 
     @Parameterized.Parameter(1)
     public String testAlias;
 
-    private <T> void runTestWrite(final AvroIO.Write.Bound<T> writeBuilder)
+    private <T> void runTestWrite(final AvroIO.Write<T> writeBuilder)
         throws Exception {
 
       final File avroFile = tmpFolder.newFile("file.avro");
       final AvroGeneratedUser[] users = generateAvroObjects();
-      final AvroIO.Write.Bound<T> write = writeBuilder.to(avroFile.getPath());
+      final AvroIO.Write<T> write = writeBuilder.to(avroFile.getPath());
 
       @SuppressWarnings("unchecked") final
       PCollection<T> input =


[05/11] beam git commit: Moves AvroSink to upper level

Posted by jk...@apache.org.
Moves AvroSink to upper level


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0166e199
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0166e199
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0166e199

Branch: refs/heads/master
Commit: 0166e19991af956a48ef99310f5f1916225255aa
Parents: 2fa3c34
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri Apr 28 18:05:00 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Mon May 1 18:43:38 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/io/AvroIO.java     | 131 ----------------
 .../java/org/apache/beam/sdk/io/AvroSink.java   | 150 +++++++++++++++++++
 2 files changed, 150 insertions(+), 131 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/0166e199/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index 2031569..75e14d5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -19,33 +19,24 @@ package org.apache.beam.sdk.io;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
 import com.google.common.io.BaseEncoding;
-import java.nio.channels.Channels;
-import java.nio.channels.WritableByteChannel;
 import java.util.Map;
 import java.util.regex.Pattern;
 import javax.annotation.Nullable;
 import org.apache.avro.Schema;
 import org.apache.avro.file.CodecFactory;
-import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.io.DatumWriter;
 import org.apache.avro.reflect.ReflectData;
-import org.apache.avro.reflect.ReflectDatumWriter;
 import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.Read.Bounded;
-import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.HasDisplayData;
-import org.apache.beam.sdk.util.MimeTypes;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PDone;
@@ -952,126 +943,4 @@ public class AvroIO {
 
   /** Disallow construction of utility class. */
   private AvroIO() {}
-
-  /**
-   * A {@link FileBasedSink} for Avro files.
-   */
-  @VisibleForTesting
-  static class AvroSink<T> extends FileBasedSink<T> {
-    private final AvroCoder<T> coder;
-    private final SerializableAvroCodecFactory codec;
-    private final ImmutableMap<String, Object> metadata;
-
-    @VisibleForTesting
-    AvroSink(
-        FilenamePolicy filenamePolicy,
-        AvroCoder<T> coder,
-        SerializableAvroCodecFactory codec,
-        ImmutableMap<String, Object> metadata) {
-      super(filenamePolicy);
-      this.coder = coder;
-      this.codec = codec;
-      this.metadata = metadata;
-    }
-
-    @VisibleForTesting
-    AvroSink(
-        String baseOutputFilename,
-        String extension,
-        String fileNameTemplate,
-        AvroCoder<T> coder,
-        SerializableAvroCodecFactory codec,
-        ImmutableMap<String, Object> metadata) {
-      super(baseOutputFilename, extension, fileNameTemplate);
-      this.coder = coder;
-      this.codec = codec;
-      this.metadata = metadata;
-    }
-
-    @Override
-    public FileBasedSink.FileBasedWriteOperation<T> createWriteOperation() {
-      return new AvroWriteOperation<>(this, coder, codec, metadata);
-    }
-
-    /**
-     * A {@link org.apache.beam.sdk.io.FileBasedSink.FileBasedWriteOperation
-     * FileBasedWriteOperation} for Avro files.
-     */
-    private static class AvroWriteOperation<T> extends FileBasedWriteOperation<T> {
-      private final AvroCoder<T> coder;
-      private final SerializableAvroCodecFactory codec;
-      private final ImmutableMap<String, Object> metadata;
-
-      private AvroWriteOperation(AvroSink<T> sink,
-                                 AvroCoder<T> coder,
-                                 SerializableAvroCodecFactory codec,
-                                 ImmutableMap<String, Object> metadata) {
-        super(sink);
-        this.coder = coder;
-        this.codec = codec;
-        this.metadata = metadata;
-      }
-
-      @Override
-      public FileBasedWriter<T> createWriter(PipelineOptions options) throws Exception {
-        return new AvroWriter<>(this, coder, codec, metadata);
-      }
-    }
-
-    /**
-     * A {@link org.apache.beam.sdk.io.FileBasedSink.FileBasedWriter FileBasedWriter}
-     * for Avro files.
-     */
-    private static class AvroWriter<T> extends FileBasedWriter<T> {
-      private final AvroCoder<T> coder;
-      private DataFileWriter<T> dataFileWriter;
-      private SerializableAvroCodecFactory codec;
-      private final ImmutableMap<String, Object> metadata;
-
-      public AvroWriter(FileBasedWriteOperation<T> writeOperation,
-                        AvroCoder<T> coder,
-                        SerializableAvroCodecFactory codec,
-                        ImmutableMap<String, Object> metadata) {
-        super(writeOperation, MimeTypes.BINARY);
-        this.coder = coder;
-        this.codec = codec;
-        this.metadata = metadata;
-      }
-
-      @SuppressWarnings("deprecation") // uses internal test functionality.
-      @Override
-      protected void prepareWrite(WritableByteChannel channel) throws Exception {
-        DatumWriter<T> datumWriter = coder.getType().equals(GenericRecord.class)
-            ? new GenericDatumWriter<T>(coder.getSchema())
-            : new ReflectDatumWriter<T>(coder.getSchema());
-
-        dataFileWriter = new DataFileWriter<>(datumWriter).setCodec(codec.getCodec());
-        for (Map.Entry<String, Object> entry : metadata.entrySet()) {
-          Object v = entry.getValue();
-          if (v instanceof String) {
-            dataFileWriter.setMeta(entry.getKey(), (String) v);
-          } else if (v instanceof Long) {
-            dataFileWriter.setMeta(entry.getKey(), (Long) v);
-          } else if (v instanceof byte[]) {
-            dataFileWriter.setMeta(entry.getKey(), (byte[]) v);
-          } else {
-            throw new IllegalStateException(
-                "Metadata value type must be one of String, Long, or byte[]. Found "
-                    + v.getClass().getSimpleName());
-          }
-        }
-        dataFileWriter.create(coder.getSchema(), Channels.newOutputStream(channel));
-      }
-
-      @Override
-      public void write(T value) throws Exception {
-        dataFileWriter.append(value);
-      }
-
-      @Override
-      protected void finishWrite() throws Exception {
-        dataFileWriter.flush();
-      }
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/0166e199/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java
new file mode 100644
index 0000000..16f233c
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io;
+
+import com.google.common.collect.ImmutableMap;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import java.util.Map;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.reflect.ReflectDatumWriter;
+import org.apache.beam.sdk.coders.AvroCoder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.util.MimeTypes;
+
+/**
+ * A {@link FileBasedSink} for Avro files.
+ */
+class AvroSink<T> extends FileBasedSink<T> {
+  private final AvroCoder<T> coder;
+  private final SerializableAvroCodecFactory codec;
+  private final ImmutableMap<String, Object> metadata;
+
+  AvroSink(
+      FilenamePolicy filenamePolicy,
+      AvroCoder<T> coder,
+      SerializableAvroCodecFactory codec,
+      ImmutableMap<String, Object> metadata) {
+    super(filenamePolicy);
+    this.coder = coder;
+    this.codec = codec;
+    this.metadata = metadata;
+  }
+
+  AvroSink(
+      String baseOutputFilename,
+      String extension,
+      String fileNameTemplate,
+      AvroCoder<T> coder,
+      SerializableAvroCodecFactory codec,
+      ImmutableMap<String, Object> metadata) {
+    super(baseOutputFilename, extension, fileNameTemplate);
+    this.coder = coder;
+    this.codec = codec;
+    this.metadata = metadata;
+  }
+
+  @Override
+  public FileBasedWriteOperation<T> createWriteOperation() {
+    return new AvroWriteOperation<>(this, coder, codec, metadata);
+  }
+
+  /**
+   * A {@link FileBasedWriteOperation
+   * FileBasedWriteOperation} for Avro files.
+   */
+  private static class AvroWriteOperation<T> extends FileBasedWriteOperation<T> {
+    private final AvroCoder<T> coder;
+    private final SerializableAvroCodecFactory codec;
+    private final ImmutableMap<String, Object> metadata;
+
+    private AvroWriteOperation(AvroSink<T> sink,
+                               AvroCoder<T> coder,
+                               SerializableAvroCodecFactory codec,
+                               ImmutableMap<String, Object> metadata) {
+      super(sink);
+      this.coder = coder;
+      this.codec = codec;
+      this.metadata = metadata;
+    }
+
+    @Override
+    public FileBasedWriter<T> createWriter(PipelineOptions options) throws Exception {
+      return new AvroWriter<>(this, coder, codec, metadata);
+    }
+  }
+
+  /**
+   * A {@link FileBasedWriter FileBasedWriter}
+   * for Avro files.
+   */
+  private static class AvroWriter<T> extends FileBasedWriter<T> {
+    private final AvroCoder<T> coder;
+    private DataFileWriter<T> dataFileWriter;
+    private SerializableAvroCodecFactory codec;
+    private final ImmutableMap<String, Object> metadata;
+
+    public AvroWriter(FileBasedWriteOperation<T> writeOperation,
+                      AvroCoder<T> coder,
+                      SerializableAvroCodecFactory codec,
+                      ImmutableMap<String, Object> metadata) {
+      super(writeOperation, MimeTypes.BINARY);
+      this.coder = coder;
+      this.codec = codec;
+      this.metadata = metadata;
+    }
+
+    @SuppressWarnings("deprecation") // uses internal test functionality.
+    @Override
+    protected void prepareWrite(WritableByteChannel channel) throws Exception {
+      DatumWriter<T> datumWriter = coder.getType().equals(GenericRecord.class)
+          ? new GenericDatumWriter<T>(coder.getSchema())
+          : new ReflectDatumWriter<T>(coder.getSchema());
+
+      dataFileWriter = new DataFileWriter<>(datumWriter).setCodec(codec.getCodec());
+      for (Map.Entry<String, Object> entry : metadata.entrySet()) {
+        Object v = entry.getValue();
+        if (v instanceof String) {
+          dataFileWriter.setMeta(entry.getKey(), (String) v);
+        } else if (v instanceof Long) {
+          dataFileWriter.setMeta(entry.getKey(), (Long) v);
+        } else if (v instanceof byte[]) {
+          dataFileWriter.setMeta(entry.getKey(), (byte[]) v);
+        } else {
+          throw new IllegalStateException(
+              "Metadata value type must be one of String, Long, or byte[]. Found "
+                  + v.getClass().getSimpleName());
+        }
+      }
+      dataFileWriter.create(coder.getSchema(), Channels.newOutputStream(channel));
+    }
+
+    @Override
+    public void write(T value) throws Exception {
+      dataFileWriter.append(value);
+    }
+
+    @Override
+    protected void finishWrite() throws Exception {
+      dataFileWriter.flush();
+    }
+  }
+}


[03/11] beam git commit: Converts AvroIO.Read to AutoValue

Posted by jk...@apache.org.
Converts AvroIO.Read to AutoValue


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/439f2ca0
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/439f2ca0
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/439f2ca0

Branch: refs/heads/master
Commit: 439f2ca03c0d8994e5736b9493f61d9cb4267cb2
Parents: ff7a1d4
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri Apr 28 18:37:49 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Mon May 1 18:43:38 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/io/AvroIO.java     | 67 +++++++++-----------
 1 file changed, 29 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/439f2ca0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index ed172d1..2f1d917 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.io;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
+import com.google.auto.value.AutoValue;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
 import com.google.common.io.BaseEncoding;
@@ -130,12 +131,15 @@ public class AvroIO {
    * <p>The schema must be specified using one of the {@code withSchema} functions.
    */
   public static <T> Read<T> read() {
-    return new Read<>();
+    return new AutoValue_AvroIO_Read.Builder<T>().build();
   }
 
   /** Reads Avro file(s) containing records of the specified schema. */
   public static Read<GenericRecord> readGenericRecords(Schema schema) {
-    return new Read<>(null, null, GenericRecord.class, schema);
+    return new AutoValue_AvroIO_Read.Builder<GenericRecord>()
+        .setRecordClass(GenericRecord.class)
+        .setSchema(schema)
+        .build();
   }
 
   /**
@@ -146,26 +150,21 @@ public class AvroIO {
   }
 
   /** Implementation of {@link #read}. */
-  public static class Read<T> extends PTransform<PBegin, PCollection<T>> {
-    /** The filepattern to read from. */
-    @Nullable
-    final String filepattern;
-    /** The class type of the records. */
-    @Nullable
-    final Class<T> type;
-    /** The schema of the input file. */
-    @Nullable
-    final Schema schema;
-
-    Read() {
-      this(null, null, null, null);
-    }
+  @AutoValue
+  public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>> {
+    @Nullable abstract String getFilepattern();
+    @Nullable abstract Class<T> getRecordClass();
+    @Nullable abstract Schema getSchema();
+
+    abstract Builder<T> toBuilder();
 
-    Read(String name, String filepattern, Class<T> type, Schema schema) {
-      super(name);
-      this.filepattern = filepattern;
-      this.type = type;
-      this.schema = schema;
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      abstract Builder<T> setFilepattern(String filepattern);
+      abstract Builder<T> setRecordClass(Class<T> recordClass);
+      abstract Builder<T> setSchema(Schema schema);
+
+      abstract Read<T> build();
     }
 
     /**
@@ -178,7 +177,7 @@ public class AvroIO {
      * Filesystem glob patterns</a> ("*", "?", "[..]") are supported.
      */
     public Read<T> from(String filepattern) {
-      return new Read<>(name, filepattern, type, schema);
+      return toBuilder().setFilepattern(filepattern).build();
     }
 
     /**
@@ -187,26 +186,26 @@ public class AvroIO {
      * specified Avro-generated class.
      */
     public Read<T> withSchema(Class<T> type) {
-      return new Read<>(name, filepattern, type, ReflectData.get().getSchema(type));
+      return toBuilder().setRecordClass(type).setSchema(ReflectData.get().getSchema(type)).build();
     }
 
     @Override
     public PCollection<T> expand(PBegin input) {
-      if (filepattern == null) {
+      if (getFilepattern() == null) {
         throw new IllegalStateException(
             "need to set the filepattern of an AvroIO.Read transform");
       }
-      if (schema == null) {
+      if (getSchema() == null) {
         throw new IllegalStateException("need to set the schema of an AvroIO.Read transform");
       }
 
       @SuppressWarnings("unchecked")
       Bounded<T> read =
-          type == GenericRecord.class
+          getRecordClass() == GenericRecord.class
               ? (Bounded<T>) org.apache.beam.sdk.io.Read.from(
-                  AvroSource.from(filepattern).withSchema(schema))
+                  AvroSource.from(getFilepattern()).withSchema(getSchema()))
               : org.apache.beam.sdk.io.Read.from(
-                  AvroSource.from(filepattern).withSchema(type));
+                  AvroSource.from(getFilepattern()).withSchema(getRecordClass()));
 
       PCollection<T> pcol = input.getPipeline().apply("Read", read);
       // Honor the default output coder that would have been used by this PTransform.
@@ -218,21 +217,13 @@ public class AvroIO {
     public void populateDisplayData(DisplayData.Builder builder) {
       super.populateDisplayData(builder);
       builder
-        .addIfNotNull(DisplayData.item("filePattern", filepattern)
+        .addIfNotNull(DisplayData.item("filePattern", getFilepattern())
           .withLabel("Input File Pattern"));
     }
 
     @Override
     protected Coder<T> getDefaultOutputCoder() {
-      return AvroCoder.of(type, schema);
-    }
-
-    public String getFilepattern() {
-      return filepattern;
-    }
-
-    public Schema getSchema() {
-      return schema;
+      return AvroCoder.of(getRecordClass(), getSchema());
     }
   }