You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/04/18 21:21:28 UTC

[2/6] beam git commit: Converts TFRecordIO.Write to AutoValue

Converts TFRecordIO.Write to AutoValue


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b77b6fbd
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b77b6fbd
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b77b6fbd

Branch: refs/heads/master
Commit: b77b6fbd5654420ee3f1cf415ab9ee19d277ff0c
Parents: 76de0e4
Author: Eugene Kirpichov <ki...@google.com>
Authored: Mon Apr 17 17:50:24 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue Apr 18 14:03:42 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/io/TFRecordIO.java | 127 ++++++++-----------
 1 file changed, 54 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/b77b6fbd/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
index 13fd4d1..2e01a80 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
@@ -72,7 +72,13 @@ public class TFRecordIO {
    * element of the input collection encoded into its own record.
    */
   public static Write write() {
-    return new Write();
+    return new AutoValue_TFRecordIO_Write.Builder()
+        .setFilenameSuffix("")
+        .setNumShards(0)
+        .setValidate(true)
+        .setShardTemplate(Write.DEFAULT_SHARD_TEMPLATE)
+        .setCompressionType(CompressionType.NONE)
+        .build();
   }
 
   /** Implementation of {@link #read}. */
@@ -214,40 +220,46 @@ public class TFRecordIO {
   /////////////////////////////////////////////////////////////////////////////
 
   /** Implementation of {@link #write}. */
-  public static class Write extends PTransform<PCollection<byte[]>, PDone> {
+  @AutoValue
+  public abstract static class Write extends PTransform<PCollection<byte[]>, PDone> {
     private static final String DEFAULT_SHARD_TEMPLATE = ShardNameTemplate.INDEX_OF_MAX;
 
     /** The prefix of each file written, combined with suffix and shardTemplate. */
-    private final ValueProvider<String> filenamePrefix;
+    @Nullable
+    abstract ValueProvider<String> getFilenamePrefix();
+
     /** The suffix of each file written, combined with prefix and shardTemplate. */
-    private final String filenameSuffix;
+    abstract String getFilenameSuffix();
 
     /** Requested number of shards. 0 for automatic. */
-    private final int numShards;
+    abstract int getNumShards();
 
     /** The shard template of each file written, combined with prefix and suffix. */
-    private final String shardTemplate;
+    abstract String getShardTemplate();
 
     /** An option to indicate if output validation is desired. Default is true. */
-    private final boolean validate;
+    abstract boolean getValidate();
 
     /** Option to indicate the output sink's compression type. Default is NONE. */
-    private final TFRecordIO.CompressionType compressionType;
+    abstract CompressionType getCompressionType();
 
-    private Write() {
-      this(null, null, "", 0, DEFAULT_SHARD_TEMPLATE, true, TFRecordIO.CompressionType.NONE);
-    }
+    abstract Builder toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setFilenamePrefix(ValueProvider<String> filenamePrefix);
+
+      abstract Builder setFilenameSuffix(String filenameSuffix);
+
+      abstract Builder setNumShards(int numShards);
+
+      abstract Builder setShardTemplate(String shardTemplate);
+
+      abstract Builder setValidate(boolean validate);
+
+      abstract Builder setCompressionType(CompressionType compressionType);
 
-    private Write(String name, ValueProvider<String> filenamePrefix, String filenameSuffix,
-                 int numShards, String shardTemplate, boolean validate,
-                 CompressionType compressionType) {
-      super(name);
-      this.filenamePrefix = filenamePrefix;
-      this.filenameSuffix = filenameSuffix;
-      this.numShards = numShards;
-      this.shardTemplate = shardTemplate;
-      this.validate = validate;
-      this.compressionType = compressionType;
+      abstract Write build();
     }
 
     /**
@@ -262,16 +274,14 @@ public class TFRecordIO {
      */
     public Write to(String filenamePrefix) {
       validateOutputComponent(filenamePrefix);
-      return new Write(name, StaticValueProvider.of(filenamePrefix), filenameSuffix, numShards,
-          shardTemplate, validate, compressionType);
+      return to(StaticValueProvider.of(filenamePrefix));
     }
 
     /**
      * Like {@link #to(String)}, but with a {@link ValueProvider}.
      */
     public Write to(ValueProvider<String> filenamePrefix) {
-      return new Write(name, filenamePrefix, filenameSuffix, numShards, shardTemplate, validate,
-          compressionType);
+      return toBuilder().setFilenamePrefix(filenamePrefix).build();
     }
 
     /**
@@ -281,8 +291,7 @@ public class TFRecordIO {
      */
     public Write withSuffix(String nameExtension) {
       validateOutputComponent(nameExtension);
-      return new Write(name, filenamePrefix, nameExtension, numShards, shardTemplate, validate,
-          compressionType);
+      return toBuilder().setFilenameSuffix(nameExtension).build();
     }
 
     /**
@@ -298,8 +307,7 @@ public class TFRecordIO {
      */
     public Write withNumShards(int numShards) {
       checkArgument(numShards >= 0);
-      return new Write(name, filenamePrefix, filenameSuffix, numShards, shardTemplate, validate,
-          compressionType);
+      return toBuilder().setNumShards(numShards).build();
     }
 
     /**
@@ -308,8 +316,7 @@ public class TFRecordIO {
      * @see ShardNameTemplate
      */
     public Write withShardNameTemplate(String shardTemplate) {
-      return new Write(name, filenamePrefix, filenameSuffix, numShards, shardTemplate, validate,
-          compressionType);
+      return toBuilder().setShardTemplate(shardTemplate).build();
     }
 
     /**
@@ -323,8 +330,7 @@ public class TFRecordIO {
      * {@code .withNumShards(1).withShardNameTemplate("")}
      */
     public Write withoutSharding() {
-      return new Write(name, filenamePrefix, filenameSuffix, 1, "",
-          validate, compressionType);
+      return withNumShards(1).withShardNameTemplate("");
     }
 
     /**
@@ -335,8 +341,7 @@ public class TFRecordIO {
      * available at execution time.
      */
     public Write withoutValidation() {
-      return new Write(name, filenamePrefix, filenameSuffix, numShards, shardTemplate, false,
-          compressionType);
+      return toBuilder().setValidate(false).build();
     }
 
     /**
@@ -347,19 +352,22 @@ public class TFRecordIO {
      * See {@link TFRecordIO.Read#withCompressionType} for more details.
      */
     public Write withCompressionType(CompressionType compressionType) {
-      return new Write(name, filenamePrefix, filenameSuffix, numShards, shardTemplate, validate,
-          compressionType);
+      return toBuilder().setCompressionType(compressionType).build();
     }
 
     @Override
     public PDone expand(PCollection<byte[]> input) {
-      if (filenamePrefix == null) {
+      if (getFilenamePrefix() == null) {
         throw new IllegalStateException(
             "need to set the filename prefix of a TFRecordIO.Write transform");
       }
       org.apache.beam.sdk.io.Write<byte[]> write =
           org.apache.beam.sdk.io.Write.to(
-              new TFRecordSink(filenamePrefix, filenameSuffix, shardTemplate, compressionType));
+              new TFRecordSink(
+                  getFilenamePrefix(),
+                  getFilenameSuffix(),
+                  getShardTemplate(),
+                  getCompressionType()));
       if (getNumShards() > 0) {
         write = write.withNumShards(getNumShards());
       }
@@ -370,56 +378,29 @@ public class TFRecordIO {
     public void populateDisplayData(DisplayData.Builder builder) {
       super.populateDisplayData(builder);
 
-      String prefixString = filenamePrefix.isAccessible()
-          ? filenamePrefix.get() : filenamePrefix.toString();
+      String prefixString = getFilenamePrefix().isAccessible()
+          ? getFilenamePrefix().get() : getFilenamePrefix().toString();
       builder
           .addIfNotNull(DisplayData.item("filePrefix", prefixString)
               .withLabel("Output File Prefix"))
-          .addIfNotDefault(DisplayData.item("fileSuffix", filenameSuffix)
+          .addIfNotDefault(DisplayData.item("fileSuffix", getFilenameSuffix())
               .withLabel("Output File Suffix"), "")
-          .addIfNotDefault(DisplayData.item("shardNameTemplate", shardTemplate)
+          .addIfNotDefault(DisplayData.item("shardNameTemplate", getShardTemplate())
                   .withLabel("Output Shard Name Template"),
               DEFAULT_SHARD_TEMPLATE)
-          .addIfNotDefault(DisplayData.item("validation", validate)
+          .addIfNotDefault(DisplayData.item("validation", getValidate())
               .withLabel("Validation Enabled"), true)
-          .addIfNotDefault(DisplayData.item("numShards", numShards)
+          .addIfNotDefault(DisplayData.item("numShards", getNumShards())
               .withLabel("Maximum Output Shards"), 0)
           .add(DisplayData
-              .item("compressionType", compressionType.toString())
+              .item("compressionType", getCompressionType().toString())
               .withLabel("Compression Type"));
     }
 
-    /**
-     * Returns the current shard name template string.
-     */
-    public String getShardNameTemplate() {
-      return shardTemplate;
-    }
-
     @Override
     protected Coder<Void> getDefaultOutputCoder() {
       return VoidCoder.of();
     }
-
-    public String getFilenamePrefix() {
-      return filenamePrefix.get();
-    }
-
-    public String getShardTemplate() {
-      return shardTemplate;
-    }
-
-    public int getNumShards() {
-      return numShards;
-    }
-
-    public String getFilenameSuffix() {
-      return filenameSuffix;
-    }
-
-    public boolean needsValidation() {
-      return validate;
-    }
   }
 
   /**