You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/12/06 16:41:25 UTC

[35/50] [abbrv] incubator-beam git commit: Add TextIO.Write support for runtime-valued output prefix

Add TextIO.Write support for runtime-valued output prefix

* Updates to TextIO
* Updates for FileBasedSink to support this change
* Updates to other FileBasedSinks that do not yet support
  runtime values but need to be aware that values are now
  ValueProvider<String> instead of String


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9a038c4f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9a038c4f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9a038c4f

Branch: refs/heads/gearpump-runner
Commit: 9a038c4f3404a3707eca29c5e898014df7fafbf4
Parents: 26eb435
Author: Sam McVeety <sg...@google.com>
Authored: Wed Nov 30 14:06:59 2016 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Fri Dec 2 17:24:12 2016 -0800

----------------------------------------------------------------------
 .../org/apache/beam/sdk/io/FileBasedSink.java   | 22 +++++++++------
 .../java/org/apache/beam/sdk/io/TextIO.java     | 28 ++++++++++++++++----
 .../java/org/apache/beam/sdk/io/XmlSink.java    |  4 +--
 .../org/apache/beam/sdk/io/XmlSinkTest.java     |  6 ++---
 4 files changed, 42 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9a038c4f/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index 5375b90..1396ab6 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -41,6 +41,8 @@ import javax.annotation.Nullable;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.SerializableCoder;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.util.IOChannelFactory;
 import org.apache.beam.sdk.util.IOChannelUtils;
