You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2016/09/08 17:19:07 UTC
incubator-beam git commit: Address comments during backport Dataflow
PR/423 for Apache Beam PR/918
Repository: incubator-beam
Updated Branches:
refs/heads/master fb322cc73 -> 8b39a2ea3
Address comments during backport Dataflow PR/423 for Apache Beam PR/918
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8b39a2ea
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8b39a2ea
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8b39a2ea
Branch: refs/heads/master
Commit: 8b39a2ea3e2a70b9c07c91c375f79b9f966ab288
Parents: fb322cc
Author: Luke Cwik <lc...@google.com>
Authored: Thu Sep 8 09:34:43 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Thu Sep 8 10:18:49 2016 -0700
----------------------------------------------------------------------
.../java/org/apache/beam/sdk/io/TextIO.java | 71 ++++++++++----------
.../java/org/apache/beam/sdk/io/TextIOTest.java | 13 ++--
2 files changed, 40 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8b39a2ea/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index c0761b1..3345cd3 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -450,25 +450,25 @@ public class TextIO {
/**
* Returns a transform for writing to text files that adds a header string to the files
- * it writes.
+ * it writes. Note that a newline character will be added after the header.
*
* <p>A {@code null} value will clear any previously configured header.
*
* @param header the string to be added as file header
*/
- public static Bound<String> withHeader(String header) {
+ public static Bound<String> withHeader(@Nullable String header) {
return new Bound<>(DEFAULT_TEXT_CODER).withHeader(header);
}
/**
* Returns a transform for writing to text files that adds a footer string to the files
- * it writes.
+ * it writes. Note that a newline character will be added after the header.
*
* <p>A {@code null} value will clear any previously configured footer.
*
* @param footer the string to be added as file footer
*/
- public static Bound<String> withFooter(String footer) {
+ public static Bound<String> withFooter(@Nullable String footer) {
return new Bound<>(DEFAULT_TEXT_CODER).withFooter(footer);
}
@@ -490,10 +490,10 @@ public class TextIO {
private final String filenameSuffix;
/** An optional header to add to each file. */
- private final String header;
+ @Nullable private final String header;
/** An optional footer to add to each file. */
- private final String footer;
+ @Nullable private final String footer;
/** The Coder to use to decode each line. */
private final Coder<T> coder;
@@ -634,7 +634,7 @@ public class TextIO {
/**
* Returns a transform for writing to text files that adds a header string to the files
- * it writes.
+ * it writes. Note that a newline character will be added after the header.
*
* <p>A {@code null} value will clear any previously configured header.
*
@@ -642,14 +642,14 @@ public class TextIO {
*
* @param header the string to be added as file header
*/
- public Bound<T> withHeader(String header) {
+ public Bound<T> withHeader(@Nullable String header) {
return new Bound<>(name, filenamePrefix, filenameSuffix, header, footer, coder, numShards,
shardTemplate, false);
}
/**
* Returns a transform for writing to text files that adds a footer string to the files
- * it writes.
+ * it writes. Note that a newline character will be added after the header.
*
* <p>A {@code null} value will clear any previously configured footer.
*
@@ -657,7 +657,7 @@ public class TextIO {
*
* @param footer the string to be added as file footer
*/
- public Bound<T> withFooter(String footer) {
+ public Bound<T> withFooter(@Nullable String footer) {
return new Bound<>(name, filenamePrefix, filenameSuffix, header, footer, coder, numShards,
shardTemplate, false);
}
@@ -733,10 +733,12 @@ public class TextIO {
return coder;
}
+ @Nullable
public String getHeader() {
return header;
}
+ @Nullable
public String getFooter() {
return footer;
}
@@ -805,7 +807,7 @@ public class TextIO {
private TextIO() {}
/**
- * A {@link FileBasedSource} which can decode records delimited by new line characters.
+ * A {@link FileBasedSource} which can decode records delimited by newline characters.
*
* <p>This source splits the data into records using {@code UTF-8} {@code \n}, {@code \r}, or
* {@code \r\n} as the delimiter. This source is not strict and supports decoding the last record
@@ -853,7 +855,7 @@ public class TextIO {
/**
* A {@link org.apache.beam.sdk.io.FileBasedSource.FileBasedReader FileBasedReader}
- * which can decode records delimited by new line characters.
+ * which can decode records delimited by newline characters.
*
* See {@link TextSource} for further details.
*/
@@ -1016,19 +1018,20 @@ public class TextIO {
}
/**
- * A {@link FileBasedSink} for text files. Produces text files with the new line separator
+ * A {@link FileBasedSink} for text files. Produces text files with the newline separator
* {@code '\n'} represented in {@code UTF-8} format as the record separator.
* Each record (including the last) is terminated.
*/
@VisibleForTesting
static class TextSink<T> extends FileBasedSink<T> {
private final Coder<T> coder;
- private final String header;
- private final String footer;
+ @Nullable private final String header;
+ @Nullable private final String footer;
@VisibleForTesting
TextSink(
- String baseOutputFilename, String extension, String header, String footer,
+ String baseOutputFilename, String extension,
+ @Nullable String header, @Nullable String footer,
String fileNameTemplate, Coder<T> coder) {
super(baseOutputFilename, extension, fileNameTemplate);
this.coder = coder;
@@ -1047,10 +1050,11 @@ public class TextIO {
*/
private static class TextWriteOperation<T> extends FileBasedWriteOperation<T> {
private final Coder<T> coder;
- private final String header;
- private final String footer;
+ @Nullable private final String header;
+ @Nullable private final String footer;
- private TextWriteOperation(TextSink<T> sink, Coder<T> coder, String header, String footer) {
+ private TextWriteOperation(TextSink<T> sink, Coder<T> coder,
+ @Nullable String header, @Nullable String footer) {
super(sink);
this.coder = coder;
this.header = header;
@@ -1070,20 +1074,12 @@ public class TextIO {
private static class TextWriter<T> extends FileBasedWriter<T> {
private static final byte[] NEWLINE = "\n".getBytes(StandardCharsets.UTF_8);
private final Coder<T> coder;
- private final String header;
- private final String footer;
+ @Nullable private final String header;
+ @Nullable private final String footer;
private OutputStream out;
- public TextWriter(FileBasedWriteOperation<T> writeOperation, Coder<T> coder) {
- this(writeOperation, coder, null, null);
- }
-
- public TextWriter(FileBasedWriteOperation<T> writeOperation, Coder<T> coder, String header) {
- this(writeOperation, coder, header, null);
- }
-
- public TextWriter(FileBasedWriteOperation<T> writeOperation, Coder<T> coder, String header,
- String footer) {
+ public TextWriter(FileBasedWriteOperation<T> writeOperation, Coder<T> coder,
+ @Nullable String header, @Nullable String footer) {
super(writeOperation);
this.header = header;
this.footer = footer;
@@ -1091,9 +1087,12 @@ public class TextIO {
this.coder = coder;
}
- private void writeLine(String line) throws IOException {
- if (line != null) {
- out.write(line.getBytes(StandardCharsets.UTF_8));
+ /**
+ * Writes {@code value} followed by a newline if {@code value} is not null.
+ */
+ private void writeIfNotNull(@Nullable String value) throws IOException {
+ if (value != null) {
+ out.write(value.getBytes(StandardCharsets.UTF_8));
out.write(NEWLINE);
}
}
@@ -1105,12 +1104,12 @@ public class TextIO {
@Override
protected void writeHeader() throws Exception {
- writeLine(header);
+ writeIfNotNull(header);
}
@Override
protected void writeFooter() throws Exception {
- writeLine(footer);
+ writeIfNotNull(footer);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8b39a2ea/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
index c60b735..859602a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
@@ -39,7 +39,6 @@ import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
-
import java.io.BufferedReader;
import java.io.File;
import java.io.FileOutputStream;
@@ -54,7 +53,6 @@ import java.nio.file.Files;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.zip.GZIPOutputStream;
@@ -299,7 +297,7 @@ public class TextIOTest {
for (File tmpFile : expectedFiles) {
try (BufferedReader reader = new BufferedReader(new FileReader(tmpFile))) {
- List<String> currentFile = Lists.newArrayList();
+ List<String> currentFile = new ArrayList<>();
for (;;) {
String line = reader.readLine();
if (line == null) {
@@ -311,8 +309,7 @@ public class TextIOTest {
}
}
- LinkedList<String> expectedElements = Lists.newLinkedList();
-
+ List<String> expectedElements = new ArrayList<>(elems.length);
for (int i = 0; i < elems.length; i++) {
T elem = elems[i];
byte[] encodedElem = CoderUtils.encodeToByteArray(coder, elem);
@@ -320,7 +317,7 @@ public class TextIOTest {
expectedElements.add(line);
}
- ArrayList<String> actualElements =
+ List<String> actualElements =
Lists.newArrayList(
Iterables.concat(
FluentIterable
@@ -942,8 +939,8 @@ public class TextIOTest {
@Test
public void testProgressAfterSplitting() throws IOException {
String file = "line1\nline2\nline3";
- BoundedSource source = prepareSource(file.getBytes());
- BoundedSource remainder;
+ BoundedSource<String> source = prepareSource(file.getBytes());
+ BoundedSource<String> remainder;
// Create the remainder, verifying properties pre- and post-splitting.
try (BoundedReader<String> readerOrig = source.createReader(PipelineOptionsFactory.create())) {