You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2016/09/07 22:39:38 UTC
[2/8] incubator-beam git commit: Add header/footer support to
TextIO.Write
Add header/footer support to TextIO.Write
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4691231b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4691231b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4691231b
Branch: refs/heads/master
Commit: 4691231b036950c5ca9c5c78b512933e61fae076
Parents: 26635d7
Author: Stas Levin <st...@gmail.com>
Authored: Mon Sep 5 20:26:12 2016 +0300
Committer: Luke Cwik <lc...@google.com>
Committed: Wed Sep 7 15:35:30 2016 -0700
----------------------------------------------------------------------
.../java/org/apache/beam/sdk/io/TextIO.java | 114 ++++++++++++++++---
.../java/org/apache/beam/sdk/io/TextIOTest.java | 60 +++++++++-
2 files changed, 152 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4691231b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index 242470b..c754a0b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -448,7 +448,15 @@ public class TextIO {
return new Bound<>(DEFAULT_TEXT_CODER).withoutValidation();
}
- // TODO: appendingNewlines, header, footer, etc.
+ public static Bound<String> withHeader(String header) {
+ return new Bound<>(DEFAULT_TEXT_CODER).withHeader(header);
+ }
+
+ public static Bound<String> withFooter(String footer) {
+ return new Bound<>(DEFAULT_TEXT_CODER).withFooter(footer);
+ }
+
+ // TODO: appendingNewlines, etc.
/**
* A PTransform that writes a bounded PCollection to a text file (or
@@ -465,6 +473,12 @@ public class TextIO {
/** The suffix of each file written, combined with prefix and shardTemplate. */
private final String filenameSuffix;
+ /** An optional header to add to each file. */
+ private final String header;
+
+ /** An optional footer to add to each file. */
+ private final String footer;
+
/** The Coder to use to decode each line. */
private final Coder<T> coder;
@@ -478,12 +492,15 @@ public class TextIO {
private final boolean validate;
Bound(Coder<T> coder) {
- this(null, null, "", coder, 0, DEFAULT_SHARD_TEMPLATE, true);
+ this(null, null, "", null, null, coder, 0, DEFAULT_SHARD_TEMPLATE, true);
}
- private Bound(String name, String filenamePrefix, String filenameSuffix, Coder<T> coder,
- int numShards, String shardTemplate, boolean validate) {
+ private Bound(String name, String filenamePrefix, String filenameSuffix, String header,
+ String footer, Coder<T> coder, int numShards, String shardTemplate,
+ boolean validate) {
super(name);
+ this.header = header;
+ this.footer = footer;
this.coder = coder;
this.filenamePrefix = filenamePrefix;
this.filenameSuffix = filenameSuffix;
@@ -502,7 +519,7 @@ public class TextIO {
*/
public Bound<T> to(String filenamePrefix) {
validateOutputComponent(filenamePrefix);
- return new Bound<>(name, filenamePrefix, filenameSuffix, coder, numShards,
+ return new Bound<>(name, filenamePrefix, filenameSuffix, header, footer, coder, numShards,
shardTemplate, validate);
}
@@ -516,7 +533,7 @@ public class TextIO {
*/
public Bound<T> withSuffix(String nameExtension) {
validateOutputComponent(nameExtension);
- return new Bound<>(name, filenamePrefix, nameExtension, coder, numShards,
+ return new Bound<>(name, filenamePrefix, nameExtension, header, footer, coder, numShards,
shardTemplate, validate);
}
@@ -536,7 +553,7 @@ public class TextIO {
*/
public Bound<T> withNumShards(int numShards) {
checkArgument(numShards >= 0);
- return new Bound<>(name, filenamePrefix, filenameSuffix, coder, numShards,
+ return new Bound<>(name, filenamePrefix, filenameSuffix, header, footer, coder, numShards,
shardTemplate, validate);
}
@@ -549,7 +566,7 @@ public class TextIO {
* @see ShardNameTemplate
*/
public Bound<T> withShardNameTemplate(String shardTemplate) {
- return new Bound<>(name, filenamePrefix, filenameSuffix, coder, numShards,
+ return new Bound<>(name, filenamePrefix, filenameSuffix, header, footer, coder, numShards,
shardTemplate, validate);
}
@@ -567,7 +584,8 @@ public class TextIO {
* <p>Does not modify this object.
*/
public Bound<T> withoutSharding() {
- return new Bound<>(name, filenamePrefix, filenameSuffix, coder, 1, "", validate);
+ return new Bound<>(name, filenamePrefix, filenameSuffix, header, footer, coder, 1, "",
+ validate);
}
/**
@@ -579,7 +597,7 @@ public class TextIO {
* @param <X> the type of the elements of the input {@link PCollection}
*/
public <X> Bound<X> withCoder(Coder<X> coder) {
- return new Bound<>(name, filenamePrefix, filenameSuffix, coder, numShards,
+ return new Bound<>(name, filenamePrefix, filenameSuffix, header, footer, coder, numShards,
shardTemplate, validate);
}
@@ -594,7 +612,17 @@ public class TextIO {
* <p>Does not modify this object.
*/
public Bound<T> withoutValidation() {
- return new Bound<>(name, filenamePrefix, filenameSuffix, coder, numShards,
+ return new Bound<>(name, filenamePrefix, filenameSuffix, header, footer, coder, numShards,
+ shardTemplate, false);
+ }
+
+ public Bound<T> withHeader(String header) {
+ return new Bound<>(name, filenamePrefix, filenameSuffix, header, footer, coder, numShards,
+ shardTemplate, false);
+ }
+
+ public Bound<T> withFooter(String footer) {
+ return new Bound<>(name, filenamePrefix, filenameSuffix, header, footer, coder, numShards,
shardTemplate, false);
}
@@ -607,7 +635,8 @@ public class TextIO {
org.apache.beam.sdk.io.Write.Bound<T> write =
org.apache.beam.sdk.io.Write.to(
- new TextSink<>(filenamePrefix, filenameSuffix, shardTemplate, coder));
+ new TextSink<>(filenamePrefix, filenameSuffix, header, footer, shardTemplate,
+ coder));
if (getNumShards() > 0) {
write = write.withNumShards(getNumShards());
}
@@ -629,7 +658,11 @@ public class TextIO {
.addIfNotDefault(DisplayData.item("validation", validate)
.withLabel("Validation Enabled"), true)
.addIfNotDefault(DisplayData.item("numShards", numShards)
- .withLabel("Maximum Output Shards"), 0);
+ .withLabel("Maximum Output Shards"), 0)
+ .addIfNotNull(DisplayData.item("fileHeader", header)
+ .withLabel("Output file header"))
+ .addIfNotNull(DisplayData.item("fileFooter", footer)
+ .withLabel("Output file footer"));
}
/**
@@ -664,6 +697,14 @@ public class TextIO {
return coder;
}
+ public String getHeader() {
+ return header;
+ }
+
+ public String getFooter() {
+ return footer;
+ }
+
public boolean needsValidation() {
return validate;
}
@@ -946,17 +987,22 @@ public class TextIO {
@VisibleForTesting
static class TextSink<T> extends FileBasedSink<T> {
private final Coder<T> coder;
+ private final String header;
+ private final String footer;
@VisibleForTesting
TextSink(
- String baseOutputFilename, String extension, String fileNameTemplate, Coder<T> coder) {
+ String baseOutputFilename, String extension, String header, String footer,
+ String fileNameTemplate, Coder<T> coder) {
super(baseOutputFilename, extension, fileNameTemplate);
this.coder = coder;
+ this.header = header;
+ this.footer = footer;
}
@Override
public FileBasedSink.FileBasedWriteOperation<T> createWriteOperation(PipelineOptions options) {
- return new TextWriteOperation<>(this, coder);
+ return new TextWriteOperation<>(this, coder, header, footer);
}
/**
@@ -965,15 +1011,19 @@ public class TextIO {
*/
private static class TextWriteOperation<T> extends FileBasedWriteOperation<T> {
private final Coder<T> coder;
+ private final String header;
+ private final String footer;
- private TextWriteOperation(TextSink<T> sink, Coder<T> coder) {
+ private TextWriteOperation(TextSink<T> sink, Coder<T> coder, String header, String footer) {
super(sink);
this.coder = coder;
+ this.header = header;
+ this.footer = footer;
}
@Override
public FileBasedWriter<T> createWriter(PipelineOptions options) throws Exception {
- return new TextWriter<>(this, coder);
+ return new TextWriter<>(this, coder, header, footer);
}
}
@@ -984,20 +1034,50 @@ public class TextIO {
private static class TextWriter<T> extends FileBasedWriter<T> {
private static final byte[] NEWLINE = "\n".getBytes(StandardCharsets.UTF_8);
private final Coder<T> coder;
+ private final String header;
+ private final String footer;
private OutputStream out;
public TextWriter(FileBasedWriteOperation<T> writeOperation, Coder<T> coder) {
+ this(writeOperation, coder, null, null);
+ }
+
+ public TextWriter(FileBasedWriteOperation<T> writeOperation, Coder<T> coder, String header) {
+ this(writeOperation, coder, header, null);
+ }
+
+ public TextWriter(FileBasedWriteOperation<T> writeOperation, Coder<T> coder, String header,
+ String footer) {
super(writeOperation);
+ this.header = header;
+ this.footer = footer;
this.mimeType = MimeTypes.TEXT;
this.coder = coder;
}
+ private void writeLine(String line) throws IOException {
+ if (line != null) {
+ out.write(line.getBytes(StandardCharsets.UTF_8));
+ out.write(NEWLINE);
+ }
+ }
+
@Override
protected void prepareWrite(WritableByteChannel channel) throws Exception {
out = Channels.newOutputStream(channel);
}
@Override
+ protected void writeHeader() throws Exception {
+ writeLine(header);
+ }
+
+ @Override
+ protected void writeFooter() throws Exception {
+ writeLine(footer);
+ }
+
+ @Override
public void write(T value) throws Exception {
coder.encode(value, out, Context.OUTER);
out.write(NEWLINE);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4691231b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
index 8f94766..2ab2683 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
@@ -34,6 +34,8 @@ import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
import java.io.BufferedReader;
import java.io.File;
import java.io.FileOutputStream;
@@ -48,6 +50,7 @@ import java.nio.file.Files;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.zip.GZIPOutputStream;
@@ -101,6 +104,9 @@ import org.mockito.stubbing.Answer;
@SuppressWarnings("unchecked")
public class TextIOTest {
+ private static final String MY_HEADER = "myHeader";
+ private static final String MY_FOOTER = "myFooter";
+
@Rule
public TemporaryFolder tmpFolder = new TemporaryFolder();
@Rule
@@ -216,10 +222,19 @@ public class TextIOTest {
}
<T> void runTestWrite(T[] elems, Coder<T> coder) throws Exception {
- runTestWrite(elems, coder, 1);
+ runTestWrite(elems, null, null, coder, 1);
}
<T> void runTestWrite(T[] elems, Coder<T> coder, int numShards) throws Exception {
+ runTestWrite(elems, null, null, coder, numShards);
+ }
+
+ <T> void runTestWrite(T[] elems, Coder<T> coder, String header, String footer) throws Exception {
+ runTestWrite(elems, header, footer, coder, 1);
+ }
+
+ <T> void runTestWrite(T[] elems, String header, String footer, Coder<T> coder, int numShards)
+ throws Exception {
String outputName = "file.txt";
String baseFilename = tmpFolder.newFile(outputName).getPath();
@@ -235,6 +250,8 @@ public class TextIOTest {
} else {
write = TextIO.Write.to(baseFilename).withCoder(coder);
}
+ write = write.withHeader(header).withFooter(footer);
+
if (numShards == 1) {
write = write.withoutSharding();
} else if (numShards > 0) {
@@ -244,11 +261,14 @@ public class TextIOTest {
p.run();
- assertOutputFiles(elems, coder, numShards, tmpFolder, outputName, write.getShardNameTemplate());
+ assertOutputFiles(elems, header, footer, coder, numShards, tmpFolder, outputName,
+ write.getShardNameTemplate());
}
public static <T> void assertOutputFiles(
T[] elems,
+ String header,
+ String footer,
Coder<T> coder,
int numShards,
TemporaryFolder rootLocation,
@@ -284,15 +304,23 @@ public class TextIOTest {
}
}
- String[] expected = new String[elems.length];
+ LinkedList<String> expected = Lists.newLinkedList();
+
for (int i = 0; i < elems.length; i++) {
T elem = elems[i];
byte[] encodedElem = CoderUtils.encodeToByteArray(coder, elem);
String line = new String(encodedElem);
- expected[i] = line;
+ expected.add(line);
+ }
+
+ if (header != null) {
+ expected.addFirst(header);
+ }
+ if (footer != null) {
+ expected.addLast(footer);
}
- assertThat(actual, containsInAnyOrder(expected));
+ assertThat(actual, containsInAnyOrder(expected.toArray()));
}
@Test
@@ -332,18 +360,40 @@ public class TextIOTest {
}
@Test
+ @Category(NeedsRunner.class)
+ public void testWriteWithHeader() throws Exception {
+ runTestWrite(LINES_ARRAY, StringUtf8Coder.of(), MY_HEADER, null);
+ }
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void testWriteWithFooter() throws Exception {
+ runTestWrite(LINES_ARRAY, StringUtf8Coder.of(), null, MY_FOOTER);
+ }
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void testWriteWithHeaderAndFooter() throws Exception {
+ runTestWrite(LINES_ARRAY, StringUtf8Coder.of(), MY_HEADER, MY_FOOTER);
+ }
+
+ @Test
public void testWriteDisplayData() {
TextIO.Write.Bound<?> write = TextIO.Write
.to("foo")
.withSuffix("bar")
.withShardNameTemplate("-SS-of-NN-")
.withNumShards(100)
+ .withFooter("myFooter")
+ .withHeader("myHeader")
.withoutValidation();
DisplayData displayData = DisplayData.from(write);
assertThat(displayData, hasDisplayItem("filePrefix", "foo"));
assertThat(displayData, hasDisplayItem("fileSuffix", "bar"));
+ assertThat(displayData, hasDisplayItem("fileHeader", "myHeader"));
+ assertThat(displayData, hasDisplayItem("fileFooter", "myFooter"));
assertThat(displayData, hasDisplayItem("shardNameTemplate", "-SS-of-NN-"));
assertThat(displayData, hasDisplayItem("numShards", 100));
assertThat(displayData, hasDisplayItem("validation", false));