You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/04/18 21:21:28 UTC
[2/6] beam git commit: Converts TFRecordIO.Write to AutoValue
Converts TFRecordIO.Write to AutoValue
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b77b6fbd
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b77b6fbd
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b77b6fbd
Branch: refs/heads/master
Commit: b77b6fbd5654420ee3f1cf415ab9ee19d277ff0c
Parents: 76de0e4
Author: Eugene Kirpichov <ki...@google.com>
Authored: Mon Apr 17 17:50:24 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue Apr 18 14:03:42 2017 -0700
----------------------------------------------------------------------
.../java/org/apache/beam/sdk/io/TFRecordIO.java | 127 ++++++++-----------
1 file changed, 54 insertions(+), 73 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/b77b6fbd/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
index 13fd4d1..2e01a80 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
@@ -72,7 +72,13 @@ public class TFRecordIO {
* element of the input collection encoded into its own record.
*/
public static Write write() {
- return new Write();
+ return new AutoValue_TFRecordIO_Write.Builder()
+ .setFilenameSuffix("")
+ .setNumShards(0)
+ .setValidate(true)
+ .setShardTemplate(Write.DEFAULT_SHARD_TEMPLATE)
+ .setCompressionType(CompressionType.NONE)
+ .build();
}
/** Implementation of {@link #read}. */
@@ -214,40 +220,46 @@ public class TFRecordIO {
/////////////////////////////////////////////////////////////////////////////
/** Implementation of {@link #write}. */
- public static class Write extends PTransform<PCollection<byte[]>, PDone> {
+ @AutoValue
+ public abstract static class Write extends PTransform<PCollection<byte[]>, PDone> {
private static final String DEFAULT_SHARD_TEMPLATE = ShardNameTemplate.INDEX_OF_MAX;
/** The prefix of each file written, combined with suffix and shardTemplate. */
- private final ValueProvider<String> filenamePrefix;
+ @Nullable
+ abstract ValueProvider<String> getFilenamePrefix();
+
/** The suffix of each file written, combined with prefix and shardTemplate. */
- private final String filenameSuffix;
+ abstract String getFilenameSuffix();
/** Requested number of shards. 0 for automatic. */
- private final int numShards;
+ abstract int getNumShards();
/** The shard template of each file written, combined with prefix and suffix. */
- private final String shardTemplate;
+ abstract String getShardTemplate();
/** An option to indicate if output validation is desired. Default is true. */
- private final boolean validate;
+ abstract boolean getValidate();
/** Option to indicate the output sink's compression type. Default is NONE. */
- private final TFRecordIO.CompressionType compressionType;
+ abstract CompressionType getCompressionType();
- private Write() {
- this(null, null, "", 0, DEFAULT_SHARD_TEMPLATE, true, TFRecordIO.CompressionType.NONE);
- }
+ abstract Builder toBuilder();
+
+ @AutoValue.Builder
+ abstract static class Builder {
+ abstract Builder setFilenamePrefix(ValueProvider<String> filenamePrefix);
+
+ abstract Builder setFilenameSuffix(String filenameSuffix);
+
+ abstract Builder setNumShards(int numShards);
+
+ abstract Builder setShardTemplate(String shardTemplate);
+
+ abstract Builder setValidate(boolean validate);
+
+ abstract Builder setCompressionType(CompressionType compressionType);
- private Write(String name, ValueProvider<String> filenamePrefix, String filenameSuffix,
- int numShards, String shardTemplate, boolean validate,
- CompressionType compressionType) {
- super(name);
- this.filenamePrefix = filenamePrefix;
- this.filenameSuffix = filenameSuffix;
- this.numShards = numShards;
- this.shardTemplate = shardTemplate;
- this.validate = validate;
- this.compressionType = compressionType;
+ abstract Write build();
}
/**
@@ -262,16 +274,14 @@ public class TFRecordIO {
*/
public Write to(String filenamePrefix) {
validateOutputComponent(filenamePrefix);
- return new Write(name, StaticValueProvider.of(filenamePrefix), filenameSuffix, numShards,
- shardTemplate, validate, compressionType);
+ return to(StaticValueProvider.of(filenamePrefix));
}
/**
* Like {@link #to(String)}, but with a {@link ValueProvider}.
*/
public Write to(ValueProvider<String> filenamePrefix) {
- return new Write(name, filenamePrefix, filenameSuffix, numShards, shardTemplate, validate,
- compressionType);
+ return toBuilder().setFilenamePrefix(filenamePrefix).build();
}
/**
@@ -281,8 +291,7 @@ public class TFRecordIO {
*/
public Write withSuffix(String nameExtension) {
validateOutputComponent(nameExtension);
- return new Write(name, filenamePrefix, nameExtension, numShards, shardTemplate, validate,
- compressionType);
+ return toBuilder().setFilenameSuffix(nameExtension).build();
}
/**
@@ -298,8 +307,7 @@ public class TFRecordIO {
*/
public Write withNumShards(int numShards) {
checkArgument(numShards >= 0);
- return new Write(name, filenamePrefix, filenameSuffix, numShards, shardTemplate, validate,
- compressionType);
+ return toBuilder().setNumShards(numShards).build();
}
/**
@@ -308,8 +316,7 @@ public class TFRecordIO {
* @see ShardNameTemplate
*/
public Write withShardNameTemplate(String shardTemplate) {
- return new Write(name, filenamePrefix, filenameSuffix, numShards, shardTemplate, validate,
- compressionType);
+ return toBuilder().setShardTemplate(shardTemplate).build();
}
/**
@@ -323,8 +330,7 @@ public class TFRecordIO {
* {@code .withNumShards(1).withShardNameTemplate("")}
*/
public Write withoutSharding() {
- return new Write(name, filenamePrefix, filenameSuffix, 1, "",
- validate, compressionType);
+ return withNumShards(1).withShardNameTemplate("");
}
/**
@@ -335,8 +341,7 @@ public class TFRecordIO {
* available at execution time.
*/
public Write withoutValidation() {
- return new Write(name, filenamePrefix, filenameSuffix, numShards, shardTemplate, false,
- compressionType);
+ return toBuilder().setValidate(false).build();
}
/**
@@ -347,19 +352,22 @@ public class TFRecordIO {
* See {@link TFRecordIO.Read#withCompressionType} for more details.
*/
public Write withCompressionType(CompressionType compressionType) {
- return new Write(name, filenamePrefix, filenameSuffix, numShards, shardTemplate, validate,
- compressionType);
+ return toBuilder().setCompressionType(compressionType).build();
}
@Override
public PDone expand(PCollection<byte[]> input) {
- if (filenamePrefix == null) {
+ if (getFilenamePrefix() == null) {
throw new IllegalStateException(
"need to set the filename prefix of a TFRecordIO.Write transform");
}
org.apache.beam.sdk.io.Write<byte[]> write =
org.apache.beam.sdk.io.Write.to(
- new TFRecordSink(filenamePrefix, filenameSuffix, shardTemplate, compressionType));
+ new TFRecordSink(
+ getFilenamePrefix(),
+ getFilenameSuffix(),
+ getShardTemplate(),
+ getCompressionType()));
if (getNumShards() > 0) {
write = write.withNumShards(getNumShards());
}
@@ -370,56 +378,29 @@ public class TFRecordIO {
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
- String prefixString = filenamePrefix.isAccessible()
- ? filenamePrefix.get() : filenamePrefix.toString();
+ String prefixString = getFilenamePrefix().isAccessible()
+ ? getFilenamePrefix().get() : getFilenamePrefix().toString();
builder
.addIfNotNull(DisplayData.item("filePrefix", prefixString)
.withLabel("Output File Prefix"))
- .addIfNotDefault(DisplayData.item("fileSuffix", filenameSuffix)
+ .addIfNotDefault(DisplayData.item("fileSuffix", getFilenameSuffix())
.withLabel("Output File Suffix"), "")
- .addIfNotDefault(DisplayData.item("shardNameTemplate", shardTemplate)
+ .addIfNotDefault(DisplayData.item("shardNameTemplate", getShardTemplate())
.withLabel("Output Shard Name Template"),
DEFAULT_SHARD_TEMPLATE)
- .addIfNotDefault(DisplayData.item("validation", validate)
+ .addIfNotDefault(DisplayData.item("validation", getValidate())
.withLabel("Validation Enabled"), true)
- .addIfNotDefault(DisplayData.item("numShards", numShards)
+ .addIfNotDefault(DisplayData.item("numShards", getNumShards())
.withLabel("Maximum Output Shards"), 0)
.add(DisplayData
- .item("compressionType", compressionType.toString())
+ .item("compressionType", getCompressionType().toString())
.withLabel("Compression Type"));
}
- /**
- * Returns the current shard name template string.
- */
- public String getShardNameTemplate() {
- return shardTemplate;
- }
-
@Override
protected Coder<Void> getDefaultOutputCoder() {
return VoidCoder.of();
}
-
- public String getFilenamePrefix() {
- return filenamePrefix.get();
- }
-
- public String getShardTemplate() {
- return shardTemplate;
- }
-
- public int getNumShards() {
- return numShards;
- }
-
- public String getFilenameSuffix() {
- return filenameSuffix;
- }
-
- public boolean needsValidation() {
- return validate;
- }
}
/**