You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/05/02 19:23:19 UTC

[1/7] beam git commit: Moves TextSource and TextSink to top level

Repository: beam
Updated Branches:
  refs/heads/master c2c89eda9 -> 3161904d9


Moves TextSource and TextSink to top level


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7b725c25
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7b725c25
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7b725c25

Branch: refs/heads/master
Commit: 7b725c25288ae24eb89be3bf61e09e0e38c2b200
Parents: 681b5d6
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri Apr 28 17:46:44 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue May 2 12:20:14 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/io/TextIO.java     | 343 +------------------
 .../java/org/apache/beam/sdk/io/TextSink.java   | 139 ++++++++
 .../java/org/apache/beam/sdk/io/TextSource.java | 236 +++++++++++++
 .../java/org/apache/beam/sdk/io/TextIOTest.java |   3 +-
 4 files changed, 377 insertions(+), 344 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/7b725c25/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index 90d56e7..1f9b7a0 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -19,20 +19,8 @@ package org.apache.beam.sdk.io;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
 
 import com.google.auto.value.AutoValue;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.protobuf.ByteString;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.nio.ByteBuffer;
-import java.nio.channels.Channels;
-import java.nio.channels.ReadableByteChannel;
-import java.nio.channels.SeekableByteChannel;
-import java.nio.channels.WritableByteChannel;
-import java.nio.charset.StandardCharsets;
-import java.util.NoSuchElementException;
 import java.util.regex.Pattern;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.coders.Coder;
@@ -41,13 +29,10 @@ import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
 import org.apache.beam.sdk.io.FileBasedSink.WritableByteChannelFactory;
 import org.apache.beam.sdk.io.Read.Bounded;
-import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
-import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.util.MimeTypes;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PDone;
@@ -412,7 +397,7 @@ public class TextIO {
         throw new IllegalStateException(
             "cannot set both a filename policy and a filename prefix");
       }
