You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2016/09/08 17:19:07 UTC

incubator-beam git commit: Address comments during backport Dataflow PR/423 for Apache Beam PR/918

Repository: incubator-beam
Updated Branches:
  refs/heads/master fb322cc73 -> 8b39a2ea3


Address comments during backport Dataflow PR/423 for Apache Beam PR/918


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8b39a2ea
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8b39a2ea
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8b39a2ea

Branch: refs/heads/master
Commit: 8b39a2ea3e2a70b9c07c91c375f79b9f966ab288
Parents: fb322cc
Author: Luke Cwik <lc...@google.com>
Authored: Thu Sep 8 09:34:43 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Thu Sep 8 10:18:49 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/io/TextIO.java     | 71 ++++++++++----------
 .../java/org/apache/beam/sdk/io/TextIOTest.java | 13 ++--
 2 files changed, 40 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8b39a2ea/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index c0761b1..3345cd3 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -450,25 +450,25 @@ public class TextIO {
 
     /**
      * Returns a transform for writing to text files that adds a header string to the files
-     * it writes.
+     * it writes. Note that a newline character will be added after the header.
      *
      * <p>A {@code null} value will clear any previously configured header.
      *
      * @param header the string to be added as file header
      */
-    public static Bound<String> withHeader(String header) {
+    public static Bound<String> withHeader(@Nullable String header) {
       return new Bound<>(DEFAULT_TEXT_CODER).withHeader(header);
     }
 
     /**
      * Returns a transform for writing to text files that adds a footer string to the files
-     * it writes.
+     * it writes. Note that a newline character will be added after the header.
      *
      * <p>A {@code null} value will clear any previously configured footer.
      *
      * @param footer the string to be added as file footer
      */
-    public static Bound<String> withFooter(String footer) {
+    public static Bound<String> withFooter(@Nullable String footer) {
       return new Bound<>(DEFAULT_TEXT_CODER).withFooter(footer);
     }
 
@@ -490,10 +490,10 @@ public class TextIO {
       private final String filenameSuffix;
 
       /** An optional header to add to each file. */
-      private final String header;
+      @Nullable private final String header;
 
       /** An optional footer to add to each file. */
-      private final String footer;
+      @Nullable private final String footer;
 
       /** The Coder to use to decode each line. */
       private final Coder<T> coder;
@@ -634,7 +634,7 @@ public class TextIO {
 
       /**
        * Returns a transform for writing to text files that adds a header string to the files
-       * it writes.
+       * it writes. Note that a newline character will be added after the header.
        *
        * <p>A {@code null} value will clear any previously configured header.
        *
@@ -642,14 +642,14 @@ public class TextIO {
        *
        * @param header the string to be added as file header
        */
-      public Bound<T> withHeader(String header) {
+      public Bound<T> withHeader(@Nullable String header) {
         return new Bound<>(name, filenamePrefix, filenameSuffix, header, footer, coder, numShards,
             shardTemplate, false);
       }
 
       /**
        * Returns a transform for writing to text files that adds a footer string to the files
-       * it writes.
+       * it writes. Note that a newline character will be added after the header.
        *
        * <p>A {@code null} value will clear any previously configured footer.
        *
@@ -657,7 +657,7 @@ public class TextIO {
        *
        * @param footer the string to be added as file footer
        */
-      public Bound<T> withFooter(String footer) {
+      public Bound<T> withFooter(@Nullable String footer) {
         return new Bound<>(name, filenamePrefix, filenameSuffix, header, footer, coder, numShards,
             shardTemplate, false);
       }
@@ -733,10 +733,12 @@ public class TextIO {
         return coder;
       }
 
+      @Nullable
       public String getHeader() {
         return header;
       }
 
+      @Nullable
       public String getFooter() {
         return footer;
       }
@@ -805,7 +807,7 @@ public class TextIO {
   private TextIO() {}
 
   /**
-   * A {@link FileBasedSource} which can decode records delimited by new line characters.
+   * A {@link FileBasedSource} which can decode records delimited by newline characters.
    *
    * <p>This source splits the data into records using {@code UTF-8} {@code \n}, {@code \r}, or
    * {@code \r\n} as the delimiter. This source is not strict and supports decoding the last record
@@ -853,7 +855,7 @@ public class TextIO {
 
     /**
      * A {@link org.apache.beam.sdk.io.FileBasedSource.FileBasedReader FileBasedReader}
-     * which can decode records delimited by new line characters.
+     * which can decode records delimited by newline characters.
      *
      * See {@link TextSource} for further details.
      */
@@ -1016,19 +1018,20 @@ public class TextIO {
   }
 
   /**
-   * A {@link FileBasedSink} for text files. Produces text files with the new line separator
+   * A {@link FileBasedSink} for text files. Produces text files with the newline separator
    * {@code '\n'} represented in {@code UTF-8} format as the record separator.
    * Each record (including the last) is terminated.
    */
   @VisibleForTesting
   static class TextSink<T> extends FileBasedSink<T> {
     private final Coder<T> coder;
-    private final String header;
-    private final String footer;
+    @Nullable private final String header;
+    @Nullable private final String footer;
 
     @VisibleForTesting
     TextSink(
-        String baseOutputFilename, String extension, String header, String footer,
+        String baseOutputFilename, String extension,
+        @Nullable String header, @Nullable String footer,
         String fileNameTemplate, Coder<T> coder) {
       super(baseOutputFilename, extension, fileNameTemplate);
       this.coder = coder;
@@ -1047,10 +1050,11 @@ public class TextIO {
      */
     private static class TextWriteOperation<T> extends FileBasedWriteOperation<T> {
       private final Coder<T> coder;
-      private final String header;
-      private final String footer;
+      @Nullable private final String header;
+      @Nullable private final String footer;
 
-      private TextWriteOperation(TextSink<T> sink, Coder<T> coder, String header, String footer) {
+      private TextWriteOperation(TextSink<T> sink, Coder<T> coder,
+          @Nullable String header, @Nullable String footer) {
         super(sink);
         this.coder = coder;
         this.header = header;
@@ -1070,20 +1074,12 @@ public class TextIO {
     private static class TextWriter<T> extends FileBasedWriter<T> {
       private static final byte[] NEWLINE = "\n".getBytes(StandardCharsets.UTF_8);
       private final Coder<T> coder;
-      private final String header;
-      private final String footer;
+      @Nullable private final String header;
+      @Nullable private final String footer;
       private OutputStream out;
 
-      public TextWriter(FileBasedWriteOperation<T> writeOperation, Coder<T> coder) {
-        this(writeOperation, coder, null, null);
-      }
-
-      public TextWriter(FileBasedWriteOperation<T> writeOperation, Coder<T> coder, String header) {
-        this(writeOperation, coder, header, null);
-      }
-
-      public TextWriter(FileBasedWriteOperation<T> writeOperation, Coder<T> coder, String header,
-                        String footer) {
+      public TextWriter(FileBasedWriteOperation<T> writeOperation, Coder<T> coder,
+          @Nullable String header, @Nullable String footer) {
         super(writeOperation);
         this.header = header;
         this.footer = footer;
@@ -1091,9 +1087,12 @@ public class TextIO {
         this.coder = coder;
       }
 
-      private void writeLine(String line) throws IOException {
-        if (line != null) {
-          out.write(line.getBytes(StandardCharsets.UTF_8));
+      /**
+       * Writes {@code value} followed by a newline if {@code value} is not null.
+       */
+      private void writeIfNotNull(@Nullable String value) throws IOException {
+        if (value != null) {
+          out.write(value.getBytes(StandardCharsets.UTF_8));
           out.write(NEWLINE);
         }
       }
@@ -1105,12 +1104,12 @@ public class TextIO {
 
       @Override
       protected void writeHeader() throws Exception {
-        writeLine(header);
+        writeIfNotNull(header);
       }
 
       @Override
       protected void writeFooter() throws Exception {
-        writeLine(footer);
+        writeIfNotNull(footer);
       }
 
       @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8b39a2ea/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
index c60b735..859602a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
@@ -39,7 +39,6 @@ import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
-
 import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileOutputStream;
@@ -54,7 +53,6 @@ import java.nio.file.Files;
 import java.nio.file.StandardOpenOption;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Set;
 import java.util.zip.GZIPOutputStream;
@@ -299,7 +297,7 @@ public class TextIOTest {
 
     for (File tmpFile : expectedFiles) {
       try (BufferedReader reader = new BufferedReader(new FileReader(tmpFile))) {
-        List<String> currentFile = Lists.newArrayList();
+        List<String> currentFile = new ArrayList<>();
         for (;;) {
           String line = reader.readLine();
           if (line == null) {
@@ -311,8 +309,7 @@ public class TextIOTest {
       }
     }
 
-    LinkedList<String> expectedElements = Lists.newLinkedList();
-
+    List<String> expectedElements = new ArrayList<>(elems.length);
     for (int i = 0; i < elems.length; i++) {
       T elem = elems[i];
       byte[] encodedElem = CoderUtils.encodeToByteArray(coder, elem);
@@ -320,7 +317,7 @@ public class TextIOTest {
       expectedElements.add(line);
     }
 
-    ArrayList<String> actualElements =
+    List<String> actualElements =
         Lists.newArrayList(
             Iterables.concat(
                 FluentIterable
@@ -942,8 +939,8 @@ public class TextIOTest {
   @Test
   public void testProgressAfterSplitting() throws IOException {
     String file = "line1\nline2\nline3";
-    BoundedSource source = prepareSource(file.getBytes());
-    BoundedSource remainder;
+    BoundedSource<String> source = prepareSource(file.getBytes());
+    BoundedSource<String> remainder;
 
     // Create the remainder, verifying properties pre- and post-splitting.
     try (BoundedReader<String> readerOrig = source.createReader(PipelineOptionsFactory.create())) {