You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2016/09/07 22:39:37 UTC

[1/8] incubator-beam git commit: Revised according to comments following a code review.

Repository: incubator-beam
Updated Branches:
  refs/heads/master 26635d7fb -> f33296c7f


Revised according to comments following a code review.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/af7437d1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/af7437d1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/af7437d1

Branch: refs/heads/master
Commit: af7437d1c8270c5b6f175b3d5ae90160bf6477b4
Parents: 4691231
Author: Stas Levin <st...@gmail.com>
Authored: Tue Sep 6 23:22:11 2016 +0300
Committer: Luke Cwik <lc...@google.com>
Committed: Wed Sep 7 15:35:30 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/io/TextIO.java     | 72 +++++++++++--------
 .../java/org/apache/beam/sdk/io/TextIOTest.java | 73 +++++++++++++++-----
 2 files changed, 98 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/af7437d1/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index c754a0b..eefa867 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -22,6 +22,7 @@ import static com.google.common.base.Preconditions.checkState;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.protobuf.ByteString;
+
 import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
@@ -35,12 +36,14 @@ import java.util.regex.Pattern;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.Coder.Context;
+import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.Read.Bounded;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.util.IOChannelUtils;
 import org.apache.beam.sdk.util.MimeTypes;
 import org.apache.beam.sdk.values.PBegin;
