You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/05/02 19:23:19 UTC
[1/7] beam git commit: Moves TextSource and TextSink to top level
Repository: beam
Updated Branches:
refs/heads/master c2c89eda9 -> 3161904d9
Moves TextSource and TextSink to top level
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7b725c25
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7b725c25
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7b725c25
Branch: refs/heads/master
Commit: 7b725c25288ae24eb89be3bf61e09e0e38c2b200
Parents: 681b5d6
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri Apr 28 17:46:44 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue May 2 12:20:14 2017 -0700
----------------------------------------------------------------------
.../java/org/apache/beam/sdk/io/TextIO.java | 343 +------------------
.../java/org/apache/beam/sdk/io/TextSink.java | 139 ++++++++
.../java/org/apache/beam/sdk/io/TextSource.java | 236 +++++++++++++
.../java/org/apache/beam/sdk/io/TextIOTest.java | 3 +-
4 files changed, 377 insertions(+), 344 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/7b725c25/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index 90d56e7..1f9b7a0 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -19,20 +19,8 @@ package org.apache.beam.sdk.io;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
import com.google.auto.value.AutoValue;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.protobuf.ByteString;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.nio.ByteBuffer;
-import java.nio.channels.Channels;
-import java.nio.channels.ReadableByteChannel;
-import java.nio.channels.SeekableByteChannel;
-import java.nio.channels.WritableByteChannel;
-import java.nio.charset.StandardCharsets;
-import java.util.NoSuchElementException;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.Coder;
@@ -41,13 +29,10 @@ import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
import org.apache.beam.sdk.io.FileBasedSink.WritableByteChannelFactory;
import org.apache.beam.sdk.io.Read.Bounded;
-import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
-import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.util.MimeTypes;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
@@ -412,7 +397,7 @@ public class TextIO {
throw new IllegalStateException(
"cannot set both a filename policy and a filename prefix");
}
- WriteFiles<String> write = null;
+ WriteFiles<String> write;
if (getFilenamePolicy() != null) {
write =
WriteFiles.to(
@@ -535,330 +520,4 @@ public class TextIO {
/** Disable construction of utility class. */
private TextIO() {}
-
- /**
- * A {@link FileBasedSource} which can decode records delimited by newline characters.
- *
- * <p>This source splits the data into records using {@code UTF-8} {@code \n}, {@code \r}, or
- * {@code \r\n} as the delimiter. This source is not strict and supports decoding the last record
- * even if it is not delimited. Finally, no records are decoded if the stream is empty.
- *
- * <p>This source supports reading from any arbitrary byte position within the stream. If the
- * starting position is not {@code 0}, then bytes are skipped until the first delimiter is found
- * representing the beginning of the first record to be decoded.
- */
- @VisibleForTesting
- static class TextSource extends FileBasedSource<String> {
- /** The Coder to use to decode each line. */
- @VisibleForTesting
- TextSource(String fileSpec) {
- super(StaticValueProvider.of(fileSpec), 1L);
- }
-
- @VisibleForTesting
- TextSource(ValueProvider<String> fileSpec) {
- super(fileSpec, 1L);
- }
-
- private TextSource(Metadata metadata, long start, long end) {
- super(metadata, 1L, start, end);
- }
-
- @Override
- protected FileBasedSource<String> createForSubrangeOfFile(
- Metadata metadata,
- long start,
- long end) {
- return new TextSource(metadata, start, end);
- }
-
- @Override
- protected FileBasedReader<String> createSingleFileReader(PipelineOptions options) {
- return new TextBasedReader(this);
- }
-
- @Override
- public Coder<String> getDefaultOutputCoder() {
- return StringUtf8Coder.of();
- }
-
- /**
- * A {@link org.apache.beam.sdk.io.FileBasedSource.FileBasedReader FileBasedReader}
- * which can decode records delimited by newline characters.
- *
- * <p>See {@link TextSource} for further details.
- */
- @VisibleForTesting
- static class TextBasedReader extends FileBasedReader<String> {
- private static final int READ_BUFFER_SIZE = 8192;
- private final ByteBuffer readBuffer = ByteBuffer.allocate(READ_BUFFER_SIZE);
- private ByteString buffer;
- private int startOfSeparatorInBuffer;
- private int endOfSeparatorInBuffer;
- private long startOfRecord;
- private volatile long startOfNextRecord;
- private volatile boolean eof;
- private volatile boolean elementIsPresent;
- private String currentValue;
- private ReadableByteChannel inChannel;
-
- private TextBasedReader(TextSource source) {
- super(source);
- buffer = ByteString.EMPTY;
- }
-
- @Override
- protected long getCurrentOffset() throws NoSuchElementException {
- if (!elementIsPresent) {
- throw new NoSuchElementException();
- }
- return startOfRecord;
- }
-
- @Override
- public long getSplitPointsRemaining() {
- if (isStarted() && startOfNextRecord >= getCurrentSource().getEndOffset()) {
- return isDone() ? 0 : 1;
- }
- return super.getSplitPointsRemaining();
- }
-
- @Override
- public String getCurrent() throws NoSuchElementException {
- if (!elementIsPresent) {
- throw new NoSuchElementException();
- }
- return currentValue;
- }
-
- @Override
- protected void startReading(ReadableByteChannel channel) throws IOException {
- this.inChannel = channel;
- // If the first offset is greater than zero, we need to skip bytes until we see our
- // first separator.
- if (getCurrentSource().getStartOffset() > 0) {
- checkState(channel instanceof SeekableByteChannel,
- "%s only supports reading from a SeekableByteChannel when given a start offset"
- + " greater than 0.", TextSource.class.getSimpleName());
- long requiredPosition = getCurrentSource().getStartOffset() - 1;
- ((SeekableByteChannel) channel).position(requiredPosition);
- findSeparatorBounds();
- buffer = buffer.substring(endOfSeparatorInBuffer);
- startOfNextRecord = requiredPosition + endOfSeparatorInBuffer;
- endOfSeparatorInBuffer = 0;
- startOfSeparatorInBuffer = 0;
- }
- }
-
- /**
- * Locates the start position and end position of the next delimiter. Will
- * consume the channel till either EOF or the delimiter bounds are found.
- *
- * <p>This fills the buffer and updates the positions as follows:
- * <pre>{@code
- * ------------------------------------------------------
- * | element bytes | delimiter bytes | unconsumed bytes |
- * ------------------------------------------------------
- * 0 start of end of buffer
- * separator separator size
- * in buffer in buffer
- * }</pre>
- */
- private void findSeparatorBounds() throws IOException {
- int bytePositionInBuffer = 0;
- while (true) {
- if (!tryToEnsureNumberOfBytesInBuffer(bytePositionInBuffer + 1)) {
- startOfSeparatorInBuffer = endOfSeparatorInBuffer = bytePositionInBuffer;
- break;
- }
-
- byte currentByte = buffer.byteAt(bytePositionInBuffer);
-
- if (currentByte == '\n') {
- startOfSeparatorInBuffer = bytePositionInBuffer;
- endOfSeparatorInBuffer = startOfSeparatorInBuffer + 1;
- break;
- } else if (currentByte == '\r') {
- startOfSeparatorInBuffer = bytePositionInBuffer;
- endOfSeparatorInBuffer = startOfSeparatorInBuffer + 1;
-
- if (tryToEnsureNumberOfBytesInBuffer(bytePositionInBuffer + 2)) {
- currentByte = buffer.byteAt(bytePositionInBuffer + 1);
- if (currentByte == '\n') {
- endOfSeparatorInBuffer += 1;
- }
- }
- break;
- }
-
- // Move to the next byte in buffer.
- bytePositionInBuffer += 1;
- }
- }
-
- @Override
- protected boolean readNextRecord() throws IOException {
- startOfRecord = startOfNextRecord;
- findSeparatorBounds();
-
- // If we have reached EOF file and consumed all of the buffer then we know
- // that there are no more records.
- if (eof && buffer.size() == 0) {
- elementIsPresent = false;
- return false;
- }
-
- decodeCurrentElement();
- startOfNextRecord = startOfRecord + endOfSeparatorInBuffer;
- return true;
- }
-
- /**
- * Decodes the current element updating the buffer to only contain the unconsumed bytes.
- *
- * <p>This invalidates the currently stored {@code startOfSeparatorInBuffer} and
- * {@code endOfSeparatorInBuffer}.
- */
- private void decodeCurrentElement() throws IOException {
- ByteString dataToDecode = buffer.substring(0, startOfSeparatorInBuffer);
- currentValue = dataToDecode.toStringUtf8();
- elementIsPresent = true;
- buffer = buffer.substring(endOfSeparatorInBuffer);
- }
-
- /**
- * Returns false if we were unable to ensure the minimum capacity by consuming the channel.
- */
- private boolean tryToEnsureNumberOfBytesInBuffer(int minCapacity) throws IOException {
- // While we aren't at EOF or haven't fulfilled the minimum buffer capacity,
- // attempt to read more bytes.
- while (buffer.size() <= minCapacity && !eof) {
- eof = inChannel.read(readBuffer) == -1;
- readBuffer.flip();
- buffer = buffer.concat(ByteString.copyFrom(readBuffer));
- readBuffer.clear();
- }
- // Return true if we were able to honor the minimum buffer capacity request
- return buffer.size() >= minCapacity;
- }
- }
- }
-
- /**
- * A {@link FileBasedSink} for text files. Produces text files with the newline separator
- * {@code '\n'} represented in {@code UTF-8} format as the record separator.
- * Each record (including the last) is terminated.
- */
- @VisibleForTesting
- static class TextSink extends FileBasedSink<String> {
- @Nullable private final String header;
- @Nullable private final String footer;
-
- @VisibleForTesting
- TextSink(FilenamePolicy filenamePolicy, @Nullable String header, @Nullable String footer,
- WritableByteChannelFactory writableByteChannelFactory) {
- super(filenamePolicy, writableByteChannelFactory);
- this.header = header;
- this.footer = footer;
- }
- @VisibleForTesting
- TextSink(
- ValueProvider<String> baseOutputFilename,
- String extension,
- @Nullable String header,
- @Nullable String footer,
- String fileNameTemplate,
- WritableByteChannelFactory writableByteChannelFactory) {
- super(baseOutputFilename, extension, fileNameTemplate, writableByteChannelFactory);
- this.header = header;
- this.footer = footer;
- }
-
- @Override
- public FileBasedSink.FileBasedWriteOperation<String> createWriteOperation() {
- return new TextWriteOperation(this, header, footer);
- }
-
- /**
- * A {@link org.apache.beam.sdk.io.FileBasedSink.FileBasedWriteOperation
- * FileBasedWriteOperation} for text files.
- */
- private static class TextWriteOperation extends FileBasedWriteOperation<String> {
- @Nullable private final String header;
- @Nullable private final String footer;
-
- private TextWriteOperation(TextSink sink, @Nullable String header, @Nullable String footer) {
- super(sink);
- this.header = header;
- this.footer = footer;
- }
-
- @Override
- public FileBasedWriter createWriter(PipelineOptions options) throws Exception {
- return new TextWriter(this, header, footer);
- }
- }
-
- /**
- * A {@link org.apache.beam.sdk.io.FileBasedSink.FileBasedWriter FileBasedWriter}
- * for text files.
- */
- private static class TextWriter extends FileBasedWriter<String> {
- private static final String NEWLINE = "\n";
- @Nullable private final String header;
- @Nullable private final String footer;
- private OutputStreamWriter out;
-
- public TextWriter(
- FileBasedWriteOperation<String> writeOperation,
- @Nullable String header,
- @Nullable String footer) {
- super(writeOperation, MimeTypes.TEXT);
- this.header = header;
- this.footer = footer;
- }
-
- /**
- * Writes {@code value} followed by a newline character if {@code value} is not null.
- */
- private void writeIfNotNull(@Nullable String value) throws IOException {
- if (value != null) {
- writeLine(value);
- }
- }
-
- /**
- * Writes {@code value} followed by newline character.
- */
- private void writeLine(String value) throws IOException {
- out.write(value);
- out.write(NEWLINE);
- }
-
- @Override
- protected void prepareWrite(WritableByteChannel channel) throws Exception {
- out = new OutputStreamWriter(Channels.newOutputStream(channel), StandardCharsets.UTF_8);
- }
-
- @Override
- protected void writeHeader() throws Exception {
- writeIfNotNull(header);
- }
-
- @Override
- public void write(String value) throws Exception {
- writeLine(value);
- }
-
- @Override
- protected void writeFooter() throws Exception {
- writeIfNotNull(footer);
- }
-
- @Override
- protected void finishWrite() throws Exception {
- out.flush();
- }
- }
- }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/7b725c25/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSink.java
new file mode 100644
index 0000000..4efdc32
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSink.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io;
+
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import java.nio.charset.StandardCharsets;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.util.MimeTypes;
+
+/**
+ * Implementation detail of {@link TextIO.Write}.
+ *
+ * <p>A {@link FileBasedSink} for text files. Produces text files with the newline separator {@code
+ * '\n'} represented in {@code UTF-8} format as the record separator. Each record (including the
+ * last) is terminated.
+ */
+class TextSink extends FileBasedSink<String> {
+ @Nullable private final String header;
+ @Nullable private final String footer;
+
+ TextSink(
+ FilenamePolicy filenamePolicy,
+ @Nullable String header,
+ @Nullable String footer,
+ WritableByteChannelFactory writableByteChannelFactory) {
+ super(filenamePolicy, writableByteChannelFactory);
+ this.header = header;
+ this.footer = footer;
+ }
+
+ TextSink(
+ ValueProvider<String> baseOutputFilename,
+ String extension,
+ @Nullable String header,
+ @Nullable String footer,
+ String fileNameTemplate,
+ WritableByteChannelFactory writableByteChannelFactory) {
+ super(baseOutputFilename, extension, fileNameTemplate, writableByteChannelFactory);
+ this.header = header;
+ this.footer = footer;
+ }
+
+ @Override
+ public FileBasedWriteOperation<String> createWriteOperation() {
+ return new TextWriteOperation(this, header, footer);
+ }
+
+ /** A {@link FileBasedWriteOperation FileBasedWriteOperation} for text files. */
+ private static class TextWriteOperation extends FileBasedWriteOperation<String> {
+ @Nullable private final String header;
+ @Nullable private final String footer;
+
+ private TextWriteOperation(TextSink sink, @Nullable String header, @Nullable String footer) {
+ super(sink);
+ this.header = header;
+ this.footer = footer;
+ }
+
+ @Override
+ public FileBasedWriter<String> createWriter(PipelineOptions options) throws Exception {
+ return new TextWriter(this, header, footer);
+ }
+ }
+
+ /** A {@link FileBasedWriter FileBasedWriter} for text files. */
+ private static class TextWriter extends FileBasedWriter<String> {
+ private static final String NEWLINE = "\n";
+ @Nullable private final String header;
+ @Nullable private final String footer;
+ private OutputStreamWriter out;
+
+ public TextWriter(
+ FileBasedWriteOperation<String> writeOperation,
+ @Nullable String header,
+ @Nullable String footer) {
+ super(writeOperation, MimeTypes.TEXT);
+ this.header = header;
+ this.footer = footer;
+ }
+
+ /** Writes {@code value} followed by a newline character if {@code value} is not null. */
+ private void writeIfNotNull(@Nullable String value) throws IOException {
+ if (value != null) {
+ writeLine(value);
+ }
+ }
+
+ /** Writes {@code value} followed by newline character. */
+ private void writeLine(String value) throws IOException {
+ out.write(value);
+ out.write(NEWLINE);
+ }
+
+ @Override
+ protected void prepareWrite(WritableByteChannel channel) throws Exception {
+ out = new OutputStreamWriter(Channels.newOutputStream(channel), StandardCharsets.UTF_8);
+ }
+
+ @Override
+ protected void writeHeader() throws Exception {
+ writeIfNotNull(header);
+ }
+
+ @Override
+ public void write(String value) throws Exception {
+ writeLine(value);
+ }
+
+ @Override
+ protected void writeFooter() throws Exception {
+ writeIfNotNull(footer);
+ }
+
+ @Override
+ protected void finishWrite() throws Exception {
+ out.flush();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/7b725c25/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java
new file mode 100644
index 0000000..4d9fa77
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.ByteString;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.SeekableByteChannel;
+import java.util.NoSuchElementException;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.ValueProvider;
+
+/**
+ * Implementation detail of {@link TextIO.Read}.
+ *
+ * <p>A {@link FileBasedSource} which can decode records delimited by newline characters.
+ *
+ * <p>This source splits the data into records using {@code UTF-8} {@code \n}, {@code \r}, or
+ * {@code \r\n} as the delimiter. This source is not strict and supports decoding the last record
+ * even if it is not delimited. Finally, no records are decoded if the stream is empty.
+ *
+ * <p>This source supports reading from any arbitrary byte position within the stream. If the
+ * starting position is not {@code 0}, then bytes are skipped until the first delimiter is found
+ * representing the beginning of the first record to be decoded.
+ */
+@VisibleForTesting
+class TextSource extends FileBasedSource<String> {
+ TextSource(ValueProvider<String> fileSpec) {
+ super(fileSpec, 1L);
+ }
+
+ private TextSource(MatchResult.Metadata metadata, long start, long end) {
+ super(metadata, 1L, start, end);
+ }
+
+ @Override
+ protected FileBasedSource<String> createForSubrangeOfFile(
+ MatchResult.Metadata metadata,
+ long start,
+ long end) {
+ return new TextSource(metadata, start, end);
+ }
+
+ @Override
+ protected FileBasedReader<String> createSingleFileReader(PipelineOptions options) {
+ return new TextBasedReader(this);
+ }
+
+ @Override
+ public Coder<String> getDefaultOutputCoder() {
+ return StringUtf8Coder.of();
+ }
+
+ /**
+ * A {@link FileBasedReader FileBasedReader}
+ * which can decode records delimited by newline characters.
+ *
+ * <p>See {@link TextSource} for further details.
+ */
+ @VisibleForTesting
+ static class TextBasedReader extends FileBasedReader<String> {
+ private static final int READ_BUFFER_SIZE = 8192;
+ private final ByteBuffer readBuffer = ByteBuffer.allocate(READ_BUFFER_SIZE);
+ private ByteString buffer;
+ private int startOfSeparatorInBuffer;
+ private int endOfSeparatorInBuffer;
+ private long startOfRecord;
+ private volatile long startOfNextRecord;
+ private volatile boolean eof;
+ private volatile boolean elementIsPresent;
+ private String currentValue;
+ private ReadableByteChannel inChannel;
+
+ private TextBasedReader(TextSource source) {
+ super(source);
+ buffer = ByteString.EMPTY;
+ }
+
+ @Override
+ protected long getCurrentOffset() throws NoSuchElementException {
+ if (!elementIsPresent) {
+ throw new NoSuchElementException();
+ }
+ return startOfRecord;
+ }
+
+ @Override
+ public long getSplitPointsRemaining() {
+ if (isStarted() && startOfNextRecord >= getCurrentSource().getEndOffset()) {
+ return isDone() ? 0 : 1;
+ }
+ return super.getSplitPointsRemaining();
+ }
+
+ @Override
+ public String getCurrent() throws NoSuchElementException {
+ if (!elementIsPresent) {
+ throw new NoSuchElementException();
+ }
+ return currentValue;
+ }
+
+ @Override
+ protected void startReading(ReadableByteChannel channel) throws IOException {
+ this.inChannel = channel;
+ // If the first offset is greater than zero, we need to skip bytes until we see our
+ // first separator.
+ if (getCurrentSource().getStartOffset() > 0) {
+ checkState(channel instanceof SeekableByteChannel,
+ "%s only supports reading from a SeekableByteChannel when given a start offset"
+ + " greater than 0.", TextSource.class.getSimpleName());
+ long requiredPosition = getCurrentSource().getStartOffset() - 1;
+ ((SeekableByteChannel) channel).position(requiredPosition);
+ findSeparatorBounds();
+ buffer = buffer.substring(endOfSeparatorInBuffer);
+ startOfNextRecord = requiredPosition + endOfSeparatorInBuffer;
+ endOfSeparatorInBuffer = 0;
+ startOfSeparatorInBuffer = 0;
+ }
+ }
+
+ /**
+ * Locates the start position and end position of the next delimiter. Will
+ * consume the channel till either EOF or the delimiter bounds are found.
+ *
+ * <p>This fills the buffer and updates the positions as follows:
+ * <pre>{@code
+ * ------------------------------------------------------
+ * | element bytes | delimiter bytes | unconsumed bytes |
+ * ------------------------------------------------------
+ * 0 start of end of buffer
+ * separator separator size
+ * in buffer in buffer
+ * }</pre>
+ */
+ private void findSeparatorBounds() throws IOException {
+ int bytePositionInBuffer = 0;
+ while (true) {
+ if (!tryToEnsureNumberOfBytesInBuffer(bytePositionInBuffer + 1)) {
+ startOfSeparatorInBuffer = endOfSeparatorInBuffer = bytePositionInBuffer;
+ break;
+ }
+
+ byte currentByte = buffer.byteAt(bytePositionInBuffer);
+
+ if (currentByte == '\n') {
+ startOfSeparatorInBuffer = bytePositionInBuffer;
+ endOfSeparatorInBuffer = startOfSeparatorInBuffer + 1;
+ break;
+ } else if (currentByte == '\r') {
+ startOfSeparatorInBuffer = bytePositionInBuffer;
+ endOfSeparatorInBuffer = startOfSeparatorInBuffer + 1;
+
+ if (tryToEnsureNumberOfBytesInBuffer(bytePositionInBuffer + 2)) {
+ currentByte = buffer.byteAt(bytePositionInBuffer + 1);
+ if (currentByte == '\n') {
+ endOfSeparatorInBuffer += 1;
+ }
+ }
+ break;
+ }
+
+ // Move to the next byte in buffer.
+ bytePositionInBuffer += 1;
+ }
+ }
+
+ @Override
+ protected boolean readNextRecord() throws IOException {
+ startOfRecord = startOfNextRecord;
+ findSeparatorBounds();
+
+ // If we have reached EOF file and consumed all of the buffer then we know
+ // that there are no more records.
+ if (eof && buffer.size() == 0) {
+ elementIsPresent = false;
+ return false;
+ }
+
+ decodeCurrentElement();
+ startOfNextRecord = startOfRecord + endOfSeparatorInBuffer;
+ return true;
+ }
+
+ /**
+ * Decodes the current element updating the buffer to only contain the unconsumed bytes.
+ *
+ * <p>This invalidates the currently stored {@code startOfSeparatorInBuffer} and
+ * {@code endOfSeparatorInBuffer}.
+ */
+ private void decodeCurrentElement() throws IOException {
+ ByteString dataToDecode = buffer.substring(0, startOfSeparatorInBuffer);
+ currentValue = dataToDecode.toStringUtf8();
+ elementIsPresent = true;
+ buffer = buffer.substring(endOfSeparatorInBuffer);
+ }
+
+ /**
+ * Returns false if we were unable to ensure the minimum capacity by consuming the channel.
+ */
+ private boolean tryToEnsureNumberOfBytesInBuffer(int minCapacity) throws IOException {
+ // While we aren't at EOF or haven't fulfilled the minimum buffer capacity,
+ // attempt to read more bytes.
+ while (buffer.size() <= minCapacity && !eof) {
+ eof = inChannel.read(readBuffer) == -1;
+ readBuffer.flip();
+ buffer = buffer.concat(ByteString.copyFrom(readBuffer));
+ readBuffer.clear();
+ }
+ // Return true if we were able to honor the minimum buffer capacity request
+ return buffer.size() >= minCapacity;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/7b725c25/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
index 425e0d6..f30b52f 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
@@ -73,7 +73,6 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
import org.apache.beam.sdk.io.FileBasedSink.WritableByteChannelFactory;
import org.apache.beam.sdk.io.TextIO.CompressionType;
-import org.apache.beam.sdk.io.TextIO.TextSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
@@ -1064,7 +1063,7 @@ public class TextIOTest {
private TextSource prepareSource(byte[] data) throws IOException {
Path path = Files.createTempFile(tempFolder, "tempfile", "ext");
Files.write(path, data);
- return new TextSource(path.toString());
+ return new TextSource(ValueProvider.StaticValueProvider.of(path.toString()));
}
@Test
[4/7] beam git commit: Removes leftover no-op withoutValidation
methods
Posted by jk...@apache.org.
Removes leftover no-op withoutValidation methods
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ef4658a5
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ef4658a5
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ef4658a5
Branch: refs/heads/master
Commit: ef4658a5d09fd77eeee3ba696da6972b9958bfb4
Parents: c2c89ed
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri Apr 28 17:14:29 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue May 2 12:20:14 2017 -0700
----------------------------------------------------------------------
.../DataflowPipelineTranslatorTest.java | 2 +-
.../runners/dataflow/DataflowRunnerTest.java | 6 +--
.../java/org/apache/beam/sdk/io/TextIO.java | 41 --------------------
.../display/DisplayDataEvaluatorTest.java | 3 +-
4 files changed, 5 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/ef4658a5/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index 63e1166..fcd23cf 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -766,7 +766,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
Pipeline pipeline = Pipeline.create(options);
DataflowPipelineTranslator t = DataflowPipelineTranslator.fromOptions(options);
- pipeline.apply(TextIO.Read.from(new TestValueProvider()).withoutValidation());
+ pipeline.apply(TextIO.Read.from(new TestValueProvider()));
// Check that translation does not fail.
t.translate(
http://git-wip-us.apache.org/repos/asf/beam/blob/ef4658a5/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index e3c884b..480591e 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -335,8 +335,8 @@ public class DataflowRunnerTest {
RuntimeTestOptions options = dataflowOptions.as(RuntimeTestOptions.class);
Pipeline p = buildDataflowPipeline(dataflowOptions);
p
- .apply(TextIO.Read.from(options.getInput()).withoutValidation())
- .apply(TextIO.Write.to(options.getOutput()).withoutValidation());
+ .apply(TextIO.Read.from(options.getInput()))
+ .apply(TextIO.Write.to(options.getOutput()));
}
/**
@@ -347,7 +347,7 @@ public class DataflowRunnerTest {
DataflowPipelineOptions dataflowOptions = buildPipelineOptions();
RuntimeTestOptions options = dataflowOptions.as(RuntimeTestOptions.class);
Pipeline p = buildDataflowPipeline(dataflowOptions);
- PCollection<String> unconsumed = p.apply(Read.from(options.getInput()).withoutValidation());
+ PCollection<String> unconsumed = p.apply(Read.from(options.getInput()));
DataflowRunner.fromOptions(dataflowOptions).replaceTransforms(p);
final AtomicBoolean unconsumedSeenAsInput = new AtomicBoolean();
p.traverseTopologically(new PipelineVisitor.Defaults() {
http://git-wip-us.apache.org/repos/asf/beam/blob/ef4658a5/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index 0947702..f735019 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -196,20 +196,6 @@ public class TextIO {
/**
* Returns a new transform for reading from text files that's like this one but
- * that has GCS path validation on pipeline creation disabled.
- *
- * <p>This can be useful in the case where the GCS input does not
- * exist at the pipeline creation time, but is expected to be
- * available at execution time.
- *
- * <p>Does not modify this object.
- */
- public Bound withoutValidation() {
- return new Bound(name, filepattern, compressionType);
- }
-
- /**
- * Returns a new transform for reading from text files that's like this one but
* reads from input sources using the specified compression type.
*
* <p>If no compression type is specified, the default is {@link TextIO.CompressionType#AUTO}.
@@ -370,18 +356,6 @@ public class TextIO {
}
/**
- * Returns a transform for writing to text files that has GCS path validation on
- * pipeline creation disabled.
- *
- * <p>This can be useful in the case where the GCS output location does
- * not exist at the pipeline creation time, but is expected to be available
- * at execution time.
- */
- public static Bound withoutValidation() {
- return new Bound().withoutValidation();
- }
-
- /**
* Returns a transform for writing to text files that adds a header string to the files
* it writes. Note that a newline character will be added after the header.
*
@@ -579,21 +553,6 @@ public class TextIO {
}
/**
- * Returns a transform for writing to text files that's like this one but
- * that has GCS output path validation on pipeline creation disabled.
- *
- * <p>This can be useful in the case where the GCS output location does
- * not exist at the pipeline creation time, but is expected to be
- * available at execution time.
- *
- * <p>Does not modify this object.
- */
- public Bound withoutValidation() {
- return new Bound(name, filenamePrefix, filenameSuffix, header, footer, numShards,
- shardTemplate, writableByteChannelFactory, filenamePolicy, windowedWrites);
- }
-
- /**
* Returns a transform for writing to text files that adds a header string to the files
* it writes. Note that a newline character will be added after the header.
*
http://git-wip-us.apache.org/repos/asf/beam/blob/ef4658a5/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluatorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluatorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluatorTest.java
index feff333..6937405 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluatorTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluatorTest.java
@@ -95,8 +95,7 @@ public class DisplayDataEvaluatorTest implements Serializable {
@Test
public void testSourceTransform() {
PTransform<? super PBegin, ? extends POutput> myTransform = TextIO.Read
- .from("foo.*")
- .withoutValidation();
+ .from("foo.*");
DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
Set<DisplayData> displayData = evaluator.displayDataForPrimitiveSourceTransforms(myTransform);
[6/7] beam git commit: Removes TextIO.Write.Bound
Posted by jk...@apache.org.
Removes TextIO.Write.Bound
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4f5098dd
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4f5098dd
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4f5098dd
Branch: refs/heads/master
Commit: 4f5098ddf641f97417955ea33f429385d6fce384
Parents: 987b4e6
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri Apr 28 17:28:06 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue May 2 12:20:14 2017 -0700
----------------------------------------------------------------------
.../apache/beam/examples/MinimalWordCount.java | 2 +-
.../org/apache/beam/examples/WordCount.java | 2 +-
.../examples/common/WriteOneFilePerWindow.java | 2 +-
.../apache/beam/examples/complete/TfIdf.java | 2 +-
.../examples/complete/TopWikipediaSessions.java | 2 +-
.../beam/examples/cookbook/DistinctExample.java | 2 +-
.../beam/examples/cookbook/JoinExamples.java | 2 +-
.../beam/examples/MinimalWordCountJava8.java | 2 +-
.../examples/MinimalWordCountJava8Test.java | 2 +-
.../runners/apex/examples/WordCountTest.java | 2 +-
.../direct/WriteWithShardingFactoryTest.java | 2 +-
.../beam/runners/flink/ReadSourceITCase.java | 2 +-
.../flink/ReadSourceStreamingITCase.java | 2 +-
.../flink/streaming/GroupByNullKeyTest.java | 2 +-
.../streaming/TopWikipediaSessionsITCase.java | 2 +-
.../DataflowPipelineTranslatorTest.java | 4 +-
.../runners/dataflow/DataflowRunnerTest.java | 8 +-
.../beam/runners/spark/examples/WordCount.java | 2 +-
.../runners/spark/SparkRunnerDebuggerTest.java | 2 +-
.../beam/runners/spark/io/NumShardsTest.java | 3 +-
.../main/java/org/apache/beam/sdk/Pipeline.java | 2 +-
.../apache/beam/sdk/io/ShardNameTemplate.java | 2 +-
.../java/org/apache/beam/sdk/io/TextIO.java | 553 +++++++------------
.../org/apache/beam/sdk/io/package-info.java | 2 +-
.../java/org/apache/beam/sdk/io/TextIOTest.java | 20 +-
.../beam/sdk/runners/TransformTreeTest.java | 2 +-
.../org/apache/beam/sdk/util/NameUtilsTest.java | 2 +-
.../org/apache/beam/sdk/values/PDoneTest.java | 2 +-
28 files changed, 249 insertions(+), 385 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java
index 4a0c1bb..5ac8080 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java
@@ -111,7 +111,7 @@ public class MinimalWordCount {
// formatted strings) to a series of text files.
//
// By default, it will write to a set of files with names like wordcount-00001-of-00005
- .apply(TextIO.Write.to("wordcounts"));
+ .apply(TextIO.write().to("wordcounts"));
// Run the pipeline.
p.run().waitUntilFinish();
http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/WordCount.java b/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
index e331a86..bfa7eb3 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
@@ -179,7 +179,7 @@ public class WordCount {
p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
.apply(new CountWords())
.apply(MapElements.via(new FormatAsTextFn()))
- .apply("WriteCounts", TextIO.Write.to(options.getOutput()));
+ .apply("WriteCounts", TextIO.write().to(options.getOutput()));
p.run().waitUntilFinish();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java b/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java
index 6609828..461b46d 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java
@@ -48,7 +48,7 @@ public class WriteOneFilePerWindow extends PTransform<PCollection<String>, PDone
@Override
public PDone expand(PCollection<String> input) {
return input.apply(
- TextIO.Write
+ TextIO.write()
.to(new PerWindowFiles(filenamePrefix))
.withWindowedWrites()
.withNumShards(3));
http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
index 1ef69c0..6fd9755 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
@@ -400,7 +400,7 @@ public class TfIdf {
c.element().getValue().getValue()));
}
}))
- .apply(TextIO.Write
+ .apply(TextIO.write()
.to(output)
.withSuffix(".csv"));
}
http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java
index bb8c8bc..478e2dc 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java
@@ -210,7 +210,7 @@ public class TopWikipediaSessions {
p.apply(TextIO.read().from(options.getInput()))
.apply(MapElements.via(new ParseTableRowJson()))
.apply(new ComputeTopSessions(samplingThreshold))
- .apply("Write", TextIO.Write.withoutSharding().to(options.getOutput()));
+ .apply("Write", TextIO.write().withoutSharding().to(options.getOutput()));
p.run().waitUntilFinish();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/examples/java/src/main/java/org/apache/beam/examples/cookbook/DistinctExample.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/DistinctExample.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/DistinctExample.java
index 20c8fa0..bb16528 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/DistinctExample.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/DistinctExample.java
@@ -89,7 +89,7 @@ public class DistinctExample {
p.apply("ReadLines", TextIO.read().from(options.getInput()))
.apply(Distinct.<String>create())
- .apply("DedupedShakespeare", TextIO.Write.to(options.getOutput()));
+ .apply("DedupedShakespeare", TextIO.write().to(options.getOutput()));
p.run().waitUntilFinish();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java
index 05a3ad3..d1fffb4 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java
@@ -169,7 +169,7 @@ public class JoinExamples {
PCollection<TableRow> eventsTable = p.apply(BigQueryIO.read().from(GDELT_EVENTS_TABLE));
PCollection<TableRow> countryCodes = p.apply(BigQueryIO.read().from(COUNTRY_CODES));
PCollection<String> formattedResults = joinEvents(eventsTable, countryCodes);
- formattedResults.apply(TextIO.Write.to(options.getOutput()));
+ formattedResults.apply(TextIO.write().to(options.getOutput()));
p.run().waitUntilFinish();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java b/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java
index 6dabadc..85c291d 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java
@@ -65,7 +65,7 @@ public class MinimalWordCountJava8 {
.into(TypeDescriptors.strings())
.via((KV<String, Long> wordCount) -> wordCount.getKey() + ": " + wordCount.getValue()))
// CHANGE 3/3: The Google Cloud Storage path is required for outputting the results to.
- .apply(TextIO.Write.to("gs://YOUR_OUTPUT_BUCKET/AND_OUTPUT_PREFIX"));
+ .apply(TextIO.write().to("gs://YOUR_OUTPUT_BUCKET/AND_OUTPUT_PREFIX"));
p.run().waitUntilFinish();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java b/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java
index 689005a..e071b4e 100644
--- a/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java
+++ b/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java
@@ -71,7 +71,7 @@ public class MinimalWordCountJava8Test implements Serializable {
.apply(MapElements
.into(TypeDescriptors.strings())
.via((KV<String, Long> wordCount) -> wordCount.getKey() + ": " + wordCount.getValue()))
- .apply(TextIO.Write.to("gs://your-output-bucket/and-output-prefix"));
+ .apply(TextIO.write().to("gs://your-output-bucket/and-output-prefix"));
}
private GcsUtil buildMockGcsUtil() throws IOException {
http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
index b0fab0b..83af61b 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
@@ -110,7 +110,7 @@ public class WordCountTest {
.apply(ParDo.of(new ExtractWordsFn()))
.apply(Count.<String>perElement())
.apply(ParDo.of(new FormatAsStringFn()))
- .apply("WriteCounts", TextIO.Write.to(options.getOutput()))
+ .apply("WriteCounts", TextIO.write().to(options.getOutput()))
;
p.run().waitUntilFinish();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
index 960640c..53d2ba3 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
@@ -86,7 +86,7 @@ public class WriteWithShardingFactoryTest {
String targetLocation = IOChannelUtils.resolve(outputPath, fileName);
// TextIO is implemented in terms of the WriteFiles PTransform. When sharding is not specified,
// resharding should be automatically applied
- p.apply(Create.of(strs)).apply(TextIO.Write.to(targetLocation));
+ p.apply(Create.of(strs)).apply(TextIO.write().to(targetLocation));
p.run();
http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java
index 65d198e..5985da8 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java
@@ -76,7 +76,7 @@ public class ReadSourceITCase extends JavaProgramTestBase {
}
}));
- result.apply(TextIO.Write.to(new URI(resultPath).getPath() + "/part"));
+ result.apply(TextIO.write().to(new URI(resultPath).getPath() + "/part"));
p.run();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingITCase.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingITCase.java
index 4f597c3..0707c21 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingITCase.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingITCase.java
@@ -65,7 +65,7 @@ public class ReadSourceStreamingITCase extends StreamingProgramTestBase {
c.output(c.element().toString());
}
}))
- .apply(TextIO.Write.to(resultPath));
+ .apply(TextIO.write().to(resultPath));
p.run();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java
index 82d9f4f..2bd8e72 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java
@@ -123,7 +123,7 @@ public class GroupByNullKeyTest extends StreamingProgramTestBase implements Seri
c.output(str.toString());
}
}));
- output.apply(TextIO.Write.to(resultPath));
+ output.apply(TextIO.write().to(resultPath));
p.run();
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsITCase.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsITCase.java
index 9e6bba8..28335e3 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsITCase.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsITCase.java
@@ -126,7 +126,7 @@ public class TopWikipediaSessionsITCase extends StreamingProgramTestBase impleme
}
}));
- format.apply(TextIO.Write.to(resultPath));
+ format.apply(TextIO.write().to(resultPath));
p.run();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index d47da45..31c47b4 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -138,7 +138,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
FileSystems.setDefaultConfigInWorkers(options);
p.apply("ReadMyFile", TextIO.read().from("gs://bucket/object"))
- .apply("WriteMyFile", TextIO.Write.to("gs://bucket/object"));
+ .apply("WriteMyFile", TextIO.write().to("gs://bucket/object"));
DataflowRunner runner = DataflowRunner.fromOptions(options);
runner.replaceTransforms(p);
@@ -525,7 +525,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
String stepName = "DoFn1";
pipeline.apply("ReadMyFile", TextIO.read().from("gs://bucket/in"))
.apply(stepName, ParDo.of(new NoOpFn()))
- .apply("WriteMyFile", TextIO.Write.to("gs://bucket/out"));
+ .apply("WriteMyFile", TextIO.write().to("gs://bucket/out"));
DataflowRunner runner = DataflowRunner.fromOptions(options);
runner.replaceTransforms(pipeline);
Job job =
http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index 7261fe9..d011994 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -172,7 +172,7 @@ public class DataflowRunnerTest {
Pipeline p = Pipeline.create(options);
p.apply("ReadMyFile", TextIO.read().from("gs://bucket/object"))
- .apply("WriteMyFile", TextIO.Write.to("gs://bucket/object"));
+ .apply("WriteMyFile", TextIO.write().to("gs://bucket/object"));
// Enable the FileSystems API to know about gs:// URIs in this test.
FileSystems.setDefaultConfigInWorkers(options);
@@ -335,7 +335,7 @@ public class DataflowRunnerTest {
Pipeline p = buildDataflowPipeline(dataflowOptions);
p
.apply(TextIO.read().from(options.getInput()))
- .apply(TextIO.Write.to(options.getOutput()));
+ .apply(TextIO.write().to(options.getOutput()));
}
/**
@@ -587,7 +587,7 @@ public class DataflowRunnerTest {
Pipeline p = buildDataflowPipeline(buildPipelineOptions());
p.apply("ReadMyGcsFile", TextIO.read().from("gs://bucket/object"))
- .apply("WriteMyNonGcsFile", TextIO.Write.to("/tmp/file"));
+ .apply("WriteMyNonGcsFile", TextIO.write().to("/tmp/file"));
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage(containsString("Expected a valid 'gs://' path but was given"));
@@ -613,7 +613,7 @@ public class DataflowRunnerTest {
public void testMultiSlashGcsFileWritePath() throws IOException {
Pipeline p = buildDataflowPipeline(buildPipelineOptions());
PCollection<String> pc = p.apply("ReadMyGcsFile", TextIO.read().from("gs://bucket/object"));
- pc.apply("WriteInvalidGcsFile", TextIO.Write.to("gs://bucket/tmp//file"));
+ pc.apply("WriteInvalidGcsFile", TextIO.write().to("gs://bucket/tmp//file"));
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("consecutive slashes");
http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
index 2bcf140..0779bd5 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
@@ -128,7 +128,7 @@ public class WordCount {
p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
.apply(new CountWords())
.apply(MapElements.via(new FormatAsTextFn()))
- .apply("WriteCounts", TextIO.Write.to(options.getOutput()));
+ .apply("WriteCounts", TextIO.write().to(options.getOutput()));
p.run().waitUntilFinish();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java
index ce52b90..e43bc4e 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java
@@ -88,7 +88,7 @@ public class SparkRunnerDebuggerTest {
wordCounts
.apply(MapElements.via(new WordCount.FormatAsTextFn()))
- .apply(TextIO.Write.to("!!PLACEHOLDER-OUTPUT-DIR!!").withNumShards(3).withSuffix(".txt"));
+ .apply(TextIO.write().to("!!PLACEHOLDER-OUTPUT-DIR!!").withNumShards(3).withSuffix(".txt"));
final String expectedPipeline = "sparkContext.parallelize(Arrays.asList(...))\n"
+ "_.mapPartitions(new org.apache.beam.runners.spark.examples.WordCount$ExtractWordsFn())\n"
http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
index c936ed3..5021744 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
@@ -73,7 +73,8 @@ public class NumShardsTest {
PCollection<String> inputWords = p.apply(Create.of(WORDS).withCoder(StringUtf8Coder.of()));
PCollection<String> output = inputWords.apply(new WordCount.CountWords())
.apply(MapElements.via(new WordCount.FormatAsTextFn()));
- output.apply(TextIO.Write.to(outputDir.getAbsolutePath()).withNumShards(3).withSuffix(".txt"));
+ output.apply(
+ TextIO.write().to(outputDir.getAbsolutePath()).withNumShards(3).withSuffix(".txt"));
p.run().waitUntilFinish();
int count = 0;
http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
index 7cb9386..d4c46cc 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
@@ -99,7 +99,7 @@ import org.slf4j.LoggerFactory;
* .apply(new Count<String>());
* PCollection<String> formattedWordCounts =
* wordCounts.apply(ParDo.of(new FormatCounts()));
- * formattedWordCounts.apply(TextIO.Write.to("gs://bucket/dir/counts.txt"));
+ * formattedWordCounts.apply(TextIO.write().to("gs://bucket/dir/counts.txt"));
*
* // PTransforms aren't executed when they're applied, rather they're
* // just added to the Pipeline. Once the whole Pipeline of PTransforms
http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ShardNameTemplate.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ShardNameTemplate.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ShardNameTemplate.java
index 7f48a5c..cc85242 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ShardNameTemplate.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ShardNameTemplate.java
@@ -45,7 +45,7 @@ package org.apache.beam.sdk.io;
*
* <pre>{@code
* pipeline.apply(
- * TextIO.Write.to("gs://bucket/path")
+ * TextIO.write().to("gs://bucket/path")
* .withShardNameTemplate("-SS-of-NN")
* .withSuffix(".txt"))
* }</pre>
http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index f8670a6..2d82572 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -86,9 +86,9 @@ import org.apache.beam.sdk.values.PDone;
*
* <p>By default, all input is put into the global window before writing. If per-window writes are
* desired - for example, when using a streaming runner -
- * {@link TextIO.Write.Bound#withWindowedWrites()} will cause windowing and triggering to be
+ * {@link TextIO.Write#withWindowedWrites()} will cause windowing and triggering to be
* preserved. When producing windowed writes, the number of output shards must be set explicitly
- * using {@link TextIO.Write.Bound#withNumShards(int)}; some runners may set this for you to a
+ * using {@link TextIO.Write#withNumShards(int)}; some runners may set this for you to a
* runner-chosen value, so you may need not set it yourself. A {@link FilenamePolicy} must be
* set, and unique windows and triggers must produce unique filenames.
*
@@ -99,11 +99,11 @@ import org.apache.beam.sdk.values.PDone;
* <pre>{@code
* // A simple Write to a local file (only runs locally):
* PCollection<String> lines = ...;
- * lines.apply(TextIO.Write.to("/path/to/file.txt"));
+ * lines.apply(TextIO.write().to("/path/to/file.txt"));
*
* // Same as above, only with Gzip compression:
* PCollection<String> lines = ...;
- * lines.apply(TextIO.Write.to("/path/to/file.txt"));
+ * lines.apply(TextIO.write().to("/path/to/file.txt"));
* .withSuffix(".txt")
* .withWritableByteChannelFactory(FileBasedSink.CompressionType.GZIP));
* }</pre>
@@ -117,6 +117,15 @@ public class TextIO {
return new AutoValue_TextIO_Read.Builder().setCompressionType(CompressionType.AUTO).build();
}
+ /**
+ * A {@link PTransform} that writes a {@link PCollection} to text file (or
+ * multiple text files matching a sharding pattern), with each
+ * element of the input collection encoded into its own line.
+ */
+ public static Write write() {
+ return new Write();
+ }
+
/** Implementation of {@link #read}. */
@AutoValue
public abstract static class Read extends PTransform<PBegin, PCollection<String>> {
@@ -227,49 +236,105 @@ public class TextIO {
/////////////////////////////////////////////////////////////////////////////
- /**
- * A {@link PTransform} that writes a {@link PCollection} to text file (or
- * multiple text files matching a sharding pattern), with each
- * element of the input collection encoded into its own line.
- */
- public static class Write {
+ /** Implementation of {@link #write}. */
+ public static class Write extends PTransform<PCollection<String>, PDone> {
+ private static final String DEFAULT_SHARD_TEMPLATE = ShardNameTemplate.INDEX_OF_MAX;
+
+ /** The prefix of each file written, combined with suffix and shardTemplate. */
+ private final ValueProvider<String> filenamePrefix;
+ /** The suffix of each file written, combined with prefix and shardTemplate. */
+ private final String filenameSuffix;
+
+ /** An optional header to add to each file. */
+ @Nullable private final String header;
+
+ /** An optional footer to add to each file. */
+ @Nullable private final String footer;
+
+ /** Requested number of shards. 0 for automatic. */
+ private final int numShards;
+
+ /** The shard template of each file written, combined with prefix and suffix. */
+ private final String shardTemplate;
+
+ /** A policy for naming output files. */
+ private final FilenamePolicy filenamePolicy;
+
+ /** Whether to write windowed output files. */
+ private boolean windowedWrites;
+
+ /**
+ * The {@link WritableByteChannelFactory} to be used by the {@link FileBasedSink}. Default is
+ * {@link FileBasedSink.CompressionType#UNCOMPRESSED}.
+ */
+ private final WritableByteChannelFactory writableByteChannelFactory;
+
+ private Write() {
+ this(null, null, "", null, null, 0, DEFAULT_SHARD_TEMPLATE,
+ FileBasedSink.CompressionType.UNCOMPRESSED, null, false);
+ }
+
+ private Write(String name, ValueProvider<String> filenamePrefix, String filenameSuffix,
+ @Nullable String header, @Nullable String footer, int numShards,
+ String shardTemplate,
+ WritableByteChannelFactory writableByteChannelFactory,
+ FilenamePolicy filenamePolicy,
+ boolean windowedWrites) {
+ super(name);
+ this.header = header;
+ this.footer = footer;
+ this.filenamePrefix = filenamePrefix;
+ this.filenameSuffix = filenameSuffix;
+ this.numShards = numShards;
+ this.shardTemplate = shardTemplate;
+ this.writableByteChannelFactory =
+ firstNonNull(writableByteChannelFactory, FileBasedSink.CompressionType.UNCOMPRESSED);
+ this.filenamePolicy = filenamePolicy;
+ this.windowedWrites = windowedWrites;
+ }
/**
- * Returns a transform for writing to text files that writes to the file(s)
- * with the given prefix. This can be a local filename
+ * Writes to text files with the given prefix. This can be a local filename
* (if running locally), or a Google Cloud Storage filename of
* the form {@code "gs://<bucket>/<filepath>"}
* (if running locally or using remote execution).
*
* <p>The files written will begin with this prefix, followed by
- * a shard identifier (see {@link Bound#withNumShards(int)}, and end
- * in a common extension, if given by {@link Bound#withSuffix(String)}.
+ * a shard identifier (see {@link #withNumShards(int)}, and end
+ * in a common extension, if given by {@link #withSuffix(String)}.
*/
- public static Bound to(String prefix) {
- return new Bound().to(prefix);
+ public Write to(String filenamePrefix) {
+ validateOutputComponent(filenamePrefix);
+ return new Write(name, StaticValueProvider.of(filenamePrefix), filenameSuffix,
+ header, footer, numShards, shardTemplate,
+ writableByteChannelFactory, filenamePolicy, windowedWrites);
}
- public static Bound to(FilenamePolicy filenamePolicy) {
- return new Bound().to(filenamePolicy);
-
+ /** Like {@link #to(String)}, but with a {@link ValueProvider}. */
+ public Write to(ValueProvider<String> filenamePrefix) {
+ return new Write(name, filenamePrefix, filenameSuffix, header, footer, numShards,
+ shardTemplate, writableByteChannelFactory, filenamePolicy, windowedWrites);
}
- /**
- * Like {@link #to(String)}, but with a {@link ValueProvider}.
- */
- public static Bound to(ValueProvider<String> prefix) {
- return new Bound().to(prefix);
+
+ /** Like {@link #to(String)}, but with a {@link FilenamePolicy}. */
+ public Write to(FilenamePolicy filenamePolicy) {
+ return new Write(name, filenamePrefix, filenameSuffix, header, footer, numShards,
+ shardTemplate, writableByteChannelFactory, filenamePolicy, windowedWrites);
}
/**
- * Returns a transform for writing to text files that appends the specified suffix
- * to the created files.
+ * Writes to the file(s) with the given filename suffix.
+ *
+ * @see ShardNameTemplate
*/
- public static Bound withSuffix(String nameExtension) {
- return new Bound().withSuffix(nameExtension);
+ public Write withSuffix(String nameExtension) {
+ validateOutputComponent(nameExtension);
+ return new Write(name, filenamePrefix, nameExtension, header, footer, numShards,
+ shardTemplate, writableByteChannelFactory, filenamePolicy, windowedWrites);
}
/**
- * Returns a transform for writing to text files that uses the provided shard count.
+ * Uses the provided shard count.
*
* <p>Constraining the number of shards is likely to reduce
* the performance of a pipeline. Setting this value is not recommended
@@ -277,371 +342,169 @@ public class TextIO {
*
* @param numShards the number of shards to use, or 0 to let the system
* decide.
+ * @see ShardNameTemplate
*/
- public static Bound withNumShards(int numShards) {
- return new Bound().withNumShards(numShards);
+ public Write withNumShards(int numShards) {
+ checkArgument(numShards >= 0);
+ return new Write(name, filenamePrefix, filenameSuffix, header, footer, numShards,
+ shardTemplate, writableByteChannelFactory, filenamePolicy, windowedWrites);
}
/**
- * Returns a transform for writing to text files that uses the given shard name
- * template.
+ * Uses the given shard name template.
*
- * <p>See {@link ShardNameTemplate} for a description of shard templates.
+ * @see ShardNameTemplate
*/
- public static Bound withShardNameTemplate(String shardTemplate) {
- return new Bound().withShardNameTemplate(shardTemplate);
+ public Write withShardNameTemplate(String shardTemplate) {
+ return new Write(name, filenamePrefix, filenameSuffix, header, footer, numShards,
+ shardTemplate, writableByteChannelFactory, filenamePolicy, windowedWrites);
}
/**
- * Returns a transform for writing to text files that forces a single file as
- * output.
+ * Forces a single file as output.
+ *
+ * <p>Constraining the number of shards is likely to reduce
+ * the performance of a pipeline. Using this setting is not recommended
+ * unless you truly require a single output file.
+ *
+ * <p>This is a shortcut for
+ * {@code .withNumShards(1).withShardNameTemplate("")}
*/
- public static Bound withoutSharding() {
- return new Bound().withoutSharding();
+ public Write withoutSharding() {
+ return new Write(name, filenamePrefix, filenameSuffix, header, footer, 1, "",
+ writableByteChannelFactory, filenamePolicy, windowedWrites);
}
/**
- * Returns a transform for writing to text files that adds a header string to the files
- * it writes. Note that a newline character will be added after the header.
+ * Adds a header string to each file. A newline after the header is added automatically.
*
* <p>A {@code null} value will clear any previously configured header.
- *
- * @param header the string to be added as file header
*/
- public static Bound withHeader(@Nullable String header) {
- return new Bound().withHeader(header);
+ public Write withHeader(@Nullable String header) {
+ return new Write(name, filenamePrefix, filenameSuffix, header, footer, numShards,
+ shardTemplate, writableByteChannelFactory, filenamePolicy, windowedWrites);
}
/**
- * Returns a transform for writing to text files that adds a footer string to the files
- * it writes. Note that a newline character will be added after the header.
+ * Adds a footer string to each file. A newline after the footer is added automatically.
*
* <p>A {@code null} value will clear any previously configured footer.
- *
- * @param footer the string to be added as file footer
*/
- public static Bound withFooter(@Nullable String footer) {
- return new Bound().withFooter(footer);
+ public Write withFooter(@Nullable String footer) {
+ return new Write(name, filenamePrefix, filenameSuffix, header, footer, numShards,
+ shardTemplate, writableByteChannelFactory, filenamePolicy, windowedWrites);
}
/**
* Returns a transform for writing to text files like this one but that has the given
- * {@link WritableByteChannelFactory} to be used by the {@link FileBasedSink} during output. The
- * default is value is {@link FileBasedSink.CompressionType#UNCOMPRESSED}.
+ * {@link WritableByteChannelFactory} to be used by the {@link FileBasedSink} during output.
+ * The default is value is {@link FileBasedSink.CompressionType#UNCOMPRESSED}.
*
* <p>A {@code null} value will reset the value to the default value mentioned above.
- *
- * @param writableByteChannelFactory the factory to be used during output
*/
- public static Bound withWritableByteChannelFactory(
+ public Write withWritableByteChannelFactory(
WritableByteChannelFactory writableByteChannelFactory) {
- return new Bound().withWritableByteChannelFactory(writableByteChannelFactory);
+ return new Write(name, filenamePrefix, filenameSuffix, header, footer, numShards,
+ shardTemplate, writableByteChannelFactory, filenamePolicy, windowedWrites);
}
- // TODO: appendingNewlines, etc.
-
- /**
- * A PTransform that writes a bounded PCollection to a text file (or
- * multiple text files matching a sharding pattern), with each
- * PCollection element being encoded into its own line.
- */
- public static class Bound extends PTransform<PCollection<String>, PDone> {
- private static final String DEFAULT_SHARD_TEMPLATE = ShardNameTemplate.INDEX_OF_MAX;
-
- /** The prefix of each file written, combined with suffix and shardTemplate. */
- private final ValueProvider<String> filenamePrefix;
- /** The suffix of each file written, combined with prefix and shardTemplate. */
- private final String filenameSuffix;
-
- /** An optional header to add to each file. */
- @Nullable private final String header;
-
- /** An optional footer to add to each file. */
- @Nullable private final String footer;
-
- /** Requested number of shards. 0 for automatic. */
- private final int numShards;
-
- /** The shard template of each file written, combined with prefix and suffix. */
- private final String shardTemplate;
-
- /** A policy for naming output files. */
- private final FilenamePolicy filenamePolicy;
-
- /** Whether to write windowed output files. */
- private boolean windowedWrites;
-
- /**
- * The {@link WritableByteChannelFactory} to be used by the {@link FileBasedSink}. Default is
- * {@link FileBasedSink.CompressionType#UNCOMPRESSED}.
- */
- private final WritableByteChannelFactory writableByteChannelFactory;
-
- private Bound() {
- this(null, null, "", null, null, 0, DEFAULT_SHARD_TEMPLATE,
- FileBasedSink.CompressionType.UNCOMPRESSED, null, false);
- }
-
- private Bound(String name, ValueProvider<String> filenamePrefix, String filenameSuffix,
- @Nullable String header, @Nullable String footer, int numShards,
- String shardTemplate,
- WritableByteChannelFactory writableByteChannelFactory,
- FilenamePolicy filenamePolicy,
- boolean windowedWrites) {
- super(name);
- this.header = header;
- this.footer = footer;
- this.filenamePrefix = filenamePrefix;
- this.filenameSuffix = filenameSuffix;
- this.numShards = numShards;
- this.shardTemplate = shardTemplate;
- this.writableByteChannelFactory =
- firstNonNull(writableByteChannelFactory, FileBasedSink.CompressionType.UNCOMPRESSED);
- this.filenamePolicy = filenamePolicy;
- this.windowedWrites = windowedWrites;
- }
-
- /**
- * Returns a transform for writing to text files that's like this one but
- * that writes to the file(s) with the given filename prefix.
- *
- * <p>See {@link TextIO.Write#to(String) Write.to(String)} for more information.
- *
- * <p>Does not modify this object.
- */
- public Bound to(String filenamePrefix) {
- validateOutputComponent(filenamePrefix);
- return new Bound(name, StaticValueProvider.of(filenamePrefix), filenameSuffix,
- header, footer, numShards, shardTemplate,
- writableByteChannelFactory, filenamePolicy, windowedWrites);
- }
-
- /**
- * Like {@link #to(String)}, but with a {@link ValueProvider}.
- */
- public Bound to(ValueProvider<String> filenamePrefix) {
- return new Bound(name, filenamePrefix, filenameSuffix, header, footer, numShards,
- shardTemplate, writableByteChannelFactory, filenamePolicy, windowedWrites);
- }
-
- /**
- * Like {@link #to(String)}, but with a {@link FilenamePolicy}.
- */
- public Bound to(FilenamePolicy filenamePolicy) {
- return new Bound(name, filenamePrefix, filenameSuffix, header, footer, numShards,
- shardTemplate, writableByteChannelFactory, filenamePolicy, windowedWrites);
- }
-
- /**
- * Returns a transform for writing to text files that that's like this one but
- * that writes to the file(s) with the given filename suffix.
- *
- * <p>Does not modify this object.
- *
- * @see ShardNameTemplate
- */
- public Bound withSuffix(String nameExtension) {
- validateOutputComponent(nameExtension);
- return new Bound(name, filenamePrefix, nameExtension, header, footer, numShards,
- shardTemplate, writableByteChannelFactory, filenamePolicy, windowedWrites);
- }
-
- /**
- * Returns a transform for writing to text files that's like this one but
- * that uses the provided shard count.
- *
- * <p>Constraining the number of shards is likely to reduce
- * the performance of a pipeline. Setting this value is not recommended
- * unless you require a specific number of output files.
- *
- * <p>Does not modify this object.
- *
- * @param numShards the number of shards to use, or 0 to let the system
- * decide.
- * @see ShardNameTemplate
- */
- public Bound withNumShards(int numShards) {
- checkArgument(numShards >= 0);
- return new Bound(name, filenamePrefix, filenameSuffix, header, footer, numShards,
- shardTemplate, writableByteChannelFactory, filenamePolicy, windowedWrites);
- }
-
- /**
- * Returns a transform for writing to text files that's like this one but
- * that uses the given shard name template.
- *
- * <p>Does not modify this object.
- *
- * @see ShardNameTemplate
- */
- public Bound withShardNameTemplate(String shardTemplate) {
- return new Bound(name, filenamePrefix, filenameSuffix, header, footer, numShards,
- shardTemplate, writableByteChannelFactory, filenamePolicy, windowedWrites);
- }
-
- /**
- * Returns a transform for writing to text files that's like this one but
- * that forces a single file as output.
- *
- * <p>Constraining the number of shards is likely to reduce
- * the performance of a pipeline. Using this setting is not recommended
- * unless you truly require a single output file.
- *
- * <p>This is a shortcut for
- * {@code .withNumShards(1).withShardNameTemplate("")}
- *
- * <p>Does not modify this object.
- */
- public Bound withoutSharding() {
- return new Bound(name, filenamePrefix, filenameSuffix, header, footer, 1, "",
- writableByteChannelFactory, filenamePolicy, windowedWrites);
- }
+ public Write withWindowedWrites() {
+ return new Write(name, filenamePrefix, filenameSuffix, header, footer, numShards,
+ shardTemplate, writableByteChannelFactory, filenamePolicy, true);
+ }
- /**
- * Returns a transform for writing to text files that adds a header string to the files
- * it writes. Note that a newline character will be added after the header.
- *
- * <p>A {@code null} value will clear any previously configured header.
- *
- * <p>Does not modify this object.
- *
- * @param header the string to be added as file header
- */
- public Bound withHeader(@Nullable String header) {
- return new Bound(name, filenamePrefix, filenameSuffix, header, footer, numShards,
- shardTemplate, writableByteChannelFactory, filenamePolicy, windowedWrites);
+ @Override
+ public PDone expand(PCollection<String> input) {
+ if (filenamePolicy == null && filenamePrefix == null) {
+ throw new IllegalStateException(
+ "need to set the filename prefix of an TextIO.Write transform");
}
-
- /**
- * Returns a transform for writing to text files that adds a footer string to the files
- * it writes. Note that a newline character will be added after the header.
- *
- * <p>A {@code null} value will clear any previously configured footer.
- *
- * <p>Does not modify this object.
- *
- * @param footer the string to be added as file footer
- */
- public Bound withFooter(@Nullable String footer) {
- return new Bound(name, filenamePrefix, filenameSuffix, header, footer, numShards,
- shardTemplate, writableByteChannelFactory, filenamePolicy, windowedWrites);
+ if (filenamePolicy != null && filenamePrefix != null) {
+ throw new IllegalStateException(
+ "cannot set both a filename policy and a filename prefix");
}
-
- /**
- * Returns a transform for writing to text files like this one but that has the given
- * {@link WritableByteChannelFactory} to be used by the {@link FileBasedSink} during output.
- * The default is value is {@link FileBasedSink.CompressionType#UNCOMPRESSED}.
- *
- * <p>A {@code null} value will reset the value to the default value mentioned above.
- *
- * <p>Does not modify this object.
- *
- * @param writableByteChannelFactory the factory to be used during output
- */
- public Bound withWritableByteChannelFactory(
- WritableByteChannelFactory writableByteChannelFactory) {
- return new Bound(name, filenamePrefix, filenameSuffix, header, footer, numShards,
- shardTemplate, writableByteChannelFactory, filenamePolicy, windowedWrites);
+ WriteFiles<String> write = null;
+ if (filenamePolicy != null) {
+ write = WriteFiles.to(
+ new TextSink(filenamePolicy, header, footer, writableByteChannelFactory));
+ } else {
+ write = WriteFiles.to(
+ new TextSink(filenamePrefix, filenameSuffix, header, footer, shardTemplate,
+ writableByteChannelFactory));
}
-
- public Bound withWindowedWrites() {
- return new Bound(name, filenamePrefix, filenameSuffix, header, footer, numShards,
- shardTemplate, writableByteChannelFactory, filenamePolicy, true);
+ if (getNumShards() > 0) {
+ write = write.withNumShards(getNumShards());
}
-
- @Override
- public PDone expand(PCollection<String> input) {
- if (filenamePolicy == null && filenamePrefix == null) {
- throw new IllegalStateException(
- "need to set the filename prefix of an TextIO.Write transform");
- }
- if (filenamePolicy != null && filenamePrefix != null) {
- throw new IllegalStateException(
- "cannot set both a filename policy and a filename prefix");
- }
- WriteFiles<String> write = null;
- if (filenamePolicy != null) {
- write = WriteFiles.to(
- new TextSink(filenamePolicy, header, footer, writableByteChannelFactory));
- } else {
- write = WriteFiles.to(
- new TextSink(filenamePrefix, filenameSuffix, header, footer, shardTemplate,
- writableByteChannelFactory));
- }
- if (getNumShards() > 0) {
- write = write.withNumShards(getNumShards());
- }
- if (windowedWrites) {
- write = write.withWindowedWrites();
- }
- return input.apply("WriteFiles", write);
+ if (windowedWrites) {
+ write = write.withWindowedWrites();
}
+ return input.apply("WriteFiles", write);
+ }
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- super.populateDisplayData(builder);
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ super.populateDisplayData(builder);
- String prefixString = "";
- if (filenamePrefix != null) {
- prefixString = filenamePrefix.isAccessible()
- ? filenamePrefix.get() : filenamePrefix.toString();
- }
- builder
- .addIfNotNull(DisplayData.item("filePrefix", prefixString)
- .withLabel("Output File Prefix"))
- .addIfNotDefault(DisplayData.item("fileSuffix", filenameSuffix)
- .withLabel("Output File Suffix"), "")
- .addIfNotDefault(DisplayData.item("shardNameTemplate", shardTemplate)
- .withLabel("Output Shard Name Template"),
- DEFAULT_SHARD_TEMPLATE)
- .addIfNotDefault(DisplayData.item("numShards", numShards)
- .withLabel("Maximum Output Shards"), 0)
- .addIfNotNull(DisplayData.item("fileHeader", header)
- .withLabel("File Header"))
- .addIfNotNull(DisplayData.item("fileFooter", footer)
- .withLabel("File Footer"))
- .add(DisplayData
- .item("writableByteChannelFactory", writableByteChannelFactory.toString())
- .withLabel("Compression/Transformation Type"));
+ String prefixString = "";
+ if (filenamePrefix != null) {
+ prefixString = filenamePrefix.isAccessible()
+ ? filenamePrefix.get() : filenamePrefix.toString();
}
+ builder
+ .addIfNotNull(DisplayData.item("filePrefix", prefixString)
+ .withLabel("Output File Prefix"))
+ .addIfNotDefault(DisplayData.item("fileSuffix", filenameSuffix)
+ .withLabel("Output File Suffix"), "")
+ .addIfNotDefault(DisplayData.item("shardNameTemplate", shardTemplate)
+ .withLabel("Output Shard Name Template"),
+ DEFAULT_SHARD_TEMPLATE)
+ .addIfNotDefault(DisplayData.item("numShards", numShards)
+ .withLabel("Maximum Output Shards"), 0)
+ .addIfNotNull(DisplayData.item("fileHeader", header)
+ .withLabel("File Header"))
+ .addIfNotNull(DisplayData.item("fileFooter", footer)
+ .withLabel("File Footer"))
+ .add(DisplayData
+ .item("writableByteChannelFactory", writableByteChannelFactory.toString())
+ .withLabel("Compression/Transformation Type"));
+ }
- /**
- * Returns the current shard name template string.
- */
- public String getShardNameTemplate() {
- return shardTemplate;
- }
+ /**
+ * Returns the current shard name template string.
+ */
+ public String getShardNameTemplate() {
+ return shardTemplate;
+ }
- @Override
- protected Coder<Void> getDefaultOutputCoder() {
- return VoidCoder.of();
- }
+ @Override
+ protected Coder<Void> getDefaultOutputCoder() {
+ return VoidCoder.of();
+ }
- public String getFilenamePrefix() {
- return filenamePrefix.get();
- }
+ public String getFilenamePrefix() {
+ return filenamePrefix.get();
+ }
- public String getShardTemplate() {
- return shardTemplate;
- }
+ public String getShardTemplate() {
+ return shardTemplate;
+ }
- public int getNumShards() {
- return numShards;
- }
+ public int getNumShards() {
+ return numShards;
+ }
- public String getFilenameSuffix() {
- return filenameSuffix;
- }
+ public String getFilenameSuffix() {
+ return filenameSuffix;
+ }
- @Nullable
- public String getHeader() {
- return header;
- }
+ @Nullable
+ public String getHeader() {
+ return header;
+ }
- @Nullable
- public String getFooter() {
- return footer;
- }
+ @Nullable
+ public String getFooter() {
+ return footer;
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/sdks/java/core/src/main/java/org/apache/beam/sdk/io/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/package-info.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/package-info.java
index c65d7dd..3fc8e32 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/package-info.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/package-info.java
@@ -29,7 +29,7 @@
* and {@code Write} transforms that persist PCollections to external storage:
* <pre> {@code
* PCollection<Integer> numbers = ...;
- * numbers.apply(TextIO.Write.to("gs://my_bucket/path/to/numbers"));
+ * numbers.apply(TextIO.write().to("gs://my_bucket/path/to/numbers"));
* } </pre>
*/
package org.apache.beam.sdk.io;
http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
index 8a7965c..095b69f 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
@@ -299,8 +299,8 @@ public class TextIOTest {
PCollection<String> input =
p.apply(Create.of(Arrays.asList(elems)).withCoder(StringUtf8Coder.of()));
- TextIO.Write.Bound write =
- TextIO.Write.to(baseFilename)
+ TextIO.Write write =
+ TextIO.write().to(baseFilename)
.withHeader(header)
.withFooter(footer);
@@ -463,7 +463,7 @@ public class TextIOTest {
final WritableByteChannelFactory writableByteChannelFactory =
new DrunkWritableByteChannelFactory();
- TextIO.Write.Bound write = TextIO.Write.to(baseDir.resolve(outputName).toString())
+ TextIO.Write write = TextIO.write().to(baseDir.resolve(outputName).toString())
.withoutSharding().withWritableByteChannelFactory(writableByteChannelFactory);
DisplayData displayData = DisplayData.from(write);
assertThat(displayData, hasDisplayItem("writableByteChannelFactory", "DRUNK"));
@@ -483,7 +483,7 @@ public class TextIOTest {
@Test
public void testWriteDisplayData() {
- TextIO.Write.Bound write = TextIO.Write
+ TextIO.Write write = TextIO.write()
.to("foo")
.withSuffix("bar")
.withShardNameTemplate("-SS-of-NN-")
@@ -504,7 +504,7 @@ public class TextIOTest {
@Test
public void testWriteDisplayDataValidateThenHeader() {
- TextIO.Write.Bound write = TextIO.Write
+ TextIO.Write write = TextIO.write()
.to("foo")
.withHeader("myHeader");
@@ -515,7 +515,7 @@ public class TextIOTest {
@Test
public void testWriteDisplayDataValidateThenFooter() {
- TextIO.Write.Bound write = TextIO.Write
+ TextIO.Write write = TextIO.write()
.to("foo")
.withFooter("myFooter");
@@ -534,7 +534,7 @@ public class TextIOTest {
DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
- TextIO.Write.Bound write = TextIO.Write.to(outputPath);
+ TextIO.Write write = TextIO.write().to(outputPath);
Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write);
assertThat("TextIO.Write should include the file prefix in its primitive display data",
@@ -553,7 +553,7 @@ public class TextIOTest {
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("Output name components are not allowed to contain");
- input.apply(TextIO.Write.to(filename));
+ input.apply(TextIO.write().to(filename));
}
/** Options for testing. */
@@ -573,7 +573,7 @@ public class TextIOTest {
p
.apply(TextIO.read().from(options.getInput()))
- .apply(TextIO.Write.to(options.getOutput()));
+ .apply(TextIO.write().to(options.getOutput()));
}
@Test
@@ -826,7 +826,7 @@ public class TextIOTest {
@Test
public void testTextIOGetName() {
assertEquals("TextIO.Read", TextIO.read().from("somefile").getName());
- assertEquals("TextIO.Write", TextIO.Write.to("somefile").getName());
+ assertEquals("TextIO.Write", TextIO.write().to("somefile").getName());
assertEquals("TextIO.Read", TextIO.read().from("somefile").toString());
}
http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
index 29d9774..6c3aba2 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
@@ -124,7 +124,7 @@ public class TransformTreeTest {
p.apply("ReadMyFile", TextIO.read().from(inputFile.getPath()))
.apply(sample)
.apply(Flatten.<String>iterables())
- .apply("WriteMyFile", TextIO.Write.to(outputFile.getPath()));
+ .apply("WriteMyFile", TextIO.write().to(outputFile.getPath()));
final EnumSet<TransformsSeen> visited =
EnumSet.noneOf(TransformsSeen.class);
http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/sdks/java/core/src/test/java/org/apache/beam/sdk/util/NameUtilsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/NameUtilsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/NameUtilsTest.java
index c685a63..411f913 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/NameUtilsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/NameUtilsTest.java
@@ -137,7 +137,7 @@ public class NameUtilsTest {
assertEquals(
"NameUtilsTest.SomeTransform",
NameUtils.approximatePTransformName(AutoValue_NameUtilsTest_SomeTransform.class));
- assertEquals("TextIO.Write", NameUtils.approximatePTransformName(TextIO.Write.Bound.class));
+ assertEquals("TextIO.Write", NameUtils.approximatePTransformName(TextIO.Write.class));
}
@AutoValue
http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PDoneTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PDoneTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PDoneTest.java
index 7c9d1d9..b07a5b8 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PDoneTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PDoneTest.java
@@ -71,7 +71,7 @@ public class PDoneTest {
return
begin
.apply(Create.of(LINES))
- .apply(TextIO.Write.to(filename));
+ .apply(TextIO.write().to(filename));
}
}
[2/7] beam git commit: Converts TextIO.Write to AutoValue
Posted by jk...@apache.org.
Converts TextIO.Write to AutoValue
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/681b5d64
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/681b5d64
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/681b5d64
Branch: refs/heads/master
Commit: 681b5d6400e1d00169a06860506a46053a226003
Parents: 4f5098d
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri Apr 28 17:39:01 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue May 2 12:20:14 2017 -0700
----------------------------------------------------------------------
.../runners/spark/SparkRunnerDebuggerTest.java | 2 +-
.../java/org/apache/beam/sdk/io/TextIO.java | 181 ++++++++-----------
.../java/org/apache/beam/sdk/io/TextIOTest.java | 4 +-
3 files changed, 77 insertions(+), 110 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/681b5d64/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java
index e43bc4e..ea058b2 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java
@@ -100,7 +100,7 @@ public class SparkRunnerDebuggerTest {
+ ".SparkRunnerDebuggerTest$PlusOne())\n"
+ "sparkContext.union(...)\n"
+ "_.mapPartitions(new org.apache.beam.runners.spark.examples.WordCount$FormatAsTextFn())\n"
- + "_.<org.apache.beam.sdk.io.TextIO$Write$Bound>";
+ + "_.<org.apache.beam.sdk.io.AutoValue_TextIO_Write>";
SparkRunnerDebugger.DebugSparkPipelineResult result =
(SparkRunnerDebugger.DebugSparkPipelineResult) pipeline.run();
http://git-wip-us.apache.org/repos/asf/beam/blob/681b5d64/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index 2d82572..90d56e7 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -17,7 +17,6 @@
*/
package org.apache.beam.sdk.io;
-import static com.google.common.base.MoreObjects.firstNonNull;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
@@ -123,7 +122,13 @@ public class TextIO {
* element of the input collection encoded into its own line.
*/
public static Write write() {
- return new Write();
+ return new AutoValue_TextIO_Write.Builder()
+ .setFilenameSuffix("")
+ .setNumShards(0)
+ .setShardTemplate(Write.DEFAULT_SHARD_TEMPLATE)
+ .setWritableByteChannelFactory(FileBasedSink.CompressionType.UNCOMPRESSED)
+ .setWindowedWrites(false)
+ .build();
}
/** Implementation of {@link #read}. */
@@ -237,60 +242,56 @@ public class TextIO {
/////////////////////////////////////////////////////////////////////////////
/** Implementation of {@link #write}. */
- public static class Write extends PTransform<PCollection<String>, PDone> {
+ @AutoValue
+ public abstract static class Write extends PTransform<PCollection<String>, PDone> {
private static final String DEFAULT_SHARD_TEMPLATE = ShardNameTemplate.INDEX_OF_MAX;
/** The prefix of each file written, combined with suffix and shardTemplate. */
- private final ValueProvider<String> filenamePrefix;
+ @Nullable abstract ValueProvider<String> getFilenamePrefix();
+
/** The suffix of each file written, combined with prefix and shardTemplate. */
- private final String filenameSuffix;
+ abstract String getFilenameSuffix();
/** An optional header to add to each file. */
- @Nullable private final String header;
+ @Nullable abstract String getHeader();
/** An optional footer to add to each file. */
- @Nullable private final String footer;
+ @Nullable abstract String getFooter();
/** Requested number of shards. 0 for automatic. */
- private final int numShards;
+ abstract int getNumShards();
/** The shard template of each file written, combined with prefix and suffix. */
- private final String shardTemplate;
+ abstract String getShardTemplate();
/** A policy for naming output files. */
- private final FilenamePolicy filenamePolicy;
+ @Nullable abstract FilenamePolicy getFilenamePolicy();
/** Whether to write windowed output files. */
- private boolean windowedWrites;
+ abstract boolean getWindowedWrites();
/**
* The {@link WritableByteChannelFactory} to be used by the {@link FileBasedSink}. Default is
* {@link FileBasedSink.CompressionType#UNCOMPRESSED}.
*/
- private final WritableByteChannelFactory writableByteChannelFactory;
+ abstract WritableByteChannelFactory getWritableByteChannelFactory();
- private Write() {
- this(null, null, "", null, null, 0, DEFAULT_SHARD_TEMPLATE,
- FileBasedSink.CompressionType.UNCOMPRESSED, null, false);
- }
+ abstract Builder toBuilder();
- private Write(String name, ValueProvider<String> filenamePrefix, String filenameSuffix,
- @Nullable String header, @Nullable String footer, int numShards,
- String shardTemplate,
- WritableByteChannelFactory writableByteChannelFactory,
- FilenamePolicy filenamePolicy,
- boolean windowedWrites) {
- super(name);
- this.header = header;
- this.footer = footer;
- this.filenamePrefix = filenamePrefix;
- this.filenameSuffix = filenameSuffix;
- this.numShards = numShards;
- this.shardTemplate = shardTemplate;
- this.writableByteChannelFactory =
- firstNonNull(writableByteChannelFactory, FileBasedSink.CompressionType.UNCOMPRESSED);
- this.filenamePolicy = filenamePolicy;
- this.windowedWrites = windowedWrites;
+ @AutoValue.Builder
+ abstract static class Builder {
+ abstract Builder setFilenamePrefix(ValueProvider<String> filenamePrefix);
+ abstract Builder setFilenameSuffix(String filenameSuffix);
+ abstract Builder setHeader(String header);
+ abstract Builder setFooter(String footer);
+ abstract Builder setNumShards(int numShards);
+ abstract Builder setShardTemplate(String shardTemplate);
+ abstract Builder setFilenamePolicy(FilenamePolicy filenamePolicy);
+ abstract Builder setWindowedWrites(boolean windowedWrites);
+ abstract Builder setWritableByteChannelFactory(
+ WritableByteChannelFactory writableByteChannelFactory);
+
+ abstract Write build();
}
/**
@@ -305,21 +306,17 @@ public class TextIO {
*/
public Write to(String filenamePrefix) {
validateOutputComponent(filenamePrefix);
- return new Write(name, StaticValueProvider.of(filenamePrefix), filenameSuffix,
- header, footer, numShards, shardTemplate,
- writableByteChannelFactory, filenamePolicy, windowedWrites);
+ return to(StaticValueProvider.of(filenamePrefix));
}
/** Like {@link #to(String)}, but with a {@link ValueProvider}. */
public Write to(ValueProvider<String> filenamePrefix) {
- return new Write(name, filenamePrefix, filenameSuffix, header, footer, numShards,
- shardTemplate, writableByteChannelFactory, filenamePolicy, windowedWrites);
+ return toBuilder().setFilenamePrefix(filenamePrefix).build();
}
/** Like {@link #to(String)}, but with a {@link FilenamePolicy}. */
public Write to(FilenamePolicy filenamePolicy) {
- return new Write(name, filenamePrefix, filenameSuffix, header, footer, numShards,
- shardTemplate, writableByteChannelFactory, filenamePolicy, windowedWrites);
+ return toBuilder().setFilenamePolicy(filenamePolicy).build();
}
/**
@@ -329,8 +326,7 @@ public class TextIO {
*/
public Write withSuffix(String nameExtension) {
validateOutputComponent(nameExtension);
- return new Write(name, filenamePrefix, nameExtension, header, footer, numShards,
- shardTemplate, writableByteChannelFactory, filenamePolicy, windowedWrites);
+ return toBuilder().setFilenameSuffix(nameExtension).build();
}
/**
@@ -346,8 +342,7 @@ public class TextIO {
*/
public Write withNumShards(int numShards) {
checkArgument(numShards >= 0);
- return new Write(name, filenamePrefix, filenameSuffix, header, footer, numShards,
- shardTemplate, writableByteChannelFactory, filenamePolicy, windowedWrites);
+ return toBuilder().setNumShards(numShards).build();
}
/**
@@ -356,8 +351,7 @@ public class TextIO {
* @see ShardNameTemplate
*/
public Write withShardNameTemplate(String shardTemplate) {
- return new Write(name, filenamePrefix, filenameSuffix, header, footer, numShards,
- shardTemplate, writableByteChannelFactory, filenamePolicy, windowedWrites);
+ return toBuilder().setShardTemplate(shardTemplate).build();
}
/**
@@ -371,8 +365,7 @@ public class TextIO {
* {@code .withNumShards(1).withShardNameTemplate("")}
*/
public Write withoutSharding() {
- return new Write(name, filenamePrefix, filenameSuffix, header, footer, 1, "",
- writableByteChannelFactory, filenamePolicy, windowedWrites);
+ return withNumShards(1).withShardNameTemplate("");
}
/**
@@ -381,8 +374,7 @@ public class TextIO {
* <p>A {@code null} value will clear any previously configured header.
*/
public Write withHeader(@Nullable String header) {
- return new Write(name, filenamePrefix, filenameSuffix, header, footer, numShards,
- shardTemplate, writableByteChannelFactory, filenamePolicy, windowedWrites);
+ return toBuilder().setHeader(header).build();
}
/**
@@ -391,8 +383,7 @@ public class TextIO {
* <p>A {@code null} value will clear any previously configured footer.
*/
public Write withFooter(@Nullable String footer) {
- return new Write(name, filenamePrefix, filenameSuffix, header, footer, numShards,
- shardTemplate, writableByteChannelFactory, filenamePolicy, windowedWrites);
+ return toBuilder().setFooter(footer).build();
}
/**
@@ -404,38 +395,47 @@ public class TextIO {
*/
public Write withWritableByteChannelFactory(
WritableByteChannelFactory writableByteChannelFactory) {
- return new Write(name, filenamePrefix, filenameSuffix, header, footer, numShards,
- shardTemplate, writableByteChannelFactory, filenamePolicy, windowedWrites);
+ return toBuilder().setWritableByteChannelFactory(writableByteChannelFactory).build();
}
public Write withWindowedWrites() {
- return new Write(name, filenamePrefix, filenameSuffix, header, footer, numShards,
- shardTemplate, writableByteChannelFactory, filenamePolicy, true);
+ return toBuilder().setWindowedWrites(true).build();
}
@Override
public PDone expand(PCollection<String> input) {
- if (filenamePolicy == null && filenamePrefix == null) {
+ if (getFilenamePolicy() == null && getFilenamePrefix() == null) {
throw new IllegalStateException(
"need to set the filename prefix of an TextIO.Write transform");
}
- if (filenamePolicy != null && filenamePrefix != null) {
+ if (getFilenamePolicy() != null && getFilenamePrefix() != null) {
throw new IllegalStateException(
"cannot set both a filename policy and a filename prefix");
}
WriteFiles<String> write = null;
- if (filenamePolicy != null) {
- write = WriteFiles.to(
- new TextSink(filenamePolicy, header, footer, writableByteChannelFactory));
+ if (getFilenamePolicy() != null) {
+ write =
+ WriteFiles.to(
+ new TextSink(
+ getFilenamePolicy(),
+ getHeader(),
+ getFooter(),
+ getWritableByteChannelFactory()));
} else {
- write = WriteFiles.to(
- new TextSink(filenamePrefix, filenameSuffix, header, footer, shardTemplate,
- writableByteChannelFactory));
+ write =
+ WriteFiles.to(
+ new TextSink(
+ getFilenamePrefix(),
+ getFilenameSuffix(),
+ getHeader(),
+ getFooter(),
+ getShardTemplate(),
+ getWritableByteChannelFactory()));
}
if (getNumShards() > 0) {
write = write.withNumShards(getNumShards());
}
- if (windowedWrites) {
+ if (getWindowedWrites()) {
write = write.withWindowedWrites();
}
return input.apply("WriteFiles", write);
@@ -446,66 +446,33 @@ public class TextIO {
super.populateDisplayData(builder);
String prefixString = "";
- if (filenamePrefix != null) {
- prefixString = filenamePrefix.isAccessible()
- ? filenamePrefix.get() : filenamePrefix.toString();
+ if (getFilenamePrefix() != null) {
+ prefixString = getFilenamePrefix().isAccessible()
+ ? getFilenamePrefix().get() : getFilenamePrefix().toString();
}
builder
.addIfNotNull(DisplayData.item("filePrefix", prefixString)
.withLabel("Output File Prefix"))
- .addIfNotDefault(DisplayData.item("fileSuffix", filenameSuffix)
+ .addIfNotDefault(DisplayData.item("fileSuffix", getFilenameSuffix())
.withLabel("Output File Suffix"), "")
- .addIfNotDefault(DisplayData.item("shardNameTemplate", shardTemplate)
+ .addIfNotDefault(DisplayData.item("shardNameTemplate", getShardTemplate())
.withLabel("Output Shard Name Template"),
DEFAULT_SHARD_TEMPLATE)
- .addIfNotDefault(DisplayData.item("numShards", numShards)
+ .addIfNotDefault(DisplayData.item("numShards", getNumShards())
.withLabel("Maximum Output Shards"), 0)
- .addIfNotNull(DisplayData.item("fileHeader", header)
+ .addIfNotNull(DisplayData.item("fileHeader", getHeader())
.withLabel("File Header"))
- .addIfNotNull(DisplayData.item("fileFooter", footer)
+ .addIfNotNull(DisplayData.item("fileFooter", getFooter())
.withLabel("File Footer"))
.add(DisplayData
- .item("writableByteChannelFactory", writableByteChannelFactory.toString())
+ .item("writableByteChannelFactory", getWritableByteChannelFactory().toString())
.withLabel("Compression/Transformation Type"));
}
- /**
- * Returns the current shard name template string.
- */
- public String getShardNameTemplate() {
- return shardTemplate;
- }
-
@Override
protected Coder<Void> getDefaultOutputCoder() {
return VoidCoder.of();
}
-
- public String getFilenamePrefix() {
- return filenamePrefix.get();
- }
-
- public String getShardTemplate() {
- return shardTemplate;
- }
-
- public int getNumShards() {
- return numShards;
- }
-
- public String getFilenameSuffix() {
- return filenameSuffix;
- }
-
- @Nullable
- public String getHeader() {
- return header;
- }
-
- @Nullable
- public String getFooter() {
- return footer;
- }
}
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/681b5d64/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
index 095b69f..425e0d6 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
@@ -315,7 +315,7 @@ public class TextIOTest {
p.run();
assertOutputFiles(elems, header, footer, numShards, baseDir, outputName,
- write.getShardNameTemplate());
+ write.getShardTemplate());
}
public static void assertOutputFiles(
@@ -478,7 +478,7 @@ public class TextIOTest {
drunkElems.add(elem);
}
assertOutputFiles(drunkElems.toArray(new String[0]), null, null, 1, baseDir,
- outputName + writableByteChannelFactory.getFilenameSuffix(), write.getShardNameTemplate());
+ outputName + writableByteChannelFactory.getFilenameSuffix(), write.getShardTemplate());
}
@Test
[7/7] beam git commit: This closes #2774
Posted by jk...@apache.org.
This closes #2774
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3161904d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3161904d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3161904d
Branch: refs/heads/master
Commit: 3161904d97e9eb4ec6eb354fb54d8d0e4e5733eb
Parents: c2c89ed 7b725c2
Author: Eugene Kirpichov <ki...@google.com>
Authored: Tue May 2 12:20:24 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue May 2 12:20:24 2017 -0700
----------------------------------------------------------------------
.../beam/examples/DebuggingWordCount.java | 2 +-
.../apache/beam/examples/MinimalWordCount.java | 4 +-
.../apache/beam/examples/WindowedWordCount.java | 2 +-
.../org/apache/beam/examples/WordCount.java | 4 +-
.../examples/common/WriteOneFilePerWindow.java | 2 +-
.../beam/examples/complete/AutoComplete.java | 2 +-
.../examples/complete/StreamingWordExtract.java | 2 +-
.../apache/beam/examples/complete/TfIdf.java | 4 +-
.../examples/complete/TopWikipediaSessions.java | 4 +-
.../examples/complete/TrafficMaxLaneFlow.java | 2 +-
.../beam/examples/complete/TrafficRoutes.java | 2 +-
.../beam/examples/cookbook/DistinctExample.java | 4 +-
.../beam/examples/cookbook/JoinExamples.java | 2 +-
.../beam/examples/cookbook/TriggerExample.java | 2 +-
.../beam/examples/MinimalWordCountJava8.java | 4 +-
.../examples/complete/game/HourlyTeamScore.java | 2 +-
.../beam/examples/complete/game/UserScore.java | 2 +-
.../examples/MinimalWordCountJava8Test.java | 4 +-
.../runners/apex/examples/WordCountTest.java | 4 +-
.../direct/WriteWithShardingFactoryTest.java | 2 +-
.../beam/runners/flink/ReadSourceITCase.java | 2 +-
.../flink/ReadSourceStreamingITCase.java | 2 +-
.../flink/streaming/GroupByNullKeyTest.java | 2 +-
.../streaming/TopWikipediaSessionsITCase.java | 2 +-
.../DataflowPipelineTranslatorTest.java | 16 +-
.../runners/dataflow/DataflowRunnerTest.java | 23 +-
.../beam/runners/spark/examples/WordCount.java | 4 +-
.../runners/spark/SparkRunnerDebuggerTest.java | 4 +-
.../beam/runners/spark/io/NumShardsTest.java | 3 +-
.../main/java/org/apache/beam/sdk/Pipeline.java | 6 +-
.../apache/beam/sdk/io/ShardNameTemplate.java | 2 +-
.../java/org/apache/beam/sdk/io/TextIO.java | 1159 +++++-------------
.../java/org/apache/beam/sdk/io/TextSink.java | 139 +++
.../java/org/apache/beam/sdk/io/TextSource.java | 236 ++++
.../org/apache/beam/sdk/io/package-info.java | 2 +-
.../java/org/apache/beam/sdk/io/TextIOTest.java | 61 +-
.../beam/sdk/runners/TransformTreeTest.java | 6 +-
.../display/DisplayDataEvaluatorTest.java | 5 +-
.../sdk/transforms/windowing/WindowingTest.java | 2 +-
.../org/apache/beam/sdk/util/NameUtilsTest.java | 2 +-
.../org/apache/beam/sdk/values/PDoneTest.java | 2 +-
.../beam/sdk/io/hdfs/HadoopFileSystemTest.java | 3 +-
42 files changed, 753 insertions(+), 986 deletions(-)
----------------------------------------------------------------------
[3/7] beam git commit: Removes TextIO.Read.Bound
Posted by jk...@apache.org.
Removes TextIO.Read.Bound
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/96315203
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/96315203
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/96315203
Branch: refs/heads/master
Commit: 96315203284ff60b10210d73b81b65ea0a395544
Parents: ef4658a
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri Apr 28 17:06:46 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue May 2 12:20:14 2017 -0700
----------------------------------------------------------------------
.../beam/examples/DebuggingWordCount.java | 2 +-
.../apache/beam/examples/MinimalWordCount.java | 2 +-
.../apache/beam/examples/WindowedWordCount.java | 2 +-
.../org/apache/beam/examples/WordCount.java | 2 +-
.../beam/examples/complete/AutoComplete.java | 2 +-
.../examples/complete/StreamingWordExtract.java | 2 +-
.../apache/beam/examples/complete/TfIdf.java | 2 +-
.../examples/complete/TopWikipediaSessions.java | 2 +-
.../examples/complete/TrafficMaxLaneFlow.java | 2 +-
.../beam/examples/complete/TrafficRoutes.java | 2 +-
.../beam/examples/cookbook/DistinctExample.java | 2 +-
.../beam/examples/cookbook/TriggerExample.java | 2 +-
.../beam/examples/MinimalWordCountJava8.java | 2 +-
.../examples/complete/game/HourlyTeamScore.java | 2 +-
.../beam/examples/complete/game/UserScore.java | 2 +-
.../examples/MinimalWordCountJava8Test.java | 2 +-
.../runners/apex/examples/WordCountTest.java | 2 +-
.../DataflowPipelineTranslatorTest.java | 12 +-
.../runners/dataflow/DataflowRunnerTest.java | 15 +-
.../beam/runners/spark/examples/WordCount.java | 2 +-
.../main/java/org/apache/beam/sdk/Pipeline.java | 4 +-
.../java/org/apache/beam/sdk/io/TextIO.java | 247 ++++++++-----------
.../java/org/apache/beam/sdk/io/TextIOTest.java | 34 +--
.../beam/sdk/runners/TransformTreeTest.java | 4 +-
.../display/DisplayDataEvaluatorTest.java | 2 +-
.../sdk/transforms/windowing/WindowingTest.java | 2 +-
.../beam/sdk/io/hdfs/HadoopFileSystemTest.java | 3 +-
27 files changed, 158 insertions(+), 201 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/96315203/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java
index e6e3a92..06af209 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java
@@ -136,7 +136,7 @@ public class DebuggingWordCount {
Pipeline p = Pipeline.create(options);
PCollection<KV<String, Long>> filteredWords =
- p.apply("ReadLines", TextIO.Read.from(options.getInputFile()))
+ p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
.apply(new WordCount.CountWords())
.apply(ParDo.of(new FilterTextFn(options.getFilterPattern())));
http://git-wip-us.apache.org/repos/asf/beam/blob/96315203/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java
index cf72672..4a0c1bb 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java
@@ -75,7 +75,7 @@ public class MinimalWordCount {
// the input text (a set of Shakespeare's texts).
// This example reads a public data set consisting of the complete works of Shakespeare.
- p.apply(TextIO.Read.from("gs://apache-beam-samples/shakespeare/*"))
+ p.apply(TextIO.read().from("gs://apache-beam-samples/shakespeare/*"))
// Concept #2: Apply a ParDo transform to our PCollection of text lines. This ParDo invokes a
// DoFn (defined in-line) on each element that tokenizes the text line into individual words.
http://git-wip-us.apache.org/repos/asf/beam/blob/96315203/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java
index d88de54..5c64c53 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java
@@ -178,7 +178,7 @@ public class WindowedWordCount {
*/
PCollection<String> input = pipeline
/** Read from the GCS file. */
- .apply(TextIO.Read.from(options.getInputFile()))
+ .apply(TextIO.read().from(options.getInputFile()))
// Concept #2: Add an element timestamp, using an artificial time just to show windowing.
// See AddTimestampFn for more detail on this.
.apply(ParDo.of(new AddTimestampFn(minTimestamp, maxTimestamp)));
http://git-wip-us.apache.org/repos/asf/beam/blob/96315203/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/WordCount.java b/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
index b64d2c1..e331a86 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
@@ -176,7 +176,7 @@ public class WordCount {
// Concepts #2 and #3: Our pipeline applies the composite CountWords transform, and passes the
// static FormatAsTextFn() to the ParDo transform.
- p.apply("ReadLines", TextIO.Read.from(options.getInputFile()))
+ p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
.apply(new CountWords())
.apply(MapElements.via(new FormatAsTextFn()))
.apply("WriteCounts", TextIO.Write.to(options.getOutput()));
http://git-wip-us.apache.org/repos/asf/beam/blob/96315203/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java b/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
index e6621ce..bd69855 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
@@ -469,7 +469,7 @@ public class AutoComplete {
// Create the pipeline.
Pipeline p = Pipeline.create(options);
PCollection<KV<String, List<CompletionCandidate>>> toWrite = p
- .apply(TextIO.Read.from(options.getInputFile()))
+ .apply(TextIO.read().from(options.getInputFile()))
.apply(ParDo.of(new ExtractHashtags()))
.apply(Window.<String>into(windowFn))
.apply(ComputeTopCompletions.top(10, options.getRecursive()));
http://git-wip-us.apache.org/repos/asf/beam/blob/96315203/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java b/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java
index 21a9849..f35d67a 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java
@@ -132,7 +132,7 @@ public class StreamingWordExtract {
.append(options.getBigQueryTable())
.toString();
pipeline
- .apply("ReadLines", TextIO.Read.from(options.getInputFile()))
+ .apply("ReadLines", TextIO.read().from(options.getInputFile()))
.apply(ParDo.of(new ExtractWords()))
.apply(ParDo.of(new Uppercase()))
.apply(ParDo.of(new StringToRowConverter()))
http://git-wip-us.apache.org/repos/asf/beam/blob/96315203/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
index 2e1be90..1ef69c0 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
@@ -179,7 +179,7 @@ public class TfIdf {
}
PCollection<KV<URI, String>> oneUriToLines = pipeline
- .apply("TextIO.Read(" + uriString + ")", TextIO.Read.from(uriString))
+ .apply("TextIO.Read(" + uriString + ")", TextIO.read().from(uriString))
.apply("WithKeys(" + uriString + ")", WithKeys.<URI, String>of(uri));
urisToLines = urisToLines.and(oneUriToLines);
http://git-wip-us.apache.org/repos/asf/beam/blob/96315203/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java
index 4c07ca4..bb8c8bc 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java
@@ -207,7 +207,7 @@ public class TopWikipediaSessions {
double samplingThreshold = 0.1;
- p.apply(TextIO.Read.from(options.getInput()))
+ p.apply(TextIO.read().from(options.getInput()))
.apply(MapElements.via(new ParseTableRowJson()))
.apply(new ComputeTopSessions(samplingThreshold))
.apply("Write", TextIO.Write.withoutSharding().to(options.getOutput()));
http://git-wip-us.apache.org/repos/asf/beam/blob/96315203/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java
index e57da93..d7c933e 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java
@@ -291,7 +291,7 @@ public class TrafficMaxLaneFlow {
@Override
public PCollection<String> expand(PBegin begin) {
return begin
- .apply(TextIO.Read.from(inputFile))
+ .apply(TextIO.read().from(inputFile))
.apply(ParDo.of(new ExtractTimestamps()));
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/96315203/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java
index b1f938b..c9ba18c 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java
@@ -301,7 +301,7 @@ public class TrafficRoutes {
@Override
public PCollection<String> expand(PBegin begin) {
return begin
- .apply(TextIO.Read.from(inputFile))
+ .apply(TextIO.read().from(inputFile))
.apply(ParDo.of(new ExtractTimestamps()));
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/96315203/examples/java/src/main/java/org/apache/beam/examples/cookbook/DistinctExample.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/DistinctExample.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/DistinctExample.java
index 9670b7f..20c8fa0 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/DistinctExample.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/DistinctExample.java
@@ -87,7 +87,7 @@ public class DistinctExample {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
Pipeline p = Pipeline.create(options);
- p.apply("ReadLines", TextIO.Read.from(options.getInput()))
+ p.apply("ReadLines", TextIO.read().from(options.getInput()))
.apply(Distinct.<String>create())
.apply("DedupedShakespeare", TextIO.Write.to(options.getOutput()));
http://git-wip-us.apache.org/repos/asf/beam/blob/96315203/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java
index 49d5eda..e7596aa 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java
@@ -446,7 +446,7 @@ public class TriggerExample {
options.getBigQueryDataset(), options.getBigQueryTable());
PCollectionList<TableRow> resultList = pipeline
- .apply("ReadMyFile", TextIO.Read.from(options.getInput()))
+ .apply("ReadMyFile", TextIO.read().from(options.getInput()))
.apply("InsertRandomDelays", ParDo.of(new InsertDelays()))
.apply(ParDo.of(new ExtractFlowInfo()))
.apply(new CalculateTotalFlow(options.getWindowDuration()));
http://git-wip-us.apache.org/repos/asf/beam/blob/96315203/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java b/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java
index 6badb75..6dabadc 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java
@@ -55,7 +55,7 @@ public class MinimalWordCountJava8 {
Pipeline p = Pipeline.create(options);
- p.apply(TextIO.Read.from("gs://apache-beam-samples/shakespeare/*"))
+ p.apply(TextIO.read().from("gs://apache-beam-samples/shakespeare/*"))
.apply(FlatMapElements
.into(TypeDescriptors.strings())
.via((String word) -> Arrays.asList(word.split("[^\\p{L}]+"))))
http://git-wip-us.apache.org/repos/asf/beam/blob/96315203/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java
index b905d61..3f1ffb0 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java
@@ -155,7 +155,7 @@ public class HourlyTeamScore extends UserScore {
final Instant startMinTimestamp = new Instant(minFmt.parseMillis(options.getStartMin()));
// Read 'gaming' events from a text file.
- pipeline.apply(TextIO.Read.from(options.getInput()))
+ pipeline.apply(TextIO.read().from(options.getInput()))
// Parse the incoming data.
.apply("ParseGameEvent", ParDo.of(new ParseEventFn()))
http://git-wip-us.apache.org/repos/asf/beam/blob/96315203/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
index 0adaabc..c136c2e 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
@@ -226,7 +226,7 @@ public class UserScore {
Pipeline pipeline = Pipeline.create(options);
// Read events from a text file and parse them.
- pipeline.apply(TextIO.Read.from(options.getInput()))
+ pipeline.apply(TextIO.read().from(options.getInput()))
.apply("ParseGameEvent", ParDo.of(new ParseEventFn()))
// Extract and sum username/score pairs from the event data.
.apply("ExtractUserScore", new ExtractAndSumScore("user"))
http://git-wip-us.apache.org/repos/asf/beam/blob/96315203/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java b/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java
index f3becf9..689005a 100644
--- a/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java
+++ b/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java
@@ -62,7 +62,7 @@ public class MinimalWordCountJava8Test implements Serializable {
public void testMinimalWordCountJava8() throws Exception {
p.getOptions().as(GcsOptions.class).setGcsUtil(buildMockGcsUtil());
- p.apply(TextIO.Read.from("gs://apache-beam-samples/shakespeare/*"))
+ p.apply(TextIO.read().from("gs://apache-beam-samples/shakespeare/*"))
.apply(FlatMapElements
.into(TypeDescriptors.strings())
.via((String word) -> Arrays.asList(word.split("[^a-zA-Z']+"))))
http://git-wip-us.apache.org/repos/asf/beam/blob/96315203/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
index b980715..b0fab0b 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
@@ -106,7 +106,7 @@ public class WordCountTest {
WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
.as(WordCountOptions.class);
Pipeline p = Pipeline.create(options);
- p.apply("ReadLines", TextIO.Read.from(options.getInputFile()))
+ p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
.apply(ParDo.of(new ExtractWordsFn()))
.apply(Count.<String>perElement())
.apply(ParDo.of(new FormatAsStringFn()))
http://git-wip-us.apache.org/repos/asf/beam/blob/96315203/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index fcd23cf..d47da45 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -137,7 +137,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
// Enable the FileSystems API to know about gs:// URIs in this test.
FileSystems.setDefaultConfigInWorkers(options);
- p.apply("ReadMyFile", TextIO.Read.from("gs://bucket/object"))
+ p.apply("ReadMyFile", TextIO.read().from("gs://bucket/object"))
.apply("WriteMyFile", TextIO.Write.to("gs://bucket/object"));
DataflowRunner runner = DataflowRunner.fromOptions(options);
runner.replaceTransforms(p);
@@ -465,7 +465,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
// Create a pipeline that the predefined step will be embedded into
Pipeline pipeline = Pipeline.create(options);
- pipeline.apply("ReadMyFile", TextIO.Read.from("gs://bucket/in"))
+ pipeline.apply("ReadMyFile", TextIO.read().from("gs://bucket/in"))
.apply(ParDo.of(new NoOpFn()))
.apply(new EmbeddedTransform(predefinedStep.clone()))
.apply(ParDo.of(new NoOpFn()));
@@ -523,7 +523,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options);
Pipeline pipeline = Pipeline.create(options);
String stepName = "DoFn1";
- pipeline.apply("ReadMyFile", TextIO.Read.from("gs://bucket/in"))
+ pipeline.apply("ReadMyFile", TextIO.read().from("gs://bucket/in"))
.apply(stepName, ParDo.of(new NoOpFn()))
.apply("WriteMyFile", TextIO.Write.to("gs://bucket/out"));
DataflowRunner runner = DataflowRunner.fromOptions(options);
@@ -723,7 +723,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
}
private void applyRead(Pipeline pipeline, String path) {
- pipeline.apply("Read(" + path + ")", TextIO.Read.from(path));
+ pipeline.apply("Read(" + path + ")", TextIO.read().from(path));
}
/**
@@ -736,7 +736,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
Pipeline pipeline = Pipeline.create(options);
DataflowPipelineTranslator t = DataflowPipelineTranslator.fromOptions(options);
- pipeline.apply(TextIO.Read.from("gs://bucket/foo**/baz"));
+ pipeline.apply(TextIO.read().from("gs://bucket/foo**/baz"));
// Check that translation does fail.
thrown.expectCause(allOf(
@@ -766,7 +766,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
Pipeline pipeline = Pipeline.create(options);
DataflowPipelineTranslator t = DataflowPipelineTranslator.fromOptions(options);
- pipeline.apply(TextIO.Read.from(new TestValueProvider()));
+ pipeline.apply(TextIO.read().from(new TestValueProvider()));
// Check that translation does not fail.
t.translate(
http://git-wip-us.apache.org/repos/asf/beam/blob/96315203/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index 480591e..7261fe9 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -70,7 +70,6 @@ import org.apache.beam.sdk.extensions.gcp.auth.NoopCredentialFactory;
import org.apache.beam.sdk.extensions.gcp.auth.TestCredential;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.io.TextIO.Read;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptions.CheckEnabled;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -172,7 +171,7 @@ public class DataflowRunnerTest {
options.setRunner(DataflowRunner.class);
Pipeline p = Pipeline.create(options);
- p.apply("ReadMyFile", TextIO.Read.from("gs://bucket/object"))
+ p.apply("ReadMyFile", TextIO.read().from("gs://bucket/object"))
.apply("WriteMyFile", TextIO.Write.to("gs://bucket/object"));
// Enable the FileSystems API to know about gs:// URIs in this test.
@@ -335,7 +334,7 @@ public class DataflowRunnerTest {
RuntimeTestOptions options = dataflowOptions.as(RuntimeTestOptions.class);
Pipeline p = buildDataflowPipeline(dataflowOptions);
p
- .apply(TextIO.Read.from(options.getInput()))
+ .apply(TextIO.read().from(options.getInput()))
.apply(TextIO.Write.to(options.getOutput()));
}
@@ -347,7 +346,7 @@ public class DataflowRunnerTest {
DataflowPipelineOptions dataflowOptions = buildPipelineOptions();
RuntimeTestOptions options = dataflowOptions.as(RuntimeTestOptions.class);
Pipeline p = buildDataflowPipeline(dataflowOptions);
- PCollection<String> unconsumed = p.apply(Read.from(options.getInput()));
+ PCollection<String> unconsumed = p.apply(TextIO.read().from(options.getInput()));
DataflowRunner.fromOptions(dataflowOptions).replaceTransforms(p);
final AtomicBoolean unconsumedSeenAsInput = new AtomicBoolean();
p.traverseTopologically(new PipelineVisitor.Defaults() {
@@ -570,7 +569,7 @@ public class DataflowRunnerTest {
@Test
public void testNonGcsFilePathInReadFailure() throws IOException {
Pipeline p = buildDataflowPipeline(buildPipelineOptions());
- p.apply("ReadMyNonGcsFile", TextIO.Read.from(tmpFolder.newFile().getPath()));
+ p.apply("ReadMyNonGcsFile", TextIO.read().from(tmpFolder.newFile().getPath()));
thrown.expectCause(Matchers.allOf(
instanceOf(IllegalArgumentException.class),
@@ -587,7 +586,7 @@ public class DataflowRunnerTest {
public void testNonGcsFilePathInWriteFailure() throws IOException {
Pipeline p = buildDataflowPipeline(buildPipelineOptions());
- p.apply("ReadMyGcsFile", TextIO.Read.from("gs://bucket/object"))
+ p.apply("ReadMyGcsFile", TextIO.read().from("gs://bucket/object"))
.apply("WriteMyNonGcsFile", TextIO.Write.to("/tmp/file"));
thrown.expect(IllegalArgumentException.class);
@@ -598,7 +597,7 @@ public class DataflowRunnerTest {
@Test
public void testMultiSlashGcsFileReadPath() throws IOException {
Pipeline p = buildDataflowPipeline(buildPipelineOptions());
- p.apply("ReadInvalidGcsFile", TextIO.Read.from("gs://bucket/tmp//file"));
+ p.apply("ReadInvalidGcsFile", TextIO.read().from("gs://bucket/tmp//file"));
thrown.expectCause(Matchers.allOf(
instanceOf(IllegalArgumentException.class),
@@ -613,7 +612,7 @@ public class DataflowRunnerTest {
@Test
public void testMultiSlashGcsFileWritePath() throws IOException {
Pipeline p = buildDataflowPipeline(buildPipelineOptions());
- PCollection<String> pc = p.apply("ReadMyGcsFile", TextIO.Read.from("gs://bucket/object"));
+ PCollection<String> pc = p.apply("ReadMyGcsFile", TextIO.read().from("gs://bucket/object"));
pc.apply("WriteInvalidGcsFile", TextIO.Write.to("gs://bucket/tmp//file"));
thrown.expect(IllegalArgumentException.class);
http://git-wip-us.apache.org/repos/asf/beam/blob/96315203/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
index 0e6faad..2bcf140 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
@@ -125,7 +125,7 @@ public class WordCount {
// Concepts #2 and #3: Our pipeline applies the composite CountWords transform, and passes the
// static FormatAsTextFn() to the ParDo transform.
- p.apply("ReadLines", TextIO.Read.from(options.getInputFile()))
+ p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
.apply(new CountWords())
.apply(MapElements.via(new FormatAsTextFn()))
.apply("WriteCounts", TextIO.Write.to(options.getOutput()));
http://git-wip-us.apache.org/repos/asf/beam/blob/96315203/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
index d578a7a..7cb9386 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
@@ -79,11 +79,11 @@ import org.slf4j.LoggerFactory;
* // A root PTransform, like TextIO.Read or Create, gets added
* // to the Pipeline by being applied:
* PCollection<String> lines =
- * p.apply(TextIO.Read.from("gs://bucket/dir/file*.txt"));
+ * p.apply(TextIO.read().from("gs://bucket/dir/file*.txt"));
*
* // A Pipeline can have multiple root transforms:
* PCollection<String> moreLines =
- * p.apply(TextIO.Read.from("gs://bucket/other/dir/file*.txt"));
+ * p.apply(TextIO.read().from("gs://bucket/other/dir/file*.txt"));
* PCollection<String> yetMoreLines =
* p.apply(Create.of("yet", "more", "lines").withCoder(StringUtf8Coder.of()));
*
http://git-wip-us.apache.org/repos/asf/beam/blob/96315203/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index f735019..31d2c3d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -73,7 +73,7 @@ import org.apache.beam.sdk.values.PDone;
*
* // A simple Read of a local file (only runs locally):
* PCollection<String> lines =
- * p.apply(TextIO.Read.from("/local/path/to/file.txt"));
+ * p.apply(TextIO.read().from("/local/path/to/file.txt"));
* }</pre>
*
* <p>To write a {@link PCollection} to one or more text files, use
@@ -109,174 +109,131 @@ import org.apache.beam.sdk.values.PDone;
*/
public class TextIO {
/**
- * A {@link PTransform} that reads from a text file (or multiple text
- * files matching a pattern) and returns a {@link PCollection} containing
- * the decoding of each of the lines of the text file(s) as a {@link String}.
+ * Reads from one or more text files and returns a bounded {@link PCollection} containing one
+ * element for each line of the input files.
*/
- public static class Read {
+ public static Read read() {
+ return new Read();
+ }
- /**
- * Returns a transform for reading text files that reads from the file(s)
- * with the given filename or filename pattern. This can be a local path (if running locally),
- * or a Google Cloud Storage filename or filename pattern of the form
- * {@code "gs://<bucket>/<filepath>"} (if running locally or using remote execution)
- * service). Standard <a href="http://docs.oracle.com/javase/tutorial/essential/io/find.html"
- * >Java Filesystem glob patterns</a> ("*", "?", "[..]") are supported.
- */
- public static Bound from(String filepattern) {
- return new Bound().from(filepattern);
+ /** Implementation of {@link #read}. */
+ public static class Read extends PTransform<PBegin, PCollection<String>> {
+ /** The filepattern to read from. */
+ @Nullable private final ValueProvider<String> filepattern;
+
+ /** Option to indicate the input source's compression type. Default is AUTO. */
+ private final TextIO.CompressionType compressionType;
+
+ private Read() {
+ this(null, null, TextIO.CompressionType.AUTO);
}
- /**
- * Same as {@code from(filepattern)}, but accepting a {@link ValueProvider}.
- */
- public static Bound from(ValueProvider<String> filepattern) {
- return new Bound().from(filepattern);
+ private Read(
+ @Nullable String name,
+ @Nullable ValueProvider<String> filepattern,
+ TextIO.CompressionType compressionType) {
+ super(name);
+ this.filepattern = filepattern;
+ this.compressionType = compressionType;
}
/**
- * Returns a transform for reading text files that decompresses all input files
- * using the specified compression type.
+ * Reads text files that reads from the file(s) with the given filename or filename pattern.
*
- * <p>If no compression type is specified, the default is {@link TextIO.CompressionType#AUTO}.
- * In this mode, the compression type of the file is determined by its extension
- * (e.g., {@code *.gz} is gzipped, {@code *.bz2} is bzipped, and all other extensions are
- * uncompressed).
+ * <p>This can be a local path (if running locally), or a Google Cloud Storage filename or
+ * filename pattern of the form {@code "gs://<bucket>/<filepath>"} (if running locally or using
+ * remote execution service).
+ *
+ * <p>Standard <a href="http://docs.oracle.com/javase/tutorial/essential/io/find.html" >Java
+ * Filesystem glob patterns</a> ("*", "?", "[..]") are supported.
*/
- public static Bound withCompressionType(TextIO.CompressionType compressionType) {
- return new Bound().withCompressionType(compressionType);
+ public Read from(String filepattern) {
+ checkNotNull(filepattern, "Filepattern cannot be empty.");
+ return new Read(name, StaticValueProvider.of(filepattern), compressionType);
}
- // TODO: strippingNewlines, etc.
+ /** Same as {@code from(filepattern)}, but accepting a {@link ValueProvider}. */
+ public Read from(ValueProvider<String> filepattern) {
+ checkNotNull(filepattern, "Filepattern cannot be empty.");
+ return new Read(name, filepattern, compressionType);
+ }
/**
- * A {@link PTransform} that reads from one or more text files and returns a bounded
- * {@link PCollection} containing one element for each line of the input files.
+ * Returns a new transform for reading from text files that's like this one but
+ * reads from input sources using the specified compression type.
+ *
+ * <p>If no compression type is specified, the default is {@link TextIO.CompressionType#AUTO}.
*/
- public static class Bound extends PTransform<PBegin, PCollection<String>> {
- /** The filepattern to read from. */
- @Nullable private final ValueProvider<String> filepattern;
-
- /** Option to indicate the input source's compression type. Default is AUTO. */
- private final TextIO.CompressionType compressionType;
-
- private Bound() {
- this(null, null, TextIO.CompressionType.AUTO);
- }
-
- private Bound(
- @Nullable String name,
- @Nullable ValueProvider<String> filepattern,
- TextIO.CompressionType compressionType) {
- super(name);
- this.filepattern = filepattern;
- this.compressionType = compressionType;
- }
-
- /**
- * Returns a new transform for reading from text files that's like this one but
- * that reads from the file(s) with the given name or pattern. See {@link TextIO.Read#from}
- * for a description of filepatterns.
- *
- * <p>Does not modify this object.
-
- */
- public Bound from(String filepattern) {
- checkNotNull(filepattern, "Filepattern cannot be empty.");
- return new Bound(name, StaticValueProvider.of(filepattern), compressionType);
- }
-
- /**
- * Same as {@code from(filepattern)}, but accepting a {@link ValueProvider}.
- */
- public Bound from(ValueProvider<String> filepattern) {
- checkNotNull(filepattern, "Filepattern cannot be empty.");
- return new Bound(name, filepattern, compressionType);
- }
-
- /**
- * Returns a new transform for reading from text files that's like this one but
- * reads from input sources using the specified compression type.
- *
- * <p>If no compression type is specified, the default is {@link TextIO.CompressionType#AUTO}.
- * See {@link TextIO.Read#withCompressionType} for more details.
- *
- * <p>Does not modify this object.
- */
- public Bound withCompressionType(TextIO.CompressionType compressionType) {
- return new Bound(name, filepattern, compressionType);
- }
-
- @Override
- public PCollection<String> expand(PBegin input) {
- if (filepattern == null) {
- throw new IllegalStateException("need to set the filepattern of a TextIO.Read transform");
- }
-
- final Bounded<String> read = org.apache.beam.sdk.io.Read.from(getSource());
- PCollection<String> pcol = input.getPipeline().apply("Read", read);
- // Honor the default output coder that would have been used by this PTransform.
- pcol.setCoder(getDefaultOutputCoder());
- return pcol;
- }
+ public Read withCompressionType(TextIO.CompressionType compressionType) {
+ return new Read(name, filepattern, compressionType);
+ }
- // Helper to create a source specific to the requested compression type.
- protected FileBasedSource<String> getSource() {
- switch (compressionType) {
- case UNCOMPRESSED:
- return new TextSource(filepattern);
- case AUTO:
- return CompressedSource.from(new TextSource(filepattern));
- case BZIP2:
- return
- CompressedSource.from(new TextSource(filepattern))
- .withDecompression(CompressedSource.CompressionMode.BZIP2);
- case GZIP:
- return
- CompressedSource.from(new TextSource(filepattern))
- .withDecompression(CompressedSource.CompressionMode.GZIP);
- case ZIP:
- return
- CompressedSource.from(new TextSource(filepattern))
- .withDecompression(CompressedSource.CompressionMode.ZIP);
- case DEFLATE:
- return
- CompressedSource.from(new TextSource(filepattern))
- .withDecompression(CompressedSource.CompressionMode.DEFLATE);
- default:
- throw new IllegalArgumentException("Unknown compression type: " + compressionType);
- }
+ @Override
+ public PCollection<String> expand(PBegin input) {
+ if (filepattern == null) {
+ throw new IllegalStateException("need to set the filepattern of a TextIO.Read transform");
}
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- super.populateDisplayData(builder);
+ final Bounded<String> read = org.apache.beam.sdk.io.Read.from(getSource());
+ PCollection<String> pcol = input.getPipeline().apply("Read", read);
+ // Honor the default output coder that would have been used by this PTransform.
+ pcol.setCoder(getDefaultOutputCoder());
+ return pcol;
+ }
- String filepatternDisplay = filepattern.isAccessible()
- ? filepattern.get() : filepattern.toString();
- builder
- .add(DisplayData.item("compressionType", compressionType.toString())
- .withLabel("Compression Type"))
- .addIfNotNull(DisplayData.item("filePattern", filepatternDisplay)
- .withLabel("File Pattern"));
+ // Helper to create a source specific to the requested compression type.
+ protected FileBasedSource<String> getSource() {
+ switch (compressionType) {
+ case UNCOMPRESSED:
+ return new TextSource(filepattern);
+ case AUTO:
+ return CompressedSource.from(new TextSource(filepattern));
+ case BZIP2:
+ return
+ CompressedSource.from(new TextSource(filepattern))
+ .withDecompression(CompressedSource.CompressionMode.BZIP2);
+ case GZIP:
+ return
+ CompressedSource.from(new TextSource(filepattern))
+ .withDecompression(CompressedSource.CompressionMode.GZIP);
+ case ZIP:
+ return
+ CompressedSource.from(new TextSource(filepattern))
+ .withDecompression(CompressedSource.CompressionMode.ZIP);
+ case DEFLATE:
+ return
+ CompressedSource.from(new TextSource(filepattern))
+ .withDecompression(CompressedSource.CompressionMode.DEFLATE);
+ default:
+ throw new IllegalArgumentException("Unknown compression type: " + compressionType);
}
+ }
- @Override
- protected Coder<String> getDefaultOutputCoder() {
- return StringUtf8Coder.of();
- }
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ super.populateDisplayData(builder);
+
+ String filepatternDisplay = filepattern.isAccessible()
+ ? filepattern.get() : filepattern.toString();
+ builder
+ .add(DisplayData.item("compressionType", compressionType.toString())
+ .withLabel("Compression Type"))
+ .addIfNotNull(DisplayData.item("filePattern", filepatternDisplay)
+ .withLabel("File Pattern"));
+ }
- public String getFilepattern() {
- return filepattern.get();
- }
+ @Override
+ protected Coder<String> getDefaultOutputCoder() {
+ return StringUtf8Coder.of();
+ }
- public TextIO.CompressionType getCompressionType() {
- return compressionType;
- }
+ public String getFilepattern() {
+ return filepattern.get();
}
- /** Disallow construction of utility classes. */
- private Read() {}
+ public TextIO.CompressionType getCompressionType() {
+ return compressionType;
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/96315203/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
index 2ba1797..8a7965c 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
@@ -220,7 +220,7 @@ public class TextIOTest {
}
}
- TextIO.Read.Bound read = TextIO.Read.from(filename);
+ TextIO.Read read = TextIO.read().from(filename);
PCollection<String> output = p.apply(read);
@@ -246,15 +246,15 @@ public class TextIOTest {
assertEquals(
"TextIO.Read/Read.out",
- p.apply(TextIO.Read.from("somefile")).getName());
+ p.apply(TextIO.read().from("somefile")).getName());
assertEquals(
"MyRead/Read.out",
- p.apply("MyRead", TextIO.Read.from(emptyTxt.getPath())).getName());
+ p.apply("MyRead", TextIO.read().from(emptyTxt.getPath())).getName());
}
@Test
public void testReadDisplayData() {
- TextIO.Read.Bound read = TextIO.Read
+ TextIO.Read read = TextIO.read()
.from("foo.*")
.withCompressionType(BZIP2);
@@ -269,7 +269,7 @@ public class TextIOTest {
public void testPrimitiveReadDisplayData() {
DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
- TextIO.Read.Bound read = TextIO.Read
+ TextIO.Read read = TextIO.read()
.from("foobar");
Set<DisplayData> displayData = evaluator.displayDataForPrimitiveSourceTransforms(read);
@@ -572,15 +572,15 @@ public class TextIOTest {
RuntimeTestOptions options = PipelineOptionsFactory.as(RuntimeTestOptions.class);
p
- .apply(TextIO.Read.from(options.getInput()))
+ .apply(TextIO.read().from(options.getInput()))
.apply(TextIO.Write.to(options.getOutput()));
}
@Test
public void testCompressionTypeIsSet() throws Exception {
- TextIO.Read.Bound read = TextIO.Read.from("/tmp/test");
+ TextIO.Read read = TextIO.read().from("/tmp/test");
assertEquals(AUTO, read.getCompressionType());
- read = TextIO.Read.from("/tmp/test").withCompressionType(GZIP);
+ read = TextIO.read().from("/tmp/test").withCompressionType(GZIP);
assertEquals(GZIP, read.getCompressionType());
}
@@ -597,14 +597,14 @@ public class TextIOTest {
}
/**
- * Helper method that runs TextIO.Read.from(filename).withCompressionType(compressionType)
+ * Helper method that runs TextIO.read().from(filename).withCompressionType(compressionType)
* and asserts that the results match the given expected output.
*/
private void assertReadingCompressedFileMatchesExpected(
File file, CompressionType compressionType, String[] expected) {
- TextIO.Read.Bound read =
- TextIO.Read.from(file.getPath()).withCompressionType(compressionType);
+ TextIO.Read read =
+ TextIO.read().from(file.getPath()).withCompressionType(compressionType);
PCollection<String> output = p.apply("Read_" + file + "_" + compressionType.toString(), read);
PAssert.that(output).containsInAnyOrder(expected);
@@ -825,9 +825,9 @@ public class TextIOTest {
@Test
public void testTextIOGetName() {
- assertEquals("TextIO.Read", TextIO.Read.from("somefile").getName());
+ assertEquals("TextIO.Read", TextIO.read().from("somefile").getName());
assertEquals("TextIO.Write", TextIO.Write.to("somefile").getName());
- assertEquals("TextIO.Read", TextIO.Read.from("somefile").toString());
+ assertEquals("TextIO.Read", TextIO.read().from("somefile").toString());
}
@Test
@@ -1075,7 +1075,7 @@ public class TextIOTest {
// Sanity check: file is at least 2 bundles long.
assertThat(largeTxt.length(), greaterThan(2 * desiredBundleSize));
- FileBasedSource<String> source = TextIO.Read.from(largeTxt.getPath()).getSource();
+ FileBasedSource<String> source = TextIO.read().from(largeTxt.getPath()).getSource();
List<? extends FileBasedSource<String>> splits =
source.split(desiredBundleSize, options);
@@ -1092,7 +1092,7 @@ public class TextIOTest {
// Sanity check: file is at least 2 bundles long.
assertThat(largeGz.length(), greaterThan(2 * desiredBundleSize));
- FileBasedSource<String> source = TextIO.Read.from(largeGz.getPath()).getSource();
+ FileBasedSource<String> source = TextIO.read().from(largeGz.getPath()).getSource();
List<? extends FileBasedSource<String>> splits =
source.split(desiredBundleSize, options);
@@ -1110,7 +1110,7 @@ public class TextIOTest {
assertThat(largeTxt.length(), greaterThan(2 * desiredBundleSize));
FileBasedSource<String> source =
- TextIO.Read.from(largeTxt.getPath()).withCompressionType(GZIP).getSource();
+ TextIO.read().from(largeTxt.getPath()).withCompressionType(GZIP).getSource();
List<? extends FileBasedSource<String>> splits =
source.split(desiredBundleSize, options);
@@ -1128,7 +1128,7 @@ public class TextIOTest {
assertThat(largeGz.length(), greaterThan(2 * desiredBundleSize));
FileBasedSource<String> source =
- TextIO.Read.from(largeGz.getPath()).withCompressionType(GZIP).getSource();
+ TextIO.read().from(largeGz.getPath()).withCompressionType(GZIP).getSource();
List<? extends FileBasedSource<String>> splits =
source.split(desiredBundleSize, options);
http://git-wip-us.apache.org/repos/asf/beam/blob/96315203/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
index 07b6b4a..29d9774 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
@@ -121,7 +121,7 @@ public class TransformTreeTest {
final PTransform<PCollection<String>, PCollection<Iterable<String>>> sample =
Sample.fixedSizeGlobally(10);
- p.apply("ReadMyFile", TextIO.Read.from(inputFile.getPath()))
+ p.apply("ReadMyFile", TextIO.read().from(inputFile.getPath()))
.apply(sample)
.apply(Flatten.<String>iterables())
.apply("WriteMyFile", TextIO.Write.to(outputFile.getPath()));
@@ -167,7 +167,7 @@ public class TransformTreeTest {
assertThat(transform, not(instanceOf(Combine.Globally.class)));
assertThat(transform, not(instanceOf(WriteFiles.class)));
if (transform instanceof Read.Bounded
- && node.getEnclosingNode().getTransform() instanceof TextIO.Read.Bound) {
+ && node.getEnclosingNode().getTransform() instanceof TextIO.Read) {
assertTrue(visited.add(TransformsSeen.READ));
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/96315203/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluatorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluatorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluatorTest.java
index 6937405..f3dc378 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluatorTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluatorTest.java
@@ -94,7 +94,7 @@ public class DisplayDataEvaluatorTest implements Serializable {
@Test
public void testSourceTransform() {
- PTransform<? super PBegin, ? extends POutput> myTransform = TextIO.Read
+ PTransform<? super PBegin, ? extends POutput> myTransform = TextIO.read()
.from("foo.*");
DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
http://git-wip-us.apache.org/repos/asf/beam/blob/96315203/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
index 30b0311..5e6580b 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
@@ -215,7 +215,7 @@ public class WindowingTest implements Serializable {
}
PCollection<String> output = p.begin()
- .apply("ReadLines", TextIO.Read.from(filename))
+ .apply("ReadLines", TextIO.read().from(filename))
.apply(ParDo.of(new ExtractWordsWithTimestampsFn()))
.apply(new WindowedCount(FixedWindows.of(Duration.millis(10))));
http://git-wip-us.apache.org/repos/asf/beam/blob/96315203/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java
index a5957b5..cf86c36 100644
--- a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java
+++ b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java
@@ -221,7 +221,8 @@ public class HadoopFileSystemTest {
.as(HadoopFileSystemOptions.class);
options.setHdfsConfiguration(ImmutableList.of(fileSystem.fileSystem.getConf()));
FileSystems.setDefaultConfigInWorkers(options);
- PCollection<String> pc = p.apply(TextIO.Read.from(testPath("testFile*").toString()));
+ PCollection<String> pc = p.apply(
+ TextIO.read().from(testPath("testFile*").toString()));
PAssert.that(pc).containsInAnyOrder("testDataA", "testDataB", "testDataC");
p.run();
}
[5/7] beam git commit: Converts TextIO.Read to AutoValue
Posted by jk...@apache.org.
Converts TextIO.Read to AutoValue
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/987b4e62
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/987b4e62
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/987b4e62
Branch: refs/heads/master
Commit: 987b4e626e9b5113778310dcb23b0b2d6c666194
Parents: 9631520
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri Apr 28 17:16:14 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue May 2 12:20:14 2017 -0700
----------------------------------------------------------------------
.../java/org/apache/beam/sdk/io/TextIO.java | 66 ++++++++------------
1 file changed, 27 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/987b4e62/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index 31d2c3d..f8670a6 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -22,6 +22,7 @@ import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
+import com.google.auto.value.AutoValue;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ByteString;
import java.io.IOException;
@@ -113,28 +114,23 @@ public class TextIO {
* element for each line of the input files.
*/
public static Read read() {
- return new Read();
+ return new AutoValue_TextIO_Read.Builder().setCompressionType(CompressionType.AUTO).build();
}
/** Implementation of {@link #read}. */
- public static class Read extends PTransform<PBegin, PCollection<String>> {
- /** The filepattern to read from. */
- @Nullable private final ValueProvider<String> filepattern;
+ @AutoValue
+ public abstract static class Read extends PTransform<PBegin, PCollection<String>> {
+ @Nullable abstract ValueProvider<String> getFilepattern();
+ abstract CompressionType getCompressionType();
- /** Option to indicate the input source's compression type. Default is AUTO. */
- private final TextIO.CompressionType compressionType;
+ abstract Builder toBuilder();
- private Read() {
- this(null, null, TextIO.CompressionType.AUTO);
- }
+ @AutoValue.Builder
+ abstract static class Builder {
+ abstract Builder setFilepattern(ValueProvider<String> filepattern);
+ abstract Builder setCompressionType(CompressionType compressionType);
- private Read(
- @Nullable String name,
- @Nullable ValueProvider<String> filepattern,
- TextIO.CompressionType compressionType) {
- super(name);
- this.filepattern = filepattern;
- this.compressionType = compressionType;
+ abstract Read build();
}
/**
@@ -149,13 +145,13 @@ public class TextIO {
*/
public Read from(String filepattern) {
checkNotNull(filepattern, "Filepattern cannot be empty.");
- return new Read(name, StaticValueProvider.of(filepattern), compressionType);
+ return from(StaticValueProvider.of(filepattern));
}
/** Same as {@code from(filepattern)}, but accepting a {@link ValueProvider}. */
public Read from(ValueProvider<String> filepattern) {
checkNotNull(filepattern, "Filepattern cannot be empty.");
- return new Read(name, filepattern, compressionType);
+ return toBuilder().setFilepattern(filepattern).build();
}
/**
@@ -165,12 +161,12 @@ public class TextIO {
* <p>If no compression type is specified, the default is {@link TextIO.CompressionType#AUTO}.
*/
public Read withCompressionType(TextIO.CompressionType compressionType) {
- return new Read(name, filepattern, compressionType);
+ return toBuilder().setCompressionType(compressionType).build();
}
@Override
public PCollection<String> expand(PBegin input) {
- if (filepattern == null) {
+ if (getFilepattern() == null) {
throw new IllegalStateException("need to set the filepattern of a TextIO.Read transform");
}
@@ -183,29 +179,29 @@ public class TextIO {
// Helper to create a source specific to the requested compression type.
protected FileBasedSource<String> getSource() {
- switch (compressionType) {
+ switch (getCompressionType()) {
case UNCOMPRESSED:
- return new TextSource(filepattern);
+ return new TextSource(getFilepattern());
case AUTO:
- return CompressedSource.from(new TextSource(filepattern));
+ return CompressedSource.from(new TextSource(getFilepattern()));
case BZIP2:
return
- CompressedSource.from(new TextSource(filepattern))
+ CompressedSource.from(new TextSource(getFilepattern()))
.withDecompression(CompressedSource.CompressionMode.BZIP2);
case GZIP:
return
- CompressedSource.from(new TextSource(filepattern))
+ CompressedSource.from(new TextSource(getFilepattern()))
.withDecompression(CompressedSource.CompressionMode.GZIP);
case ZIP:
return
- CompressedSource.from(new TextSource(filepattern))
+ CompressedSource.from(new TextSource(getFilepattern()))
.withDecompression(CompressedSource.CompressionMode.ZIP);
case DEFLATE:
return
- CompressedSource.from(new TextSource(filepattern))
+ CompressedSource.from(new TextSource(getFilepattern()))
.withDecompression(CompressedSource.CompressionMode.DEFLATE);
default:
- throw new IllegalArgumentException("Unknown compression type: " + compressionType);
+ throw new IllegalArgumentException("Unknown compression type: " + getFilepattern());
}
}
@@ -213,10 +209,10 @@ public class TextIO {
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
- String filepatternDisplay = filepattern.isAccessible()
- ? filepattern.get() : filepattern.toString();
+ String filepatternDisplay = getFilepattern().isAccessible()
+ ? getFilepattern().get() : getFilepattern().toString();
builder
- .add(DisplayData.item("compressionType", compressionType.toString())
+ .add(DisplayData.item("compressionType", getCompressionType().toString())
.withLabel("Compression Type"))
.addIfNotNull(DisplayData.item("filePattern", filepatternDisplay)
.withLabel("File Pattern"));
@@ -226,14 +222,6 @@ public class TextIO {
protected Coder<String> getDefaultOutputCoder() {
return StringUtf8Coder.of();
}
-
- public String getFilepattern() {
- return filepattern.get();
- }
-
- public TextIO.CompressionType getCompressionType() {
- return compressionType;
- }
}