You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2016/09/07 22:39:40 UTC
[4/8] incubator-beam git commit: Reverted header and footer to be of
type String.
Reverted header and footer to be of type String.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0e8ff41e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0e8ff41e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0e8ff41e
Branch: refs/heads/master
Commit: 0e8ff41e34c9bc332148487c7c52ed99051b5ff7
Parents: af7437d
Author: Stas Levin <st...@gmail.com>
Authored: Wed Sep 7 09:57:17 2016 +0300
Committer: Luke Cwik <lc...@google.com>
Committed: Wed Sep 7 15:35:31 2016 -0700
----------------------------------------------------------------------
.../java/org/apache/beam/sdk/io/TextIO.java | 68 ++++++++------------
.../java/org/apache/beam/sdk/io/TextIOTest.java | 68 +++++++++++---------
2 files changed, 63 insertions(+), 73 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0e8ff41e/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index eefa867..0895123 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -22,7 +22,6 @@ import static com.google.common.base.Preconditions.checkState;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ByteString;
-
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
@@ -36,14 +35,12 @@ import java.util.regex.Pattern;
import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.Coder.Context;
-import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.io.Read.Bounded;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.IOChannelUtils;
import org.apache.beam.sdk.util.MimeTypes;
import org.apache.beam.sdk.values.PBegin;
@@ -477,10 +474,10 @@ public class TextIO {
private final String filenameSuffix;
/** An optional header to add to each file. */
- private final T header;
+ private final String header;
/** An optional footer to add to each file. */
- private final T footer;
+ private final String footer;
/** The Coder to use to decode each line. */
private final Coder<T> coder;
@@ -498,8 +495,8 @@ public class TextIO {
this(null, null, "", null, null, coder, 0, DEFAULT_SHARD_TEMPLATE, true);
}
- private Bound(String name, String filenamePrefix, String filenameSuffix, T header,
- T footer, Coder<T> coder, int numShards, String shardTemplate,
+ private Bound(String name, String filenamePrefix, String filenameSuffix, String header,
+ String footer, Coder<T> coder, int numShards, String shardTemplate,
boolean validate) {
super(name);
this.header = header;
@@ -512,14 +509,6 @@ public class TextIO {
this.validate = validate;
}
- private String asString(T obj, Coder<T> coder) {
- try {
- return obj == null ? "" : new String(CoderUtils.encodeToByteArray(coder, obj));
- } catch (CoderException e) {
- throw new RuntimeException(e);
- }
- }
-
/**
* Returns a transform for writing to text files that's like this one but
* that writes to the file(s) with the given filename prefix.
@@ -605,8 +594,9 @@ public class TextIO {
* the elements of the input {@link PCollection PCollection<X>} into an
* output text line. Does not modify this object.
*
+ * @param <X> the type of the elements of the input {@link PCollection}
*/
- public Bound<?> withCoder(Coder<? super T> coder) {
+ public <X> Bound<X> withCoder(Coder<X> coder) {
return new Bound<>(name, filenamePrefix, filenameSuffix, header, footer, coder, numShards,
shardTemplate, validate);
}
@@ -626,12 +616,12 @@ public class TextIO {
shardTemplate, false);
}
- public Bound<T> withHeader(T header) {
+ public Bound<T> withHeader(String header) {
return new Bound<>(name, filenamePrefix, filenameSuffix, header, footer, coder, numShards,
shardTemplate, false);
}
- public Bound<T> withFooter(T footer) {
+ public Bound<T> withFooter(String footer) {
return new Bound<>(name, filenamePrefix, filenameSuffix, header, footer, coder, numShards,
shardTemplate, false);
}
@@ -669,9 +659,9 @@ public class TextIO {
.withLabel("Validation Enabled"), true)
.addIfNotDefault(DisplayData.item("numShards", numShards)
.withLabel("Maximum Output Shards"), 0)
- .addIfNotNull(DisplayData.item("fileHeader", asString(header, coder))
+ .addIfNotNull(DisplayData.item("fileHeader", header)
.withLabel("File Header"))
- .addIfNotNull(DisplayData.item("fileFooter", asString(footer, coder))
+ .addIfNotNull(DisplayData.item("fileFooter", footer)
.withLabel("File Footer"));
}
@@ -707,11 +697,11 @@ public class TextIO {
return coder;
}
- public T getHeader() {
+ public String getHeader() {
return header;
}
- public T getFooter() {
+ public String getFooter() {
return footer;
}
@@ -997,21 +987,17 @@ public class TextIO {
@VisibleForTesting
static class TextSink<T> extends FileBasedSink<T> {
private final Coder<T> coder;
- private final byte[] header;
- private final byte[] footer;
+ private final String header;
+ private final String footer;
@VisibleForTesting
TextSink(
- String baseOutputFilename, String extension, T header, T footer,
+ String baseOutputFilename, String extension, String header, String footer,
String fileNameTemplate, Coder<T> coder) {
super(baseOutputFilename, extension, fileNameTemplate);
this.coder = coder;
- try {
- this.header = header == null ? null : CoderUtils.encodeToByteArray(coder, header);
- this.footer = footer == null ? null : CoderUtils.encodeToByteArray(coder, footer);
- } catch (CoderException e) {
- throw new RuntimeException(e);
- }
+ this.header = header;
+ this.footer = footer;
}
@Override
@@ -1025,10 +1011,10 @@ public class TextIO {
*/
private static class TextWriteOperation<T> extends FileBasedWriteOperation<T> {
private final Coder<T> coder;
- private final byte[] header;
- private final byte[] footer;
+ private final String header;
+ private final String footer;
- private TextWriteOperation(TextSink<T> sink, Coder<T> coder, byte[] header, byte[] footer) {
+ private TextWriteOperation(TextSink<T> sink, Coder<T> coder, String header, String footer) {
super(sink);
this.coder = coder;
this.header = header;
@@ -1048,20 +1034,20 @@ public class TextIO {
private static class TextWriter<T> extends FileBasedWriter<T> {
private static final byte[] NEWLINE = "\n".getBytes(StandardCharsets.UTF_8);
private final Coder<T> coder;
- private final byte[] header;
- private final byte[] footer;
+ private final String header;
+ private final String footer;
private OutputStream out;
public TextWriter(FileBasedWriteOperation<T> writeOperation, Coder<T> coder) {
this(writeOperation, coder, null, null);
}
- public TextWriter(FileBasedWriteOperation<T> writeOperation, Coder<T> coder, byte[] header) {
+ public TextWriter(FileBasedWriteOperation<T> writeOperation, Coder<T> coder, String header) {
this(writeOperation, coder, header, null);
}
- public TextWriter(FileBasedWriteOperation<T> writeOperation, Coder<T> coder, byte[] header,
- byte[] footer) {
+ public TextWriter(FileBasedWriteOperation<T> writeOperation, Coder<T> coder, String header,
+ String footer) {
super(writeOperation);
this.header = header;
this.footer = footer;
@@ -1069,9 +1055,9 @@ public class TextIO {
this.coder = coder;
}
- private void writeLine(byte[] line) throws IOException {
+ private void writeLine(String line) throws IOException {
if (line != null) {
- out.write(line);
+ out.write(line.getBytes(StandardCharsets.UTF_8));
out.write(NEWLINE);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0e8ff41e/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
index 7028761..c60b735 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
@@ -233,11 +233,11 @@ public class TextIOTest {
runTestWrite(elems, null, null, coder, numShards);
}
- <T> void runTestWrite(T[] elems, Coder<T> coder, T header, T footer) throws Exception {
+ <T> void runTestWrite(T[] elems, Coder<T> coder, String header, String footer) throws Exception {
runTestWrite(elems, header, footer, coder, 1);
}
- <T> void runTestWrite(T[] elems, T header, T footer, Coder<T> coder, int numShards)
+ <T> void runTestWrite(T[] elems, String header, String footer, Coder<T> coder, int numShards)
throws Exception {
String outputName = "file.txt";
String baseFilename = tmpFolder.newFile(outputName).getPath();
@@ -252,7 +252,7 @@ public class TextIOTest {
// T==String
write = (TextIO.Write.Bound<T>) writeStrings;
} else {
- write = TextIO.Write.withCoder(coder).to(baseFilename);
+ write = TextIO.Write.to(baseFilename).withCoder(coder);
}
write = write.withHeader(header).withFooter(footer);
@@ -271,9 +271,9 @@ public class TextIOTest {
public static <T> void assertOutputFiles(
T[] elems,
- final T header,
- final T footer,
- final Coder<T> coder,
+ final String header,
+ final String footer,
+ Coder<T> coder,
int numShards,
TemporaryFolder rootLocation,
String outputName,
@@ -320,44 +320,48 @@ public class TextIOTest {
expectedElements.add(line);
}
- final String headerString =
- header == null ? null : new String(CoderUtils.encodeToByteArray(coder, header));
-
- final String footerString =
- footer == null ? null : new String(CoderUtils.encodeToByteArray(coder, footer));
-
ArrayList<String> actualElements =
Lists.newArrayList(
Iterables.concat(
FluentIterable
.from(actual)
- .transform(new Function<List<String>, List<String>>() {
- @Nullable
- @Override
- public List<String> apply(List<String> lines) {
- ArrayList<String> newLines = Lists.newArrayList(lines);
- if (headerString != null) {
- newLines.remove(0);
- }
- if (footerString != null) {
- int last = newLines.size() - 1;
- newLines.remove(last);
- }
- return newLines;
- }
- })
+ .transform(removeHeaderAndFooter(header, footer))
.toList()));
assertThat(actualElements, containsInAnyOrder(expectedElements.toArray()));
- assertTrue(Iterables.all(actual, new Predicate<List<String>>() {
+ assertTrue(Iterables.all(actual, haveProperHeaderAndFooter(header, footer)));
+ }
+
+ private static Function<List<String>, List<String>> removeHeaderAndFooter(final String header,
+ final String footer) {
+ return new Function<List<String>, List<String>>() {
+ @Nullable
+ @Override
+ public List<String> apply(List<String> lines) {
+ ArrayList<String> newLines = Lists.newArrayList(lines);
+ if (header != null) {
+ newLines.remove(0);
+ }
+ if (footer != null) {
+ int last = newLines.size() - 1;
+ newLines.remove(last);
+ }
+ return newLines;
+ }
+ };
+ }
+
+ private static Predicate<List<String>> haveProperHeaderAndFooter(final String header,
+ final String footer) {
+ return new Predicate<List<String>>() {
@Override
- public boolean apply(@Nullable List<String> fileLines) {
+ public boolean apply(List<String> fileLines) {
int last = fileLines.size() - 1;
- return (headerString == null || fileLines.get(0).equals(headerString))
- && (footerString == null || fileLines.get(last).equals(footerString));
+ return (header == null || fileLines.get(0).equals(header))
+ && (footer == null || fileLines.get(last).equals(footer));
}
- }));
+ };
}
@Test