-      WriteFiles<String> write = null;
+      WriteFiles<String> write;
       if (getFilenamePolicy() != null) {
         write =
             WriteFiles.to(
@@ -535,330 +520,4 @@ public class TextIO {
 
   /** Disable construction of utility class. */
   private TextIO() {}
-
-  /**
-   * A {@link FileBasedSource} which can decode records delimited by newline characters.
-   *
-   * <p>This source splits the data into records using {@code UTF-8} {@code \n}, {@code \r}, or
-   * {@code \r\n} as the delimiter. This source is not strict and supports decoding the last record
-   * even if it is not delimited. Finally, no records are decoded if the stream is empty.
-   *
-   * <p>This source supports reading from any arbitrary byte position within the stream. If the
-   * starting position is not {@code 0}, then bytes are skipped until the first delimiter is found
-   * representing the beginning of the first record to be decoded.
-   */
-  @VisibleForTesting
-  static class TextSource extends FileBasedSource<String> {
-    /** The Coder to use to decode each line. */
-    @VisibleForTesting
-    TextSource(String fileSpec) {
-      super(StaticValueProvider.of(fileSpec), 1L);
-    }
-
-    @VisibleForTesting
-    TextSource(ValueProvider<String> fileSpec) {
-      super(fileSpec, 1L);
-    }
-
-    private TextSource(Metadata metadata, long start, long end) {
-      super(metadata, 1L, start, end);
-    }
-
-    @Override
-    protected FileBasedSource<String> createForSubrangeOfFile(
-        Metadata metadata,
-        long start,
-        long end) {
-      return new TextSource(metadata, start, end);
-    }
-
-    @Override
-    protected FileBasedReader<String> createSingleFileReader(PipelineOptions options) {
-      return new TextBasedReader(this);
-    }
-
-    @Override
-    public Coder<String> getDefaultOutputCoder() {
-      return StringUtf8Coder.of();
-    }
-
-    /**
-     * A {@link org.apache.beam.sdk.io.FileBasedSource.FileBasedReader FileBasedReader}
-     * which can decode records delimited by newline characters.
-     *
-     * <p>See {@link TextSource} for further details.
-     */
-    @VisibleForTesting
-    static class TextBasedReader extends FileBasedReader<String> {
-      private static final int READ_BUFFER_SIZE = 8192;
-      private final ByteBuffer readBuffer = ByteBuffer.allocate(READ_BUFFER_SIZE);
-      private ByteString buffer;
-      private int startOfSeparatorInBuffer;
-      private int endOfSeparatorInBuffer;
-      private long startOfRecord;
-      private volatile long startOfNextRecord;
-      private volatile boolean eof;
-      private volatile boolean elementIsPresent;
-      private String currentValue;
-      private ReadableByteChannel inChannel;
-
-      private TextBasedReader(TextSource source) {
-        super(source);
-        buffer = ByteString.EMPTY;
-      }
-
-      @Override
-      protected long getCurrentOffset() throws NoSuchElementException {
-        if (!elementIsPresent) {
-          throw new NoSuchElementException();
-        }
-        return startOfRecord;
-      }
-
-      @Override
-      public long getSplitPointsRemaining() {
-        if (isStarted() && startOfNextRecord >= getCurrentSource().getEndOffset()) {
-          return isDone() ? 0 : 1;
-        }
-        return super.getSplitPointsRemaining();
-      }
-
-      @Override
-      public String getCurrent() throws NoSuchElementException {
-        if (!elementIsPresent) {
-          throw new NoSuchElementException();
-        }
-        return currentValue;
-      }
-
-      @Override
-      protected void startReading(ReadableByteChannel channel) throws IOException {
-        this.inChannel = channel;
-        // If the first offset is greater than zero, we need to skip bytes until we see our
-        // first separator.
-        if (getCurrentSource().getStartOffset() > 0) {
-          checkState(channel instanceof SeekableByteChannel,
-              "%s only supports reading from a SeekableByteChannel when given a start offset"
-              + " greater than 0.", TextSource.class.getSimpleName());
-          long requiredPosition = getCurrentSource().getStartOffset() - 1;
-          ((SeekableByteChannel) channel).position(requiredPosition);
-          findSeparatorBounds();
-          buffer = buffer.substring(endOfSeparatorInBuffer);
-          startOfNextRecord = requiredPosition + endOfSeparatorInBuffer;
-          endOfSeparatorInBuffer = 0;
-          startOfSeparatorInBuffer = 0;
-        }
-      }
-
-      /**
-       * Locates the start position and end position of the next delimiter. Will
-       * consume the channel till either EOF or the delimiter bounds are found.
-       *
-       * <p>This fills the buffer and updates the positions as follows:
-       * <pre>{@code
-       * ------------------------------------------------------
-       * | element bytes | delimiter bytes | unconsumed bytes |
-       * ------------------------------------------------------
-       * 0            start of          end of              buffer
-       *              separator         separator           size
-       *              in buffer         in buffer
-       * }</pre>
-       */
-      private void findSeparatorBounds() throws IOException {
-        int bytePositionInBuffer = 0;
-        while (true) {
-          if (!tryToEnsureNumberOfBytesInBuffer(bytePositionInBuffer + 1)) {
-            startOfSeparatorInBuffer = endOfSeparatorInBuffer = bytePositionInBuffer;
-            break;
-          }
-
-          byte currentByte = buffer.byteAt(bytePositionInBuffer);
-
-          if (currentByte == '\n') {
-            startOfSeparatorInBuffer = bytePositionInBuffer;
-            endOfSeparatorInBuffer = startOfSeparatorInBuffer + 1;
-            break;
-          } else if (currentByte == '\r') {
-            startOfSeparatorInBuffer = bytePositionInBuffer;
-            endOfSeparatorInBuffer = startOfSeparatorInBuffer + 1;
-
-            if (tryToEnsureNumberOfBytesInBuffer(bytePositionInBuffer + 2)) {
-              currentByte = buffer.byteAt(bytePositionInBuffer + 1);
-              if (currentByte == '\n') {
-                endOfSeparatorInBuffer += 1;
-              }
-            }
-            break;
-          }
-
-          // Move to the next byte in buffer.
-          bytePositionInBuffer += 1;
-        }
-      }
-
-      @Override
-      protected boolean readNextRecord() throws IOException {
-        startOfRecord = startOfNextRecord;
-        findSeparatorBounds();
-
-        // If we have reached EOF file and consumed all of the buffer then we know
-        // that there are no more records.
-        if (eof && buffer.size() == 0) {
-          elementIsPresent = false;
-          return false;
-        }
-
-        decodeCurrentElement();
-        startOfNextRecord = startOfRecord + endOfSeparatorInBuffer;
-        return true;
-      }
-
-      /**
-       * Decodes the current element updating the buffer to only contain the unconsumed bytes.
-       *
-       * <p>This invalidates the currently stored {@code startOfSeparatorInBuffer} and
-       * {@code endOfSeparatorInBuffer}.
-       */
-      private void decodeCurrentElement() throws IOException {
-        ByteString dataToDecode = buffer.substring(0, startOfSeparatorInBuffer);
-        currentValue = dataToDecode.toStringUtf8();
-        elementIsPresent = true;
-        buffer = buffer.substring(endOfSeparatorInBuffer);
-      }
-
-      /**
-       * Returns false if we were unable to ensure the minimum capacity by consuming the channel.
-       */
-      private boolean tryToEnsureNumberOfBytesInBuffer(int minCapacity) throws IOException {
-        // While we aren't at EOF or haven't fulfilled the minimum buffer capacity,
-        // attempt to read more bytes.
-        while (buffer.size() <= minCapacity && !eof) {
-          eof = inChannel.read(readBuffer) == -1;
-          readBuffer.flip();
-          buffer = buffer.concat(ByteString.copyFrom(readBuffer));
-          readBuffer.clear();
-        }
-        // Return true if we were able to honor the minimum buffer capacity request
-        return buffer.size() >= minCapacity;
-      }
-    }
-  }
-
-  /**
-   * A {@link FileBasedSink} for text files. Produces text files with the newline separator
-   * {@code '\n'} represented in {@code UTF-8} format as the record separator.
-   * Each record (including the last) is terminated.
-   */
-  @VisibleForTesting
-  static class TextSink extends FileBasedSink<String> {
-    @Nullable private final String header;
-    @Nullable private final String footer;
-
-    @VisibleForTesting
-    TextSink(FilenamePolicy filenamePolicy, @Nullable String header, @Nullable String footer,
-             WritableByteChannelFactory writableByteChannelFactory) {
-      super(filenamePolicy, writableByteChannelFactory);
-      this.header = header;
-      this.footer = footer;
-    }
-    @VisibleForTesting
-    TextSink(
-        ValueProvider<String> baseOutputFilename,
-        String extension,
-        @Nullable String header,
-        @Nullable String footer,
-        String fileNameTemplate,
-        WritableByteChannelFactory writableByteChannelFactory) {
-      super(baseOutputFilename, extension, fileNameTemplate, writableByteChannelFactory);
-      this.header = header;
-      this.footer = footer;
-    }
-
-    @Override
-    public FileBasedSink.FileBasedWriteOperation<String> createWriteOperation() {
-      return new TextWriteOperation(this, header, footer);
-    }
-
-    /**
-     * A {@link org.apache.beam.sdk.io.FileBasedSink.FileBasedWriteOperation
-     * FileBasedWriteOperation} for text files.
-     */
-    private static class TextWriteOperation extends FileBasedWriteOperation<String> {
-      @Nullable private final String header;
-      @Nullable private final String footer;
-
-      private TextWriteOperation(TextSink sink, @Nullable String header, @Nullable String footer) {
-        super(sink);
-        this.header = header;
-        this.footer = footer;
-      }
-
-      @Override
-      public FileBasedWriter createWriter(PipelineOptions options) throws Exception {
-        return new TextWriter(this, header, footer);
-      }
-    }
-
-    /**
-     * A {@link org.apache.beam.sdk.io.FileBasedSink.FileBasedWriter FileBasedWriter}
-     * for text files.
-     */
-    private static class TextWriter extends FileBasedWriter<String> {
-      private static final String NEWLINE = "\n";
-      @Nullable private final String header;
-      @Nullable private final String footer;
-      private OutputStreamWriter out;
-
-      public TextWriter(
-          FileBasedWriteOperation<String> writeOperation,
-          @Nullable String header,
-          @Nullable String footer) {
-        super(writeOperation, MimeTypes.TEXT);
-        this.header = header;
-        this.footer = footer;
-      }
-
-      /**
-       * Writes {@code value} followed by a newline character if {@code value} is not null.
-       */
-      private void writeIfNotNull(@Nullable String value) throws IOException {
-        if (value != null) {
-          writeLine(value);
-        }
-      }
-
-      /**
-       * Writes {@code value} followed by newline character.
-       */
-      private void writeLine(String value) throws IOException {
-        out.write(value);
-        out.write(NEWLINE);
-      }
-
-      @Override
-      protected void prepareWrite(WritableByteChannel channel) throws Exception {
-        out = new OutputStreamWriter(Channels.newOutputStream(channel), StandardCharsets.UTF_8);
-      }
-
-      @Override
-      protected void writeHeader() throws Exception {
-        writeIfNotNull(header);
-      }
-
-      @Override
-      public void write(String value) throws Exception {
-        writeLine(value);
-      }
-
-      @Override
-      protected void writeFooter() throws Exception {
-        writeIfNotNull(footer);
-      }
-
-      @Override
-      protected void finishWrite() throws Exception {
-        out.flush();
-      }
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7b725c25/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSink.java
new file mode 100644
index 0000000..4efdc32
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSink.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io;
+
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import java.nio.charset.StandardCharsets;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.util.MimeTypes;
+
+/**
+ * Implementation detail of {@link TextIO.Write}.
+ *
+ * <p>A {@link FileBasedSink} for text files. Produces text files with the newline separator {@code
+ * '\n'} represented in {@code UTF-8} format as the record separator. Each record (including the
+ * last) is terminated.
+ */
+class TextSink extends FileBasedSink<String> {
+  @Nullable private final String header;
+  @Nullable private final String footer;
+
+  TextSink(
+      FilenamePolicy filenamePolicy,
+      @Nullable String header,
+      @Nullable String footer,
+      WritableByteChannelFactory writableByteChannelFactory) {
+    super(filenamePolicy, writableByteChannelFactory);
+    this.header = header;
+    this.footer = footer;
+  }
+
+  TextSink(
+      ValueProvider<String> baseOutputFilename,
+      String extension,
+      @Nullable String header,
+      @Nullable String footer,
+      String fileNameTemplate,
+      WritableByteChannelFactory writableByteChannelFactory) {
+    super(baseOutputFilename, extension, fileNameTemplate, writableByteChannelFactory);
+    this.header = header;
+    this.footer = footer;
+  }
+
+  @Override
+  public FileBasedWriteOperation<String> createWriteOperation() {
+    return new TextWriteOperation(this, header, footer);
+  }
+
+  /** A {@link FileBasedWriteOperation FileBasedWriteOperation} for text files. */
+  private static class TextWriteOperation extends FileBasedWriteOperation<String> {
+    @Nullable private final String header;
+    @Nullable private final String footer;
+
+    private TextWriteOperation(TextSink sink, @Nullable String header, @Nullable String footer) {
+      super(sink);
+      this.header = header;
+      this.footer = footer;
+    }
+
+    @Override
+    public FileBasedWriter<String> createWriter(PipelineOptions options) throws Exception {
+      return new TextWriter(this, header, footer);
+    }
+  }
+
+  /** A {@link FileBasedWriter FileBasedWriter} for text files. */
+  private static class TextWriter extends FileBasedWriter<String> {
+    private static final String NEWLINE = "\n";
+    @Nullable private final String header;
+    @Nullable private final String footer;
+    private OutputStreamWriter out;
+
+    public TextWriter(
+        FileBasedWriteOperation<String> writeOperation,
+        @Nullable String header,
+        @Nullable String footer) {
+      super(writeOperation, MimeTypes.TEXT);
+      this.header = header;
+      this.footer = footer;
+    }
+
+    /** Writes {@code value} followed by a newline character if {@code value} is not null. */
+    private void writeIfNotNull(@Nullable String value) throws IOException {
+      if (value != null) {
+        writeLine(value);
+      }
+    }
+
+    /** Writes {@code value} followed by newline character. */
+    private void writeLine(String value) throws IOException {
+      out.write(value);
+      out.write(NEWLINE);
+    }
+
+    @Override
+    protected void prepareWrite(WritableByteChannel channel) throws Exception {
+      out = new OutputStreamWriter(Channels.newOutputStream(channel), StandardCharsets.UTF_8);
+    }
+
+    @Override
+    protected void writeHeader() throws Exception {
+      writeIfNotNull(header);
+    }
+
+    @Override
+    public void write(String value) throws Exception {
+      writeLine(value);
+    }
+
+    @Override
+    protected void writeFooter() throws Exception {
+      writeIfNotNull(footer);
+    }
+
+    @Override
+    protected void finishWrite() throws Exception {
+      out.flush();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/7b725c25/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java
new file mode 100644
index 0000000..4d9fa77
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.ByteString;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.SeekableByteChannel;
+import java.util.NoSuchElementException;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.ValueProvider;
+
+/**
+ * Implementation detail of {@link TextIO.Read}.
+ *
+ * <p>A {@link FileBasedSource} which can decode records delimited by newline characters.
+ *
+ * <p>This source splits the data into records using {@code UTF-8} {@code \n}, {@code \r}, or
+ * {@code \r\n} as the delimiter. This source is not strict and supports decoding the last record
+ * even if it is not delimited. Finally, no records are decoded if the stream is empty.
+ *
+ * <p>This source supports reading from any arbitrary byte position within the stream. If the
+ * starting position is not {@code 0}, then bytes are skipped until the first delimiter is found
+ * representing the beginning of the first record to be decoded.
+ */
+@VisibleForTesting
+class TextSource extends FileBasedSource<String> {
+  TextSource(ValueProvider<String> fileSpec) {
+    super(fileSpec, 1L);
+  }
+
+  private TextSource(MatchResult.Metadata metadata, long start, long end) {
+    super(metadata, 1L, start, end);
+  }
+
+  @Override
+  protected FileBasedSource<String> createForSubrangeOfFile(
+      MatchResult.Metadata metadata,
+      long start,
+      long end) {
+    return new TextSource(metadata, start, end);
+  }
+
+  @Override
+  protected FileBasedReader<String> createSingleFileReader(PipelineOptions options) {
+    return new TextBasedReader(this);
+  }
+
+  @Override
+  public Coder<String> getDefaultOutputCoder() {
+    return StringUtf8Coder.of();
+  }
+
+  /**
+   * A {@link FileBasedReader FileBasedReader}
+   * which can decode records delimited by newline characters.
+   *
+   * <p>See {@link TextSource} for further details.
+   */
+  @VisibleForTesting
+  static class TextBasedReader extends FileBasedReader<String> {
+    private static final int READ_BUFFER_SIZE = 8192;
+    private final ByteBuffer readBuffer = ByteBuffer.allocate(READ_BUFFER_SIZE);
+    private ByteString buffer;
+    private int startOfSeparatorInBuffer;
+    private int endOfSeparatorInBuffer;
+    private long startOfRecord;
+    private volatile long startOfNextRecord;
+    private volatile boolean eof;
+    private volatile boolean elementIsPresent;
+    private String currentValue;
+    private ReadableByteChannel inChannel;
+
+    private TextBasedReader(TextSource source) {
+      super(source);
+      buffer = ByteString.EMPTY;
+    }
+
+    @Override
+    protected long getCurrentOffset() throws NoSuchElementException {
+      if (!elementIsPresent) {
+        throw new NoSuchElementException();
+      }
+      return startOfRecord;
+    }
+
+    @Override
+    public long getSplitPointsRemaining() {
+      if (isStarted() && startOfNextRecord >= getCurrentSource().getEndOffset()) {
+        return isDone() ? 0 : 1;
+      }
+      return super.getSplitPointsRemaining();
+    }
+
+    @Override
+    public String getCurrent() throws NoSuchElementException {
+      if (!elementIsPresent) {
+        throw new NoSuchElementException();
+      }
+      return currentValue;
+    }
+
+    @Override
+    protected void startReading(ReadableByteChannel channel) throws IOException {
+      this.inChannel = channel;
+      // If the first offset is greater than zero, we need to skip bytes until we see our
+      // first separator.
+      if (getCurrentSource().getStartOffset() > 0) {
+        checkState(channel instanceof SeekableByteChannel,
+            "%s only supports reading from a SeekableByteChannel when given a start offset"
+            + " greater than 0.", TextSource.class.getSimpleName());
+        long requiredPosition = getCurrentSource().getStartOffset() - 1;
+        ((SeekableByteChannel) channel).position(requiredPosition);
+        findSeparatorBounds();
+        buffer = buffer.substring(endOfSeparatorInBuffer);
+        startOfNextRecord = requiredPosition + endOfSeparatorInBuffer;
+        endOfSeparatorInBuffer = 0;
+        startOfSeparatorInBuffer = 0;
+      }
+    }
+
+    /**
+     * Locates the start position and end position of the next delimiter. Will
+     * consume the channel till either EOF or the delimiter bounds are found.
+     *
+     * <p>This fills the buffer and updates the positions as follows:
+     * <pre>{@code
+     * ------------------------------------------------------
+     * | element bytes | delimiter bytes | unconsumed bytes |
+     * ------------------------------------------------------
+     * 0            start of          end of              buffer
+     *              separator         separator           size
+     *              in buffer         in buffer
+     * }</pre>
+     */
+    private void findSeparatorBounds() throws IOException {
+      int bytePositionInBuffer = 0;
+      while (true) {
+        if (!tryToEnsureNumberOfBytesInBuffer(bytePositionInBuffer + 1)) {
+          startOfSeparatorInBuffer = endOfSeparatorInBuffer = bytePositionInBuffer;
+          break;
+        }
+
+        byte currentByte = buffer.byteAt(bytePositionInBuffer);
+
+        if (currentByte == '\n') {
+          startOfSeparatorInBuffer = bytePositionInBuffer;
+          endOfSeparatorInBuffer = startOfSeparatorInBuffer + 1;
+          break;
+        } else if (currentByte == '\r') {
+          startOfSeparatorInBuffer = bytePositionInBuffer;
+          endOfSeparatorInBuffer = startOfSeparatorInBuffer + 1;
+
+          if (tryToEnsureNumberOfBytesInBuffer(bytePositionInBuffer + 2)) {
+            currentByte = buffer.byteAt(bytePositionInBuffer + 1);
+            if (currentByte == '\n') {
+              endOfSeparatorInBuffer += 1;
+            }
+          }
+          break;
+        }
+
+        // Move to the next byte in buffer.
+        bytePositionInBuffer += 1;
+      }
+    }
+
+    @Override
+    protected boolean readNextRecord() throws IOException {
+      startOfRecord = startOfNextRecord;
+      findSeparatorBounds();
+
+      // If we have reached EOF file and consumed all of the buffer then we know
+      // that there are no more records.
+      if (eof && buffer.size() == 0) {
+        elementIsPresent = false;
+        return false;
+      }
+
+      decodeCurrentElement();
+      startOfNextRecord = startOfRecord + endOfSeparatorInBuffer;
+      return true;
+    }
+
+    /**
+     * Decodes the current element updating the buffer to only contain the unconsumed bytes.
+     *
+     * <p>This invalidates the currently stored {@code startOfSeparatorInBuffer} and
+     * {@code endOfSeparatorInBuffer}.
+     */
+    private void decodeCurrentElement() throws IOException {
+      ByteString dataToDecode = buffer.substring(0, startOfSeparatorInBuffer);
+      currentValue = dataToDecode.toStringUtf8();
+      elementIsPresent = true;
+      buffer = buffer.substring(endOfSeparatorInBuffer);
+    }
+
+    /**
+     * Returns false if we were unable to ensure the minimum capacity by consuming the channel.
+     */
+    private boolean tryToEnsureNumberOfBytesInBuffer(int minCapacity) throws IOException {
+      // While we aren't at EOF or haven't fulfilled the minimum buffer capacity,
+      // attempt to read more bytes.
+      while (buffer.size() <= minCapacity && !eof) {
+        eof = inChannel.read(readBuffer) == -1;
+        readBuffer.flip();
+        buffer = buffer.concat(ByteString.copyFrom(readBuffer));
+        readBuffer.clear();
+      }
+      // Return true if we were able to honor the minimum buffer capacity request
+      return buffer.size() >= minCapacity;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/7b725c25/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
index 425e0d6..f30b52f 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
@@ -73,7 +73,6 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
 import org.apache.beam.sdk.io.FileBasedSink.WritableByteChannelFactory;
 import org.apache.beam.sdk.io.TextIO.CompressionType;
-import org.apache.beam.sdk.io.TextIO.TextSource;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.options.ValueProvider;
@@ -1064,7 +1063,7 @@ public class TextIOTest {
   private TextSource prepareSource(byte[] data) throws IOException {
     Path path = Files.createTempFile(tempFolder, "tempfile", "ext");
     Files.write(path, data);
-    return new TextSource(path.toString());
+    return new TextSource(ValueProvider.StaticValueProvider.of(path.toString()));
   }
 
   @Test


[4/7] beam git commit: Removes leftover no-op withoutValidation methods

Posted by jk...@apache.org.
Removes leftover no-op withoutValidation methods


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ef4658a5
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ef4658a5
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ef4658a5

Branch: refs/heads/master
Commit: ef4658a5d09fd77eeee3ba696da6972b9958bfb4
Parents: c2c89ed
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri Apr 28 17:14:29 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue May 2 12:20:14 2017 -0700

----------------------------------------------------------------------
 .../DataflowPipelineTranslatorTest.java         |  2 +-
 .../runners/dataflow/DataflowRunnerTest.java    |  6 +--
 .../java/org/apache/beam/sdk/io/TextIO.java     | 41 --------------------
 .../display/DisplayDataEvaluatorTest.java       |  3 +-
 4 files changed, 5 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/ef4658a5/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index 63e1166..fcd23cf 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -766,7 +766,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
     Pipeline pipeline = Pipeline.create(options);
     DataflowPipelineTranslator t = DataflowPipelineTranslator.fromOptions(options);
 
-    pipeline.apply(TextIO.Read.from(new TestValueProvider()).withoutValidation());
+    pipeline.apply(TextIO.Read.from(new TestValueProvider()));
 
     // Check that translation does not fail.
     t.translate(

http://git-wip-us.apache.org/repos/asf/beam/blob/ef4658a5/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index e3c884b..480591e 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -335,8 +335,8 @@ public class DataflowRunnerTest {
     RuntimeTestOptions options = dataflowOptions.as(RuntimeTestOptions.class);
     Pipeline p = buildDataflowPipeline(dataflowOptions);
     p
-        .apply(TextIO.Read.from(options.getInput()).withoutValidation())
-        .apply(TextIO.Write.to(options.getOutput()).withoutValidation());
+        .apply(TextIO.Read.from(options.getInput()))
+        .apply(TextIO.Write.to(options.getOutput()));
   }
 
   /**
@@ -347,7 +347,7 @@ public class DataflowRunnerTest {
     DataflowPipelineOptions dataflowOptions = buildPipelineOptions();
     RuntimeTestOptions options = dataflowOptions.as(RuntimeTestOptions.class);
     Pipeline p = buildDataflowPipeline(dataflowOptions);
-    PCollection<String> unconsumed = p.apply(Read.from(options.getInput()).withoutValidation());
+    PCollection<String> unconsumed = p.apply(Read.from(options.getInput()));
     DataflowRunner.fromOptions(dataflowOptions).replaceTransforms(p);
     final AtomicBoolean unconsumedSeenAsInput = new AtomicBoolean();
     p.traverseTopologically(new PipelineVisitor.Defaults() {

http://git-wip-us.apache.org/repos/asf/beam/blob/ef4658a5/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index 0947702..f735019 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -196,20 +196,6 @@ public class TextIO {
 
       /**
        * Returns a new transform for reading from text files that's like this one but
-       * that has GCS path validation on pipeline creation disabled.
-       *
-       * <p>This can be useful in the case where the GCS input does not
-       * exist at the pipeline creation time, but is expected to be
-       * available at execution time.
-       *
-       * <p>Does not modify this object.
-       */
-      public Bound withoutValidation() {
-        return new Bound(name, filepattern, compressionType);
-      }
-
-      /**
-       * Returns a new transform for reading from text files that's like this one but
        * reads from input sources using the specified compression type.
        *
        * <p>If no compression type is specified, the default is {@link TextIO.CompressionType#AUTO}.
@@ -370,18 +356,6 @@ public class TextIO {
     }
 
     /**
-     * Returns a transform for writing to text files that has GCS path validation on
-     * pipeline creation disabled.
-     *
-     * <p>This can be useful in the case where the GCS output location does
-     * not exist at the pipeline creation time, but is expected to be available
-     * at execution time.
-     */
-    public static Bound withoutValidation() {
-      return new Bound().withoutValidation();
-    }
-
-    /**
      * Returns a transform for writing to text files that adds a header string to the files
      * it writes. Note that a newline character will be added after the header.
      *
@@ -579,21 +553,6 @@ public class TextIO {
       }
 
       /**
-       * Returns a transform for writing to text files that's like this one but
-       * that has GCS output path validation on pipeline creation disabled.
-       *
-       * <p>This can be useful in the case where the GCS output location does
-       * not exist at the pipeline creation time, but is expected to be
-       * available at execution time.
-       *
-       * <p>Does not modify this object.
-       */
-      public Bound withoutValidation() {
-        return new Bound(name, filenamePrefix, filenameSuffix, header, footer, numShards,
-            shardTemplate, writableByteChannelFactory, filenamePolicy, windowedWrites);
-      }
-
-      /**
        * Returns a transform for writing to text files that adds a header string to the files
        * it writes. Note that a newline character will be added after the header.
        *

http://git-wip-us.apache.org/repos/asf/beam/blob/ef4658a5/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluatorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluatorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluatorTest.java
index feff333..6937405 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluatorTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluatorTest.java
@@ -95,8 +95,7 @@ public class DisplayDataEvaluatorTest implements Serializable {
   @Test
   public void testSourceTransform() {
     PTransform<? super PBegin, ? extends POutput> myTransform = TextIO.Read
-        .from("foo.*")
-        .withoutValidation();
+        .from("foo.*");
 
     DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
     Set<DisplayData> displayData = evaluator.displayDataForPrimitiveSourceTransforms(myTransform);


[6/7] beam git commit: Removes TextIO.Write.Bound

Posted by jk...@apache.org.
Removes TextIO.Write.Bound


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4f5098dd
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4f5098dd
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4f5098dd

Branch: refs/heads/master
Commit: 4f5098ddf641f97417955ea33f429385d6fce384
Parents: 987b4e6
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri Apr 28 17:28:06 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue May 2 12:20:14 2017 -0700

----------------------------------------------------------------------
 .../apache/beam/examples/MinimalWordCount.java  |   2 +-
 .../org/apache/beam/examples/WordCount.java     |   2 +-
 .../examples/common/WriteOneFilePerWindow.java  |   2 +-
 .../apache/beam/examples/complete/TfIdf.java    |   2 +-
 .../examples/complete/TopWikipediaSessions.java |   2 +-
 .../beam/examples/cookbook/DistinctExample.java |   2 +-
 .../beam/examples/cookbook/JoinExamples.java    |   2 +-
 .../beam/examples/MinimalWordCountJava8.java    |   2 +-
 .../examples/MinimalWordCountJava8Test.java     |   2 +-
 .../runners/apex/examples/WordCountTest.java    |   2 +-
 .../direct/WriteWithShardingFactoryTest.java    |   2 +-
 .../beam/runners/flink/ReadSourceITCase.java    |   2 +-
 .../flink/ReadSourceStreamingITCase.java        |   2 +-
 .../flink/streaming/GroupByNullKeyTest.java     |   2 +-
 .../streaming/TopWikipediaSessionsITCase.java   |   2 +-
 .../DataflowPipelineTranslatorTest.java         |   4 +-
 .../runners/dataflow/DataflowRunnerTest.java    |   8 +-
 .../beam/runners/spark/examples/WordCount.java  |   2 +-
 .../runners/spark/SparkRunnerDebuggerTest.java  |   2 +-
 .../beam/runners/spark/io/NumShardsTest.java    |   3 +-
 .../main/java/org/apache/beam/sdk/Pipeline.java |   2 +-
 .../apache/beam/sdk/io/ShardNameTemplate.java   |   2 +-
 .../java/org/apache/beam/sdk/io/TextIO.java     | 553 +++++++------------
 .../org/apache/beam/sdk/io/package-info.java    |   2 +-
 .../java/org/apache/beam/sdk/io/TextIOTest.java |  20 +-
 .../beam/sdk/runners/TransformTreeTest.java     |   2 +-
 .../org/apache/beam/sdk/util/NameUtilsTest.java |   2 +-
 .../org/apache/beam/sdk/values/PDoneTest.java   |   2 +-
 28 files changed, 249 insertions(+), 385 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java
index 4a0c1bb..5ac8080 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java
@@ -111,7 +111,7 @@ public class MinimalWordCount {
      // formatted strings) to a series of text files.
      //
      // By default, it will write to a set of files with names like wordcount-00001-of-00005
-     .apply(TextIO.Write.to("wordcounts"));
+     .apply(TextIO.write().to("wordcounts"));
 
     // Run the pipeline.
     p.run().waitUntilFinish();

http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/WordCount.java b/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
index e331a86..bfa7eb3 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
@@ -179,7 +179,7 @@ public class WordCount {
     p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
      .apply(new CountWords())
      .apply(MapElements.via(new FormatAsTextFn()))
-     .apply("WriteCounts", TextIO.Write.to(options.getOutput()));
+     .apply("WriteCounts", TextIO.write().to(options.getOutput()));
 
     p.run().waitUntilFinish();
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java b/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java
index 6609828..461b46d 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java
@@ -48,7 +48,7 @@ public class WriteOneFilePerWindow extends PTransform<PCollection<String>, PDone
   @Override
   public PDone expand(PCollection<String> input) {
     return input.apply(
-        TextIO.Write
+        TextIO.write()
             .to(new PerWindowFiles(filenamePrefix))
             .withWindowedWrites()
             .withNumShards(3));

http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
index 1ef69c0..6fd9755 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
@@ -400,7 +400,7 @@ public class TfIdf {
                   c.element().getValue().getValue()));
             }
           }))
-          .apply(TextIO.Write
+          .apply(TextIO.write()
               .to(output)
               .withSuffix(".csv"));
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java
index bb8c8bc..478e2dc 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java
@@ -210,7 +210,7 @@ public class TopWikipediaSessions {
     p.apply(TextIO.read().from(options.getInput()))
         .apply(MapElements.via(new ParseTableRowJson()))
         .apply(new ComputeTopSessions(samplingThreshold))
-        .apply("Write", TextIO.Write.withoutSharding().to(options.getOutput()));
+        .apply("Write", TextIO.write().withoutSharding().to(options.getOutput()));
 
     p.run().waitUntilFinish();
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/examples/java/src/main/java/org/apache/beam/examples/cookbook/DistinctExample.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/DistinctExample.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/DistinctExample.java
index 20c8fa0..bb16528 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/DistinctExample.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/DistinctExample.java
@@ -89,7 +89,7 @@ public class DistinctExample {
 
     p.apply("ReadLines", TextIO.read().from(options.getInput()))
      .apply(Distinct.<String>create())
-     .apply("DedupedShakespeare", TextIO.Write.to(options.getOutput()));
+     .apply("DedupedShakespeare", TextIO.write().to(options.getOutput()));
 
     p.run().waitUntilFinish();
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java
index 05a3ad3..d1fffb4 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java
@@ -169,7 +169,7 @@ public class JoinExamples {
     PCollection<TableRow> eventsTable = p.apply(BigQueryIO.read().from(GDELT_EVENTS_TABLE));
     PCollection<TableRow> countryCodes = p.apply(BigQueryIO.read().from(COUNTRY_CODES));
     PCollection<String> formattedResults = joinEvents(eventsTable, countryCodes);
-    formattedResults.apply(TextIO.Write.to(options.getOutput()));
+    formattedResults.apply(TextIO.write().to(options.getOutput()));
     p.run().waitUntilFinish();
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java b/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java
index 6dabadc..85c291d 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java
@@ -65,7 +65,7 @@ public class MinimalWordCountJava8 {
          .into(TypeDescriptors.strings())
          .via((KV<String, Long> wordCount) -> wordCount.getKey() + ": " + wordCount.getValue()))
      // CHANGE 3/3: The Google Cloud Storage path is required for outputting the results to.
-     .apply(TextIO.Write.to("gs://YOUR_OUTPUT_BUCKET/AND_OUTPUT_PREFIX"));
+     .apply(TextIO.write().to("gs://YOUR_OUTPUT_BUCKET/AND_OUTPUT_PREFIX"));
 
     p.run().waitUntilFinish();
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java b/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java
index 689005a..e071b4e 100644
--- a/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java
+++ b/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java
@@ -71,7 +71,7 @@ public class MinimalWordCountJava8Test implements Serializable {
      .apply(MapElements
          .into(TypeDescriptors.strings())
          .via((KV<String, Long> wordCount) -> wordCount.getKey() + ": " + wordCount.getValue()))
-     .apply(TextIO.Write.to("gs://your-output-bucket/and-output-prefix"));
+     .apply(TextIO.write().to("gs://your-output-bucket/and-output-prefix"));
   }
 
   private GcsUtil buildMockGcsUtil() throws IOException {

http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
index b0fab0b..83af61b 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
@@ -110,7 +110,7 @@ public class WordCountTest {
       .apply(ParDo.of(new ExtractWordsFn()))
       .apply(Count.<String>perElement())
       .apply(ParDo.of(new FormatAsStringFn()))
-      .apply("WriteCounts", TextIO.Write.to(options.getOutput()))
+      .apply("WriteCounts", TextIO.write().to(options.getOutput()))
       ;
     p.run().waitUntilFinish();
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
index 960640c..53d2ba3 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
@@ -86,7 +86,7 @@ public class WriteWithShardingFactoryTest {
     String targetLocation = IOChannelUtils.resolve(outputPath, fileName);
     // TextIO is implemented in terms of the WriteFiles PTransform. When sharding is not specified,
     // resharding should be automatically applied
-    p.apply(Create.of(strs)).apply(TextIO.Write.to(targetLocation));
+    p.apply(Create.of(strs)).apply(TextIO.write().to(targetLocation));
 
     p.run();
 

http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java
index 65d198e..5985da8 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java
@@ -76,7 +76,7 @@ public class ReadSourceITCase extends JavaProgramTestBase {
           }
         }));
 
-    result.apply(TextIO.Write.to(new URI(resultPath).getPath() + "/part"));
+    result.apply(TextIO.write().to(new URI(resultPath).getPath() + "/part"));
 
     p.run();
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingITCase.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingITCase.java
index 4f597c3..0707c21 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingITCase.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingITCase.java
@@ -65,7 +65,7 @@ public class ReadSourceStreamingITCase extends StreamingProgramTestBase {
             c.output(c.element().toString());
           }
         }))
-      .apply(TextIO.Write.to(resultPath));
+      .apply(TextIO.write().to(resultPath));
 
     p.run();
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java
index 82d9f4f..2bd8e72 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java
@@ -123,7 +123,7 @@ public class GroupByNullKeyTest extends StreamingProgramTestBase implements Seri
               c.output(str.toString());
             }
           }));
-    output.apply(TextIO.Write.to(resultPath));
+    output.apply(TextIO.write().to(resultPath));
     p.run();
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsITCase.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsITCase.java
index 9e6bba8..28335e3 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsITCase.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsITCase.java
@@ -126,7 +126,7 @@ public class TopWikipediaSessionsITCase extends StreamingProgramTestBase impleme
       }
     }));
 
-    format.apply(TextIO.Write.to(resultPath));
+    format.apply(TextIO.write().to(resultPath));
 
     p.run();
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index d47da45..31c47b4 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -138,7 +138,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
     FileSystems.setDefaultConfigInWorkers(options);
 
     p.apply("ReadMyFile", TextIO.read().from("gs://bucket/object"))
-     .apply("WriteMyFile", TextIO.Write.to("gs://bucket/object"));
+     .apply("WriteMyFile", TextIO.write().to("gs://bucket/object"));
     DataflowRunner runner = DataflowRunner.fromOptions(options);
     runner.replaceTransforms(p);
 
@@ -525,7 +525,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
     String stepName = "DoFn1";
     pipeline.apply("ReadMyFile", TextIO.read().from("gs://bucket/in"))
         .apply(stepName, ParDo.of(new NoOpFn()))
-        .apply("WriteMyFile", TextIO.Write.to("gs://bucket/out"));
+        .apply("WriteMyFile", TextIO.write().to("gs://bucket/out"));
     DataflowRunner runner = DataflowRunner.fromOptions(options);
     runner.replaceTransforms(pipeline);
     Job job =

http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index 7261fe9..d011994 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -172,7 +172,7 @@ public class DataflowRunnerTest {
     Pipeline p = Pipeline.create(options);
 
     p.apply("ReadMyFile", TextIO.read().from("gs://bucket/object"))
-        .apply("WriteMyFile", TextIO.Write.to("gs://bucket/object"));
+        .apply("WriteMyFile", TextIO.write().to("gs://bucket/object"));
 
     // Enable the FileSystems API to know about gs:// URIs in this test.
     FileSystems.setDefaultConfigInWorkers(options);
@@ -335,7 +335,7 @@ public class DataflowRunnerTest {
     Pipeline p = buildDataflowPipeline(dataflowOptions);
     p
         .apply(TextIO.read().from(options.getInput()))
-        .apply(TextIO.Write.to(options.getOutput()));
+        .apply(TextIO.write().to(options.getOutput()));
   }
 
   /**
@@ -587,7 +587,7 @@ public class DataflowRunnerTest {
     Pipeline p = buildDataflowPipeline(buildPipelineOptions());
 
     p.apply("ReadMyGcsFile", TextIO.read().from("gs://bucket/object"))
-        .apply("WriteMyNonGcsFile", TextIO.Write.to("/tmp/file"));
+        .apply("WriteMyNonGcsFile", TextIO.write().to("/tmp/file"));
 
     thrown.expect(IllegalArgumentException.class);
     thrown.expectMessage(containsString("Expected a valid 'gs://' path but was given"));
@@ -613,7 +613,7 @@ public class DataflowRunnerTest {
   public void testMultiSlashGcsFileWritePath() throws IOException {
     Pipeline p = buildDataflowPipeline(buildPipelineOptions());
     PCollection<String> pc = p.apply("ReadMyGcsFile", TextIO.read().from("gs://bucket/object"));
-    pc.apply("WriteInvalidGcsFile", TextIO.Write.to("gs://bucket/tmp//file"));
+    pc.apply("WriteInvalidGcsFile", TextIO.write().to("gs://bucket/tmp//file"));
 
     thrown.expect(IllegalArgumentException.class);
     thrown.expectMessage("consecutive slashes");

http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
index 2bcf140..0779bd5 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
@@ -128,7 +128,7 @@ public class WordCount {
     p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
      .apply(new CountWords())
      .apply(MapElements.via(new FormatAsTextFn()))
-     .apply("WriteCounts", TextIO.Write.to(options.getOutput()));
+     .apply("WriteCounts", TextIO.write().to(options.getOutput()));
 
     p.run().waitUntilFinish();
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java
index ce52b90..e43bc4e 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java
@@ -88,7 +88,7 @@ public class SparkRunnerDebuggerTest {
 
     wordCounts
         .apply(MapElements.via(new WordCount.FormatAsTextFn()))
-        .apply(TextIO.Write.to("!!PLACEHOLDER-OUTPUT-DIR!!").withNumShards(3).withSuffix(".txt"));
+        .apply(TextIO.write().to("!!PLACEHOLDER-OUTPUT-DIR!!").withNumShards(3).withSuffix(".txt"));
 
     final String expectedPipeline = "sparkContext.parallelize(Arrays.asList(...))\n"
         + "_.mapPartitions(new org.apache.beam.runners.spark.examples.WordCount$ExtractWordsFn())\n"

http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
index c936ed3..5021744 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
@@ -73,7 +73,8 @@ public class NumShardsTest {
     PCollection<String> inputWords = p.apply(Create.of(WORDS).withCoder(StringUtf8Coder.of()));
     PCollection<String> output = inputWords.apply(new WordCount.CountWords())
         .apply(MapElements.via(new WordCount.FormatAsTextFn()));
-    output.apply(TextIO.Write.to(outputDir.getAbsolutePath()).withNumShards(3).withSuffix(".txt"));
+    output.apply(
+        TextIO.write().to(outputDir.getAbsolutePath()).withNumShards(3).withSuffix(".txt"));
     p.run().waitUntilFinish();
 
     int count = 0;

http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
index 7cb9386..d4c46cc 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
@@ -99,7 +99,7 @@ import org.slf4j.LoggerFactory;
  *     .apply(new Count<String>());
  * PCollection<String> formattedWordCounts =
  *     wordCounts.apply(ParDo.of(new FormatCounts()));
- * formattedWordCounts.apply(TextIO.Write.to("gs://bucket/dir/counts.txt"));
+ * formattedWordCounts.apply(TextIO.write().to("gs://bucket/dir/counts.txt"));
  *
  * // PTransforms aren't executed when they're applied, rather they're
  * // just added to the Pipeline.  Once the whole Pipeline of PTransforms

http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ShardNameTemplate.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ShardNameTemplate.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ShardNameTemplate.java
index 7f48a5c..cc85242 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ShardNameTemplate.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ShardNameTemplate.java
@@ -45,7 +45,7 @@ package org.apache.beam.sdk.io;
  *
  * <pre>{@code
  *   pipeline.apply(
- *       TextIO.Write.to("gs://bucket/path")
+ *       TextIO.write().to("gs://bucket/path")
  *                   .withShardNameTemplate("-SS-of-NN")
  *                   .withSuffix(".txt"))
  * }</pre>

http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index f8670a6..2d82572 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -86,9 +86,9 @@ import org.apache.beam.sdk.values.PDone;
  *
  * <p>By default, all input is put into the global window before writing. If per-window writes are
  * desired - for example, when using a streaming runner -
- * {@link TextIO.Write.Bound#withWindowedWrites()} will cause windowing and triggering to be
+ * {@link TextIO.Write#withWindowedWrites()} will cause windowing and triggering to be
  * preserved. When producing windowed writes, the number of output shards must be set explicitly
- * using {@link TextIO.Write.Bound#withNumShards(int)}; some runners may set this for you to a
+ * using {@link TextIO.Write#withNumShards(int)}; some runners may set this for you to a
  * runner-chosen value, so you may need not set it yourself. A {@link FilenamePolicy} must be
  * set, and unique windows and triggers must produce unique filenames.
  *
@@ -99,11 +99,11 @@ import org.apache.beam.sdk.values.PDone;
  * <pre>{@code
  * // A simple Write to a local file (only runs locally):
  * PCollection<String> lines = ...;
- * lines.apply(TextIO.Write.to("/path/to/file.txt"));
+ * lines.apply(TextIO.write().to("/path/to/file.txt"));
  *
  * // Same as above, only with Gzip compression:
  * PCollection<String> lines = ...;
- * lines.apply(TextIO.Write.to("/path/to/file.txt"));
+ * lines.apply(TextIO.write().to("/path/to/file.txt"));
  *      .withSuffix(".txt")
  *      .withWritableByteChannelFactory(FileBasedSink.CompressionType.GZIP));
  * }</pre>
@@ -117,6 +117,15 @@ public class TextIO {
     return new AutoValue_TextIO_Read.Builder().setCompressionType(CompressionType.AUTO).build();
   }
 
+  /**
+   * A {@link PTransform} that writes a {@link PCollection} to text file (or
+   * multiple text files matching a sharding pattern), with each
+   * element of the input collection encoded into its own line.
+   */
+  public static Write write() {
+    return new Write();
+  }
+
   /** Implementation of {@link #read}. */
   @AutoValue
   public abstract static class Read extends PTransform<PBegin, PCollection<String>> {
@@ -227,49 +236,105 @@ public class TextIO {
 
   /////////////////////////////////////////////////////////////////////////////
 
-  /**
-   * A {@link PTransform} that writes a {@link PCollection} to text file (or
-   * multiple text files matching a sharding pattern), with each
-   * element of the input collection encoded into its own line.
-   */
-  public static class Write {
+  /** Implementation of {@link #write}. */
+  public static class Write extends PTransform<PCollection<String>, PDone> {
+    private static final String DEFAULT_SHARD_TEMPLATE = ShardNameTemplate.INDEX_OF_MAX;
+
+    /** The prefix of each file written, combined with suffix and shardTemplate. */
+    private final ValueProvider<String> filenamePrefix;
+    /** The suffix of each file written, combined with prefix and shardTemplate. */
+    private final String filenameSuffix;
+
+    /** An optional header to add to each file. */
+    @Nullable private final String header;
+
+    /** An optional footer to add to each file. */
+    @Nullable private final String footer;
+
+    /** Requested number of shards. 0 for automatic. */
+    private final int numShards;
+
+    /** The shard template of each file written, combined with prefix and suffix. */
+    private final String shardTemplate;
+
+    /** A policy for naming output files. */
+    private final FilenamePolicy filenamePolicy;
+
+    /** Whether to write windowed output files. */
+    private boolean windowedWrites;
+
+    /**
+     * The {@link WritableByteChannelFactory} to be used by the {@link FileBasedSink}. Default is
+     * {@link FileBasedSink.CompressionType#UNCOMPRESSED}.
+     */
+    private final WritableByteChannelFactory writableByteChannelFactory;
+
+    private Write() {
+      this(null, null, "", null, null, 0, DEFAULT_SHARD_TEMPLATE,
+          FileBasedSink.CompressionType.UNCOMPRESSED, null, false);
+    }
+
+    private Write(String name, ValueProvider<String> filenamePrefix, String filenameSuffix,
+        @Nullable String header, @Nullable String footer, int numShards,
+        String shardTemplate,
+        WritableByteChannelFactory writableByteChannelFactory,
+        FilenamePolicy filenamePolicy,
+        boolean windowedWrites) {
+      super(name);
+      this.header = header;
+      this.footer = footer;
+      this.filenamePrefix = filenamePrefix;
+      this.filenameSuffix = filenameSuffix;
+      this.numShards = numShards;
+      this.shardTemplate = shardTemplate;
+      this.writableByteChannelFactory =
+          firstNonNull(writableByteChannelFactory, FileBasedSink.CompressionType.UNCOMPRESSED);
+      this.filenamePolicy = filenamePolicy;
+      this.windowedWrites = windowedWrites;
+    }
 
     /**
-     * Returns a transform for writing to text files that writes to the file(s)
-     * with the given prefix. This can be a local filename
+     * Writes to text files with the given prefix. This can be a local filename
      * (if running locally), or a Google Cloud Storage filename of
      * the form {@code "gs://<bucket>/<filepath>"}
      * (if running locally or using remote execution).
      *
      * <p>The files written will begin with this prefix, followed by
-     * a shard identifier (see {@link Bound#withNumShards(int)}, and end
-     * in a common extension, if given by {@link Bound#withSuffix(String)}.
+     * a shard identifier (see {@link #withNumShards(int)}, and end
+     * in a common extension, if given by {@link #withSuffix(String)}.
      */
-    public static Bound to(String prefix) {
-      return new Bound().to(prefix);
+    public Write to(String filenamePrefix) {
+      validateOutputComponent(filenamePrefix);
+      return new Write(name, StaticValueProvider.of(filenamePrefix), filenameSuffix,
+          header, footer, numShards, shardTemplate,
+          writableByteChannelFactory, filenamePolicy, windowedWrites);
     }
 
-    public static Bound to(FilenamePolicy filenamePolicy) {
-      return new Bound().to(filenamePolicy);
-
+    /** Like {@link #to(String)}, but with a {@link ValueProvider}. */
+    public Write to(ValueProvider<String> filenamePrefix) {
+      return new Write(name, filenamePrefix, filenameSuffix, header, footer, numShards,
+          shardTemplate, writableByteChannelFactory, filenamePolicy, windowedWrites);
     }
-    /**
-     * Like {@link #to(String)}, but with a {@link ValueProvider}.
-     */
-    public static Bound to(ValueProvider<String> prefix) {
-      return new Bound().to(prefix);
+
+    /** Like {@link #to(String)}, but with a {@link FilenamePolicy}. */
+    public Write to(FilenamePolicy filenamePolicy) {
+      return new Write(name, filenamePrefix, filenameSuffix, header, footer, numShards,
+          shardTemplate, writableByteChannelFactory, filenamePolicy, windowedWrites);
     }
 
     /**
-     * Returns a transform for writing to text files that appends the specified suffix
-     * to the created files.
+     * Writes to the file(s) with the given filename suffix.
+     *
+     * @see ShardNameTemplate
      */
-    public static Bound withSuffix(String nameExtension) {
-      return new Bound().withSuffix(nameExtension);
+    public Write withSuffix(String nameExtension) {
+      validateOutputComponent(nameExtension);
+      return new Write(name, filenamePrefix, nameExtension, header, footer, numShards,
+          shardTemplate, writableByteChannelFactory, filenamePolicy, windowedWrites);
     }
 
     /**
-     * Returns a transform for writing to text files that uses the provided shard count.
+     * Uses the provided shard count.
      *
      * <p>Constraining the number of shards is likely to reduce
      * the performance of a pipeline. Setting this value is not recommended
@@ -277,371 +342,169 @@ public class TextIO {
      *
      * @param numShards the number of shards to use, or 0 to let the system
      *                  decide.
+     * @see ShardNameTemplate
      */
-    public static Bound withNumShards(int numShards) {
-      return new Bound().withNumShards(numShards);
+    public Write withNumShards(int numShards) {
+      checkArgument(numShards >= 0);
+      return new Write(name, filenamePrefix, filenameSuffix, header, footer, numShards,
+          shardTemplate, writableByteChannelFactory, filenamePolicy, windowedWrites);
     }
 
     /**
-     * Returns a transform for writing to text files that uses the given shard name
-     * template.
+     * Uses the given shard name template.
      *
-     * <p>See {@link ShardNameTemplate} for a description of shard templates.
+     * @see ShardNameTemplate
      */
-    public static Bound withShardNameTemplate(String shardTemplate) {
-      return new Bound().withShardNameTemplate(shardTemplate);
+    public Write withShardNameTemplate(String shardTemplate) {
+      return new Write(name, filenamePrefix, filenameSuffix, header, footer, numShards,
+          shardTemplate, writableByteChannelFactory, filenamePolicy, windowedWrites);
     }
 
     /**
-     * Returns a transform for writing to text files that forces a single file as
-     * output.
+     * Forces a single file as output.
+     *
+     * <p>Constraining the number of shards is likely to reduce
+     * the performance of a pipeline. Using this setting is not recommended
+     * unless you truly require a single output file.
+     *
+     * <p>This is a shortcut for
+     * {@code .withNumShards(1).withShardNameTemplate("")}
      */
-    public static Bound withoutSharding() {
-      return new Bound().withoutSharding();
+    public Write withoutSharding() {
+      return new Write(name, filenamePrefix, filenameSuffix, header, footer, 1, "",
+          writableByteChannelFactory, filenamePolicy, windowedWrites);
     }
 
     /**
-     * Returns a transform for writing to text files that adds a header string to the files
-     * it writes. Note that a newline character will be added after the header.
+     * Adds a header string to each file. A newline after the header is added automatically.
      *
      * <p>A {@code null} value will clear any previously configured header.
-     *
-     * @param header the string to be added as file header
      */
-    public static Bound withHeader(@Nullable String header) {
-      return new Bound().withHeader(header);
+    public Write withHeader(@Nullable String header) {
+      return new Write(name, filenamePrefix, filenameSuffix, header, footer, numShards,
+          shardTemplate, writableByteChannelFactory, filenamePolicy, windowedWrites);
     }
 
     /**
-     * Returns a transform for writing to text files that adds a footer string to the files
-     * it writes. Note that a newline character will be added after the header.
+     * Adds a footer string to each file. A newline after the footer is added automatically.
      *
      * <p>A {@code null} value will clear any previously configured footer.
-     *
-     * @param footer the string to be added as file footer
      */
-    public static Bound withFooter(@Nullable String footer) {
-      return new Bound().withFooter(footer);
+    public Write withFooter(@Nullable String footer) {
+      return new Write(name, filenamePrefix, filenameSuffix, header, footer, numShards,
+          shardTemplate, writableByteChannelFactory, filenamePolicy, windowedWrites);
     }
 
     /**
      * Returns a transform for writing to text files like this one but that has the given
-     * {@link WritableByteChannelFactory} to be used by the {@link FileBasedSink} during output. The
-     * default is value is {@link FileBasedSink.CompressionType#UNCOMPRESSED}.
+     * {@link WritableByteChannelFactory} to be used by the {@link FileBasedSink} during output.
+     * The default is value is {@link FileBasedSink.CompressionType#UNCOMPRESSED}.
      *
      * <p>A {@code null} value will reset the value to the default value mentioned above.
-     *
-     * @param writableByteChannelFactory the factory to be used during output
      */
-    public static Bound withWritableByteChannelFactory(
+    public Write withWritableByteChannelFactory(
         WritableByteChannelFactory writableByteChannelFactory) {
-      return new Bound().withWritableByteChannelFactory(writableByteChannelFactory);
+      return new Write(name, filenamePrefix, filenameSuffix, header, footer, numShards,
+          shardTemplate, writableByteChannelFactory, filenamePolicy, windowedWrites);
     }
 
-    // TODO: appendingNewlines, etc.
-
-    /**
-     * A PTransform that writes a bounded PCollection to a text file (or
-     * multiple text files matching a sharding pattern), with each
-     * PCollection element being encoded into its own line.
-     */
-    public static class Bound extends PTransform<PCollection<String>, PDone> {
-      private static final String DEFAULT_SHARD_TEMPLATE = ShardNameTemplate.INDEX_OF_MAX;
-
-      /** The prefix of each file written, combined with suffix and shardTemplate. */
-      private final ValueProvider<String> filenamePrefix;
-      /** The suffix of each file written, combined with prefix and shardTemplate. */
-      private final String filenameSuffix;
-
-      /** An optional header to add to each file. */
-      @Nullable private final String header;
-
-      /** An optional footer to add to each file. */
-      @Nullable private final String footer;
-
-      /** Requested number of shards. 0 for automatic. */
-      private final int numShards;
-
-      /** The shard template of each file written, combined with prefix and suffix. */
-      private final String shardTemplate;
-
-      /** A policy for naming output files. */
-      private final FilenamePolicy filenamePolicy;
-
-      /** Whether to write windowed output files. */
-      private boolean windowedWrites;
-
-      /**
-       * The {@link WritableByteChannelFactory} to be used by the {@link FileBasedSink}. Default is
-       * {@link FileBasedSink.CompressionType#UNCOMPRESSED}.
-       */
-      private final WritableByteChannelFactory writableByteChannelFactory;
-
-      private Bound() {
-        this(null, null, "", null, null, 0, DEFAULT_SHARD_TEMPLATE,
-            FileBasedSink.CompressionType.UNCOMPRESSED, null, false);
-      }
-
-      private Bound(String name, ValueProvider<String> filenamePrefix, String filenameSuffix,
-          @Nullable String header, @Nullable String footer, int numShards,
-          String shardTemplate,
-          WritableByteChannelFactory writableByteChannelFactory,
-          FilenamePolicy filenamePolicy,
-          boolean windowedWrites) {
-        super(name);
-        this.header = header;
-        this.footer = footer;
-        this.filenamePrefix = filenamePrefix;
-        this.filenameSuffix = filenameSuffix;
-        this.numShards = numShards;
-        this.shardTemplate = shardTemplate;
-        this.writableByteChannelFactory =
-            firstNonNull(writableByteChannelFactory, FileBasedSink.CompressionType.UNCOMPRESSED);
-        this.filenamePolicy = filenamePolicy;
-        this.windowedWrites = windowedWrites;
-      }
-
-      /**
-       * Returns a transform for writing to text files that's like this one but
-       * that writes to the file(s) with the given filename prefix.
-       *
-       * <p>See {@link TextIO.Write#to(String) Write.to(String)} for more information.
-       *
-       * <p>Does not modify this object.
-       */
-      public Bound to(String filenamePrefix) {
-        validateOutputComponent(filenamePrefix);
-        return new Bound(name, StaticValueProvider.of(filenamePrefix), filenameSuffix,
-            header, footer, numShards, shardTemplate,
-            writableByteChannelFactory, filenamePolicy, windowedWrites);
-      }
-
-      /**
-       * Like {@link #to(String)}, but with a {@link ValueProvider}.
-       */
-      public Bound to(ValueProvider<String> filenamePrefix) {
-        return new Bound(name, filenamePrefix, filenameSuffix, header, footer, numShards,
-            shardTemplate, writableByteChannelFactory, filenamePolicy, windowedWrites);
-      }
-
-       /**
-        * Like {@link #to(String)}, but with a {@link FilenamePolicy}.
-        */
-      public Bound to(FilenamePolicy filenamePolicy) {
-        return new Bound(name, filenamePrefix, filenameSuffix, header, footer, numShards,
-            shardTemplate, writableByteChannelFactory, filenamePolicy, windowedWrites);
-      }
-
-      /**
-       * Returns a transform for writing to text files that that's like this one but
-       * that writes to the file(s) with the given filename suffix.
-       *
-       * <p>Does not modify this object.
-       *
-       * @see ShardNameTemplate
-       */
-      public Bound withSuffix(String nameExtension) {
-        validateOutputComponent(nameExtension);
-        return new Bound(name, filenamePrefix, nameExtension, header, footer, numShards,
-            shardTemplate, writableByteChannelFactory, filenamePolicy, windowedWrites);
-      }
-
-      /**
-       * Returns a transform for writing to text files that's like this one but
-       * that uses the provided shard count.
-       *
-       * <p>Constraining the number of shards is likely to reduce
-       * the performance of a pipeline. Setting this value is not recommended
-       * unless you require a specific number of output files.
-       *
-       * <p>Does not modify this object.
-       *
-       * @param numShards the number of shards to use, or 0 to let the system
-       *                  decide.
-       * @see ShardNameTemplate
-       */
-      public Bound withNumShards(int numShards) {
-        checkArgument(numShards >= 0);
-        return new Bound(name, filenamePrefix, filenameSuffix, header, footer, numShards,
-            shardTemplate, writableByteChannelFactory, filenamePolicy, windowedWrites);
-      }
-
-      /**
-       * Returns a transform for writing to text files that's like this one but
-       * that uses the given shard name template.
-       *
-       * <p>Does not modify this object.
-       *
-       * @see ShardNameTemplate
-       */
-      public Bound withShardNameTemplate(String shardTemplate) {
-        return new Bound(name, filenamePrefix, filenameSuffix, header, footer, numShards,
-            shardTemplate, writableByteChannelFactory, filenamePolicy, windowedWrites);
-      }
-
-      /**
-       * Returns a transform for writing to text files that's like this one but
-       * that forces a single file as output.
-       *
-       * <p>Constraining the number of shards is likely to reduce
-       * the performance of a pipeline. Using this setting is not recommended
-       * unless you truly require a single output file.
-       *
-       * <p>This is a shortcut for
-       * {@code .withNumShards(1).withShardNameTemplate("")}
-       *
-       * <p>Does not modify this object.
-       */
-      public Bound withoutSharding() {
-        return new Bound(name, filenamePrefix, filenameSuffix, header, footer, 1, "",
-            writableByteChannelFactory, filenamePolicy, windowedWrites);
-      }
+    public Write withWindowedWrites() {
+      return new Write(name, filenamePrefix, filenameSuffix, header, footer, numShards,
+          shardTemplate, writableByteChannelFactory, filenamePolicy, true);
+    }
 
-      /**
-       * Returns a transform for writing to text files that adds a header string to the files
-       * it writes. Note that a newline character will be added after the header.
-       *
-       * <p>A {@code null} value will clear any previously configured header.
-       *
-       * <p>Does not modify this object.
-       *
-       * @param header the string to be added as file header
-       */
-      public Bound withHeader(@Nullable String header) {
-        return new Bound(name, filenamePrefix, filenameSuffix, header, footer, numShards,
-            shardTemplate, writableByteChannelFactory, filenamePolicy, windowedWrites);
+    @Override
+    public PDone expand(PCollection<String> input) {
+      if (filenamePolicy == null && filenamePrefix == null) {
+        throw new IllegalStateException(
+            "need to set the filename prefix of an TextIO.Write transform");
       }
-
-      /**
-       * Returns a transform for writing to text files that adds a footer string to the files
-       * it writes. Note that a newline character will be added after the header.
-       *
-       * <p>A {@code null} value will clear any previously configured footer.
-       *
-       * <p>Does not modify this object.
-       *
-       * @param footer the string to be added as file footer
-       */
-      public Bound withFooter(@Nullable String footer) {
-        return new Bound(name, filenamePrefix, filenameSuffix, header, footer, numShards,
-            shardTemplate, writableByteChannelFactory, filenamePolicy, windowedWrites);
+      if (filenamePolicy != null && filenamePrefix != null) {
+        throw new IllegalStateException(
+            "cannot set both a filename policy and a filename prefix");
       }
-
-      /**
-       * Returns a transform for writing to text files like this one but that has the given
-       * {@link WritableByteChannelFactory} to be used by the {@link FileBasedSink} during output.
-       * The default is value is {@link FileBasedSink.CompressionType#UNCOMPRESSED}.
-       *
-       * <p>A {@code null} value will reset the value to the default value mentioned above.
-       *
-       * <p>Does not modify this object.
-       *
-       * @param writableByteChannelFactory the factory to be used during output
-       */
-      public Bound withWritableByteChannelFactory(
-          WritableByteChannelFactory writableByteChannelFactory) {
-        return new Bound(name, filenamePrefix, filenameSuffix, header, footer, numShards,
-            shardTemplate, writableByteChannelFactory, filenamePolicy, windowedWrites);
+      WriteFiles<String> write = null;
+      if (filenamePolicy != null) {
+       write = WriteFiles.to(
+           new TextSink(filenamePolicy, header, footer, writableByteChannelFactory));
+      } else {
+        write = WriteFiles.to(
+            new TextSink(filenamePrefix, filenameSuffix, header, footer, shardTemplate,
+                writableByteChannelFactory));
       }
-
-      public Bound withWindowedWrites() {
-        return new Bound(name, filenamePrefix, filenameSuffix, header, footer, numShards,
-            shardTemplate, writableByteChannelFactory, filenamePolicy, true);
+      if (getNumShards() > 0) {
+        write = write.withNumShards(getNumShards());
       }
-
-      @Override
-      public PDone expand(PCollection<String> input) {
-        if (filenamePolicy == null && filenamePrefix == null) {
-          throw new IllegalStateException(
-              "need to set the filename prefix of an TextIO.Write transform");
-        }
-        if (filenamePolicy != null && filenamePrefix != null) {
-          throw new IllegalStateException(
-              "cannot set both a filename policy and a filename prefix");
-        }
-        WriteFiles<String> write = null;
-        if (filenamePolicy != null) {
-         write = WriteFiles.to(
-             new TextSink(filenamePolicy, header, footer, writableByteChannelFactory));
-        } else {
-          write = WriteFiles.to(
-              new TextSink(filenamePrefix, filenameSuffix, header, footer, shardTemplate,
-                  writableByteChannelFactory));
-        }
-        if (getNumShards() > 0) {
-          write = write.withNumShards(getNumShards());
-        }
-        if (windowedWrites) {
-          write = write.withWindowedWrites();
-        }
-        return input.apply("WriteFiles", write);
+      if (windowedWrites) {
+        write = write.withWindowedWrites();
       }
+      return input.apply("WriteFiles", write);
+    }
 
-      @Override
-      public void populateDisplayData(DisplayData.Builder builder) {
-        super.populateDisplayData(builder);
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
 
-        String prefixString = "";
-        if (filenamePrefix != null) {
-          prefixString = filenamePrefix.isAccessible()
-              ? filenamePrefix.get() : filenamePrefix.toString();
-        }
-        builder
-            .addIfNotNull(DisplayData.item("filePrefix", prefixString)
-              .withLabel("Output File Prefix"))
-            .addIfNotDefault(DisplayData.item("fileSuffix", filenameSuffix)
-              .withLabel("Output File Suffix"), "")
-            .addIfNotDefault(DisplayData.item("shardNameTemplate", shardTemplate)
-              .withLabel("Output Shard Name Template"),
-                DEFAULT_SHARD_TEMPLATE)
-            .addIfNotDefault(DisplayData.item("numShards", numShards)
-              .withLabel("Maximum Output Shards"), 0)
-            .addIfNotNull(DisplayData.item("fileHeader", header)
-              .withLabel("File Header"))
-            .addIfNotNull(DisplayData.item("fileFooter", footer)
-                .withLabel("File Footer"))
-            .add(DisplayData
-                .item("writableByteChannelFactory", writableByteChannelFactory.toString())
-                .withLabel("Compression/Transformation Type"));
+      String prefixString = "";
+      if (filenamePrefix != null) {
+        prefixString = filenamePrefix.isAccessible()
+            ? filenamePrefix.get() : filenamePrefix.toString();
       }
+      builder
+          .addIfNotNull(DisplayData.item("filePrefix", prefixString)
+            .withLabel("Output File Prefix"))
+          .addIfNotDefault(DisplayData.item("fileSuffix", filenameSuffix)
+            .withLabel("Output File Suffix"), "")
+          .addIfNotDefault(DisplayData.item("shardNameTemplate", shardTemplate)
+            .withLabel("Output Shard Name Template"),
+              DEFAULT_SHARD_TEMPLATE)
+          .addIfNotDefault(DisplayData.item("numShards", numShards)
+            .withLabel("Maximum Output Shards"), 0)
+          .addIfNotNull(DisplayData.item("fileHeader", header)
+            .withLabel("File Header"))
+          .addIfNotNull(DisplayData.item("fileFooter", footer)
+              .withLabel("File Footer"))
+          .add(DisplayData
+              .item("writableByteChannelFactory", writableByteChannelFactory.toString())
+              .withLabel("Compression/Transformation Type"));
+    }
 
-      /**
-       * Returns the current shard name template string.
-       */
-      public String getShardNameTemplate() {
-        return shardTemplate;
-      }
+    /**
+     * Returns the current shard name template string.
+     */
+    public String getShardNameTemplate() {
+      return shardTemplate;
+    }
 
-      @Override
-      protected Coder<Void> getDefaultOutputCoder() {
-        return VoidCoder.of();
-      }
+    @Override
+    protected Coder<Void> getDefaultOutputCoder() {
+      return VoidCoder.of();
+    }
 
-      public String getFilenamePrefix() {
-        return filenamePrefix.get();
-      }
+    public String getFilenamePrefix() {
+      return filenamePrefix.get();
+    }
 
-      public String getShardTemplate() {
-        return shardTemplate;
-      }
+    public String getShardTemplate() {
+      return shardTemplate;
+    }
 
-      public int getNumShards() {
-        return numShards;
-      }
+    public int getNumShards() {
+      return numShards;
+    }
 
-      public String getFilenameSuffix() {
-        return filenameSuffix;
-      }
+    public String getFilenameSuffix() {
+      return filenameSuffix;
+    }
 
-      @Nullable
-      public String getHeader() {
-        return header;
-      }
+    @Nullable
+    public String getHeader() {
+      return header;
+    }
 
-      @Nullable
-      public String getFooter() {
-        return footer;
-      }
+    @Nullable
+    public String getFooter() {
+      return footer;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/sdks/java/core/src/main/java/org/apache/beam/sdk/io/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/package-info.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/package-info.java
index c65d7dd..3fc8e32 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/package-info.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/package-info.java
@@ -29,7 +29,7 @@
  * and {@code Write} transforms that persist PCollections to external storage:
  * <pre> {@code
  * PCollection<Integer> numbers = ...;
- * numbers.apply(TextIO.Write.to("gs://my_bucket/path/to/numbers"));
+ * numbers.apply(TextIO.write().to("gs://my_bucket/path/to/numbers"));
  * } </pre>
  */
 package org.apache.beam.sdk.io;

http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
index 8a7965c..095b69f 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
@@ -299,8 +299,8 @@ public class TextIOTest {
     PCollection<String> input =
         p.apply(Create.of(Arrays.asList(elems)).withCoder(StringUtf8Coder.of()));
 
-    TextIO.Write.Bound write =
-        TextIO.Write.to(baseFilename)
+    TextIO.Write write =
+        TextIO.write().to(baseFilename)
             .withHeader(header)
             .withFooter(footer);
 
@@ -463,7 +463,7 @@ public class TextIOTest {
 
     final WritableByteChannelFactory writableByteChannelFactory =
         new DrunkWritableByteChannelFactory();
-    TextIO.Write.Bound write = TextIO.Write.to(baseDir.resolve(outputName).toString())
+    TextIO.Write write = TextIO.write().to(baseDir.resolve(outputName).toString())
         .withoutSharding().withWritableByteChannelFactory(writableByteChannelFactory);
     DisplayData displayData = DisplayData.from(write);
     assertThat(displayData, hasDisplayItem("writableByteChannelFactory", "DRUNK"));
@@ -483,7 +483,7 @@ public class TextIOTest {
 
   @Test
   public void testWriteDisplayData() {
-    TextIO.Write.Bound write = TextIO.Write
+    TextIO.Write write = TextIO.write()
         .to("foo")
         .withSuffix("bar")
         .withShardNameTemplate("-SS-of-NN-")
@@ -504,7 +504,7 @@ public class TextIOTest {
 
   @Test
   public void testWriteDisplayDataValidateThenHeader() {
-    TextIO.Write.Bound write = TextIO.Write
+    TextIO.Write write = TextIO.write()
         .to("foo")
         .withHeader("myHeader");
 
@@ -515,7 +515,7 @@ public class TextIOTest {
 
   @Test
   public void testWriteDisplayDataValidateThenFooter() {
-    TextIO.Write.Bound write = TextIO.Write
+    TextIO.Write write = TextIO.write()
         .to("foo")
         .withFooter("myFooter");
 
@@ -534,7 +534,7 @@ public class TextIOTest {
 
     DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
 
-    TextIO.Write.Bound write = TextIO.Write.to(outputPath);
+    TextIO.Write write = TextIO.write().to(outputPath);
 
     Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write);
     assertThat("TextIO.Write should include the file prefix in its primitive display data",
@@ -553,7 +553,7 @@ public class TextIOTest {
 
     expectedException.expect(IllegalArgumentException.class);
     expectedException.expectMessage("Output name components are not allowed to contain");
-    input.apply(TextIO.Write.to(filename));
+    input.apply(TextIO.write().to(filename));
   }
 
   /** Options for testing. */
@@ -573,7 +573,7 @@ public class TextIOTest {
 
     p
         .apply(TextIO.read().from(options.getInput()))
-        .apply(TextIO.Write.to(options.getOutput()));
+        .apply(TextIO.write().to(options.getOutput()));
   }
 
   @Test
@@ -826,7 +826,7 @@ public class TextIOTest {
   @Test
   public void testTextIOGetName() {
     assertEquals("TextIO.Read", TextIO.read().from("somefile").getName());
-    assertEquals("TextIO.Write", TextIO.Write.to("somefile").getName());
+    assertEquals("TextIO.Write", TextIO.write().to("somefile").getName());
     assertEquals("TextIO.Read", TextIO.read().from("somefile").toString());
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
index 29d9774..6c3aba2 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
@@ -124,7 +124,7 @@ public class TransformTreeTest {
     p.apply("ReadMyFile", TextIO.read().from(inputFile.getPath()))
         .apply(sample)
         .apply(Flatten.<String>iterables())
-        .apply("WriteMyFile", TextIO.Write.to(outputFile.getPath()));
+        .apply("WriteMyFile", TextIO.write().to(outputFile.getPath()));
 
     final EnumSet<TransformsSeen> visited =
         EnumSet.noneOf(TransformsSeen.class);

http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/sdks/java/core/src/test/java/org/apache/beam/sdk/util/NameUtilsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/NameUtilsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/NameUtilsTest.java
index c685a63..411f913 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/NameUtilsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/NameUtilsTest.java
@@ -137,7 +137,7 @@ public class NameUtilsTest {
     assertEquals(
         "NameUtilsTest.SomeTransform",
         NameUtils.approximatePTransformName(AutoValue_NameUtilsTest_SomeTransform.class));
-    assertEquals("TextIO.Write", NameUtils.approximatePTransformName(TextIO.Write.Bound.class));
+    assertEquals("TextIO.Write", NameUtils.approximatePTransformName(TextIO.Write.class));
   }
 
   @AutoValue

http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PDoneTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PDoneTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PDoneTest.java
index 7c9d1d9..b07a5b8 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PDoneTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PDoneTest.java
@@ -71,7 +71,7 @@ public class PDoneTest {
       return
           begin
           .apply(Create.of(LINES))
-          .apply(TextIO.Write.to(filename));
+          .apply(TextIO.write().to(filename));
     }
   }
 


[2/7] beam git commit: Converts TextIO.Write to AutoValue

Posted by jk...@apache.org.
Converts TextIO.Write to AutoValue


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/681b5d64
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/681b5d64
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/681b5d64

Branch: refs/heads/master
Commit: 681b5d6400e1d00169a06860506a46053a226003
Parents: 4f5098d
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri Apr 28 17:39:01 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue May 2 12:20:14 2017 -0700

----------------------------------------------------------------------
 .../runners/spark/SparkRunnerDebuggerTest.java  |   2 +-
 .../java/org/apache/beam/sdk/io/TextIO.java     | 181 ++++++++-----------
 .../java/org/apache/beam/sdk/io/TextIOTest.java |   4 +-
 3 files changed, 77 insertions(+), 110 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/681b5d64/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java
index e43bc4e..ea058b2 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java
@@ -100,7 +100,7 @@ public class SparkRunnerDebuggerTest {
         + ".SparkRunnerDebuggerTest$PlusOne())\n"
         + "sparkContext.union(...)\n"
         + "_.mapPartitions(new org.apache.beam.runners.spark.examples.WordCount$FormatAsTextFn())\n"
-        + "_.<org.apache.beam.sdk.io.TextIO$Write$Bound>";
+        + "_.<org.apache.beam.sdk.io.AutoValue_TextIO_Write>";
 
     SparkRunnerDebugger.DebugSparkPipelineResult result =
         (SparkRunnerDebugger.DebugSparkPipelineResult) pipeline.run();

http://git-wip-us.apache.org/repos/asf/beam/blob/681b5d64/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index 2d82572..90d56e7 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -17,7 +17,6 @@
  */
 package org.apache.beam.sdk.io;
 
-import static com.google.common.base.MoreObjects.firstNonNull;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
@@ -123,7 +122,13 @@ public class TextIO {
    * element of the input collection encoded into its own line.
    */
   public static Write write() {
-    return new Write();
+    return new AutoValue_TextIO_Write.Builder()
+        .setFilenameSuffix("")
+        .setNumShards(0)
+        .setShardTemplate(Write.DEFAULT_SHARD_TEMPLATE)
+        .setWritableByteChannelFactory(FileBasedSink.CompressionType.UNCOMPRESSED)
+        .setWindowedWrites(false)
+        .build();
   }
 
   /** Implementation of {@link #read}. */
@@ -237,60 +242,56 @@ public class TextIO {
   /////////////////////////////////////////////////////////////////////////////
 
   /** Implementation of {@link #write}. */
-  public static class Write extends PTransform<PCollection<String>, PDone> {
+  @AutoValue
+  public abstract static class Write extends PTransform<PCollection<String>, PDone> {
     private static final String DEFAULT_SHARD_TEMPLATE = ShardNameTemplate.INDEX_OF_MAX;
 
     /** The prefix of each file written, combined with suffix and shardTemplate. */
-    private final ValueProvider<String> filenamePrefix;
+    @Nullable abstract ValueProvider<String> getFilenamePrefix();
+
     /** The suffix of each file written, combined with prefix and shardTemplate. */
-    private final String filenameSuffix;
+    abstract String getFilenameSuffix();
 
     /** An optional header to add to each file. */
-    @Nullable private final String header;
+    @Nullable abstract String getHeader();
 
     /** An optional footer to add to each file. */
-    @Nullable private final String footer;
+    @Nullable abstract String getFooter();
 
     /** Requested number of shards. 0 for automatic. */
-    private final int numShards;
+    abstract int getNumShards();
 
     /** The shard template of each file written, combined with prefix and suffix. */
-    private final String shardTemplate;
+    abstract String getShardTemplate();
 
     /** A policy for naming output files. */
-    private final FilenamePolicy filenamePolicy;
+    @Nullable abstract FilenamePolicy getFilenamePolicy();
 
     /** Whether to write windowed output files. */
-    private boolean windowedWrites;
+    abstract boolean getWindowedWrites();
 
     /**
      * The {@link WritableByteChannelFactory} to be used by the {@link FileBasedSink}. Default is
      * {@link FileBasedSink.CompressionType#UNCOMPRESSED}.
      */
-    private final WritableByteChannelFactory writableByteChannelFactory;
+    abstract WritableByteChannelFactory getWritableByteChannelFactory();
 
-    private Write() {
-      this(null, null, "", null, null, 0, DEFAULT_SHARD_TEMPLATE,
-          FileBasedSink.CompressionType.UNCOMPRESSED, null, false);
-    }
+    abstract Builder toBuilder();
 
-    private Write(String name, ValueProvider<String> filenamePrefix, String filenameSuffix,
-        @Nullable String header, @Nullable String footer, int numShards,
-        String shardTemplate,
-        WritableByteChannelFactory writableByteChannelFactory,
-        FilenamePolicy filenamePolicy,
-        boolean windowedWrites) {
-      super(name);
-      this.header = header;
-      this.footer = footer;
-      this.filenamePrefix = filenamePrefix;
-      this.filenameSuffix = filenameSuffix;
-      this.numShards = numShards;
-      this.shardTemplate = shardTemplate;
-      this.writableByteChannelFactory =
-          firstNonNull(writableByteChannelFactory, FileBasedSink.CompressionType.UNCOMPRESSED);
-      this.filenamePolicy = filenamePolicy;
-      this.windowedWrites = windowedWrites;
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setFilenamePrefix(ValueProvider<String> filenamePrefix);
+      abstract Builder setFilenameSuffix(String filenameSuffix);
+      abstract Builder setHeader(String header);
+      abstract Builder setFooter(String footer);
+      abstract Builder setNumShards(int numShards);
+      abstract Builder setShardTemplate(String shardTemplate);
+      abstract Builder setFilenamePolicy(FilenamePolicy filenamePolicy);
+      abstract Builder setWindowedWrites(boolean windowedWrites);
+      abstract Builder setWritableByteChannelFactory(
+          WritableByteChannelFactory writableByteChannelFactory);
+
+      abstract Write build();
     }
 
     /**
@@ -305,21 +306,17 @@ public class TextIO {
      */
     public Write to(String filenamePrefix) {
       validateOutputComponent(filenamePrefix);
-      return new Write(name, StaticValueProvider.of(filenamePrefix), filenameSuffix,
-          header, footer, numShards, shardTemplate,
-          writableByteChannelFactory, filenamePolicy, windowedWrites);
+      return to(StaticValueProvider.of(filenamePrefix));
     }
 
     /** Like {@link #to(String)}, but with a {@link ValueProvider}. */
     public Write to(ValueProvider<String> filenamePrefix) {
-      return new Write(name, filenamePrefix, filenameSuffix, header, footer, numShards,
-          shardTemplate, writableByteChannelFactory, filenamePolicy, windowedWrites);
+      return toBuilder().setFilenamePrefix(filenamePrefix).build();
     }
 
     /** Like {@link #to(String)}, but with a {@link FilenamePolicy}. */
     public Write to(FilenamePolicy filenamePolicy) {
-      return new Write(name, filenamePrefix, filenameSuffix, header, footer, numShards,
-          shardTemplate, writableByteChannelFactory, filenamePolicy, windowedWrites);
+      return toBuilder().setFilenamePolicy(filenamePolicy).build();
     }
 
     /**
@@ -329,8 +326,7 @@ public class TextIO {
      */
     public Write withSuffix(String nameExtension) {
       validateOutputComponent(nameExtension);
-      return new Write(name, filenamePrefix, nameExtension, header, footer, numShards,
-          shardTemplate, writableByteChannelFactory, filenamePolicy, windowedWrites);
+      return toBuilder().setFilenameSuffix(nameExtension).build();
     }
 
     /**
@@ -346,8 +342,7 @@ public class TextIO {
      */
     public Write withNumShards(int numShards) {
       checkArgument(numShards >= 0);
-      return new Write(name, filenamePrefix, filenameSuffix, header, footer, numShards,
-          shardTemplate, writableByteChannelFactory, filenamePolicy, windowedWrites);
+      return toBuilder().setNumShards(numShards).build();
     }
 
     /**
@@ -356,8 +351,7 @@ public class TextIO {
      * @see ShardNameTemplate
      */
     public Write withShardNameTemplate(String shardTemplate) {
-      return new Write(name, filenamePrefix, filenameSuffix, header, footer, numShards,
-          shardTemplate, writableByteChannelFactory, filenamePolicy, windowedWrites);
+      return toBuilder().setShardTemplate(shardTemplate).build();
     }
 
     /**
@@ -371,8 +365,7 @@ public class TextIO {
      * {@code .withNumShards(1).withShardNameTemplate("")}
      */
     public Write withoutSharding() {
-      return new Write(name, filenamePrefix, filenameSuffix, header, footer, 1, "",
-          writableByteChannelFactory, filenamePolicy, windowedWrites);
+      return withNumShards(1).withShardNameTemplate("");
     }
 
     /**
@@ -381,8 +374,7 @@ public class TextIO {
      * <p>A {@code null} value will clear any previously configured header.
      */
     public Write withHeader(@Nullable String header) {
-      return new Write(name, filenamePrefix, filenameSuffix, header, footer, numShards,
-          shardTemplate, writableByteChannelFactory, filenamePolicy, windowedWrites);
+      return toBuilder().setHeader(header).build();
     }
 
     /**
@@ -391,8 +383,7 @@ public class TextIO {
      * <p>A {@code null} value will clear any previously configured footer.
      */
     public Write withFooter(@Nullable String footer) {
-      return new Write(name, filenamePrefix, filenameSuffix, header, footer, numShards,
-          shardTemplate, writableByteChannelFactory, filenamePolicy, windowedWrites);
+      return toBuilder().setFooter(footer).build();
     }
 
     /**
@@ -404,38 +395,47 @@ public class TextIO {
      */
     public Write withWritableByteChannelFactory(
         WritableByteChannelFactory writableByteChannelFactory) {
-      return new Write(name, filenamePrefix, filenameSuffix, header, footer, numShards,
-          shardTemplate, writableByteChannelFactory, filenamePolicy, windowedWrites);
+      return toBuilder().setWritableByteChannelFactory(writableByteChannelFactory).build();
     }
 
     public Write withWindowedWrites() {
-      return new Write(name, filenamePrefix, filenameSuffix, header, footer, numShards,
-          shardTemplate, writableByteChannelFactory, filenamePolicy, true);
+      return toBuilder().setWindowedWrites(true).build();
     }
 
     @Override
     public PDone expand(PCollection<String> input) {
-      if (filenamePolicy == null && filenamePrefix == null) {
+      if (getFilenamePolicy() == null && getFilenamePrefix() == null) {
         throw new IllegalStateException(
             "need to set the filename prefix of an TextIO.Write transform");
       }
-      if (filenamePolicy != null && filenamePrefix != null) {
+      if (getFilenamePolicy() != null && getFilenamePrefix() != null) {
         throw new IllegalStateException(
             "cannot set both a filename policy and a filename prefix");
       }
       WriteFiles<String> write = null;
-      if (filenamePolicy != null) {
-       write = WriteFiles.to(
-           new TextSink(filenamePolicy, header, footer, writableByteChannelFactory));
+      if (getFilenamePolicy() != null) {
+        write =
+            WriteFiles.to(
+                new TextSink(
+                    getFilenamePolicy(),
+                    getHeader(),
+                    getFooter(),
+                    getWritableByteChannelFactory()));
       } else {
-        write = WriteFiles.to(
-            new TextSink(filenamePrefix, filenameSuffix, header, footer, shardTemplate,
-                writableByteChannelFactory));
+        write =
+            WriteFiles.to(
+                new TextSink(
+                    getFilenamePrefix(),
+                    getFilenameSuffix(),
+                    getHeader(),
+                    getFooter(),
+                    getShardTemplate(),
+                    getWritableByteChannelFactory()));
       }
       if (getNumShards() > 0) {
         write = write.withNumShards(getNumShards());
       }
-      if (windowedWrites) {
+      if (getWindowedWrites()) {
         write = write.withWindowedWrites();
       }
       return input.apply("WriteFiles", write);
@@ -446,66 +446,33 @@ public class TextIO {
       super.populateDisplayData(builder);
 
       String prefixString = "";
-      if (filenamePrefix != null) {
-        prefixString = filenamePrefix.isAccessible()
-            ? filenamePrefix.get() : filenamePrefix.toString();
+      if (getFilenamePrefix() != null) {
+        prefixString = getFilenamePrefix().isAccessible()
+            ? getFilenamePrefix().get() : getFilenamePrefix().toString();
       }
       builder
           .addIfNotNull(DisplayData.item("filePrefix", prefixString)
             .withLabel("Output File Prefix"))
-          .addIfNotDefault(DisplayData.item("fileSuffix", filenameSuffix)
+          .addIfNotDefault(DisplayData.item("fileSuffix", getFilenameSuffix())
             .withLabel("Output File Suffix"), "")
-          .addIfNotDefault(DisplayData.item("shardNameTemplate", shardTemplate)
+          .addIfNotDefault(DisplayData.item("shardNameTemplate", getShardTemplate())
             .withLabel("Output Shard Name Template"),
               DEFAULT_SHARD_TEMPLATE)
-          .addIfNotDefault(DisplayData.item("numShards", numShards)
+          .addIfNotDefault(DisplayData.item("numShards", getNumShards())
             .withLabel("Maximum Output Shards"), 0)
-          .addIfNotNull(DisplayData.item("fileHeader", header)
+          .addIfNotNull(DisplayData.item("fileHeader", getHeader())
             .withLabel("File Header"))
-          .addIfNotNull(DisplayData.item("fileFooter", footer)
+          .addIfNotNull(DisplayData.item("fileFooter", getFooter())
               .withLabel("File Footer"))
           .add(DisplayData
-              .item("writableByteChannelFactory", writableByteChannelFactory.toString())
+              .item("writableByteChannelFactory", getWritableByteChannelFactory().toString())
               .withLabel("Compression/Transformation Type"));
     }
 
-    /**
-     * Returns the current shard name template string.
-     */
-    public String getShardNameTemplate() {
-      return shardTemplate;
-    }
-
     @Override
     protected Coder<Void> getDefaultOutputCoder() {
       return VoidCoder.of();
     }
-
-    public String getFilenamePrefix() {
-      return filenamePrefix.get();
-    }
-
-    public String getShardTemplate() {
-      return shardTemplate;
-    }
-
-    public int getNumShards() {
-      return numShards;
-    }
-
-    public String getFilenameSuffix() {
-      return filenameSuffix;
-    }
-
-    @Nullable
-    public String getHeader() {
-      return header;
-    }
-
-    @Nullable
-    public String getFooter() {
-      return footer;
-    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/681b5d64/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
index 095b69f..425e0d6 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
@@ -315,7 +315,7 @@ public class TextIOTest {
     p.run();
 
     assertOutputFiles(elems, header, footer, numShards, baseDir, outputName,
-        write.getShardNameTemplate());
+        write.getShardTemplate());
   }
 
   public static void assertOutputFiles(
@@ -478,7 +478,7 @@ public class TextIOTest {
       drunkElems.add(elem);
     }
     assertOutputFiles(drunkElems.toArray(new String[0]), null, null, 1, baseDir,
-        outputName + writableByteChannelFactory.getFilenameSuffix(), write.getShardNameTemplate());
+        outputName + writableByteChannelFactory.getFilenameSuffix(), write.getShardTemplate());
   }
 
   @Test


[7/7] beam git commit: This closes #2774

Posted by jk...@apache.org.
This closes #2774


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3161904d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3161904d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3161904d

Branch: refs/heads/master
Commit: 3161904d97e9eb4ec6eb354fb54d8d0e4e5733eb
Parents: c2c89ed 7b725c2
Author: Eugene Kirpichov <ki...@google.com>
Authored: Tue May 2 12:20:24 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue May 2 12:20:24 2017 -0700

----------------------------------------------------------------------
 .../beam/examples/DebuggingWordCount.java       |    2 +-
 .../apache/beam/examples/MinimalWordCount.java  |    4 +-
 .../apache/beam/examples/WindowedWordCount.java |    2 +-
 .../org/apache/beam/examples/WordCount.java     |    4 +-
 .../examples/common/WriteOneFilePerWindow.java  |    2 +-
 .../beam/examples/complete/AutoComplete.java    |    2 +-
 .../examples/complete/StreamingWordExtract.java |    2 +-
 .../apache/beam/examples/complete/TfIdf.java    |    4 +-
 .../examples/complete/TopWikipediaSessions.java |    4 +-
 .../examples/complete/TrafficMaxLaneFlow.java   |    2 +-
 .../beam/examples/complete/TrafficRoutes.java   |    2 +-
 .../beam/examples/cookbook/DistinctExample.java |    4 +-
 .../beam/examples/cookbook/JoinExamples.java    |    2 +-
 .../beam/examples/cookbook/TriggerExample.java  |    2 +-
 .../beam/examples/MinimalWordCountJava8.java    |    4 +-
 .../examples/complete/game/HourlyTeamScore.java |    2 +-
 .../beam/examples/complete/game/UserScore.java  |    2 +-
 .../examples/MinimalWordCountJava8Test.java     |    4 +-
 .../runners/apex/examples/WordCountTest.java    |    4 +-
 .../direct/WriteWithShardingFactoryTest.java    |    2 +-
 .../beam/runners/flink/ReadSourceITCase.java    |    2 +-
 .../flink/ReadSourceStreamingITCase.java        |    2 +-
 .../flink/streaming/GroupByNullKeyTest.java     |    2 +-
 .../streaming/TopWikipediaSessionsITCase.java   |    2 +-
 .../DataflowPipelineTranslatorTest.java         |   16 +-
 .../runners/dataflow/DataflowRunnerTest.java    |   23 +-
 .../beam/runners/spark/examples/WordCount.java  |    4 +-
 .../runners/spark/SparkRunnerDebuggerTest.java  |    4 +-
 .../beam/runners/spark/io/NumShardsTest.java    |    3 +-
 .../main/java/org/apache/beam/sdk/Pipeline.java |    6 +-
 .../apache/beam/sdk/io/ShardNameTemplate.java   |    2 +-
 .../java/org/apache/beam/sdk/io/TextIO.java     | 1159 +++++-------------
 .../java/org/apache/beam/sdk/io/TextSink.java   |  139 +++
 .../java/org/apache/beam/sdk/io/TextSource.java |  236 ++++
 .../org/apache/beam/sdk/io/package-info.java    |    2 +-
 .../java/org/apache/beam/sdk/io/TextIOTest.java |   61 +-
 .../beam/sdk/runners/TransformTreeTest.java     |    6 +-
 .../display/DisplayDataEvaluatorTest.java       |    5 +-
 .../sdk/transforms/windowing/WindowingTest.java |    2 +-
 .../org/apache/beam/sdk/util/NameUtilsTest.java |    2 +-
 .../org/apache/beam/sdk/values/PDoneTest.java   |    2 +-
 .../beam/sdk/io/hdfs/HadoopFileSystemTest.java  |    3 +-
 42 files changed, 753 insertions(+), 986 deletions(-)
----------------------------------------------------------------------



[3/7] beam git commit: Removes TextIO.Read.Bound

Posted by jk...@apache.org.
Removes TextIO.Read.Bound


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/96315203
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/96315203
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/96315203

Branch: refs/heads/master
Commit: 96315203284ff60b10210d73b81b65ea0a395544
Parents: ef4658a
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri Apr 28 17:06:46 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue May 2 12:20:14 2017 -0700

----------------------------------------------------------------------
 .../beam/examples/DebuggingWordCount.java       |   2 +-
 .../apache/beam/examples/MinimalWordCount.java  |   2 +-
 .../apache/beam/examples/WindowedWordCount.java |   2 +-
 .../org/apache/beam/examples/WordCount.java     |   2 +-
 .../beam/examples/complete/AutoComplete.java    |   2 +-
 .../examples/complete/StreamingWordExtract.java |   2 +-
 .../apache/beam/examples/complete/TfIdf.java    |   2 +-
 .../examples/complete/TopWikipediaSessions.java |   2 +-
 .../examples/complete/TrafficMaxLaneFlow.java   |   2 +-
 .../beam/examples/complete/TrafficRoutes.java   |   2 +-
 .../beam/examples/cookbook/DistinctExample.java |   2 +-
 .../beam/examples/cookbook/TriggerExample.java  |   2 +-
 .../beam/examples/MinimalWordCountJava8.java    |   2 +-
 .../examples/complete/game/HourlyTeamScore.java |   2 +-
 .../beam/examples/complete/game/UserScore.java  |   2 +-
 .../examples/MinimalWordCountJava8Test.java     |   2 +-
 .../runners/apex/examples/WordCountTest.java    |   2 +-
 .../DataflowPipelineTranslatorTest.java         |  12 +-
 .../runners/dataflow/DataflowRunnerTest.java    |  15 +-
 .../beam/runners/spark/examples/WordCount.java  |   2 +-
 .../main/java/org/apache/beam/sdk/Pipeline.java |   4 +-
 .../java/org/apache/beam/sdk/io/TextIO.java     | 247 ++++++++-----------
 .../java/org/apache/beam/sdk/io/TextIOTest.java |  34 +--
 .../beam/sdk/runners/TransformTreeTest.java     |   4 +-
 .../display/DisplayDataEvaluatorTest.java       |   2 +-
 .../sdk/transforms/windowing/WindowingTest.java |   2 +-
 .../beam/sdk/io/hdfs/HadoopFileSystemTest.java  |   3 +-
 27 files changed, 158 insertions(+), 201 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/96315203/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java
index e6e3a92..06af209 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java
@@ -136,7 +136,7 @@ public class DebuggingWordCount {
     Pipeline p = Pipeline.create(options);
 
     PCollection<KV<String, Long>> filteredWords =
-        p.apply("ReadLines", TextIO.Read.from(options.getInputFile()))
+        p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
          .apply(new WordCount.CountWords())
          .apply(ParDo.of(new FilterTextFn(options.getFilterPattern())));
 

http://git-wip-us.apache.org/repos/asf/beam/blob/96315203/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java
index cf72672..4a0c1bb 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java
@@ -75,7 +75,7 @@ public class MinimalWordCount {
     // the input text (a set of Shakespeare's texts).
 
     // This example reads a public data set consisting of the complete works of Shakespeare.
-    p.apply(TextIO.Read.from("gs://apache-beam-samples/shakespeare/*"))
+    p.apply(TextIO.read().from("gs://apache-beam-samples/shakespeare/*"))
 
      // Concept #2: Apply a ParDo transform to our PCollection of text lines. This ParDo invokes a
      // DoFn (defined in-line) on each element that tokenizes the text line into individual words.

http://git-wip-us.apache.org/repos/asf/beam/blob/96315203/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java
index d88de54..5c64c53 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java
@@ -178,7 +178,7 @@ public class WindowedWordCount {
      */
     PCollection<String> input = pipeline
       /** Read from the GCS file. */
-      .apply(TextIO.Read.from(options.getInputFile()))
+      .apply(TextIO.read().from(options.getInputFile()))
       // Concept #2: Add an element timestamp, using an artificial time just to show windowing.
       // See AddTimestampFn for more detail on this.
       .apply(ParDo.of(new AddTimestampFn(minTimestamp, maxTimestamp)));

http://git-wip-us.apache.org/repos/asf/beam/blob/96315203/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/WordCount.java b/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
index b64d2c1..e331a86 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
@@ -176,7 +176,7 @@ public class WordCount {
 
     // Concepts #2 and #3: Our pipeline applies the composite CountWords transform, and passes the
     // static FormatAsTextFn() to the ParDo transform.
-    p.apply("ReadLines", TextIO.Read.from(options.getInputFile()))
+    p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
      .apply(new CountWords())
      .apply(MapElements.via(new FormatAsTextFn()))
      .apply("WriteCounts", TextIO.Write.to(options.getOutput()));

http://git-wip-us.apache.org/repos/asf/beam/blob/96315203/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java b/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
index e6621ce..bd69855 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
@@ -469,7 +469,7 @@ public class AutoComplete {
     // Create the pipeline.
     Pipeline p = Pipeline.create(options);
     PCollection<KV<String, List<CompletionCandidate>>> toWrite = p
-      .apply(TextIO.Read.from(options.getInputFile()))
+      .apply(TextIO.read().from(options.getInputFile()))
       .apply(ParDo.of(new ExtractHashtags()))
       .apply(Window.<String>into(windowFn))
       .apply(ComputeTopCompletions.top(10, options.getRecursive()));

http://git-wip-us.apache.org/repos/asf/beam/blob/96315203/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java b/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java
index 21a9849..f35d67a 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java
@@ -132,7 +132,7 @@ public class StreamingWordExtract {
         .append(options.getBigQueryTable())
         .toString();
     pipeline
-        .apply("ReadLines", TextIO.Read.from(options.getInputFile()))
+        .apply("ReadLines", TextIO.read().from(options.getInputFile()))
         .apply(ParDo.of(new ExtractWords()))
         .apply(ParDo.of(new Uppercase()))
         .apply(ParDo.of(new StringToRowConverter()))

http://git-wip-us.apache.org/repos/asf/beam/blob/96315203/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
index 2e1be90..1ef69c0 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
@@ -179,7 +179,7 @@ public class TfIdf {
         }
 
         PCollection<KV<URI, String>> oneUriToLines = pipeline
-            .apply("TextIO.Read(" + uriString + ")", TextIO.Read.from(uriString))
+            .apply("TextIO.Read(" + uriString + ")", TextIO.read().from(uriString))
             .apply("WithKeys(" + uriString + ")", WithKeys.<URI, String>of(uri));
 
         urisToLines = urisToLines.and(oneUriToLines);

http://git-wip-us.apache.org/repos/asf/beam/blob/96315203/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java
index 4c07ca4..bb8c8bc 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java
@@ -207,7 +207,7 @@ public class TopWikipediaSessions {
 
     double samplingThreshold = 0.1;
 
-    p.apply(TextIO.Read.from(options.getInput()))
+    p.apply(TextIO.read().from(options.getInput()))
         .apply(MapElements.via(new ParseTableRowJson()))
         .apply(new ComputeTopSessions(samplingThreshold))
         .apply("Write", TextIO.Write.withoutSharding().to(options.getOutput()));

http://git-wip-us.apache.org/repos/asf/beam/blob/96315203/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java
index e57da93..d7c933e 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java
@@ -291,7 +291,7 @@ public class TrafficMaxLaneFlow {
     @Override
     public PCollection<String> expand(PBegin begin) {
       return begin
-          .apply(TextIO.Read.from(inputFile))
+          .apply(TextIO.read().from(inputFile))
           .apply(ParDo.of(new ExtractTimestamps()));
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/96315203/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java
index b1f938b..c9ba18c 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java
@@ -301,7 +301,7 @@ public class TrafficRoutes {
     @Override
     public PCollection<String> expand(PBegin begin) {
       return begin
-          .apply(TextIO.Read.from(inputFile))
+          .apply(TextIO.read().from(inputFile))
           .apply(ParDo.of(new ExtractTimestamps()));
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/96315203/examples/java/src/main/java/org/apache/beam/examples/cookbook/DistinctExample.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/DistinctExample.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/DistinctExample.java
index 9670b7f..20c8fa0 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/DistinctExample.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/DistinctExample.java
@@ -87,7 +87,7 @@ public class DistinctExample {
     Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
     Pipeline p = Pipeline.create(options);
 
-    p.apply("ReadLines", TextIO.Read.from(options.getInput()))
+    p.apply("ReadLines", TextIO.read().from(options.getInput()))
      .apply(Distinct.<String>create())
      .apply("DedupedShakespeare", TextIO.Write.to(options.getOutput()));
 

http://git-wip-us.apache.org/repos/asf/beam/blob/96315203/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java
index 49d5eda..e7596aa 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java
@@ -446,7 +446,7 @@ public class TriggerExample {
         options.getBigQueryDataset(), options.getBigQueryTable());
 
     PCollectionList<TableRow> resultList = pipeline
-        .apply("ReadMyFile", TextIO.Read.from(options.getInput()))
+        .apply("ReadMyFile", TextIO.read().from(options.getInput()))
         .apply("InsertRandomDelays", ParDo.of(new InsertDelays()))
         .apply(ParDo.of(new ExtractFlowInfo()))
         .apply(new CalculateTotalFlow(options.getWindowDuration()));

http://git-wip-us.apache.org/repos/asf/beam/blob/96315203/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java b/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java
index 6badb75..6dabadc 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java
@@ -55,7 +55,7 @@ public class MinimalWordCountJava8 {
 
     Pipeline p = Pipeline.create(options);
 
-    p.apply(TextIO.Read.from("gs://apache-beam-samples/shakespeare/*"))
+    p.apply(TextIO.read().from("gs://apache-beam-samples/shakespeare/*"))
      .apply(FlatMapElements
          .into(TypeDescriptors.strings())
          .via((String word) -> Arrays.asList(word.split("[^\\p{L}]+"))))

http://git-wip-us.apache.org/repos/asf/beam/blob/96315203/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java
index b905d61..3f1ffb0 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java
@@ -155,7 +155,7 @@ public class HourlyTeamScore extends UserScore {
     final Instant startMinTimestamp = new Instant(minFmt.parseMillis(options.getStartMin()));
 
     // Read 'gaming' events from a text file.
-    pipeline.apply(TextIO.Read.from(options.getInput()))
+    pipeline.apply(TextIO.read().from(options.getInput()))
       // Parse the incoming data.
       .apply("ParseGameEvent", ParDo.of(new ParseEventFn()))
 

http://git-wip-us.apache.org/repos/asf/beam/blob/96315203/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
index 0adaabc..c136c2e 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
@@ -226,7 +226,7 @@ public class UserScore {
     Pipeline pipeline = Pipeline.create(options);
 
     // Read events from a text file and parse them.
-    pipeline.apply(TextIO.Read.from(options.getInput()))
+    pipeline.apply(TextIO.read().from(options.getInput()))
       .apply("ParseGameEvent", ParDo.of(new ParseEventFn()))
       // Extract and sum username/score pairs from the event data.
       .apply("ExtractUserScore", new ExtractAndSumScore("user"))

http://git-wip-us.apache.org/repos/asf/beam/blob/96315203/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java b/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java
index f3becf9..689005a 100644
--- a/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java
+++ b/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java
@@ -62,7 +62,7 @@ public class MinimalWordCountJava8Test implements Serializable {
   public void testMinimalWordCountJava8() throws Exception {
     p.getOptions().as(GcsOptions.class).setGcsUtil(buildMockGcsUtil());
 
-    p.apply(TextIO.Read.from("gs://apache-beam-samples/shakespeare/*"))
+    p.apply(TextIO.read().from("gs://apache-beam-samples/shakespeare/*"))
      .apply(FlatMapElements
          .into(TypeDescriptors.strings())
          .via((String word) -> Arrays.asList(word.split("[^a-zA-Z']+"))))

http://git-wip-us.apache.org/repos/asf/beam/blob/96315203/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
index b980715..b0fab0b 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
@@ -106,7 +106,7 @@ public class WordCountTest {
     WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
       .as(WordCountOptions.class);
     Pipeline p = Pipeline.create(options);
-    p.apply("ReadLines", TextIO.Read.from(options.getInputFile()))
+    p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
       .apply(ParDo.of(new ExtractWordsFn()))
       .apply(Count.<String>perElement())
       .apply(ParDo.of(new FormatAsStringFn()))

http://git-wip-us.apache.org/repos/asf/beam/blob/96315203/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index fcd23cf..d47da45 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -137,7 +137,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
     // Enable the FileSystems API to know about gs:// URIs in this test.
     FileSystems.setDefaultConfigInWorkers(options);
 
-    p.apply("ReadMyFile", TextIO.Read.from("gs://bucket/object"))
+    p.apply("ReadMyFile", TextIO.read().from("gs://bucket/object"))
      .apply("WriteMyFile", TextIO.Write.to("gs://bucket/object"));
     DataflowRunner runner = DataflowRunner.fromOptions(options);
     runner.replaceTransforms(p);
@@ -465,7 +465,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
 
     // Create a pipeline that the predefined step will be embedded into
     Pipeline pipeline = Pipeline.create(options);
-    pipeline.apply("ReadMyFile", TextIO.Read.from("gs://bucket/in"))
+    pipeline.apply("ReadMyFile", TextIO.read().from("gs://bucket/in"))
         .apply(ParDo.of(new NoOpFn()))
         .apply(new EmbeddedTransform(predefinedStep.clone()))
         .apply(ParDo.of(new NoOpFn()));
@@ -523,7 +523,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
     DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options);
     Pipeline pipeline = Pipeline.create(options);
     String stepName = "DoFn1";
-    pipeline.apply("ReadMyFile", TextIO.Read.from("gs://bucket/in"))
+    pipeline.apply("ReadMyFile", TextIO.read().from("gs://bucket/in"))
         .apply(stepName, ParDo.of(new NoOpFn()))
         .apply("WriteMyFile", TextIO.Write.to("gs://bucket/out"));
     DataflowRunner runner = DataflowRunner.fromOptions(options);
@@ -723,7 +723,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
   }
 
   private void applyRead(Pipeline pipeline, String path) {
-    pipeline.apply("Read(" + path + ")", TextIO.Read.from(path));
+    pipeline.apply("Read(" + path + ")", TextIO.read().from(path));
   }
 
   /**
@@ -736,7 +736,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
     Pipeline pipeline = Pipeline.create(options);
     DataflowPipelineTranslator t = DataflowPipelineTranslator.fromOptions(options);
 
-    pipeline.apply(TextIO.Read.from("gs://bucket/foo**/baz"));
+    pipeline.apply(TextIO.read().from("gs://bucket/foo**/baz"));
 
     // Check that translation does fail.
     thrown.expectCause(allOf(
@@ -766,7 +766,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
     Pipeline pipeline = Pipeline.create(options);
     DataflowPipelineTranslator t = DataflowPipelineTranslator.fromOptions(options);
 
-    pipeline.apply(TextIO.Read.from(new TestValueProvider()));
+    pipeline.apply(TextIO.read().from(new TestValueProvider()));
 
     // Check that translation does not fail.
     t.translate(

http://git-wip-us.apache.org/repos/asf/beam/blob/96315203/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index 480591e..7261fe9 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -70,7 +70,6 @@ import org.apache.beam.sdk.extensions.gcp.auth.NoopCredentialFactory;
 import org.apache.beam.sdk.extensions.gcp.auth.TestCredential;
 import org.apache.beam.sdk.io.FileSystems;
 import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.io.TextIO.Read;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptions.CheckEnabled;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -172,7 +171,7 @@ public class DataflowRunnerTest {
     options.setRunner(DataflowRunner.class);
     Pipeline p = Pipeline.create(options);
 
-    p.apply("ReadMyFile", TextIO.Read.from("gs://bucket/object"))
+    p.apply("ReadMyFile", TextIO.read().from("gs://bucket/object"))
         .apply("WriteMyFile", TextIO.Write.to("gs://bucket/object"));
 
     // Enable the FileSystems API to know about gs:// URIs in this test.
@@ -335,7 +334,7 @@ public class DataflowRunnerTest {
     RuntimeTestOptions options = dataflowOptions.as(RuntimeTestOptions.class);
     Pipeline p = buildDataflowPipeline(dataflowOptions);
     p
-        .apply(TextIO.Read.from(options.getInput()))
+        .apply(TextIO.read().from(options.getInput()))
         .apply(TextIO.Write.to(options.getOutput()));
   }
 
@@ -347,7 +346,7 @@ public class DataflowRunnerTest {
     DataflowPipelineOptions dataflowOptions = buildPipelineOptions();
     RuntimeTestOptions options = dataflowOptions.as(RuntimeTestOptions.class);
     Pipeline p = buildDataflowPipeline(dataflowOptions);
-    PCollection<String> unconsumed = p.apply(Read.from(options.getInput()));
+    PCollection<String> unconsumed = p.apply(TextIO.read().from(options.getInput()));
     DataflowRunner.fromOptions(dataflowOptions).replaceTransforms(p);
     final AtomicBoolean unconsumedSeenAsInput = new AtomicBoolean();
     p.traverseTopologically(new PipelineVisitor.Defaults() {
@@ -570,7 +569,7 @@ public class DataflowRunnerTest {
   @Test
   public void testNonGcsFilePathInReadFailure() throws IOException {
     Pipeline p = buildDataflowPipeline(buildPipelineOptions());
-    p.apply("ReadMyNonGcsFile", TextIO.Read.from(tmpFolder.newFile().getPath()));
+    p.apply("ReadMyNonGcsFile", TextIO.read().from(tmpFolder.newFile().getPath()));
 
     thrown.expectCause(Matchers.allOf(
         instanceOf(IllegalArgumentException.class),
@@ -587,7 +586,7 @@ public class DataflowRunnerTest {
   public void testNonGcsFilePathInWriteFailure() throws IOException {
     Pipeline p = buildDataflowPipeline(buildPipelineOptions());
 
-    p.apply("ReadMyGcsFile", TextIO.Read.from("gs://bucket/object"))
+    p.apply("ReadMyGcsFile", TextIO.read().from("gs://bucket/object"))
         .apply("WriteMyNonGcsFile", TextIO.Write.to("/tmp/file"));
 
     thrown.expect(IllegalArgumentException.class);
@@ -598,7 +597,7 @@ public class DataflowRunnerTest {
   @Test
   public void testMultiSlashGcsFileReadPath() throws IOException {
     Pipeline p = buildDataflowPipeline(buildPipelineOptions());
-    p.apply("ReadInvalidGcsFile", TextIO.Read.from("gs://bucket/tmp//file"));
+    p.apply("ReadInvalidGcsFile", TextIO.read().from("gs://bucket/tmp//file"));
 
     thrown.expectCause(Matchers.allOf(
         instanceOf(IllegalArgumentException.class),
@@ -613,7 +612,7 @@ public class DataflowRunnerTest {
   @Test
   public void testMultiSlashGcsFileWritePath() throws IOException {
     Pipeline p = buildDataflowPipeline(buildPipelineOptions());
-    PCollection<String> pc = p.apply("ReadMyGcsFile", TextIO.Read.from("gs://bucket/object"));
+    PCollection<String> pc = p.apply("ReadMyGcsFile", TextIO.read().from("gs://bucket/object"));
     pc.apply("WriteInvalidGcsFile", TextIO.Write.to("gs://bucket/tmp//file"));
 
     thrown.expect(IllegalArgumentException.class);

http://git-wip-us.apache.org/repos/asf/beam/blob/96315203/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
index 0e6faad..2bcf140 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
@@ -125,7 +125,7 @@ public class WordCount {
 
     // Concepts #2 and #3: Our pipeline applies the composite CountWords transform, and passes the
     // static FormatAsTextFn() to the ParDo transform.
-    p.apply("ReadLines", TextIO.Read.from(options.getInputFile()))
+    p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
      .apply(new CountWords())
      .apply(MapElements.via(new FormatAsTextFn()))
      .apply("WriteCounts", TextIO.Write.to(options.getOutput()));

http://git-wip-us.apache.org/repos/asf/beam/blob/96315203/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
index d578a7a..7cb9386 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
@@ -79,11 +79,11 @@ import org.slf4j.LoggerFactory;
  * // A root PTransform, like TextIO.Read or Create, gets added
  * // to the Pipeline by being applied:
  * PCollection<String> lines =
- *     p.apply(TextIO.Read.from("gs://bucket/dir/file*.txt"));
+ *     p.apply(TextIO.read().from("gs://bucket/dir/file*.txt"));
  *
  * // A Pipeline can have multiple root transforms:
  * PCollection<String> moreLines =
- *     p.apply(TextIO.Read.from("gs://bucket/other/dir/file*.txt"));
+ *     p.apply(TextIO.read().from("gs://bucket/other/dir/file*.txt"));
  * PCollection<String> yetMoreLines =
  *     p.apply(Create.of("yet", "more", "lines").withCoder(StringUtf8Coder.of()));
  *

http://git-wip-us.apache.org/repos/asf/beam/blob/96315203/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index f735019..31d2c3d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -73,7 +73,7 @@ import org.apache.beam.sdk.values.PDone;
  *
  * // A simple Read of a local file (only runs locally):
  * PCollection<String> lines =
- *     p.apply(TextIO.Read.from("/local/path/to/file.txt"));
+ *     p.apply(TextIO.read().from("/local/path/to/file.txt"));
  * }</pre>
  *
  * <p>To write a {@link PCollection} to one or more text files, use
@@ -109,174 +109,131 @@ import org.apache.beam.sdk.values.PDone;
  */
 public class TextIO {
   /**
-   * A {@link PTransform} that reads from a text file (or multiple text
-   * files matching a pattern) and returns a {@link PCollection} containing
-   * the decoding of each of the lines of the text file(s) as a {@link String}.
+   * Reads from one or more text files and returns a bounded {@link PCollection} containing one
+   * element for each line of the input files.
    */
-  public static class Read {
+  public static Read read() {
+    return new Read();
+  }
 
-    /**
-     * Returns a transform for reading text files that reads from the file(s)
-     * with the given filename or filename pattern. This can be a local path (if running locally),
-     * or a Google Cloud Storage filename or filename pattern of the form
-     * {@code "gs://<bucket>/<filepath>"} (if running locally or using remote execution)
-     * service). Standard <a href="http://docs.oracle.com/javase/tutorial/essential/io/find.html"
-     * >Java Filesystem glob patterns</a> ("*", "?", "[..]") are supported.
-     */
-    public static Bound from(String filepattern) {
-      return new Bound().from(filepattern);
+  /** Implementation of {@link #read}. */
+  public static class Read extends PTransform<PBegin, PCollection<String>> {
+    /** The filepattern to read from. */
+    @Nullable private final ValueProvider<String> filepattern;
+
+    /** Option to indicate the input source's compression type. Default is AUTO. */
+    private final TextIO.CompressionType compressionType;
+
+    private Read() {
+      this(null, null, TextIO.CompressionType.AUTO);
     }
 
-    /**
-     * Same as {@code from(filepattern)}, but accepting a {@link ValueProvider}.
-     */
-    public static Bound from(ValueProvider<String> filepattern) {
-      return new Bound().from(filepattern);
+    private Read(
+        @Nullable String name,
+        @Nullable ValueProvider<String> filepattern,
+        TextIO.CompressionType compressionType) {
+      super(name);
+      this.filepattern = filepattern;
+      this.compressionType = compressionType;
     }
 
     /**
-     * Returns a transform for reading text files that decompresses all input files
-     * using the specified compression type.
+     * Reads text files that reads from the file(s) with the given filename or filename pattern.
      *
-     * <p>If no compression type is specified, the default is {@link TextIO.CompressionType#AUTO}.
-     * In this mode, the compression type of the file is determined by its extension
-     * (e.g., {@code *.gz} is gzipped, {@code *.bz2} is bzipped, and all other extensions are
-     * uncompressed).
+     * <p>This can be a local path (if running locally), or a Google Cloud Storage filename or
+     * filename pattern of the form {@code "gs://<bucket>/<filepath>"} (if running locally or using
+     * remote execution service).
+     *
+     * <p>Standard <a href="http://docs.oracle.com/javase/tutorial/essential/io/find.html" >Java
+     * Filesystem glob patterns</a> ("*", "?", "[..]") are supported.
      */
-    public static Bound withCompressionType(TextIO.CompressionType compressionType) {
-      return new Bound().withCompressionType(compressionType);
+    public Read from(String filepattern) {
+      checkNotNull(filepattern, "Filepattern cannot be empty.");
+      return new Read(name, StaticValueProvider.of(filepattern), compressionType);
     }
 
-    // TODO: strippingNewlines, etc.
+    /** Same as {@code from(filepattern)}, but accepting a {@link ValueProvider}. */
+    public Read from(ValueProvider<String> filepattern) {
+      checkNotNull(filepattern, "Filepattern cannot be empty.");
+      return new Read(name, filepattern, compressionType);
+    }
 
     /**
-     * A {@link PTransform} that reads from one or more text files and returns a bounded
-     * {@link PCollection} containing one element for each line of the input files.
+     * Returns a new transform for reading from text files that's like this one but
+     * reads from input sources using the specified compression type.
+     *
+     * <p>If no compression type is specified, the default is {@link TextIO.CompressionType#AUTO}.
      */
-    public static class Bound extends PTransform<PBegin, PCollection<String>> {
-      /** The filepattern to read from. */
-      @Nullable private final ValueProvider<String> filepattern;
-
-      /** Option to indicate the input source's compression type. Default is AUTO. */
-      private final TextIO.CompressionType compressionType;
-
-      private Bound() {
-        this(null, null, TextIO.CompressionType.AUTO);
-      }
-
-      private Bound(
-          @Nullable String name,
-          @Nullable ValueProvider<String> filepattern,
-          TextIO.CompressionType compressionType) {
-        super(name);
-        this.filepattern = filepattern;
-        this.compressionType = compressionType;
-      }
-
-      /**
-       * Returns a new transform for reading from text files that's like this one but
-       * that reads from the file(s) with the given name or pattern. See {@link TextIO.Read#from}
-       * for a description of filepatterns.
-       *
-       * <p>Does not modify this object.
-
-       */
-      public Bound from(String filepattern) {
-        checkNotNull(filepattern, "Filepattern cannot be empty.");
-        return new Bound(name, StaticValueProvider.of(filepattern), compressionType);
-      }
-
-      /**
-       * Same as {@code from(filepattern)}, but accepting a {@link ValueProvider}.
-       */
-      public Bound from(ValueProvider<String> filepattern) {
-        checkNotNull(filepattern, "Filepattern cannot be empty.");
-        return new Bound(name, filepattern, compressionType);
-      }
-
-      /**
-       * Returns a new transform for reading from text files that's like this one but
-       * reads from input sources using the specified compression type.
-       *
-       * <p>If no compression type is specified, the default is {@link TextIO.CompressionType#AUTO}.
-       * See {@link TextIO.Read#withCompressionType} for more details.
-       *
-       * <p>Does not modify this object.
-       */
-      public Bound withCompressionType(TextIO.CompressionType compressionType) {
-        return new Bound(name, filepattern, compressionType);
-      }
-
-      @Override
-      public PCollection<String> expand(PBegin input) {
-        if (filepattern == null) {
-          throw new IllegalStateException("need to set the filepattern of a TextIO.Read transform");
-        }
-
-        final Bounded<String> read = org.apache.beam.sdk.io.Read.from(getSource());
-        PCollection<String> pcol = input.getPipeline().apply("Read", read);
-        // Honor the default output coder that would have been used by this PTransform.
-        pcol.setCoder(getDefaultOutputCoder());
-        return pcol;
-      }
+    public Read withCompressionType(TextIO.CompressionType compressionType) {
+      return new Read(name, filepattern, compressionType);
+    }
 
-      // Helper to create a source specific to the requested compression type.
-      protected FileBasedSource<String> getSource() {
-        switch (compressionType) {
-          case UNCOMPRESSED:
-            return new TextSource(filepattern);
-          case AUTO:
-            return CompressedSource.from(new TextSource(filepattern));
-          case BZIP2:
-            return
-                CompressedSource.from(new TextSource(filepattern))
-                    .withDecompression(CompressedSource.CompressionMode.BZIP2);
-          case GZIP:
-            return
-                CompressedSource.from(new TextSource(filepattern))
-                    .withDecompression(CompressedSource.CompressionMode.GZIP);
-          case ZIP:
-            return
-                CompressedSource.from(new TextSource(filepattern))
-                    .withDecompression(CompressedSource.CompressionMode.ZIP);
-          case DEFLATE:
-            return
-                CompressedSource.from(new TextSource(filepattern))
-                    .withDecompression(CompressedSource.CompressionMode.DEFLATE);
-          default:
-            throw new IllegalArgumentException("Unknown compression type: " + compressionType);
-        }
+    @Override
+    public PCollection<String> expand(PBegin input) {
+      if (filepattern == null) {
+        throw new IllegalStateException("need to set the filepattern of a TextIO.Read transform");
       }
 
-      @Override
-      public void populateDisplayData(DisplayData.Builder builder) {
-        super.populateDisplayData(builder);
+      final Bounded<String> read = org.apache.beam.sdk.io.Read.from(getSource());
+      PCollection<String> pcol = input.getPipeline().apply("Read", read);
+      // Honor the default output coder that would have been used by this PTransform.
+      pcol.setCoder(getDefaultOutputCoder());
+      return pcol;
+    }
 
-        String filepatternDisplay = filepattern.isAccessible()
-          ? filepattern.get() : filepattern.toString();
-        builder
-            .add(DisplayData.item("compressionType", compressionType.toString())
-              .withLabel("Compression Type"))
-            .addIfNotNull(DisplayData.item("filePattern", filepatternDisplay)
-              .withLabel("File Pattern"));
+    // Helper to create a source specific to the requested compression type.
+    protected FileBasedSource<String> getSource() {
+      switch (compressionType) {
+        case UNCOMPRESSED:
+          return new TextSource(filepattern);
+        case AUTO:
+          return CompressedSource.from(new TextSource(filepattern));
+        case BZIP2:
+          return
+              CompressedSource.from(new TextSource(filepattern))
+                  .withDecompression(CompressedSource.CompressionMode.BZIP2);
+        case GZIP:
+          return
+              CompressedSource.from(new TextSource(filepattern))
+                  .withDecompression(CompressedSource.CompressionMode.GZIP);
+        case ZIP:
+          return
+              CompressedSource.from(new TextSource(filepattern))
+                  .withDecompression(CompressedSource.CompressionMode.ZIP);
+        case DEFLATE:
+          return
+              CompressedSource.from(new TextSource(filepattern))
+                  .withDecompression(CompressedSource.CompressionMode.DEFLATE);
+        default:
+          throw new IllegalArgumentException("Unknown compression type: " + compressionType);
       }
+    }
 
-      @Override
-      protected Coder<String> getDefaultOutputCoder() {
-        return StringUtf8Coder.of();
-      }
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+
+      String filepatternDisplay = filepattern.isAccessible()
+        ? filepattern.get() : filepattern.toString();
+      builder
+          .add(DisplayData.item("compressionType", compressionType.toString())
+            .withLabel("Compression Type"))
+          .addIfNotNull(DisplayData.item("filePattern", filepatternDisplay)
+            .withLabel("File Pattern"));
+    }
 
-      public String getFilepattern() {
-        return filepattern.get();
-      }
+    @Override
+    protected Coder<String> getDefaultOutputCoder() {
+      return StringUtf8Coder.of();
+    }
 
-      public TextIO.CompressionType getCompressionType() {
-        return compressionType;
-      }
+    public String getFilepattern() {
+      return filepattern.get();
     }
 
-    /** Disallow construction of utility classes. */
-    private Read() {}
+    public TextIO.CompressionType getCompressionType() {
+      return compressionType;
+    }
   }
 
 

http://git-wip-us.apache.org/repos/asf/beam/blob/96315203/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
index 2ba1797..8a7965c 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
@@ -220,7 +220,7 @@ public class TextIOTest {
       }
     }
 
-    TextIO.Read.Bound read = TextIO.Read.from(filename);
+    TextIO.Read read = TextIO.read().from(filename);
 
     PCollection<String> output = p.apply(read);
 
@@ -246,15 +246,15 @@ public class TextIOTest {
 
     assertEquals(
         "TextIO.Read/Read.out",
-        p.apply(TextIO.Read.from("somefile")).getName());
+        p.apply(TextIO.read().from("somefile")).getName());
     assertEquals(
         "MyRead/Read.out",
-        p.apply("MyRead", TextIO.Read.from(emptyTxt.getPath())).getName());
+        p.apply("MyRead", TextIO.read().from(emptyTxt.getPath())).getName());
   }
 
   @Test
   public void testReadDisplayData() {
-    TextIO.Read.Bound read = TextIO.Read
+    TextIO.Read read = TextIO.read()
         .from("foo.*")
         .withCompressionType(BZIP2);
 
@@ -269,7 +269,7 @@ public class TextIOTest {
   public void testPrimitiveReadDisplayData() {
     DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
 
-    TextIO.Read.Bound read = TextIO.Read
+    TextIO.Read read = TextIO.read()
         .from("foobar");
 
     Set<DisplayData> displayData = evaluator.displayDataForPrimitiveSourceTransforms(read);
@@ -572,15 +572,15 @@ public class TextIOTest {
     RuntimeTestOptions options = PipelineOptionsFactory.as(RuntimeTestOptions.class);
 
     p
-        .apply(TextIO.Read.from(options.getInput()))
+        .apply(TextIO.read().from(options.getInput()))
         .apply(TextIO.Write.to(options.getOutput()));
   }
 
   @Test
   public void testCompressionTypeIsSet() throws Exception {
-    TextIO.Read.Bound read = TextIO.Read.from("/tmp/test");
+    TextIO.Read read = TextIO.read().from("/tmp/test");
     assertEquals(AUTO, read.getCompressionType());
-    read = TextIO.Read.from("/tmp/test").withCompressionType(GZIP);
+    read = TextIO.read().from("/tmp/test").withCompressionType(GZIP);
     assertEquals(GZIP, read.getCompressionType());
   }
 
@@ -597,14 +597,14 @@ public class TextIOTest {
   }
 
   /**
-   * Helper method that runs TextIO.Read.from(filename).withCompressionType(compressionType)
+   * Helper method that runs TextIO.read().from(filename).withCompressionType(compressionType)
    * and asserts that the results match the given expected output.
    */
   private void assertReadingCompressedFileMatchesExpected(
       File file, CompressionType compressionType, String[] expected) {
 
-    TextIO.Read.Bound read =
-        TextIO.Read.from(file.getPath()).withCompressionType(compressionType);
+    TextIO.Read read =
+        TextIO.read().from(file.getPath()).withCompressionType(compressionType);
     PCollection<String> output = p.apply("Read_" + file + "_" + compressionType.toString(), read);
 
     PAssert.that(output).containsInAnyOrder(expected);
@@ -825,9 +825,9 @@ public class TextIOTest {
 
   @Test
   public void testTextIOGetName() {
-    assertEquals("TextIO.Read", TextIO.Read.from("somefile").getName());
+    assertEquals("TextIO.Read", TextIO.read().from("somefile").getName());
     assertEquals("TextIO.Write", TextIO.Write.to("somefile").getName());
-    assertEquals("TextIO.Read", TextIO.Read.from("somefile").toString());
+    assertEquals("TextIO.Read", TextIO.read().from("somefile").toString());
   }
 
   @Test
@@ -1075,7 +1075,7 @@ public class TextIOTest {
     // Sanity check: file is at least 2 bundles long.
     assertThat(largeTxt.length(), greaterThan(2 * desiredBundleSize));
 
-    FileBasedSource<String> source = TextIO.Read.from(largeTxt.getPath()).getSource();
+    FileBasedSource<String> source = TextIO.read().from(largeTxt.getPath()).getSource();
     List<? extends FileBasedSource<String>> splits =
         source.split(desiredBundleSize, options);
 
@@ -1092,7 +1092,7 @@ public class TextIOTest {
     // Sanity check: file is at least 2 bundles long.
     assertThat(largeGz.length(), greaterThan(2 * desiredBundleSize));
 
-    FileBasedSource<String> source = TextIO.Read.from(largeGz.getPath()).getSource();
+    FileBasedSource<String> source = TextIO.read().from(largeGz.getPath()).getSource();
     List<? extends FileBasedSource<String>> splits =
         source.split(desiredBundleSize, options);
 
@@ -1110,7 +1110,7 @@ public class TextIOTest {
     assertThat(largeTxt.length(), greaterThan(2 * desiredBundleSize));
 
     FileBasedSource<String> source =
-        TextIO.Read.from(largeTxt.getPath()).withCompressionType(GZIP).getSource();
+        TextIO.read().from(largeTxt.getPath()).withCompressionType(GZIP).getSource();
     List<? extends FileBasedSource<String>> splits =
         source.split(desiredBundleSize, options);
 
@@ -1128,7 +1128,7 @@ public class TextIOTest {
     assertThat(largeGz.length(), greaterThan(2 * desiredBundleSize));
 
     FileBasedSource<String> source =
-        TextIO.Read.from(largeGz.getPath()).withCompressionType(GZIP).getSource();
+        TextIO.read().from(largeGz.getPath()).withCompressionType(GZIP).getSource();
     List<? extends FileBasedSource<String>> splits =
         source.split(desiredBundleSize, options);
 

http://git-wip-us.apache.org/repos/asf/beam/blob/96315203/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
index 07b6b4a..29d9774 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
@@ -121,7 +121,7 @@ public class TransformTreeTest {
 
     final PTransform<PCollection<String>, PCollection<Iterable<String>>> sample =
         Sample.fixedSizeGlobally(10);
-    p.apply("ReadMyFile", TextIO.Read.from(inputFile.getPath()))
+    p.apply("ReadMyFile", TextIO.read().from(inputFile.getPath()))
         .apply(sample)
         .apply(Flatten.<String>iterables())
         .apply("WriteMyFile", TextIO.Write.to(outputFile.getPath()));
@@ -167,7 +167,7 @@ public class TransformTreeTest {
             assertThat(transform, not(instanceOf(Combine.Globally.class)));
             assertThat(transform, not(instanceOf(WriteFiles.class)));
             if (transform instanceof Read.Bounded
-                && node.getEnclosingNode().getTransform() instanceof TextIO.Read.Bound) {
+                && node.getEnclosingNode().getTransform() instanceof TextIO.Read) {
               assertTrue(visited.add(TransformsSeen.READ));
             }
           }

http://git-wip-us.apache.org/repos/asf/beam/blob/96315203/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluatorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluatorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluatorTest.java
index 6937405..f3dc378 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluatorTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluatorTest.java
@@ -94,7 +94,7 @@ public class DisplayDataEvaluatorTest implements Serializable {
 
   @Test
   public void testSourceTransform() {
-    PTransform<? super PBegin, ? extends POutput> myTransform = TextIO.Read
+    PTransform<? super PBegin, ? extends POutput> myTransform = TextIO.read()
         .from("foo.*");
 
     DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();

http://git-wip-us.apache.org/repos/asf/beam/blob/96315203/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
index 30b0311..5e6580b 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
@@ -215,7 +215,7 @@ public class WindowingTest implements Serializable {
     }
 
     PCollection<String> output = p.begin()
-        .apply("ReadLines", TextIO.Read.from(filename))
+        .apply("ReadLines", TextIO.read().from(filename))
         .apply(ParDo.of(new ExtractWordsWithTimestampsFn()))
         .apply(new WindowedCount(FixedWindows.of(Duration.millis(10))));
 

http://git-wip-us.apache.org/repos/asf/beam/blob/96315203/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java
index a5957b5..cf86c36 100644
--- a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java
+++ b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java
@@ -221,7 +221,8 @@ public class HadoopFileSystemTest {
         .as(HadoopFileSystemOptions.class);
     options.setHdfsConfiguration(ImmutableList.of(fileSystem.fileSystem.getConf()));
     FileSystems.setDefaultConfigInWorkers(options);
-    PCollection<String> pc = p.apply(TextIO.Read.from(testPath("testFile*").toString()));
+    PCollection<String> pc = p.apply(
+        TextIO.read().from(testPath("testFile*").toString()));
     PAssert.that(pc).containsInAnyOrder("testDataA", "testDataB", "testDataC");
     p.run();
   }


[5/7] beam git commit: Converts TextIO.Read to AutoValue

Posted by jk...@apache.org.
Converts TextIO.Read to AutoValue


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/987b4e62
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/987b4e62
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/987b4e62

Branch: refs/heads/master
Commit: 987b4e626e9b5113778310dcb23b0b2d6c666194
Parents: 9631520
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri Apr 28 17:16:14 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue May 2 12:20:14 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/io/TextIO.java     | 66 ++++++++------------
 1 file changed, 27 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/987b4e62/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index 31d2c3d..f8670a6 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -22,6 +22,7 @@ import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 
+import com.google.auto.value.AutoValue;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.protobuf.ByteString;
 import java.io.IOException;
@@ -113,28 +114,23 @@ public class TextIO {
    * element for each line of the input files.
    */
   public static Read read() {
-    return new Read();
+    return new AutoValue_TextIO_Read.Builder().setCompressionType(CompressionType.AUTO).build();
   }
 
   /** Implementation of {@link #read}. */
-  public static class Read extends PTransform<PBegin, PCollection<String>> {
-    /** The filepattern to read from. */
-    @Nullable private final ValueProvider<String> filepattern;
+  @AutoValue
+  public abstract static class Read extends PTransform<PBegin, PCollection<String>> {
+    @Nullable abstract ValueProvider<String> getFilepattern();
+    abstract CompressionType getCompressionType();
 
-    /** Option to indicate the input source's compression type. Default is AUTO. */
-    private final TextIO.CompressionType compressionType;
+    abstract Builder toBuilder();
 
-    private Read() {
-      this(null, null, TextIO.CompressionType.AUTO);
-    }
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setFilepattern(ValueProvider<String> filepattern);
+      abstract Builder setCompressionType(CompressionType compressionType);
 
-    private Read(
-        @Nullable String name,
-        @Nullable ValueProvider<String> filepattern,
-        TextIO.CompressionType compressionType) {
-      super(name);
-      this.filepattern = filepattern;
-      this.compressionType = compressionType;
+      abstract Read build();
     }
 
     /**
@@ -149,13 +145,13 @@ public class TextIO {
      */
     public Read from(String filepattern) {
       checkNotNull(filepattern, "Filepattern cannot be empty.");
-      return new Read(name, StaticValueProvider.of(filepattern), compressionType);
+      return from(StaticValueProvider.of(filepattern));
     }
 
     /** Same as {@code from(filepattern)}, but accepting a {@link ValueProvider}. */
     public Read from(ValueProvider<String> filepattern) {
       checkNotNull(filepattern, "Filepattern cannot be empty.");
-      return new Read(name, filepattern, compressionType);
+      return toBuilder().setFilepattern(filepattern).build();
     }
 
     /**
@@ -165,12 +161,12 @@ public class TextIO {
      * <p>If no compression type is specified, the default is {@link TextIO.CompressionType#AUTO}.
      */
     public Read withCompressionType(TextIO.CompressionType compressionType) {
-      return new Read(name, filepattern, compressionType);
+      return toBuilder().setCompressionType(compressionType).build();
     }
 
     @Override
     public PCollection<String> expand(PBegin input) {
-      if (filepattern == null) {
+      if (getFilepattern() == null) {
         throw new IllegalStateException("need to set the filepattern of a TextIO.Read transform");
       }
 
@@ -183,29 +179,29 @@ public class TextIO {
 
     // Helper to create a source specific to the requested compression type.
     protected FileBasedSource<String> getSource() {
-      switch (compressionType) {
+      switch (getCompressionType()) {
         case UNCOMPRESSED:
-          return new TextSource(filepattern);
+          return new TextSource(getFilepattern());
         case AUTO:
-          return CompressedSource.from(new TextSource(filepattern));
+          return CompressedSource.from(new TextSource(getFilepattern()));
         case BZIP2:
           return
-              CompressedSource.from(new TextSource(filepattern))
+              CompressedSource.from(new TextSource(getFilepattern()))
                   .withDecompression(CompressedSource.CompressionMode.BZIP2);
         case GZIP:
           return
-              CompressedSource.from(new TextSource(filepattern))
+              CompressedSource.from(new TextSource(getFilepattern()))
                   .withDecompression(CompressedSource.CompressionMode.GZIP);
         case ZIP:
           return
-              CompressedSource.from(new TextSource(filepattern))
+              CompressedSource.from(new TextSource(getFilepattern()))
                   .withDecompression(CompressedSource.CompressionMode.ZIP);
         case DEFLATE:
           return
-              CompressedSource.from(new TextSource(filepattern))
+              CompressedSource.from(new TextSource(getFilepattern()))
                   .withDecompression(CompressedSource.CompressionMode.DEFLATE);
         default:
-          throw new IllegalArgumentException("Unknown compression type: " + compressionType);
+          throw new IllegalArgumentException("Unknown compression type: " + getFilepattern());
       }
     }
 
@@ -213,10 +209,10 @@ public class TextIO {
     public void populateDisplayData(DisplayData.Builder builder) {
       super.populateDisplayData(builder);
 
-      String filepatternDisplay = filepattern.isAccessible()
-        ? filepattern.get() : filepattern.toString();
+      String filepatternDisplay = getFilepattern().isAccessible()
+        ? getFilepattern().get() : getFilepattern().toString();
       builder
-          .add(DisplayData.item("compressionType", compressionType.toString())
+          .add(DisplayData.item("compressionType", getCompressionType().toString())
             .withLabel("Compression Type"))
           .addIfNotNull(DisplayData.item("filePattern", filepatternDisplay)
             .withLabel("File Pattern"));
@@ -226,14 +222,6 @@ public class TextIO {
     protected Coder<String> getDefaultOutputCoder() {
       return StringUtf8Coder.of();
     }
-
-    public String getFilepattern() {
-      return filepattern.get();
-    }
-
-    public TextIO.CompressionType getCompressionType() {
-      return compressionType;
-    }
   }