You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/04/18 21:21:31 UTC
[5/6] beam git commit: Gets rid of TFRecordIO.Write.Bound
Gets rid of TFRecordIO.Write.Bound
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/76de0e46
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/76de0e46
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/76de0e46
Branch: refs/heads/master
Commit: 76de0e4609670070f8f8fe479e4919b538143d15
Parents: f5d92a4
Author: Eugene Kirpichov <ki...@google.com>
Authored: Mon Apr 17 17:38:08 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue Apr 18 14:03:42 2017 -0700
----------------------------------------------------------------------
.../java/org/apache/beam/sdk/io/TFRecordIO.java | 407 +++++++------------
.../org/apache/beam/sdk/io/TFRecordIOTest.java | 6 +-
2 files changed, 154 insertions(+), 259 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/76de0e46/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
index fb4ff5b..13fd4d1 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
@@ -66,6 +66,15 @@ public class TFRecordIO {
.build();
}
+ /**
+ * A {@link PTransform} that writes a {@link PCollection} to TFRecord file (or
+ * multiple TFRecord files matching a sharding pattern), with each
+ * element of the input collection encoded into its own record.
+ */
+ public static Write write() {
+ return new Write();
+ }
+
/** Implementation of {@link #read}. */
@AutoValue
public abstract static class Read extends PTransform<PBegin, PCollection<byte[]>> {
@@ -204,45 +213,80 @@ public class TFRecordIO {
/////////////////////////////////////////////////////////////////////////////
- /**
- * A {@link PTransform} that writes a {@link PCollection} to TFRecord file (or
- * multiple TFRecord files matching a sharding pattern), with each
- * element of the input collection encoded into its own record.
- */
- public static class Write {
+ /** Implementation of {@link #write}. */
+ public static class Write extends PTransform<PCollection<byte[]>, PDone> {
+ private static final String DEFAULT_SHARD_TEMPLATE = ShardNameTemplate.INDEX_OF_MAX;
+
+ /** The prefix of each file written, combined with suffix and shardTemplate. */
+ private final ValueProvider<String> filenamePrefix;
+ /** The suffix of each file written, combined with prefix and shardTemplate. */
+ private final String filenameSuffix;
+
+ /** Requested number of shards. 0 for automatic. */
+ private final int numShards;
+
+ /** The shard template of each file written, combined with prefix and suffix. */
+ private final String shardTemplate;
+
+ /** An option to indicate if output validation is desired. Default is true. */
+ private final boolean validate;
+
+ /** Option to indicate the output sink's compression type. Default is NONE. */
+ private final TFRecordIO.CompressionType compressionType;
+
+ private Write() {
+ this(null, null, "", 0, DEFAULT_SHARD_TEMPLATE, true, TFRecordIO.CompressionType.NONE);
+ }
+
+ private Write(String name, ValueProvider<String> filenamePrefix, String filenameSuffix,
+ int numShards, String shardTemplate, boolean validate,
+ CompressionType compressionType) {
+ super(name);
+ this.filenamePrefix = filenamePrefix;
+ this.filenameSuffix = filenameSuffix;
+ this.numShards = numShards;
+ this.shardTemplate = shardTemplate;
+ this.validate = validate;
+ this.compressionType = compressionType;
+ }
/**
- * Returns a transform for writing to TFRecord files that writes to the file(s)
- * with the given prefix. This can be a local filename
+ * Writes to TFRecord file(s) with the given prefix. This can be a local filename
* (if running locally), or a Google Cloud Storage filename of
* the form {@code "gs://<bucket>/<filepath>"}
* (if running locally or using remote execution).
*
* <p>The files written will begin with this prefix, followed by
- * a shard identifier (see {@link TFRecordIO.Write.Bound#withNumShards(int)}, and end
- * in a common extension, if given by {@link TFRecordIO.Write.Bound#withSuffix(String)}.
+ * a shard identifier (see {@link #withNumShards(int)}, and end
+ * in a common extension, if given by {@link #withSuffix(String)}.
*/
- public static Bound to(String prefix) {
- return new Bound().to(prefix);
+ public Write to(String filenamePrefix) {
+ validateOutputComponent(filenamePrefix);
+ return new Write(name, StaticValueProvider.of(filenamePrefix), filenameSuffix, numShards,
+ shardTemplate, validate, compressionType);
}
/**
* Like {@link #to(String)}, but with a {@link ValueProvider}.
*/
- public static Bound to(ValueProvider<String> prefix) {
- return new Bound().to(prefix);
+ public Write to(ValueProvider<String> filenamePrefix) {
+ return new Write(name, filenamePrefix, filenameSuffix, numShards, shardTemplate, validate,
+ compressionType);
}
/**
- * Returns a transform for writing to TFRecord files that appends the specified suffix
- * to the created files.
+ * Writes to the file(s) with the given filename suffix.
+ *
+ * @see ShardNameTemplate
*/
- public static Bound withSuffix(String nameExtension) {
- return new Bound().withSuffix(nameExtension);
+ public Write withSuffix(String nameExtension) {
+ validateOutputComponent(nameExtension);
+ return new Write(name, filenamePrefix, nameExtension, numShards, shardTemplate, validate,
+ compressionType);
}
/**
- * Returns a transform for writing to TFRecord files that uses the provided shard count.
+ * Writes to the provided number of shards.
*
* <p>Constraining the number of shards is likely to reduce
* the performance of a pipeline. Setting this value is not recommended
@@ -250,280 +294,131 @@ public class TFRecordIO {
*
* @param numShards the number of shards to use, or 0 to let the system
* decide.
+ * @see ShardNameTemplate
*/
- public static Bound withNumShards(int numShards) {
- return new Bound().withNumShards(numShards);
+ public Write withNumShards(int numShards) {
+ checkArgument(numShards >= 0);
+ return new Write(name, filenamePrefix, filenameSuffix, numShards, shardTemplate, validate,
+ compressionType);
}
/**
- * Returns a transform for writing to TFRecord files that uses the given shard name
- * template.
+ * Uses the given shard name template.
*
- * <p>See {@link ShardNameTemplate} for a description of shard templates.
+ * @see ShardNameTemplate
*/
- public static Bound withShardNameTemplate(String shardTemplate) {
- return new Bound().withShardNameTemplate(shardTemplate);
+ public Write withShardNameTemplate(String shardTemplate) {
+ return new Write(name, filenamePrefix, filenameSuffix, numShards, shardTemplate, validate,
+ compressionType);
}
/**
- * Returns a transform for writing to TFRecord files that forces a single file as
- * output.
+ * Forces a single file as output.
+ *
+ * <p>Constraining the number of shards is likely to reduce
+ * the performance of a pipeline. Using this setting is not recommended
+ * unless you truly require a single output file.
+ *
+ * <p>This is a shortcut for
+ * {@code .withNumShards(1).withShardNameTemplate("")}
*/
- public static Bound withoutSharding() {
- return new Bound().withoutSharding();
+ public Write withoutSharding() {
+ return new Write(name, filenamePrefix, filenameSuffix, 1, "",
+ validate, compressionType);
}
/**
- * Returns a transform for writing to text files that has GCS path validation on
- * pipeline creation disabled.
+ * Disables GCS output path validation on pipeline creation.
*
* <p>This can be useful in the case where the GCS output location does
- * not exist at the pipeline creation time, but is expected to be available
- * at execution time.
+ * not exist at the pipeline creation time, but is expected to be
+ * available at execution time.
*/
- public static Bound withoutValidation() {
- return new Bound().withoutValidation();
+ public Write withoutValidation() {
+ return new Write(name, filenamePrefix, filenameSuffix, numShards, shardTemplate, false,
+ compressionType);
}
/**
- * Returns a transform for writing to TFRecord files like this one but writes to output files
- * using the specified compression type.
+ * Writes to output files using the specified compression type.
*
* <p>If no compression type is specified, the default is
* {@link TFRecordIO.CompressionType#NONE}.
* See {@link TFRecordIO.Read#withCompressionType} for more details.
*/
- public static Bound withCompressionType(CompressionType compressionType) {
- return new Bound().withCompressionType(compressionType);
+ public Write withCompressionType(CompressionType compressionType) {
+ return new Write(name, filenamePrefix, filenameSuffix, numShards, shardTemplate, validate,
+ compressionType);
}
- /**
- * A PTransform that writes a bounded PCollection to a TFRecord file (or
- * multiple TFRecord files matching a sharding pattern), with each
- * PCollection element being encoded into its own record.
- */
- public static class Bound extends PTransform<PCollection<byte[]>, PDone> {
- private static final String DEFAULT_SHARD_TEMPLATE = ShardNameTemplate.INDEX_OF_MAX;
-
- /** The prefix of each file written, combined with suffix and shardTemplate. */
- private final ValueProvider<String> filenamePrefix;
- /** The suffix of each file written, combined with prefix and shardTemplate. */
- private final String filenameSuffix;
-
- /** Requested number of shards. 0 for automatic. */
- private final int numShards;
-
- /** The shard template of each file written, combined with prefix and suffix. */
- private final String shardTemplate;
-
- /** An option to indicate if output validation is desired. Default is true. */
- private final boolean validate;
-
- /** Option to indicate the output sink's compression type. Default is NONE. */
- private final TFRecordIO.CompressionType compressionType;
-
- private Bound() {
- this(null, null, "", 0, DEFAULT_SHARD_TEMPLATE, true, TFRecordIO.CompressionType.NONE);
- }
-
- private Bound(String name, ValueProvider<String> filenamePrefix, String filenameSuffix,
- int numShards, String shardTemplate, boolean validate,
- CompressionType compressionType) {
- super(name);
- this.filenamePrefix = filenamePrefix;
- this.filenameSuffix = filenameSuffix;
- this.numShards = numShards;
- this.shardTemplate = shardTemplate;
- this.validate = validate;
- this.compressionType = compressionType;
- }
-
- /**
- * Returns a transform for writing to TFRecord files that's like this one but
- * that writes to the file(s) with the given filename prefix.
- *
- * <p>See {@link TFRecordIO.Write#to(String) Write.to(String)} for more information.
- *
- * <p>Does not modify this object.
- */
- public Bound to(String filenamePrefix) {
- validateOutputComponent(filenamePrefix);
- return new Bound(name, StaticValueProvider.of(filenamePrefix), filenameSuffix, numShards,
- shardTemplate, validate, compressionType);
- }
-
- /**
- * Like {@link #to(String)}, but with a {@link ValueProvider}.
- */
- public Bound to(ValueProvider<String> filenamePrefix) {
- return new Bound(name, filenamePrefix, filenameSuffix, numShards, shardTemplate, validate,
- compressionType);
- }
-
- /**
- * Returns a transform for writing to TFRecord files that that's like this one but
- * that writes to the file(s) with the given filename suffix.
- *
- * <p>Does not modify this object.
- *
- * @see ShardNameTemplate
- */
- public Bound withSuffix(String nameExtension) {
- validateOutputComponent(nameExtension);
- return new Bound(name, filenamePrefix, nameExtension, numShards, shardTemplate, validate,
- compressionType);
- }
-
- /**
- * Returns a transform for writing to TFRecord files that's like this one but
- * that uses the provided shard count.
- *
- * <p>Constraining the number of shards is likely to reduce
- * the performance of a pipeline. Setting this value is not recommended
- * unless you require a specific number of output files.
- *
- * <p>Does not modify this object.
- *
- * @param numShards the number of shards to use, or 0 to let the system
- * decide.
- * @see ShardNameTemplate
- */
- public Bound withNumShards(int numShards) {
- checkArgument(numShards >= 0);
- return new Bound(name, filenamePrefix, filenameSuffix, numShards, shardTemplate, validate,
- compressionType);
- }
-
- /**
- * Returns a transform for writing to TFRecord files that's like this one but
- * that uses the given shard name template.
- *
- * <p>Does not modify this object.
- *
- * @see ShardNameTemplate
- */
- public Bound withShardNameTemplate(String shardTemplate) {
- return new Bound(name, filenamePrefix, filenameSuffix, numShards, shardTemplate, validate,
- compressionType);
- }
-
- /**
- * Returns a transform for writing to TFRecord files that's like this one but
- * that forces a single file as output.
- *
- * <p>Constraining the number of shards is likely to reduce
- * the performance of a pipeline. Using this setting is not recommended
- * unless you truly require a single output file.
- *
- * <p>This is a shortcut for
- * {@code .withNumShards(1).withShardNameTemplate("")}
- *
- * <p>Does not modify this object.
- */
- public Bound withoutSharding() {
- return new Bound(name, filenamePrefix, filenameSuffix, 1, "",
- validate, compressionType);
- }
-
- /**
- * Returns a transform for writing to TFRecord files that's like this one but
- * that has GCS output path validation on pipeline creation disabled.
- *
- * <p>This can be useful in the case where the GCS output location does
- * not exist at the pipeline creation time, but is expected to be
- * available at execution time.
- *
- * <p>Does not modify this object.
- */
- public Bound withoutValidation() {
- return new Bound(name, filenamePrefix, filenameSuffix, numShards, shardTemplate, false,
- compressionType);
+ @Override
+ public PDone expand(PCollection<byte[]> input) {
+ if (filenamePrefix == null) {
+ throw new IllegalStateException(
+ "need to set the filename prefix of a TFRecordIO.Write transform");
}
-
- /**
- * Returns a transform for writing to TFRecord files like this one but writes to output files
- * using the specified compression type.
- *
- * <p>If no compression type is specified, the default is
- * {@link TFRecordIO.CompressionType#NONE}.
- * See {@link TFRecordIO.Read#withCompressionType} for more details.
- *
- * <p>Does not modify this object.
- */
- public Bound withCompressionType(CompressionType compressionType) {
- return new Bound(name, filenamePrefix, filenameSuffix, numShards, shardTemplate, validate,
- compressionType);
+ org.apache.beam.sdk.io.Write<byte[]> write =
+ org.apache.beam.sdk.io.Write.to(
+ new TFRecordSink(filenamePrefix, filenameSuffix, shardTemplate, compressionType));
+ if (getNumShards() > 0) {
+ write = write.withNumShards(getNumShards());
}
+ return input.apply("Write", write);
+ }
- @Override
- public PDone expand(PCollection<byte[]> input) {
- if (filenamePrefix == null) {
- throw new IllegalStateException(
- "need to set the filename prefix of a TFRecordIO.Write transform");
- }
- org.apache.beam.sdk.io.Write<byte[]> write =
- org.apache.beam.sdk.io.Write.to(
- new TFRecordSink(filenamePrefix, filenameSuffix, shardTemplate, compressionType));
- if (getNumShards() > 0) {
- write = write.withNumShards(getNumShards());
- }
- return input.apply("Write", write);
- }
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ super.populateDisplayData(builder);
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- super.populateDisplayData(builder);
-
- String prefixString = filenamePrefix.isAccessible()
- ? filenamePrefix.get() : filenamePrefix.toString();
- builder
- .addIfNotNull(DisplayData.item("filePrefix", prefixString)
- .withLabel("Output File Prefix"))
- .addIfNotDefault(DisplayData.item("fileSuffix", filenameSuffix)
- .withLabel("Output File Suffix"), "")
- .addIfNotDefault(DisplayData.item("shardNameTemplate", shardTemplate)
- .withLabel("Output Shard Name Template"),
- DEFAULT_SHARD_TEMPLATE)
- .addIfNotDefault(DisplayData.item("validation", validate)
- .withLabel("Validation Enabled"), true)
- .addIfNotDefault(DisplayData.item("numShards", numShards)
- .withLabel("Maximum Output Shards"), 0)
- .add(DisplayData
- .item("compressionType", compressionType.toString())
- .withLabel("Compression Type"));
- }
+ String prefixString = filenamePrefix.isAccessible()
+ ? filenamePrefix.get() : filenamePrefix.toString();
+ builder
+ .addIfNotNull(DisplayData.item("filePrefix", prefixString)
+ .withLabel("Output File Prefix"))
+ .addIfNotDefault(DisplayData.item("fileSuffix", filenameSuffix)
+ .withLabel("Output File Suffix"), "")
+ .addIfNotDefault(DisplayData.item("shardNameTemplate", shardTemplate)
+ .withLabel("Output Shard Name Template"),
+ DEFAULT_SHARD_TEMPLATE)
+ .addIfNotDefault(DisplayData.item("validation", validate)
+ .withLabel("Validation Enabled"), true)
+ .addIfNotDefault(DisplayData.item("numShards", numShards)
+ .withLabel("Maximum Output Shards"), 0)
+ .add(DisplayData
+ .item("compressionType", compressionType.toString())
+ .withLabel("Compression Type"));
+ }
- /**
- * Returns the current shard name template string.
- */
- public String getShardNameTemplate() {
- return shardTemplate;
- }
+ /**
+ * Returns the current shard name template string.
+ */
+ public String getShardNameTemplate() {
+ return shardTemplate;
+ }
- @Override
- protected Coder<Void> getDefaultOutputCoder() {
- return VoidCoder.of();
- }
+ @Override
+ protected Coder<Void> getDefaultOutputCoder() {
+ return VoidCoder.of();
+ }
- public String getFilenamePrefix() {
- return filenamePrefix.get();
- }
+ public String getFilenamePrefix() {
+ return filenamePrefix.get();
+ }
- public String getShardTemplate() {
- return shardTemplate;
- }
+ public String getShardTemplate() {
+ return shardTemplate;
+ }
- public int getNumShards() {
- return numShards;
- }
+ public int getNumShards() {
+ return numShards;
+ }
- public String getFilenameSuffix() {
- return filenameSuffix;
- }
+ public String getFilenameSuffix() {
+ return filenameSuffix;
+ }
- public boolean needsValidation() {
- return validate;
- }
+ public boolean needsValidation() {
+ return validate;
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/76de0e46/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TFRecordIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TFRecordIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TFRecordIOTest.java
index 2a455d1..9511c2a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TFRecordIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TFRecordIOTest.java
@@ -158,7 +158,7 @@ public class TFRecordIOTest {
@Test
public void testWriteDisplayData() {
- TFRecordIO.Write.Bound write = TFRecordIO.Write
+ TFRecordIO.Write write = TFRecordIO.write()
.to("foo")
.withSuffix("bar")
.withShardNameTemplate("-SS-of-NN-")
@@ -255,7 +255,7 @@ public class TFRecordIOTest {
PCollection<byte[]> input = p.apply(Create.of(Arrays.asList(elems)))
.apply(ParDo.of(new StringToByteArray()));
- TFRecordIO.Write.Bound write = TFRecordIO.Write.to(filename).withoutSharding();
+ TFRecordIO.Write write = TFRecordIO.write().to(filename).withoutSharding();
input.apply(write);
p.run();
@@ -329,7 +329,7 @@ public class TFRecordIOTest {
Path baseDir = Files.createTempDirectory(tempFolder, "test-rt");
String baseFilename = baseDir.resolve(outputName).toString();
- TFRecordIO.Write.Bound write = TFRecordIO.Write.to(baseFilename)
+ TFRecordIO.Write write = TFRecordIO.write().to(baseFilename)
.withNumShards(numShards)
.withSuffix(suffix)
.withCompressionType(writeCompressionType);