@@ -474,10 +477,10 @@ public class TextIO {
       private final String filenameSuffix;
 
       /** An optional header to add to each file. */
-      private final String header;
+      private final T header;
 
       /** An optional footer to add to each file. */
-      private final String footer;
+      private final T footer;
 
       /** The Coder to use to decode each line. */
       private final Coder<T> coder;
@@ -495,8 +498,8 @@ public class TextIO {
         this(null, null, "", null, null, coder, 0, DEFAULT_SHARD_TEMPLATE, true);
       }
 
-      private Bound(String name, String filenamePrefix, String filenameSuffix, String header,
-                    String footer, Coder<T> coder, int numShards, String shardTemplate,
+      private Bound(String name, String filenamePrefix, String filenameSuffix, T header,
+                    T footer, Coder<T> coder, int numShards, String shardTemplate,
                     boolean validate) {
         super(name);
         this.header = header;
@@ -509,6 +512,14 @@ public class TextIO {
         this.validate = validate;
       }
 
+      private String asString(T obj, Coder<T> coder) {
+        try {
+          return obj == null ? "" : new String(CoderUtils.encodeToByteArray(coder, obj));
+        } catch (CoderException e) {
+          throw new RuntimeException(e);
+        }
+      }
+
       /**
        * Returns a transform for writing to text files that's like this one but
        * that writes to the file(s) with the given filename prefix.
@@ -594,9 +605,8 @@ public class TextIO {
        * the elements of the input {@link PCollection PCollection<X>} into an
        * output text line. Does not modify this object.
        *
-       * @param <X> the type of the elements of the input {@link PCollection}
        */
-      public <X> Bound<X> withCoder(Coder<X> coder) {
+      public Bound<?> withCoder(Coder<? super T> coder) {
         return new Bound<>(name, filenamePrefix, filenameSuffix, header, footer, coder, numShards,
             shardTemplate, validate);
       }
@@ -616,12 +626,12 @@ public class TextIO {
             shardTemplate, false);
       }
 
-      public Bound<T> withHeader(String header) {
+      public Bound<T> withHeader(T header) {
         return new Bound<>(name, filenamePrefix, filenameSuffix, header, footer, coder, numShards,
             shardTemplate, false);
       }
 
-      public Bound<T> withFooter(String footer) {
+      public Bound<T> withFooter(T footer) {
         return new Bound<>(name, filenamePrefix, filenameSuffix, header, footer, coder, numShards,
             shardTemplate, false);
       }
@@ -659,10 +669,10 @@ public class TextIO {
               .withLabel("Validation Enabled"), true)
             .addIfNotDefault(DisplayData.item("numShards", numShards)
               .withLabel("Maximum Output Shards"), 0)
-            .addIfNotNull(DisplayData.item("fileHeader", header)
-              .withLabel("Output file header"))
-            .addIfNotNull(DisplayData.item("fileFooter", footer)
-                .withLabel("Output file footer"));
+            .addIfNotNull(DisplayData.item("fileHeader", asString(header, coder))
+              .withLabel("File Header"))
+            .addIfNotNull(DisplayData.item("fileFooter", asString(footer, coder))
+                .withLabel("File Footer"));
       }
 
       /**
@@ -697,11 +707,11 @@ public class TextIO {
         return coder;
       }
 
-      public String getHeader() {
+      public T getHeader() {
         return header;
       }
 
-      public String getFooter() {
+      public T getFooter() {
         return footer;
       }
 
@@ -987,17 +997,21 @@ public class TextIO {
   @VisibleForTesting
   static class TextSink<T> extends FileBasedSink<T> {
     private final Coder<T> coder;
-    private final String header;
-    private final String footer;
+    private final byte[] header;
+    private final byte[] footer;
 
     @VisibleForTesting
     TextSink(
-        String baseOutputFilename, String extension, String header, String footer,
+        String baseOutputFilename, String extension, T header, T footer,
         String fileNameTemplate, Coder<T> coder) {
       super(baseOutputFilename, extension, fileNameTemplate);
       this.coder = coder;
-      this.header = header;
-      this.footer = footer;
+      try {
+        this.header = header == null ? null : CoderUtils.encodeToByteArray(coder, header);
+        this.footer = footer == null ? null : CoderUtils.encodeToByteArray(coder, footer);
+      } catch (CoderException e) {
+        throw new RuntimeException(e);
+      }
     }
 
     @Override
@@ -1011,10 +1025,10 @@ public class TextIO {
      */
     private static class TextWriteOperation<T> extends FileBasedWriteOperation<T> {
       private final Coder<T> coder;
-      private final String header;
-      private final String footer;
+      private final byte[] header;
+      private final byte[] footer;
 
-      private TextWriteOperation(TextSink<T> sink, Coder<T> coder, String header, String footer) {
+      private TextWriteOperation(TextSink<T> sink, Coder<T> coder, byte[] header, byte[] footer) {
         super(sink);
         this.coder = coder;
         this.header = header;
@@ -1034,20 +1048,20 @@ public class TextIO {
     private static class TextWriter<T> extends FileBasedWriter<T> {
       private static final byte[] NEWLINE = "\n".getBytes(StandardCharsets.UTF_8);
       private final Coder<T> coder;
-      private final String header;
-      private final String footer;
+      private final byte[] header;
+      private final byte[] footer;
       private OutputStream out;
 
       public TextWriter(FileBasedWriteOperation<T> writeOperation, Coder<T> coder) {
         this(writeOperation, coder, null, null);
       }
 
-      public TextWriter(FileBasedWriteOperation<T> writeOperation, Coder<T> coder, String header) {
+      public TextWriter(FileBasedWriteOperation<T> writeOperation, Coder<T> coder, byte[] header) {
         this(writeOperation, coder, header, null);
       }
 
-      public TextWriter(FileBasedWriteOperation<T> writeOperation, Coder<T> coder, String header,
-                        String footer) {
+      public TextWriter(FileBasedWriteOperation<T> writeOperation, Coder<T> coder, byte[] header,
+                        byte[] footer) {
         super(writeOperation);
         this.header = header;
         this.footer = footer;
@@ -1055,9 +1069,9 @@ public class TextIO {
         this.coder = coder;
       }
 
-      private void writeLine(String line) throws IOException {
+      private void writeLine(byte[] line) throws IOException {
         if (line != null) {
-          out.write(line.getBytes(StandardCharsets.UTF_8));
+          out.write(line);
           out.write(NEWLINE);
         }
       }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/af7437d1/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
index 2ab2683..7028761 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
@@ -33,7 +33,11 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
+import com.google.common.base.Function;
+import com.google.common.base.Predicate;
+import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 
 import java.io.BufferedReader;
@@ -229,11 +233,11 @@ public class TextIOTest {
     runTestWrite(elems, null, null, coder, numShards);
   }
 
-  <T> void runTestWrite(T[] elems, Coder<T> coder, String header, String footer) throws Exception {
+  <T> void runTestWrite(T[] elems, Coder<T> coder, T header, T footer) throws Exception {
     runTestWrite(elems, header, footer, coder, 1);
   }
 
-  <T> void runTestWrite(T[] elems, String header, String footer, Coder<T> coder, int numShards)
+  <T> void runTestWrite(T[] elems, T header, T footer, Coder<T> coder, int numShards)
       throws Exception {
     String outputName = "file.txt";
     String baseFilename = tmpFolder.newFile(outputName).getPath();
@@ -248,7 +252,7 @@ public class TextIOTest {
       // T==String
       write = (TextIO.Write.Bound<T>) writeStrings;
     } else {
-      write = TextIO.Write.to(baseFilename).withCoder(coder);
+      write = TextIO.Write.withCoder(coder).to(baseFilename);
     }
     write = write.withHeader(header).withFooter(footer);
 
@@ -267,9 +271,9 @@ public class TextIOTest {
 
   public static <T> void assertOutputFiles(
       T[] elems,
-      String header,
-      String footer,
-      Coder<T> coder,
+      final T header,
+      final T footer,
+      final Coder<T> coder,
       int numShards,
       TemporaryFolder rootLocation,
       String outputName,
@@ -291,36 +295,69 @@ public class TextIOTest {
       }
     }
 
-    List<String> actual = new ArrayList<>();
+    List<List<String>> actual = new ArrayList<>();
+
     for (File tmpFile : expectedFiles) {
       try (BufferedReader reader = new BufferedReader(new FileReader(tmpFile))) {
+        List<String> currentFile = Lists.newArrayList();
         for (;;) {
           String line = reader.readLine();
           if (line == null) {
             break;
           }
-          actual.add(line);
+          currentFile.add(line);
         }
+        actual.add(currentFile);
       }
     }
 
-    LinkedList<String> expected = Lists.newLinkedList();
+    LinkedList<String> expectedElements = Lists.newLinkedList();
 
     for (int i = 0; i < elems.length; i++) {
       T elem = elems[i];
       byte[] encodedElem = CoderUtils.encodeToByteArray(coder, elem);
       String line = new String(encodedElem);
-      expected.add(line);
+      expectedElements.add(line);
     }
 
-    if (header != null) {
-      expected.addFirst(header);
-    }
-    if (footer != null) {
-      expected.addLast(footer);
-    }
-
-    assertThat(actual, containsInAnyOrder(expected.toArray()));
+    final String headerString =
+        header == null ? null : new String(CoderUtils.encodeToByteArray(coder, header));
+
+    final String footerString =
+        footer == null ? null : new String(CoderUtils.encodeToByteArray(coder, footer));
+
+    ArrayList<String> actualElements =
+        Lists.newArrayList(
+            Iterables.concat(
+                FluentIterable
+                    .from(actual)
+                    .transform(new Function<List<String>, List<String>>() {
+                      @Nullable
+                      @Override
+                      public List<String> apply(List<String> lines) {
+                        ArrayList<String> newLines = Lists.newArrayList(lines);
+                        if (headerString != null) {
+                          newLines.remove(0);
+                        }
+                        if (footerString != null) {
+                          int last = newLines.size() - 1;
+                          newLines.remove(last);
+                        }
+                        return newLines;
+                      }
+                    })
+                    .toList()));
+
+    assertThat(actualElements, containsInAnyOrder(expectedElements.toArray()));
+
+    assertTrue(Iterables.all(actual, new Predicate<List<String>>() {
+      @Override
+      public boolean apply(@Nullable List<String> fileLines) {
+        int last = fileLines.size() - 1;
+        return (headerString == null || fileLines.get(0).equals(headerString))
+              && (footerString == null || fileLines.get(last).equals(footerString));
+      }
+    }));
   }
 
   @Test


[4/8] incubator-beam git commit: Reverted header and footer to be of type String.

Posted by lc...@apache.org.
Reverted header and footer to be of type String.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0e8ff41e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0e8ff41e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0e8ff41e

Branch: refs/heads/master
Commit: 0e8ff41e34c9bc332148487c7c52ed99051b5ff7
Parents: af7437d
Author: Stas Levin <st...@gmail.com>
Authored: Wed Sep 7 09:57:17 2016 +0300
Committer: Luke Cwik <lc...@google.com>
Committed: Wed Sep 7 15:35:31 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/io/TextIO.java     | 68 ++++++++------------
 .../java/org/apache/beam/sdk/io/TextIOTest.java | 68 +++++++++++---------
 2 files changed, 63 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0e8ff41e/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index eefa867..0895123 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -22,7 +22,6 @@ import static com.google.common.base.Preconditions.checkState;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.protobuf.ByteString;
-
 import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
@@ -36,14 +35,12 @@ import java.util.regex.Pattern;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.Coder.Context;
-import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.Read.Bounded;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.util.IOChannelUtils;
 import org.apache.beam.sdk.util.MimeTypes;
 import org.apache.beam.sdk.values.PBegin;
@@ -477,10 +474,10 @@ public class TextIO {
       private final String filenameSuffix;
 
       /** An optional header to add to each file. */
-      private final T header;
+      private final String header;
 
       /** An optional footer to add to each file. */
-      private final T footer;
+      private final String footer;
 
       /** The Coder to use to decode each line. */
       private final Coder<T> coder;
@@ -498,8 +495,8 @@ public class TextIO {
         this(null, null, "", null, null, coder, 0, DEFAULT_SHARD_TEMPLATE, true);
       }
 
-      private Bound(String name, String filenamePrefix, String filenameSuffix, T header,
-                    T footer, Coder<T> coder, int numShards, String shardTemplate,
+      private Bound(String name, String filenamePrefix, String filenameSuffix, String header,
+                    String footer, Coder<T> coder, int numShards, String shardTemplate,
                     boolean validate) {
         super(name);
         this.header = header;
@@ -512,14 +509,6 @@ public class TextIO {
         this.validate = validate;
       }
 
-      private String asString(T obj, Coder<T> coder) {
-        try {
-          return obj == null ? "" : new String(CoderUtils.encodeToByteArray(coder, obj));
-        } catch (CoderException e) {
-          throw new RuntimeException(e);
-        }
-      }
-
       /**
        * Returns a transform for writing to text files that's like this one but
        * that writes to the file(s) with the given filename prefix.
@@ -605,8 +594,9 @@ public class TextIO {
        * the elements of the input {@link PCollection PCollection<X>} into an
        * output text line. Does not modify this object.
        *
+       * @param <X> the type of the elements of the input {@link PCollection}
        */
-      public Bound<?> withCoder(Coder<? super T> coder) {
+      public <X> Bound<X> withCoder(Coder<X> coder) {
         return new Bound<>(name, filenamePrefix, filenameSuffix, header, footer, coder, numShards,
             shardTemplate, validate);
       }
@@ -626,12 +616,12 @@ public class TextIO {
             shardTemplate, false);
       }
 
-      public Bound<T> withHeader(T header) {
+      public Bound<T> withHeader(String header) {
         return new Bound<>(name, filenamePrefix, filenameSuffix, header, footer, coder, numShards,
             shardTemplate, false);
       }
 
-      public Bound<T> withFooter(T footer) {
+      public Bound<T> withFooter(String footer) {
         return new Bound<>(name, filenamePrefix, filenameSuffix, header, footer, coder, numShards,
             shardTemplate, false);
       }
@@ -669,9 +659,9 @@ public class TextIO {
               .withLabel("Validation Enabled"), true)
             .addIfNotDefault(DisplayData.item("numShards", numShards)
               .withLabel("Maximum Output Shards"), 0)
-            .addIfNotNull(DisplayData.item("fileHeader", asString(header, coder))
+            .addIfNotNull(DisplayData.item("fileHeader", header)
               .withLabel("File Header"))
-            .addIfNotNull(DisplayData.item("fileFooter", asString(footer, coder))
+            .addIfNotNull(DisplayData.item("fileFooter", footer)
                 .withLabel("File Footer"));
       }
 
@@ -707,11 +697,11 @@ public class TextIO {
         return coder;
       }
 
-      public T getHeader() {
+      public String getHeader() {
         return header;
       }
 
-      public T getFooter() {
+      public String getFooter() {
         return footer;
       }
 
@@ -997,21 +987,17 @@ public class TextIO {
   @VisibleForTesting
   static class TextSink<T> extends FileBasedSink<T> {
     private final Coder<T> coder;
-    private final byte[] header;
-    private final byte[] footer;
+    private final String header;
+    private final String footer;
 
     @VisibleForTesting
     TextSink(
-        String baseOutputFilename, String extension, T header, T footer,
+        String baseOutputFilename, String extension, String header, String footer,
         String fileNameTemplate, Coder<T> coder) {
       super(baseOutputFilename, extension, fileNameTemplate);
       this.coder = coder;
-      try {
-        this.header = header == null ? null : CoderUtils.encodeToByteArray(coder, header);
-        this.footer = footer == null ? null : CoderUtils.encodeToByteArray(coder, footer);
-      } catch (CoderException e) {
-        throw new RuntimeException(e);
-      }
+      this.header = header;
+      this.footer = footer;
     }
 
     @Override
@@ -1025,10 +1011,10 @@ public class TextIO {
      */
     private static class TextWriteOperation<T> extends FileBasedWriteOperation<T> {
       private final Coder<T> coder;
-      private final byte[] header;
-      private final byte[] footer;
+      private final String header;
+      private final String footer;
 
-      private TextWriteOperation(TextSink<T> sink, Coder<T> coder, byte[] header, byte[] footer) {
+      private TextWriteOperation(TextSink<T> sink, Coder<T> coder, String header, String footer) {
         super(sink);
         this.coder = coder;
         this.header = header;
@@ -1048,20 +1034,20 @@ public class TextIO {
     private static class TextWriter<T> extends FileBasedWriter<T> {
       private static final byte[] NEWLINE = "\n".getBytes(StandardCharsets.UTF_8);
       private final Coder<T> coder;
-      private final byte[] header;
-      private final byte[] footer;
+      private final String header;
+      private final String footer;
       private OutputStream out;
 
       public TextWriter(FileBasedWriteOperation<T> writeOperation, Coder<T> coder) {
         this(writeOperation, coder, null, null);
       }
 
-      public TextWriter(FileBasedWriteOperation<T> writeOperation, Coder<T> coder, byte[] header) {
+      public TextWriter(FileBasedWriteOperation<T> writeOperation, Coder<T> coder, String header) {
         this(writeOperation, coder, header, null);
       }
 
-      public TextWriter(FileBasedWriteOperation<T> writeOperation, Coder<T> coder, byte[] header,
-                        byte[] footer) {
+      public TextWriter(FileBasedWriteOperation<T> writeOperation, Coder<T> coder, String header,
+                        String footer) {
         super(writeOperation);
         this.header = header;
         this.footer = footer;
@@ -1069,9 +1055,9 @@ public class TextIO {
         this.coder = coder;
       }
 
-      private void writeLine(byte[] line) throws IOException {
+      private void writeLine(String line) throws IOException {
         if (line != null) {
-          out.write(line);
+          out.write(line.getBytes(StandardCharsets.UTF_8));
           out.write(NEWLINE);
         }
       }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0e8ff41e/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
index 7028761..c60b735 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
@@ -233,11 +233,11 @@ public class TextIOTest {
     runTestWrite(elems, null, null, coder, numShards);
   }
 
-  <T> void runTestWrite(T[] elems, Coder<T> coder, T header, T footer) throws Exception {
+  <T> void runTestWrite(T[] elems, Coder<T> coder, String header, String footer) throws Exception {
     runTestWrite(elems, header, footer, coder, 1);
   }
 
-  <T> void runTestWrite(T[] elems, T header, T footer, Coder<T> coder, int numShards)
+  <T> void runTestWrite(T[] elems, String header, String footer, Coder<T> coder, int numShards)
       throws Exception {
     String outputName = "file.txt";
     String baseFilename = tmpFolder.newFile(outputName).getPath();
@@ -252,7 +252,7 @@ public class TextIOTest {
       // T==String
       write = (TextIO.Write.Bound<T>) writeStrings;
     } else {
-      write = TextIO.Write.withCoder(coder).to(baseFilename);
+      write = TextIO.Write.to(baseFilename).withCoder(coder);
     }
     write = write.withHeader(header).withFooter(footer);
 
@@ -271,9 +271,9 @@ public class TextIOTest {
 
   public static <T> void assertOutputFiles(
       T[] elems,
-      final T header,
-      final T footer,
-      final Coder<T> coder,
+      final String header,
+      final String footer,
+      Coder<T> coder,
       int numShards,
       TemporaryFolder rootLocation,
       String outputName,
@@ -320,44 +320,48 @@ public class TextIOTest {
       expectedElements.add(line);
     }
 
-    final String headerString =
-        header == null ? null : new String(CoderUtils.encodeToByteArray(coder, header));
-
-    final String footerString =
-        footer == null ? null : new String(CoderUtils.encodeToByteArray(coder, footer));
-
     ArrayList<String> actualElements =
         Lists.newArrayList(
             Iterables.concat(
                 FluentIterable
                     .from(actual)
-                    .transform(new Function<List<String>, List<String>>() {
-                      @Nullable
-                      @Override
-                      public List<String> apply(List<String> lines) {
-                        ArrayList<String> newLines = Lists.newArrayList(lines);
-                        if (headerString != null) {
-                          newLines.remove(0);
-                        }
-                        if (footerString != null) {
-                          int last = newLines.size() - 1;
-                          newLines.remove(last);
-                        }
-                        return newLines;
-                      }
-                    })
+                    .transform(removeHeaderAndFooter(header, footer))
                     .toList()));
 
     assertThat(actualElements, containsInAnyOrder(expectedElements.toArray()));
 
-    assertTrue(Iterables.all(actual, new Predicate<List<String>>() {
+    assertTrue(Iterables.all(actual, haveProperHeaderAndFooter(header, footer)));
+  }
+
+  private static Function<List<String>, List<String>> removeHeaderAndFooter(final String header,
+                                                                            final String footer) {
+    return new Function<List<String>, List<String>>() {
+      @Nullable
+      @Override
+      public List<String> apply(List<String> lines) {
+        ArrayList<String> newLines = Lists.newArrayList(lines);
+        if (header != null) {
+          newLines.remove(0);
+        }
+        if (footer != null) {
+          int last = newLines.size() - 1;
+          newLines.remove(last);
+        }
+        return newLines;
+      }
+    };
+  }
+
+  private static Predicate<List<String>> haveProperHeaderAndFooter(final String header,
+                                                                   final String footer) {
+    return new Predicate<List<String>>() {
       @Override
-      public boolean apply(@Nullable List<String> fileLines) {
+      public boolean apply(List<String> fileLines) {
         int last = fileLines.size() - 1;
-        return (headerString == null || fileLines.get(0).equals(headerString))
-              && (footerString == null || fileLines.get(last).equals(footerString));
+        return (header == null || fileLines.get(0).equals(header))
+            && (footer == null || fileLines.get(last).equals(footer));
       }
-    }));
+    };
   }
 
   @Test


[7/8] incubator-beam git commit: !fixup Minor javadoc clean-up

Posted by lc...@apache.org.
!fixup Minor javadoc clean-up


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ce1294dd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ce1294dd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ce1294dd

Branch: refs/heads/master
Commit: ce1294dd5dfad80b7a154bb1ef52836b7770eb32
Parents: f21875e
Author: Luke Cwik <lc...@google.com>
Authored: Wed Sep 7 15:38:16 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Wed Sep 7 15:38:16 2016 -0700

----------------------------------------------------------------------
 .../src/main/java/org/apache/beam/sdk/io/TextIO.java    | 12 ++++++++----
 1 file changed, 8 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ce1294dd/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index e4fcc32..c0761b1 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -452,7 +452,7 @@ public class TextIO {
      * Returns a transform for writing to text files that adds a header string to the files
      * it writes.
      *
-     *<p> A <code>null</code> value will clear any previously configured header.</p>
+     * <p>A {@code null} value will clear any previously configured header.
      *
      * @param header the string to be added as file header
      */
@@ -464,7 +464,7 @@ public class TextIO {
      * Returns a transform for writing to text files that adds a footer string to the files
      * it writes.
      *
-     *<p> A <code>null</code> value will clear any previously configured footer.</p>
+     * <p>A {@code null} value will clear any previously configured footer.
      *
      * @param footer the string to be added as file footer
      */
@@ -636,7 +636,9 @@ public class TextIO {
        * Returns a transform for writing to text files that adds a header string to the files
        * it writes.
        *
-       *<p> A <code>null</code> value will clear any previously configured header.</p>
+       * <p>A {@code null} value will clear any previously configured header.
+       *
+       * <p>Does not modify this object.
        *
        * @param header the string to be added as file header
        */
@@ -649,7 +651,9 @@ public class TextIO {
        * Returns a transform for writing to text files that adds a footer string to the files
        * it writes.
        *
-       *<p> A <code>null</code> value will clear any previously configured footer.</p>
+       * <p>A {@code null} value will clear any previously configured footer.
+       *
+       * <p>Does not modify this object.
        *
        * @param footer the string to be added as file footer
        */


[3/8] incubator-beam git commit: Added even more javadoc to TextIO#withHeader and TextIO#withFooter (2).

Posted by lc...@apache.org.
Added even more javadoc to TextIO#withHeader and TextIO#withFooter (2).


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f21875e2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f21875e2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f21875e2

Branch: refs/heads/master
Commit: f21875e2fe85572e83c933ba409626d509209119
Parents: b1176de
Author: Stas Levin <st...@gmail.com>
Authored: Thu Sep 8 00:02:52 2016 +0300
Committer: Luke Cwik <lc...@google.com>
Committed: Wed Sep 7 15:35:31 2016 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/beam/sdk/io/TextIO.java    | 16 ++++++++++++++++
 1 file changed, 16 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f21875e2/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index f09ab6c..e4fcc32 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -632,11 +632,27 @@ public class TextIO {
             shardTemplate, false);
       }
 
+      /**
+       * Returns a transform for writing to text files that adds a header string to the files
+       * it writes.
+       *
+       *<p> A <code>null</code> value will clear any previously configured header.</p>
+       *
+       * @param header the string to be added as file header
+       */
       public Bound<T> withHeader(String header) {
         return new Bound<>(name, filenamePrefix, filenameSuffix, header, footer, coder, numShards,
             shardTemplate, false);
       }
 
+      /**
+       * Returns a transform for writing to text files that adds a footer string to the files
+       * it writes.
+       *
+       *<p> A <code>null</code> value will clear any previously configured footer.</p>
+       *
+       * @param footer the string to be added as file footer
+       */
       public Bound<T> withFooter(String footer) {
         return new Bound<>(name, filenamePrefix, filenameSuffix, header, footer, coder, numShards,
             shardTemplate, false);


[6/8] incubator-beam git commit: Added even more javadoc to TextIO#withHeader and TextIO#withFooter.

Posted by lc...@apache.org.
Added even more javadoc to TextIO#withHeader and TextIO#withFooter.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b1176de9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b1176de9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b1176de9

Branch: refs/heads/master
Commit: b1176de93b0e5dcff17901f8b5049e677213d62e
Parents: 94c58b6
Author: Stas Levin <st...@gmail.com>
Authored: Wed Sep 7 18:38:28 2016 +0300
Committer: Luke Cwik <lc...@google.com>
Committed: Wed Sep 7 15:35:31 2016 -0700

----------------------------------------------------------------------
 sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java | 4 ++++
 1 file changed, 4 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b1176de9/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index e75aaa9..f09ab6c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -452,6 +452,8 @@ public class TextIO {
      * Returns a transform for writing to text files that adds a header string to the files
      * it writes.
      *
+     *<p> A <code>null</code> value will clear any previously configured header.</p>
+     *
      * @param header the string to be added as file header
      */
     public static Bound<String> withHeader(String header) {
@@ -462,6 +464,8 @@ public class TextIO {
      * Returns a transform for writing to text files that adds a footer string to the files
      * it writes.
      *
+     *<p> A <code>null</code> value will clear any previously configured footer.</p>
+     *
      * @param footer the string to be added as file footer
      */
     public static Bound<String> withFooter(String footer) {


[2/8] incubator-beam git commit: Add header/footer support to TextIO.Write

Posted by lc...@apache.org.
Add header/footer support to TextIO.Write


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4691231b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4691231b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4691231b

Branch: refs/heads/master
Commit: 4691231b036950c5ca9c5c78b512933e61fae076
Parents: 26635d7
Author: Stas Levin <st...@gmail.com>
Authored: Mon Sep 5 20:26:12 2016 +0300
Committer: Luke Cwik <lc...@google.com>
Committed: Wed Sep 7 15:35:30 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/io/TextIO.java     | 114 ++++++++++++++++---
 .../java/org/apache/beam/sdk/io/TextIOTest.java |  60 +++++++++-
 2 files changed, 152 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4691231b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index 242470b..c754a0b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -448,7 +448,15 @@ public class TextIO {
       return new Bound<>(DEFAULT_TEXT_CODER).withoutValidation();
     }
 
-    // TODO: appendingNewlines, header, footer, etc.
+    public static Bound<String> withHeader(String header) {
+      return new Bound<>(DEFAULT_TEXT_CODER).withHeader(header);
+    }
+
+    public static Bound<String> withFooter(String footer) {
+      return new Bound<>(DEFAULT_TEXT_CODER).withFooter(footer);
+    }
+
+    // TODO: appendingNewlines, etc.
 
     /**
      * A PTransform that writes a bounded PCollection to a text file (or
@@ -465,6 +473,12 @@ public class TextIO {
       /** The suffix of each file written, combined with prefix and shardTemplate. */
       private final String filenameSuffix;
 
+      /** An optional header to add to each file. */
+      private final String header;
+
+      /** An optional footer to add to each file. */
+      private final String footer;
+
       /** The Coder to use to decode each line. */
       private final Coder<T> coder;
 
@@ -478,12 +492,15 @@ public class TextIO {
       private final boolean validate;
 
       Bound(Coder<T> coder) {
-        this(null, null, "", coder, 0, DEFAULT_SHARD_TEMPLATE, true);
+        this(null, null, "", null, null, coder, 0, DEFAULT_SHARD_TEMPLATE, true);
       }
 
-      private Bound(String name, String filenamePrefix, String filenameSuffix, Coder<T> coder,
-          int numShards, String shardTemplate, boolean validate) {
+      private Bound(String name, String filenamePrefix, String filenameSuffix, String header,
+                    String footer, Coder<T> coder, int numShards, String shardTemplate,
+                    boolean validate) {
         super(name);
+        this.header = header;
+        this.footer = footer;
         this.coder = coder;
         this.filenamePrefix = filenamePrefix;
         this.filenameSuffix = filenameSuffix;
@@ -502,7 +519,7 @@ public class TextIO {
        */
       public Bound<T> to(String filenamePrefix) {
         validateOutputComponent(filenamePrefix);
-        return new Bound<>(name, filenamePrefix, filenameSuffix, coder, numShards,
+        return new Bound<>(name, filenamePrefix, filenameSuffix, header, footer, coder, numShards,
             shardTemplate, validate);
       }
 
@@ -516,7 +533,7 @@ public class TextIO {
        */
       public Bound<T> withSuffix(String nameExtension) {
         validateOutputComponent(nameExtension);
-        return new Bound<>(name, filenamePrefix, nameExtension, coder, numShards,
+        return new Bound<>(name, filenamePrefix, nameExtension, header, footer, coder, numShards,
             shardTemplate, validate);
       }
 
@@ -536,7 +553,7 @@ public class TextIO {
        */
       public Bound<T> withNumShards(int numShards) {
         checkArgument(numShards >= 0);
-        return new Bound<>(name, filenamePrefix, filenameSuffix, coder, numShards,
+        return new Bound<>(name, filenamePrefix, filenameSuffix, header, footer, coder, numShards,
             shardTemplate, validate);
       }
 
@@ -549,7 +566,7 @@ public class TextIO {
        * @see ShardNameTemplate
        */
       public Bound<T> withShardNameTemplate(String shardTemplate) {
-        return new Bound<>(name, filenamePrefix, filenameSuffix, coder, numShards,
+        return new Bound<>(name, filenamePrefix, filenameSuffix, header, footer, coder, numShards,
             shardTemplate, validate);
       }
 
@@ -567,7 +584,8 @@ public class TextIO {
        * <p>Does not modify this object.
        */
       public Bound<T> withoutSharding() {
-        return new Bound<>(name, filenamePrefix, filenameSuffix, coder, 1, "", validate);
+        return new Bound<>(name, filenamePrefix, filenameSuffix, header, footer, coder, 1, "",
+            validate);
       }
 
       /**
@@ -579,7 +597,7 @@ public class TextIO {
        * @param <X> the type of the elements of the input {@link PCollection}
        */
       public <X> Bound<X> withCoder(Coder<X> coder) {
-        return new Bound<>(name, filenamePrefix, filenameSuffix, coder, numShards,
+        return new Bound<>(name, filenamePrefix, filenameSuffix, header, footer, coder, numShards,
             shardTemplate, validate);
       }
 
@@ -594,7 +612,17 @@ public class TextIO {
        * <p>Does not modify this object.
        */
       public Bound<T> withoutValidation() {
-        return new Bound<>(name, filenamePrefix, filenameSuffix, coder, numShards,
+        return new Bound<>(name, filenamePrefix, filenameSuffix, header, footer, coder, numShards,
+            shardTemplate, false);
+      }
+
+      public Bound<T> withHeader(String header) {
+        return new Bound<>(name, filenamePrefix, filenameSuffix, header, footer, coder, numShards,
+            shardTemplate, false);
+      }
+
+      public Bound<T> withFooter(String footer) {
+        return new Bound<>(name, filenamePrefix, filenameSuffix, header, footer, coder, numShards,
             shardTemplate, false);
       }
 
@@ -607,7 +635,8 @@ public class TextIO {
 
         org.apache.beam.sdk.io.Write.Bound<T> write =
             org.apache.beam.sdk.io.Write.to(
-                new TextSink<>(filenamePrefix, filenameSuffix, shardTemplate, coder));
+                new TextSink<>(filenamePrefix, filenameSuffix, header, footer, shardTemplate,
+                    coder));
         if (getNumShards() > 0) {
           write = write.withNumShards(getNumShards());
         }
@@ -629,7 +658,11 @@ public class TextIO {
             .addIfNotDefault(DisplayData.item("validation", validate)
               .withLabel("Validation Enabled"), true)
             .addIfNotDefault(DisplayData.item("numShards", numShards)
-              .withLabel("Maximum Output Shards"), 0);
+              .withLabel("Maximum Output Shards"), 0)
+            .addIfNotNull(DisplayData.item("fileHeader", header)
+              .withLabel("Output file header"))
+            .addIfNotNull(DisplayData.item("fileFooter", footer)
+                .withLabel("Output file footer"));
       }
 
       /**
@@ -664,6 +697,14 @@ public class TextIO {
         return coder;
       }
 
+      public String getHeader() {
+        return header;
+      }
+
+      public String getFooter() {
+        return footer;
+      }
+
       public boolean needsValidation() {
         return validate;
       }
@@ -946,17 +987,22 @@ public class TextIO {
   @VisibleForTesting
   static class TextSink<T> extends FileBasedSink<T> {
     private final Coder<T> coder;
+    private final String header;
+    private final String footer;
 
     @VisibleForTesting
     TextSink(
-        String baseOutputFilename, String extension, String fileNameTemplate, Coder<T> coder) {
+        String baseOutputFilename, String extension, String header, String footer,
+        String fileNameTemplate, Coder<T> coder) {
       super(baseOutputFilename, extension, fileNameTemplate);
       this.coder = coder;
+      this.header = header;
+      this.footer = footer;
     }
 
     @Override
     public FileBasedSink.FileBasedWriteOperation<T> createWriteOperation(PipelineOptions options) {
-      return new TextWriteOperation<>(this, coder);
+      return new TextWriteOperation<>(this, coder, header, footer);
     }
 
     /**
@@ -965,15 +1011,19 @@ public class TextIO {
      */
     private static class TextWriteOperation<T> extends FileBasedWriteOperation<T> {
       private final Coder<T> coder;
+      private final String header;
+      private final String footer;
 
-      private TextWriteOperation(TextSink<T> sink, Coder<T> coder) {
+      private TextWriteOperation(TextSink<T> sink, Coder<T> coder, String header, String footer) {
         super(sink);
         this.coder = coder;
+        this.header = header;
+        this.footer = footer;
       }
 
       @Override
       public FileBasedWriter<T> createWriter(PipelineOptions options) throws Exception {
-        return new TextWriter<>(this, coder);
+        return new TextWriter<>(this, coder, header, footer);
       }
     }
 
@@ -984,20 +1034,50 @@ public class TextIO {
     private static class TextWriter<T> extends FileBasedWriter<T> {
       private static final byte[] NEWLINE = "\n".getBytes(StandardCharsets.UTF_8);
       private final Coder<T> coder;
+      private final String header;
+      private final String footer;
       private OutputStream out;
 
       public TextWriter(FileBasedWriteOperation<T> writeOperation, Coder<T> coder) {
+        this(writeOperation, coder, null, null);
+      }
+
+      public TextWriter(FileBasedWriteOperation<T> writeOperation, Coder<T> coder, String header) {
+        this(writeOperation, coder, header, null);
+      }
+
+      public TextWriter(FileBasedWriteOperation<T> writeOperation, Coder<T> coder, String header,
+                        String footer) {
         super(writeOperation);
+        this.header = header;
+        this.footer = footer;
         this.mimeType = MimeTypes.TEXT;
         this.coder = coder;
       }
 
+      private void writeLine(String line) throws IOException {
+        if (line != null) {
+          out.write(line.getBytes(StandardCharsets.UTF_8));
+          out.write(NEWLINE);
+        }
+      }
+
       @Override
       protected void prepareWrite(WritableByteChannel channel) throws Exception {
         out = Channels.newOutputStream(channel);
       }
 
       @Override
+      protected void writeHeader() throws Exception {
+        writeLine(header);
+      }
+
+      @Override
+      protected void writeFooter() throws Exception {
+        writeLine(footer);
+      }
+
+      @Override
       public void write(T value) throws Exception {
         coder.encode(value, out, Context.OUTER);
         out.write(NEWLINE);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4691231b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
index 8f94766..2ab2683 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
@@ -34,6 +34,8 @@ import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
 import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileOutputStream;
@@ -48,6 +50,7 @@ import java.nio.file.Files;
 import java.nio.file.StandardOpenOption;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Set;
 import java.util.zip.GZIPOutputStream;
@@ -101,6 +104,9 @@ import org.mockito.stubbing.Answer;
 @SuppressWarnings("unchecked")
 public class TextIOTest {
 
+  private static final String MY_HEADER = "myHeader";
+  private static final String MY_FOOTER = "myFooter";
+
   @Rule
   public TemporaryFolder tmpFolder = new TemporaryFolder();
   @Rule
@@ -216,10 +222,19 @@ public class TextIOTest {
   }
 
   <T> void runTestWrite(T[] elems, Coder<T> coder) throws Exception {
-    runTestWrite(elems, coder, 1);
+    runTestWrite(elems, null, null, coder, 1);
   }
 
   <T> void runTestWrite(T[] elems, Coder<T> coder, int numShards) throws Exception {
+    runTestWrite(elems, null, null, coder, numShards);
+  }
+
+  <T> void runTestWrite(T[] elems, Coder<T> coder, String header, String footer) throws Exception {
+    runTestWrite(elems, header, footer, coder, 1);
+  }
+
+  <T> void runTestWrite(T[] elems, String header, String footer, Coder<T> coder, int numShards)
+      throws Exception {
     String outputName = "file.txt";
     String baseFilename = tmpFolder.newFile(outputName).getPath();
 
@@ -235,6 +250,8 @@ public class TextIOTest {
     } else {
       write = TextIO.Write.to(baseFilename).withCoder(coder);
     }
+    write = write.withHeader(header).withFooter(footer);
+
     if (numShards == 1) {
       write = write.withoutSharding();
     } else if (numShards > 0) {
@@ -244,11 +261,14 @@ public class TextIOTest {
 
     p.run();
 
-    assertOutputFiles(elems, coder, numShards, tmpFolder, outputName, write.getShardNameTemplate());
+    assertOutputFiles(elems, header, footer, coder, numShards, tmpFolder, outputName,
+        write.getShardNameTemplate());
   }
 
   public static <T> void assertOutputFiles(
       T[] elems,
+      String header,
+      String footer,
       Coder<T> coder,
       int numShards,
       TemporaryFolder rootLocation,
@@ -284,15 +304,23 @@ public class TextIOTest {
       }
     }
 
-    String[] expected = new String[elems.length];
+    LinkedList<String> expected = Lists.newLinkedList();
+
     for (int i = 0; i < elems.length; i++) {
       T elem = elems[i];
       byte[] encodedElem = CoderUtils.encodeToByteArray(coder, elem);
       String line = new String(encodedElem);
-      expected[i] = line;
+      expected.add(line);
+    }
+
+    if (header != null) {
+      expected.addFirst(header);
+    }
+    if (footer != null) {
+      expected.addLast(footer);
     }
 
-    assertThat(actual, containsInAnyOrder(expected));
+    assertThat(actual, containsInAnyOrder(expected.toArray()));
   }
 
   @Test
@@ -332,18 +360,40 @@ public class TextIOTest {
   }
 
   @Test
+  @Category(NeedsRunner.class)
+  public void testWriteWithHeader() throws Exception {
+    runTestWrite(LINES_ARRAY, StringUtf8Coder.of(), MY_HEADER, null);
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testWriteWithFooter() throws Exception {
+    runTestWrite(LINES_ARRAY, StringUtf8Coder.of(), null, MY_FOOTER);
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testWriteWithHeaderAndFooter() throws Exception {
+    runTestWrite(LINES_ARRAY, StringUtf8Coder.of(), MY_HEADER, MY_FOOTER);
+  }
+
+  @Test
   public void testWriteDisplayData() {
     TextIO.Write.Bound<?> write = TextIO.Write
         .to("foo")
         .withSuffix("bar")
         .withShardNameTemplate("-SS-of-NN-")
         .withNumShards(100)
+        .withFooter("myFooter")
+        .withHeader("myHeader")
         .withoutValidation();
 
     DisplayData displayData = DisplayData.from(write);
 
     assertThat(displayData, hasDisplayItem("filePrefix", "foo"));
     assertThat(displayData, hasDisplayItem("fileSuffix", "bar"));
+    assertThat(displayData, hasDisplayItem("fileHeader", "myHeader"));
+    assertThat(displayData, hasDisplayItem("fileFooter", "myFooter"));
     assertThat(displayData, hasDisplayItem("shardNameTemplate", "-SS-of-NN-"));
     assertThat(displayData, hasDisplayItem("numShards", 100));
     assertThat(displayData, hasDisplayItem("validation", false));


[5/8] incubator-beam git commit: Added javadoc to TextIO#withHeader and TextIO#withFooter.

Posted by lc...@apache.org.
Added javadoc to TextIO#withHeader and TextIO#withFooter.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/94c58b61
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/94c58b61
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/94c58b61

Branch: refs/heads/master
Commit: 94c58b61411e78da8bb844e4167a51fb2894692e
Parents: 0e8ff41
Author: Stas Levin <st...@gmail.com>
Authored: Wed Sep 7 18:33:48 2016 +0300
Committer: Luke Cwik <lc...@google.com>
Committed: Wed Sep 7 15:35:31 2016 -0700

----------------------------------------------------------------------
 .../src/main/java/org/apache/beam/sdk/io/TextIO.java    | 12 ++++++++++++
 1 file changed, 12 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/94c58b61/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index 0895123..e75aaa9 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -448,10 +448,22 @@ public class TextIO {
       return new Bound<>(DEFAULT_TEXT_CODER).withoutValidation();
     }
 
+    /**
+     * Returns a transform for writing to text files that adds a header string to the files
+     * it writes.
+     *
+     * @param header the string to be added as file header
+     */
     public static Bound<String> withHeader(String header) {
       return new Bound<>(DEFAULT_TEXT_CODER).withHeader(header);
     }
 
+    /**
+     * Returns a transform for writing to text files that adds a footer string to the files
+     * it writes.
+     *
+     * @param footer the string to be added as file footer
+     */
     public static Bound<String> withFooter(String footer) {
       return new Bound<>(DEFAULT_TEXT_CODER).withFooter(footer);
     }


[8/8] incubator-beam git commit: [BEAM-544] Add header/footer support to TextIO.Write

Posted by lc...@apache.org.
[BEAM-544] Add header/footer support to TextIO.Write

This closes #918


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f33296c7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f33296c7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f33296c7

Branch: refs/heads/master
Commit: f33296c7f97e5cb8e11124bccd1665f365dbb425
Parents: 26635d7 ce1294d
Author: Luke Cwik <lc...@google.com>
Authored: Wed Sep 7 15:39:02 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Wed Sep 7 15:39:02 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/io/TextIO.java     | 150 ++++++++++++++++---
 .../java/org/apache/beam/sdk/io/TextIOTest.java | 105 ++++++++++++-
 2 files changed, 231 insertions(+), 24 deletions(-)
----------------------------------------------------------------------