@@ -135,7 +137,7 @@ public abstract class FileBasedSink<T> extends Sink<T> {
   /**
    * Base filename for final output files.
    */
-  protected final String baseOutputFilename;
+  protected final ValueProvider<String> baseOutputFilename;
 
   /**
    * The extension to be used for the final output files.
@@ -162,7 +164,8 @@ public abstract class FileBasedSink<T> extends Sink<T> {
    */
   public FileBasedSink(String baseOutputFilename, String extension,
       WritableByteChannelFactory writableByteChannelFactory) {
-    this(baseOutputFilename, extension, ShardNameTemplate.INDEX_OF_MAX, writableByteChannelFactory);
+    this(StaticValueProvider.of(baseOutputFilename), extension,
+        ShardNameTemplate.INDEX_OF_MAX, writableByteChannelFactory);
   }
 
   /**
@@ -173,7 +176,8 @@ public abstract class FileBasedSink<T> extends Sink<T> {
    * <p>See {@link ShardNameTemplate} for a description of file naming templates.
    */
   public FileBasedSink(String baseOutputFilename, String extension, String fileNamingTemplate) {
-    this(baseOutputFilename, extension, fileNamingTemplate, CompressionType.UNCOMPRESSED);
+    this(StaticValueProvider.of(baseOutputFilename), extension, fileNamingTemplate,
+        CompressionType.UNCOMPRESSED);
   }
 
   /**
@@ -182,8 +186,8 @@ public abstract class FileBasedSink<T> extends Sink<T> {
    *
    * <p>See {@link ShardNameTemplate} for a description of file naming templates.
    */
-  public FileBasedSink(String baseOutputFilename, String extension, String fileNamingTemplate,
-      WritableByteChannelFactory writableByteChannelFactory) {
+  public FileBasedSink(ValueProvider<String> baseOutputFilename, String extension,
+      String fileNamingTemplate, WritableByteChannelFactory writableByteChannelFactory) {
     this.writableByteChannelFactory = writableByteChannelFactory;
     this.baseOutputFilename = baseOutputFilename;
     if (!isNullOrEmpty(writableByteChannelFactory.getFilenameSuffix())) {
@@ -198,7 +202,7 @@ public abstract class FileBasedSink<T> extends Sink<T> {
    * Returns the base output filename for this file based sink.
    */
   public String getBaseOutputFilename() {
-    return baseOutputFilename;
+    return baseOutputFilename.get();
   }
 
   @Override
@@ -216,7 +220,9 @@ public abstract class FileBasedSink<T> extends Sink<T> {
     super.populateDisplayData(builder);
 
     String fileNamePattern = String.format("%s%s%s",
-        baseOutputFilename, fileNamingTemplate, getFileExtension(extension));
+        baseOutputFilename.isAccessible()
+        ? baseOutputFilename.get() : baseOutputFilename.toString(),
+        fileNamingTemplate, getFileExtension(extension));
     builder.add(DisplayData.item("fileNamePattern", fileNamePattern)
       .withLabel("File Name Pattern"));
   }
@@ -420,7 +426,7 @@ public abstract class FileBasedSink<T> extends Sink<T> {
     protected final List<String> generateDestinationFilenames(int numFiles) {
       List<String> destFilenames = new ArrayList<>();
       String extension = getSink().extension;
-      String baseOutputFilename = getSink().baseOutputFilename;
+      String baseOutputFilename = getSink().baseOutputFilename.get();
       String fileNamingTemplate = getSink().fileNamingTemplate;
 
       String suffix = getFileExtension(extension);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9a038c4f/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index 84c24ea..e967a27 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -404,6 +404,13 @@ public class TextIO {
     }
 
     /**
+     * Like {@link #to(String)}, but with a {@link ValueProvider}.
+     */
+    public static Bound<String> to(ValueProvider<String> prefix) {
+      return new Bound<>(DEFAULT_TEXT_CODER).to(prefix);
+    }
+
+    /**
      * Returns a transform for writing to text files that appends the specified suffix
      * to the created files.
      */
@@ -521,7 +528,7 @@ public class TextIO {
       private static final String DEFAULT_SHARD_TEMPLATE = ShardNameTemplate.INDEX_OF_MAX;
 
       /** The prefix of each file written, combined with suffix and shardTemplate. */
-      @Nullable private final String filenamePrefix;
+      private final ValueProvider<String> filenamePrefix;
       /** The suffix of each file written, combined with prefix and shardTemplate. */
       private final String filenameSuffix;
 
@@ -554,7 +561,7 @@ public class TextIO {
             FileBasedSink.CompressionType.UNCOMPRESSED);
       }
 
-      private Bound(String name, String filenamePrefix, String filenameSuffix,
+      private Bound(String name, ValueProvider<String> filenamePrefix, String filenameSuffix,
           @Nullable String header, @Nullable String footer, Coder<T> coder, int numShards,
           String shardTemplate, boolean validate,
           WritableByteChannelFactory writableByteChannelFactory) {
@@ -581,6 +588,15 @@ public class TextIO {
        */
       public Bound<T> to(String filenamePrefix) {
         validateOutputComponent(filenamePrefix);
+        return new Bound<>(name, StaticValueProvider.of(filenamePrefix), filenameSuffix,
+            header, footer, coder, numShards, shardTemplate, validate,
+            writableByteChannelFactory);
+      }
+
+      /**
+       * Like {@link #to(String)}, but with a {@link ValueProvider}.
+       */
+      public Bound<T> to(ValueProvider<String> filenamePrefix) {
         return new Bound<>(name, filenamePrefix, filenameSuffix, header, footer, coder, numShards,
             shardTemplate, validate, writableByteChannelFactory);
       }
@@ -745,8 +761,10 @@ public class TextIO {
       public void populateDisplayData(DisplayData.Builder builder) {
         super.populateDisplayData(builder);
 
+        String prefixString = filenamePrefix.isAccessible()
+            ? filenamePrefix.get() : filenamePrefix.toString();
         builder
-            .addIfNotNull(DisplayData.item("filePrefix", filenamePrefix)
+            .addIfNotNull(DisplayData.item("filePrefix", prefixString)
               .withLabel("Output File Prefix"))
             .addIfNotDefault(DisplayData.item("fileSuffix", filenameSuffix)
               .withLabel("Output Fix Suffix"), "")
@@ -779,7 +797,7 @@ public class TextIO {
       }
 
       public String getFilenamePrefix() {
-        return filenamePrefix;
+        return filenamePrefix.get();
       }
 
       public String getShardTemplate() {
@@ -1101,7 +1119,7 @@ public class TextIO {
 
     @VisibleForTesting
     TextSink(
-        String baseOutputFilename, String extension,
+        ValueProvider<String> baseOutputFilename, String extension,
         @Nullable String header, @Nullable String footer,
         String fileNameTemplate, Coder<T> coder,
         WritableByteChannelFactory writableByteChannelFactory) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9a038c4f/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSink.java
index 983eed2..0f25aea 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSink.java
@@ -176,7 +176,7 @@ public class XmlSink {
      * <p>The specified class must be able to be used to create a JAXB context.
      */
     public <T> Bound<T> ofRecordClass(Class<T> classToBind) {
-      return new Bound<>(classToBind, rootElementName, baseOutputFilename);
+      return new Bound<>(classToBind, rootElementName, baseOutputFilename.get());
     }
 
     /**
@@ -194,7 +194,7 @@ public class XmlSink {
      * supplied name.
      */
     public Bound<T> withRootElement(String rootElementName) {
-      return new Bound<>(classToBind, rootElementName, baseOutputFilename);
+      return new Bound<>(classToBind, rootElementName, baseOutputFilename.get());
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9a038c4f/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSinkTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSinkTest.java
index 400b04a..f9a9655 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSinkTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSinkTest.java
@@ -93,7 +93,7 @@ public class XmlSinkTest {
             .withRootElement(testRootElement);
     assertEquals(testClass, sink.classToBind);
     assertEquals(testRootElement, sink.rootElementName);
-    assertEquals(testFilePrefix, sink.baseOutputFilename);
+    assertEquals(testFilePrefix, sink.baseOutputFilename.get());
   }
 
   /**
@@ -105,7 +105,7 @@ public class XmlSinkTest {
         XmlSink.writeOf(Bird.class, testRootElement, testFilePrefix);
     assertEquals(testClass, sink.classToBind);
     assertEquals(testRootElement, sink.rootElementName);
-    assertEquals(testFilePrefix, sink.baseOutputFilename);
+    assertEquals(testFilePrefix, sink.baseOutputFilename.get());
   }
 
   /**
@@ -142,7 +142,7 @@ public class XmlSinkTest {
         XmlSink.writeOf(testClass, testRootElement, testFilePrefix);
     XmlWriteOperation<Bird> writeOp = sink.createWriteOperation(options);
     assertEquals(testClass, writeOp.getSink().classToBind);
-    assertEquals(testFilePrefix, writeOp.getSink().baseOutputFilename);
+    assertEquals(testFilePrefix, writeOp.getSink().baseOutputFilename.get());
     assertEquals(testRootElement, writeOp.getSink().rootElementName);
     assertEquals(XmlSink.XML_EXTENSION, writeOp.getSink().extension);
     Path outputPath = new File(testFilePrefix).toPath();