You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/09/13 00:41:18 UTC

[47/50] [abbrv] incubator-beam git commit: Reverted header and footer to be of type String.

Reverted header and footer to be of type String.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/092a1870
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/092a1870
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/092a1870

Branch: refs/heads/gearpump-runner
Commit: 092a1870fc84067ae0e19a736a37160a9a55c653
Parents: 1b420db
Author: Stas Levin <st...@gmail.com>
Authored: Wed Sep 7 09:57:17 2016 +0300
Committer: Dan Halperin <dh...@google.com>
Committed: Mon Sep 12 17:40:14 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/io/TextIO.java     | 68 ++++++++------------
 .../java/org/apache/beam/sdk/io/TextIOTest.java | 68 +++++++++++---------
 2 files changed, 63 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/092a1870/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index eefa867..0895123 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -22,7 +22,6 @@ import static com.google.common.base.Preconditions.checkState;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.protobuf.ByteString;
-
 import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
@@ -36,14 +35,12 @@ import java.util.regex.Pattern;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.Coder.Context;
-import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.Read.Bounded;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.util.IOChannelUtils;
 import org.apache.beam.sdk.util.MimeTypes;
 import org.apache.beam.sdk.values.PBegin;
@@ -477,10 +474,10 @@ public class TextIO {
       private final String filenameSuffix;
 
       /** An optional header to add to each file. */
-      private final T header;
+      private final String header;
 
       /** An optional footer to add to each file. */
-      private final T footer;
+      private final String footer;
 
       /** The Coder to use to decode each line. */
       private final Coder<T> coder;
@@ -498,8 +495,8 @@ public class TextIO {
         this(null, null, "", null, null, coder, 0, DEFAULT_SHARD_TEMPLATE, true);
       }
 
-      private Bound(String name, String filenamePrefix, String filenameSuffix, T header,
-                    T footer, Coder<T> coder, int numShards, String shardTemplate,
+      private Bound(String name, String filenamePrefix, String filenameSuffix, String header,
+                    String footer, Coder<T> coder, int numShards, String shardTemplate,
                     boolean validate) {
         super(name);
         this.header = header;
@@ -512,14 +509,6 @@ public class TextIO {
         this.validate = validate;
       }
 
-      private String asString(T obj, Coder<T> coder) {
-        try {
-          return obj == null ? "" : new String(CoderUtils.encodeToByteArray(coder, obj));
-        } catch (CoderException e) {
-          throw new RuntimeException(e);
-        }
-      }
-
       /**
        * Returns a transform for writing to text files that's like this one but
        * that writes to the file(s) with the given filename prefix.
@@ -605,8 +594,9 @@ public class TextIO {
        * the elements of the input {@link PCollection PCollection<X>} into an
        * output text line. Does not modify this object.
        *
+       * @param <X> the type of the elements of the input {@link PCollection}
        */
-      public Bound<?> withCoder(Coder<? super T> coder) {
+      public <X> Bound<X> withCoder(Coder<X> coder) {
         return new Bound<>(name, filenamePrefix, filenameSuffix, header, footer, coder, numShards,
             shardTemplate, validate);
       }
@@ -626,12 +616,12 @@ public class TextIO {
             shardTemplate, false);
       }
 
-      public Bound<T> withHeader(T header) {
+      public Bound<T> withHeader(String header) {
         return new Bound<>(name, filenamePrefix, filenameSuffix, header, footer, coder, numShards,
             shardTemplate, false);
       }
 
-      public Bound<T> withFooter(T footer) {
+      public Bound<T> withFooter(String footer) {
         return new Bound<>(name, filenamePrefix, filenameSuffix, header, footer, coder, numShards,
             shardTemplate, false);
       }
@@ -669,9 +659,9 @@ public class TextIO {
               .withLabel("Validation Enabled"), true)
             .addIfNotDefault(DisplayData.item("numShards", numShards)
               .withLabel("Maximum Output Shards"), 0)
-            .addIfNotNull(DisplayData.item("fileHeader", asString(header, coder))
+            .addIfNotNull(DisplayData.item("fileHeader", header)
               .withLabel("File Header"))
-            .addIfNotNull(DisplayData.item("fileFooter", asString(footer, coder))
+            .addIfNotNull(DisplayData.item("fileFooter", footer)
                 .withLabel("File Footer"));
       }
 
@@ -707,11 +697,11 @@ public class TextIO {
         return coder;
       }
 
-      public T getHeader() {
+      public String getHeader() {
         return header;
       }
 
-      public T getFooter() {
+      public String getFooter() {
         return footer;
       }
 
@@ -997,21 +987,17 @@ public class TextIO {
   @VisibleForTesting
   static class TextSink<T> extends FileBasedSink<T> {
     private final Coder<T> coder;
-    private final byte[] header;
-    private final byte[] footer;
+    private final String header;
+    private final String footer;
 
     @VisibleForTesting
     TextSink(
-        String baseOutputFilename, String extension, T header, T footer,
+        String baseOutputFilename, String extension, String header, String footer,
         String fileNameTemplate, Coder<T> coder) {
       super(baseOutputFilename, extension, fileNameTemplate);
       this.coder = coder;
-      try {
-        this.header = header == null ? null : CoderUtils.encodeToByteArray(coder, header);
-        this.footer = footer == null ? null : CoderUtils.encodeToByteArray(coder, footer);
-      } catch (CoderException e) {
-        throw new RuntimeException(e);
-      }
+      this.header = header;
+      this.footer = footer;
     }
 
     @Override
@@ -1025,10 +1011,10 @@ public class TextIO {
      */
     private static class TextWriteOperation<T> extends FileBasedWriteOperation<T> {
       private final Coder<T> coder;
-      private final byte[] header;
-      private final byte[] footer;
+      private final String header;
+      private final String footer;
 
-      private TextWriteOperation(TextSink<T> sink, Coder<T> coder, byte[] header, byte[] footer) {
+      private TextWriteOperation(TextSink<T> sink, Coder<T> coder, String header, String footer) {
         super(sink);
         this.coder = coder;
         this.header = header;
@@ -1048,20 +1034,20 @@ public class TextIO {
     private static class TextWriter<T> extends FileBasedWriter<T> {
       private static final byte[] NEWLINE = "\n".getBytes(StandardCharsets.UTF_8);
       private final Coder<T> coder;
-      private final byte[] header;
-      private final byte[] footer;
+      private final String header;
+      private final String footer;
       private OutputStream out;
 
       public TextWriter(FileBasedWriteOperation<T> writeOperation, Coder<T> coder) {
         this(writeOperation, coder, null, null);
       }
 
-      public TextWriter(FileBasedWriteOperation<T> writeOperation, Coder<T> coder, byte[] header) {
+      public TextWriter(FileBasedWriteOperation<T> writeOperation, Coder<T> coder, String header) {
         this(writeOperation, coder, header, null);
       }
 
-      public TextWriter(FileBasedWriteOperation<T> writeOperation, Coder<T> coder, byte[] header,
-                        byte[] footer) {
+      public TextWriter(FileBasedWriteOperation<T> writeOperation, Coder<T> coder, String header,
+                        String footer) {
         super(writeOperation);
         this.header = header;
         this.footer = footer;
@@ -1069,9 +1055,9 @@ public class TextIO {
         this.coder = coder;
       }
 
-      private void writeLine(byte[] line) throws IOException {
+      private void writeLine(String line) throws IOException {
         if (line != null) {
-          out.write(line);
+          out.write(line.getBytes(StandardCharsets.UTF_8));
           out.write(NEWLINE);
         }
       }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/092a1870/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
index 7028761..c60b735 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
@@ -233,11 +233,11 @@ public class TextIOTest {
     runTestWrite(elems, null, null, coder, numShards);
   }
 
-  <T> void runTestWrite(T[] elems, Coder<T> coder, T header, T footer) throws Exception {
+  <T> void runTestWrite(T[] elems, Coder<T> coder, String header, String footer) throws Exception {
     runTestWrite(elems, header, footer, coder, 1);
   }
 
-  <T> void runTestWrite(T[] elems, T header, T footer, Coder<T> coder, int numShards)
+  <T> void runTestWrite(T[] elems, String header, String footer, Coder<T> coder, int numShards)
       throws Exception {
     String outputName = "file.txt";
     String baseFilename = tmpFolder.newFile(outputName).getPath();
@@ -252,7 +252,7 @@ public class TextIOTest {
       // T==String
       write = (TextIO.Write.Bound<T>) writeStrings;
     } else {
-      write = TextIO.Write.withCoder(coder).to(baseFilename);
+      write = TextIO.Write.to(baseFilename).withCoder(coder);
     }
     write = write.withHeader(header).withFooter(footer);
 
@@ -271,9 +271,9 @@ public class TextIOTest {
 
   public static <T> void assertOutputFiles(
       T[] elems,
-      final T header,
-      final T footer,
-      final Coder<T> coder,
+      final String header,
+      final String footer,
+      Coder<T> coder,
       int numShards,
       TemporaryFolder rootLocation,
       String outputName,
@@ -320,44 +320,48 @@ public class TextIOTest {
       expectedElements.add(line);
     }
 
-    final String headerString =
-        header == null ? null : new String(CoderUtils.encodeToByteArray(coder, header));
-
-    final String footerString =
-        footer == null ? null : new String(CoderUtils.encodeToByteArray(coder, footer));
-
     ArrayList<String> actualElements =
         Lists.newArrayList(
             Iterables.concat(
                 FluentIterable
                     .from(actual)
-                    .transform(new Function<List<String>, List<String>>() {
-                      @Nullable
-                      @Override
-                      public List<String> apply(List<String> lines) {
-                        ArrayList<String> newLines = Lists.newArrayList(lines);
-                        if (headerString != null) {
-                          newLines.remove(0);
-                        }
-                        if (footerString != null) {
-                          int last = newLines.size() - 1;
-                          newLines.remove(last);
-                        }
-                        return newLines;
-                      }
-                    })
+                    .transform(removeHeaderAndFooter(header, footer))
                     .toList()));
 
     assertThat(actualElements, containsInAnyOrder(expectedElements.toArray()));
 
-    assertTrue(Iterables.all(actual, new Predicate<List<String>>() {
+    assertTrue(Iterables.all(actual, haveProperHeaderAndFooter(header, footer)));
+  }
+
+  private static Function<List<String>, List<String>> removeHeaderAndFooter(final String header,
+                                                                            final String footer) {
+    return new Function<List<String>, List<String>>() {
+      @Nullable
+      @Override
+      public List<String> apply(List<String> lines) {
+        ArrayList<String> newLines = Lists.newArrayList(lines);
+        if (header != null) {
+          newLines.remove(0);
+        }
+        if (footer != null) {
+          int last = newLines.size() - 1;
+          newLines.remove(last);
+        }
+        return newLines;
+      }
+    };
+  }
+
+  private static Predicate<List<String>> haveProperHeaderAndFooter(final String header,
+                                                                   final String footer) {
+    return new Predicate<List<String>>() {
       @Override
-      public boolean apply(@Nullable List<String> fileLines) {
+      public boolean apply(List<String> fileLines) {
         int last = fileLines.size() - 1;
-        return (headerString == null || fileLines.get(0).equals(headerString))
-              && (footerString == null || fileLines.get(last).equals(footerString));
+        return (header == null || fileLines.get(0).equals(header))
+            && (footer == null || fileLines.get(last).equals(footer));
       }
-    }));
+    };
   }
 
   @Test