You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/12/06 16:41:25 UTC
[35/50] [abbrv] incubator-beam git commit: Add TextIO.Write support
for runtime-valued output prefix
Add TextIO.Write support for runtime-valued output prefix
* Updates to TextIO
* Updates for FileBasedSink to support this change
* Updates to other FileBasedSinks that do not yet support
runtime values but need to be aware that values are now
ValueProvider<String> instead of String
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9a038c4f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9a038c4f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9a038c4f
Branch: refs/heads/gearpump-runner
Commit: 9a038c4f3404a3707eca29c5e898014df7fafbf4
Parents: 26eb435
Author: Sam McVeety <sg...@google.com>
Authored: Wed Nov 30 14:06:59 2016 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Fri Dec 2 17:24:12 2016 -0800
----------------------------------------------------------------------
.../org/apache/beam/sdk/io/FileBasedSink.java | 22 +++++++++------
.../java/org/apache/beam/sdk/io/TextIO.java | 28 ++++++++++++++++----
.../java/org/apache/beam/sdk/io/XmlSink.java | 4 +--
.../org/apache/beam/sdk/io/XmlSinkTest.java | 6 ++---
4 files changed, 42 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9a038c4f/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index 5375b90..1396ab6 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -41,6 +41,8 @@ import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.util.IOChannelFactory;
import org.apache.beam.sdk.util.IOChannelUtils;
@@ -135,7 +137,7 @@ public abstract class FileBasedSink<T> extends Sink<T> {
/**
* Base filename for final output files.
*/
- protected final String baseOutputFilename;
+ protected final ValueProvider<String> baseOutputFilename;
/**
* The extension to be used for the final output files.
@@ -162,7 +164,8 @@ public abstract class FileBasedSink<T> extends Sink<T> {
*/
public FileBasedSink(String baseOutputFilename, String extension,
WritableByteChannelFactory writableByteChannelFactory) {
- this(baseOutputFilename, extension, ShardNameTemplate.INDEX_OF_MAX, writableByteChannelFactory);
+ this(StaticValueProvider.of(baseOutputFilename), extension,
+ ShardNameTemplate.INDEX_OF_MAX, writableByteChannelFactory);
}
/**
@@ -173,7 +176,8 @@ public abstract class FileBasedSink<T> extends Sink<T> {
* <p>See {@link ShardNameTemplate} for a description of file naming templates.
*/
public FileBasedSink(String baseOutputFilename, String extension, String fileNamingTemplate) {
- this(baseOutputFilename, extension, fileNamingTemplate, CompressionType.UNCOMPRESSED);
+ this(StaticValueProvider.of(baseOutputFilename), extension, fileNamingTemplate,
+ CompressionType.UNCOMPRESSED);
}
/**
@@ -182,8 +186,8 @@ public abstract class FileBasedSink<T> extends Sink<T> {
*
* <p>See {@link ShardNameTemplate} for a description of file naming templates.
*/
- public FileBasedSink(String baseOutputFilename, String extension, String fileNamingTemplate,
- WritableByteChannelFactory writableByteChannelFactory) {
+ public FileBasedSink(ValueProvider<String> baseOutputFilename, String extension,
+ String fileNamingTemplate, WritableByteChannelFactory writableByteChannelFactory) {
this.writableByteChannelFactory = writableByteChannelFactory;
this.baseOutputFilename = baseOutputFilename;
if (!isNullOrEmpty(writableByteChannelFactory.getFilenameSuffix())) {
@@ -198,7 +202,7 @@ public abstract class FileBasedSink<T> extends Sink<T> {
* Returns the base output filename for this file based sink.
*/
public String getBaseOutputFilename() {
- return baseOutputFilename;
+ return baseOutputFilename.get();
}
@Override
@@ -216,7 +220,9 @@ public abstract class FileBasedSink<T> extends Sink<T> {
super.populateDisplayData(builder);
String fileNamePattern = String.format("%s%s%s",
- baseOutputFilename, fileNamingTemplate, getFileExtension(extension));
+ baseOutputFilename.isAccessible()
+ ? baseOutputFilename.get() : baseOutputFilename.toString(),
+ fileNamingTemplate, getFileExtension(extension));
builder.add(DisplayData.item("fileNamePattern", fileNamePattern)
.withLabel("File Name Pattern"));
}
@@ -420,7 +426,7 @@ public abstract class FileBasedSink<T> extends Sink<T> {
protected final List<String> generateDestinationFilenames(int numFiles) {
List<String> destFilenames = new ArrayList<>();
String extension = getSink().extension;
- String baseOutputFilename = getSink().baseOutputFilename;
+ String baseOutputFilename = getSink().baseOutputFilename.get();
String fileNamingTemplate = getSink().fileNamingTemplate;
String suffix = getFileExtension(extension);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9a038c4f/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index 84c24ea..e967a27 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -404,6 +404,13 @@ public class TextIO {
}
/**
+ * Like {@link #to(String)}, but with a {@link ValueProvider}.
+ */
+ public static Bound<String> to(ValueProvider<String> prefix) {
+ return new Bound<>(DEFAULT_TEXT_CODER).to(prefix);
+ }
+
+ /**
* Returns a transform for writing to text files that appends the specified suffix
* to the created files.
*/
@@ -521,7 +528,7 @@ public class TextIO {
private static final String DEFAULT_SHARD_TEMPLATE = ShardNameTemplate.INDEX_OF_MAX;
/** The prefix of each file written, combined with suffix and shardTemplate. */
- @Nullable private final String filenamePrefix;
+ private final ValueProvider<String> filenamePrefix;
/** The suffix of each file written, combined with prefix and shardTemplate. */
private final String filenameSuffix;
@@ -554,7 +561,7 @@ public class TextIO {
FileBasedSink.CompressionType.UNCOMPRESSED);
}
- private Bound(String name, String filenamePrefix, String filenameSuffix,
+ private Bound(String name, ValueProvider<String> filenamePrefix, String filenameSuffix,
@Nullable String header, @Nullable String footer, Coder<T> coder, int numShards,
String shardTemplate, boolean validate,
WritableByteChannelFactory writableByteChannelFactory) {
@@ -581,6 +588,15 @@ public class TextIO {
*/
public Bound<T> to(String filenamePrefix) {
validateOutputComponent(filenamePrefix);
+ return new Bound<>(name, StaticValueProvider.of(filenamePrefix), filenameSuffix,
+ header, footer, coder, numShards, shardTemplate, validate,
+ writableByteChannelFactory);
+ }
+
+ /**
+ * Like {@link #to(String)}, but with a {@link ValueProvider}.
+ */
+ public Bound<T> to(ValueProvider<String> filenamePrefix) {
return new Bound<>(name, filenamePrefix, filenameSuffix, header, footer, coder, numShards,
shardTemplate, validate, writableByteChannelFactory);
}
@@ -745,8 +761,10 @@ public class TextIO {
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
+ String prefixString = filenamePrefix.isAccessible()
+ ? filenamePrefix.get() : filenamePrefix.toString();
builder
- .addIfNotNull(DisplayData.item("filePrefix", filenamePrefix)
+ .addIfNotNull(DisplayData.item("filePrefix", prefixString)
.withLabel("Output File Prefix"))
.addIfNotDefault(DisplayData.item("fileSuffix", filenameSuffix)
.withLabel("Output Fix Suffix"), "")
@@ -779,7 +797,7 @@ public class TextIO {
}
public String getFilenamePrefix() {
- return filenamePrefix;
+ return filenamePrefix.get();
}
public String getShardTemplate() {
@@ -1101,7 +1119,7 @@ public class TextIO {
@VisibleForTesting
TextSink(
- String baseOutputFilename, String extension,
+ ValueProvider<String> baseOutputFilename, String extension,
@Nullable String header, @Nullable String footer,
String fileNameTemplate, Coder<T> coder,
WritableByteChannelFactory writableByteChannelFactory) {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9a038c4f/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSink.java
index 983eed2..0f25aea 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSink.java
@@ -176,7 +176,7 @@ public class XmlSink {
* <p>The specified class must be able to be used to create a JAXB context.
*/
public <T> Bound<T> ofRecordClass(Class<T> classToBind) {
- return new Bound<>(classToBind, rootElementName, baseOutputFilename);
+ return new Bound<>(classToBind, rootElementName, baseOutputFilename.get());
}
/**
@@ -194,7 +194,7 @@ public class XmlSink {
* supplied name.
*/
public Bound<T> withRootElement(String rootElementName) {
- return new Bound<>(classToBind, rootElementName, baseOutputFilename);
+ return new Bound<>(classToBind, rootElementName, baseOutputFilename.get());
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9a038c4f/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSinkTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSinkTest.java
index 400b04a..f9a9655 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSinkTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSinkTest.java
@@ -93,7 +93,7 @@ public class XmlSinkTest {
.withRootElement(testRootElement);
assertEquals(testClass, sink.classToBind);
assertEquals(testRootElement, sink.rootElementName);
- assertEquals(testFilePrefix, sink.baseOutputFilename);
+ assertEquals(testFilePrefix, sink.baseOutputFilename.get());
}
/**
@@ -105,7 +105,7 @@ public class XmlSinkTest {
XmlSink.writeOf(Bird.class, testRootElement, testFilePrefix);
assertEquals(testClass, sink.classToBind);
assertEquals(testRootElement, sink.rootElementName);
- assertEquals(testFilePrefix, sink.baseOutputFilename);
+ assertEquals(testFilePrefix, sink.baseOutputFilename.get());
}
/**
@@ -142,7 +142,7 @@ public class XmlSinkTest {
XmlSink.writeOf(testClass, testRootElement, testFilePrefix);
XmlWriteOperation<Bird> writeOp = sink.createWriteOperation(options);
assertEquals(testClass, writeOp.getSink().classToBind);
- assertEquals(testFilePrefix, writeOp.getSink().baseOutputFilename);
+ assertEquals(testFilePrefix, writeOp.getSink().baseOutputFilename.get());
assertEquals(testRootElement, writeOp.getSink().rootElementName);
assertEquals(XmlSink.XML_EXTENSION, writeOp.getSink().extension);
Path outputPath = new File(testFilePrefix).toPath();