You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/04/18 21:21:31 UTC

[5/6] beam git commit: Gets rid of TFRecordIO.Write.Bound

Gets rid of TFRecordIO.Write.Bound


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/76de0e46
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/76de0e46
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/76de0e46

Branch: refs/heads/master
Commit: 76de0e4609670070f8f8fe479e4919b538143d15
Parents: f5d92a4
Author: Eugene Kirpichov <ki...@google.com>
Authored: Mon Apr 17 17:38:08 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue Apr 18 14:03:42 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/io/TFRecordIO.java | 407 +++++++------------
 .../org/apache/beam/sdk/io/TFRecordIOTest.java  |   6 +-
 2 files changed, 154 insertions(+), 259 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/76de0e46/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
index fb4ff5b..13fd4d1 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
@@ -66,6 +66,15 @@ public class TFRecordIO {
         .build();
   }
 
+  /**
+   * A {@link PTransform} that writes a {@link PCollection} to TFRecord file (or
+   * multiple TFRecord files matching a sharding pattern), with each
+   * element of the input collection encoded into its own record.
+   */
+  public static Write write() {
+    return new Write();
+  }
+
   /** Implementation of {@link #read}. */
   @AutoValue
   public abstract static class Read extends PTransform<PBegin, PCollection<byte[]>> {
@@ -204,45 +213,80 @@ public class TFRecordIO {
 
   /////////////////////////////////////////////////////////////////////////////
 
-  /**
-   * A {@link PTransform} that writes a {@link PCollection} to TFRecord file (or
-   * multiple TFRecord files matching a sharding pattern), with each
-   * element of the input collection encoded into its own record.
-   */
-  public static class Write {
+  /** Implementation of {@link #write}. */
+  public static class Write extends PTransform<PCollection<byte[]>, PDone> {
+    private static final String DEFAULT_SHARD_TEMPLATE = ShardNameTemplate.INDEX_OF_MAX;
+
+    /** The prefix of each file written, combined with suffix and shardTemplate. */
+    private final ValueProvider<String> filenamePrefix;
+    /** The suffix of each file written, combined with prefix and shardTemplate. */
+    private final String filenameSuffix;
+
+    /** Requested number of shards. 0 for automatic. */
+    private final int numShards;
+
+    /** The shard template of each file written, combined with prefix and suffix. */
+    private final String shardTemplate;
+
+    /** An option to indicate if output validation is desired. Default is true. */
+    private final boolean validate;
+
+    /** Option to indicate the output sink's compression type. Default is NONE. */
+    private final TFRecordIO.CompressionType compressionType;
+
+    private Write() {
+      this(null, null, "", 0, DEFAULT_SHARD_TEMPLATE, true, TFRecordIO.CompressionType.NONE);
+    }
+
+    private Write(String name, ValueProvider<String> filenamePrefix, String filenameSuffix,
+                 int numShards, String shardTemplate, boolean validate,
+                 CompressionType compressionType) {
+      super(name);
+      this.filenamePrefix = filenamePrefix;
+      this.filenameSuffix = filenameSuffix;
+      this.numShards = numShards;
+      this.shardTemplate = shardTemplate;
+      this.validate = validate;
+      this.compressionType = compressionType;
+    }
 
     /**
-     * Returns a transform for writing to TFRecord files that writes to the file(s)
-     * with the given prefix. This can be a local filename
+     * Writes to TFRecord file(s) with the given prefix. This can be a local filename
      * (if running locally), or a Google Cloud Storage filename of
      * the form {@code "gs://<bucket>/<filepath>"}
      * (if running locally or using remote execution).
      *
      * <p>The files written will begin with this prefix, followed by
-     * a shard identifier (see {@link TFRecordIO.Write.Bound#withNumShards(int)}, and end
-     * in a common extension, if given by {@link TFRecordIO.Write.Bound#withSuffix(String)}.
+     * a shard identifier (see {@link #withNumShards(int)}, and end
+     * in a common extension, if given by {@link #withSuffix(String)}.
      */
-    public static Bound to(String prefix) {
-      return new Bound().to(prefix);
+    public Write to(String filenamePrefix) {
+      validateOutputComponent(filenamePrefix);
+      return new Write(name, StaticValueProvider.of(filenamePrefix), filenameSuffix, numShards,
+          shardTemplate, validate, compressionType);
     }
 
     /**
      * Like {@link #to(String)}, but with a {@link ValueProvider}.
      */
-    public static Bound to(ValueProvider<String> prefix) {
-      return new Bound().to(prefix);
+    public Write to(ValueProvider<String> filenamePrefix) {
+      return new Write(name, filenamePrefix, filenameSuffix, numShards, shardTemplate, validate,
+          compressionType);
     }
 
     /**
-     * Returns a transform for writing to TFRecord files that appends the specified suffix
-     * to the created files.
+     * Writes to the file(s) with the given filename suffix.
+     *
+     * @see ShardNameTemplate
      */
-    public static Bound withSuffix(String nameExtension) {
-      return new Bound().withSuffix(nameExtension);
+    public Write withSuffix(String nameExtension) {
+      validateOutputComponent(nameExtension);
+      return new Write(name, filenamePrefix, nameExtension, numShards, shardTemplate, validate,
+          compressionType);
     }
 
     /**
-     * Returns a transform for writing to TFRecord files that uses the provided shard count.
+     * Writes to the provided number of shards.
      *
      * <p>Constraining the number of shards is likely to reduce
      * the performance of a pipeline. Setting this value is not recommended
@@ -250,280 +294,131 @@ public class TFRecordIO {
      *
      * @param numShards the number of shards to use, or 0 to let the system
      *                  decide.
+     * @see ShardNameTemplate
      */
-    public static Bound withNumShards(int numShards) {
-      return new Bound().withNumShards(numShards);
+    public Write withNumShards(int numShards) {
+      checkArgument(numShards >= 0);
+      return new Write(name, filenamePrefix, filenameSuffix, numShards, shardTemplate, validate,
+          compressionType);
     }
 
     /**
-     * Returns a transform for writing to TFRecord files that uses the given shard name
-     * template.
+     * Uses the given shard name template.
      *
-     * <p>See {@link ShardNameTemplate} for a description of shard templates.
+     * @see ShardNameTemplate
      */
-    public static Bound withShardNameTemplate(String shardTemplate) {
-      return new Bound().withShardNameTemplate(shardTemplate);
+    public Write withShardNameTemplate(String shardTemplate) {
+      return new Write(name, filenamePrefix, filenameSuffix, numShards, shardTemplate, validate,
+          compressionType);
     }
 
     /**
-     * Returns a transform for writing to TFRecord files that forces a single file as
-     * output.
+     * Forces a single file as output.
+     *
+     * <p>Constraining the number of shards is likely to reduce
+     * the performance of a pipeline. Using this setting is not recommended
+     * unless you truly require a single output file.
+     *
+     * <p>This is a shortcut for
+     * {@code .withNumShards(1).withShardNameTemplate("")}
      */
-    public static Bound withoutSharding() {
-      return new Bound().withoutSharding();
+    public Write withoutSharding() {
+      return new Write(name, filenamePrefix, filenameSuffix, 1, "",
+          validate, compressionType);
     }
 
     /**
-     * Returns a transform for writing to text files that has GCS path validation on
-     * pipeline creation disabled.
+     * Disables GCS output path validation on pipeline creation.
      *
      * <p>This can be useful in the case where the GCS output location does
-     * not exist at the pipeline creation time, but is expected to be available
-     * at execution time.
+     * not exist at the pipeline creation time, but is expected to be
+     * available at execution time.
      */
-    public static Bound withoutValidation() {
-      return new Bound().withoutValidation();
+    public Write withoutValidation() {
+      return new Write(name, filenamePrefix, filenameSuffix, numShards, shardTemplate, false,
+          compressionType);
     }
 
     /**
-     * Returns a transform for writing to TFRecord files like this one but writes to output files
-     * using the specified compression type.
+     * Writes to output files using the specified compression type.
      *
      * <p>If no compression type is specified, the default is
      * {@link TFRecordIO.CompressionType#NONE}.
      * See {@link TFRecordIO.Read#withCompressionType} for more details.
      */
-    public static Bound withCompressionType(CompressionType compressionType) {
-      return new Bound().withCompressionType(compressionType);
+    public Write withCompressionType(CompressionType compressionType) {
+      return new Write(name, filenamePrefix, filenameSuffix, numShards, shardTemplate, validate,
+          compressionType);
     }
 
-    /**
-     * A PTransform that writes a bounded PCollection to a TFRecord file (or
-     * multiple TFRecord files matching a sharding pattern), with each
-     * PCollection element being encoded into its own record.
-     */
-    public static class Bound extends PTransform<PCollection<byte[]>, PDone> {
-      private static final String DEFAULT_SHARD_TEMPLATE = ShardNameTemplate.INDEX_OF_MAX;
-
-      /** The prefix of each file written, combined with suffix and shardTemplate. */
-      private final ValueProvider<String> filenamePrefix;
-      /** The suffix of each file written, combined with prefix and shardTemplate. */
-      private final String filenameSuffix;
-
-      /** Requested number of shards. 0 for automatic. */
-      private final int numShards;
-
-      /** The shard template of each file written, combined with prefix and suffix. */
-      private final String shardTemplate;
-
-      /** An option to indicate if output validation is desired. Default is true. */
-      private final boolean validate;
-
-      /** Option to indicate the output sink's compression type. Default is NONE. */
-      private final TFRecordIO.CompressionType compressionType;
-
-      private Bound() {
-        this(null, null, "", 0, DEFAULT_SHARD_TEMPLATE, true, TFRecordIO.CompressionType.NONE);
-      }
-
-      private Bound(String name, ValueProvider<String> filenamePrefix, String filenameSuffix,
-                   int numShards, String shardTemplate, boolean validate,
-                   CompressionType compressionType) {
-        super(name);
-        this.filenamePrefix = filenamePrefix;
-        this.filenameSuffix = filenameSuffix;
-        this.numShards = numShards;
-        this.shardTemplate = shardTemplate;
-        this.validate = validate;
-        this.compressionType = compressionType;
-      }
-
-      /**
-       * Returns a transform for writing to TFRecord files that's like this one but
-       * that writes to the file(s) with the given filename prefix.
-       *
-       * <p>See {@link TFRecordIO.Write#to(String) Write.to(String)} for more information.
-       *
-       * <p>Does not modify this object.
-       */
-      public Bound to(String filenamePrefix) {
-        validateOutputComponent(filenamePrefix);
-        return new Bound(name, StaticValueProvider.of(filenamePrefix), filenameSuffix, numShards,
-            shardTemplate, validate, compressionType);
-      }
-
-      /**
-       * Like {@link #to(String)}, but with a {@link ValueProvider}.
-       */
-      public Bound to(ValueProvider<String> filenamePrefix) {
-        return new Bound(name, filenamePrefix, filenameSuffix, numShards, shardTemplate, validate,
-            compressionType);
-      }
-
-      /**
-       * Returns a transform for writing to TFRecord files that that's like this one but
-       * that writes to the file(s) with the given filename suffix.
-       *
-       * <p>Does not modify this object.
-       *
-       * @see ShardNameTemplate
-       */
-      public Bound withSuffix(String nameExtension) {
-        validateOutputComponent(nameExtension);
-        return new Bound(name, filenamePrefix, nameExtension, numShards, shardTemplate, validate,
-            compressionType);
-      }
-
-      /**
-       * Returns a transform for writing to TFRecord files that's like this one but
-       * that uses the provided shard count.
-       *
-       * <p>Constraining the number of shards is likely to reduce
-       * the performance of a pipeline. Setting this value is not recommended
-       * unless you require a specific number of output files.
-       *
-       * <p>Does not modify this object.
-       *
-       * @param numShards the number of shards to use, or 0 to let the system
-       *                  decide.
-       * @see ShardNameTemplate
-       */
-      public Bound withNumShards(int numShards) {
-        checkArgument(numShards >= 0);
-        return new Bound(name, filenamePrefix, filenameSuffix, numShards, shardTemplate, validate,
-            compressionType);
-      }
-
-      /**
-       * Returns a transform for writing to TFRecord files that's like this one but
-       * that uses the given shard name template.
-       *
-       * <p>Does not modify this object.
-       *
-       * @see ShardNameTemplate
-       */
-      public Bound withShardNameTemplate(String shardTemplate) {
-        return new Bound(name, filenamePrefix, filenameSuffix, numShards, shardTemplate, validate,
-            compressionType);
-      }
-
-      /**
-       * Returns a transform for writing to TFRecord files that's like this one but
-       * that forces a single file as output.
-       *
-       * <p>Constraining the number of shards is likely to reduce
-       * the performance of a pipeline. Using this setting is not recommended
-       * unless you truly require a single output file.
-       *
-       * <p>This is a shortcut for
-       * {@code .withNumShards(1).withShardNameTemplate("")}
-       *
-       * <p>Does not modify this object.
-       */
-      public Bound withoutSharding() {
-        return new Bound(name, filenamePrefix, filenameSuffix, 1, "",
-            validate, compressionType);
-      }
-
-      /**
-       * Returns a transform for writing to TFRecord files that's like this one but
-       * that has GCS output path validation on pipeline creation disabled.
-       *
-       * <p>This can be useful in the case where the GCS output location does
-       * not exist at the pipeline creation time, but is expected to be
-       * available at execution time.
-       *
-       * <p>Does not modify this object.
-       */
-      public Bound withoutValidation() {
-        return new Bound(name, filenamePrefix, filenameSuffix, numShards, shardTemplate, false,
-            compressionType);
+    @Override
+    public PDone expand(PCollection<byte[]> input) {
+      if (filenamePrefix == null) {
+        throw new IllegalStateException(
+            "need to set the filename prefix of a TFRecordIO.Write transform");
       }
-
-      /**
-       * Returns a transform for writing to TFRecord files like this one but writes to output files
-       * using the specified compression type.
-       *
-       * <p>If no compression type is specified, the default is
-       * {@link TFRecordIO.CompressionType#NONE}.
-       * See {@link TFRecordIO.Read#withCompressionType} for more details.
-       *
-       * <p>Does not modify this object.
-       */
-      public Bound withCompressionType(CompressionType compressionType) {
-        return new Bound(name, filenamePrefix, filenameSuffix, numShards, shardTemplate, validate,
-            compressionType);
+      org.apache.beam.sdk.io.Write<byte[]> write =
+          org.apache.beam.sdk.io.Write.to(
+              new TFRecordSink(filenamePrefix, filenameSuffix, shardTemplate, compressionType));
+      if (getNumShards() > 0) {
+        write = write.withNumShards(getNumShards());
       }
+      return input.apply("Write", write);
+    }
 
-      @Override
-      public PDone expand(PCollection<byte[]> input) {
-        if (filenamePrefix == null) {
-          throw new IllegalStateException(
-              "need to set the filename prefix of a TFRecordIO.Write transform");
-        }
-        org.apache.beam.sdk.io.Write<byte[]> write =
-            org.apache.beam.sdk.io.Write.to(
-                new TFRecordSink(filenamePrefix, filenameSuffix, shardTemplate, compressionType));
-        if (getNumShards() > 0) {
-          write = write.withNumShards(getNumShards());
-        }
-        return input.apply("Write", write);
-      }
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
 
-      @Override
-      public void populateDisplayData(DisplayData.Builder builder) {
-        super.populateDisplayData(builder);
-
-        String prefixString = filenamePrefix.isAccessible()
-            ? filenamePrefix.get() : filenamePrefix.toString();
-        builder
-            .addIfNotNull(DisplayData.item("filePrefix", prefixString)
-                .withLabel("Output File Prefix"))
-            .addIfNotDefault(DisplayData.item("fileSuffix", filenameSuffix)
-                .withLabel("Output File Suffix"), "")
-            .addIfNotDefault(DisplayData.item("shardNameTemplate", shardTemplate)
-                    .withLabel("Output Shard Name Template"),
-                DEFAULT_SHARD_TEMPLATE)
-            .addIfNotDefault(DisplayData.item("validation", validate)
-                .withLabel("Validation Enabled"), true)
-            .addIfNotDefault(DisplayData.item("numShards", numShards)
-                .withLabel("Maximum Output Shards"), 0)
-            .add(DisplayData
-                .item("compressionType", compressionType.toString())
-                .withLabel("Compression Type"));
-      }
+      String prefixString = filenamePrefix.isAccessible()
+          ? filenamePrefix.get() : filenamePrefix.toString();
+      builder
+          .addIfNotNull(DisplayData.item("filePrefix", prefixString)
+              .withLabel("Output File Prefix"))
+          .addIfNotDefault(DisplayData.item("fileSuffix", filenameSuffix)
+              .withLabel("Output File Suffix"), "")
+          .addIfNotDefault(DisplayData.item("shardNameTemplate", shardTemplate)
+                  .withLabel("Output Shard Name Template"),
+              DEFAULT_SHARD_TEMPLATE)
+          .addIfNotDefault(DisplayData.item("validation", validate)
+              .withLabel("Validation Enabled"), true)
+          .addIfNotDefault(DisplayData.item("numShards", numShards)
+              .withLabel("Maximum Output Shards"), 0)
+          .add(DisplayData
+              .item("compressionType", compressionType.toString())
+              .withLabel("Compression Type"));
+    }
 
-      /**
-       * Returns the current shard name template string.
-       */
-      public String getShardNameTemplate() {
-        return shardTemplate;
-      }
+    /**
+     * Returns the current shard name template string.
+     */
+    public String getShardNameTemplate() {
+      return shardTemplate;
+    }
 
-      @Override
-      protected Coder<Void> getDefaultOutputCoder() {
-        return VoidCoder.of();
-      }
+    @Override
+    protected Coder<Void> getDefaultOutputCoder() {
+      return VoidCoder.of();
+    }
 
-      public String getFilenamePrefix() {
-        return filenamePrefix.get();
-      }
+    public String getFilenamePrefix() {
+      return filenamePrefix.get();
+    }
 
-      public String getShardTemplate() {
-        return shardTemplate;
-      }
+    public String getShardTemplate() {
+      return shardTemplate;
+    }
 
-      public int getNumShards() {
-        return numShards;
-      }
+    public int getNumShards() {
+      return numShards;
+    }
 
-      public String getFilenameSuffix() {
-        return filenameSuffix;
-      }
+    public String getFilenameSuffix() {
+      return filenameSuffix;
+    }
 
-      public boolean needsValidation() {
-        return validate;
-      }
+    public boolean needsValidation() {
+      return validate;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/76de0e46/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TFRecordIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TFRecordIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TFRecordIOTest.java
index 2a455d1..9511c2a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TFRecordIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TFRecordIOTest.java
@@ -158,7 +158,7 @@ public class TFRecordIOTest {
 
   @Test
   public void testWriteDisplayData() {
-    TFRecordIO.Write.Bound write = TFRecordIO.Write
+    TFRecordIO.Write write = TFRecordIO.write()
         .to("foo")
         .withSuffix("bar")
         .withShardNameTemplate("-SS-of-NN-")
@@ -255,7 +255,7 @@ public class TFRecordIOTest {
     PCollection<byte[]> input = p.apply(Create.of(Arrays.asList(elems)))
         .apply(ParDo.of(new StringToByteArray()));
 
-    TFRecordIO.Write.Bound write = TFRecordIO.Write.to(filename).withoutSharding();
+    TFRecordIO.Write write = TFRecordIO.write().to(filename).withoutSharding();
     input.apply(write);
 
     p.run();
@@ -329,7 +329,7 @@ public class TFRecordIOTest {
     Path baseDir = Files.createTempDirectory(tempFolder, "test-rt");
     String baseFilename = baseDir.resolve(outputName).toString();
 
-    TFRecordIO.Write.Bound write = TFRecordIO.Write.to(baseFilename)
+    TFRecordIO.Write write = TFRecordIO.write().to(baseFilename)
         .withNumShards(numShards)
         .withSuffix(suffix)
         .withCompressionType(writeCompressionType);