You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/04/20 21:49:30 UTC

[3/4] beam git commit: Remove Sink in favor of FileBasedSink

Remove Sink in favor of FileBasedSink


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6a6a1a8c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6a6a1a8c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6a6a1a8c

Branch: refs/heads/master
Commit: 6a6a1a8c0d39965e540dbe74ddf73c839ca46889
Parents: 8319369
Author: Reuven Lax <re...@google.com>
Authored: Wed Apr 5 12:13:44 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Apr 20 13:49:36 2017 -0700

----------------------------------------------------------------------
 .../core/construction/PTransformMatchers.java   |   6 +-
 .../construction/PTransformMatchersTest.java    |  16 +-
 .../direct/WriteWithShardingFactory.java        |  15 +-
 .../direct/WriteWithShardingFactoryTest.java    |  18 +-
 .../beam/runners/flink/WriteSinkITCase.java     | 192 -----
 .../beam/runners/dataflow/DataflowRunner.java   |  16 +-
 .../DataflowPipelineTranslatorTest.java         |   2 +-
 .../java/org/apache/beam/sdk/io/AvroIO.java     |   6 +-
 .../org/apache/beam/sdk/io/FileBasedSink.java   |  93 +--
 .../main/java/org/apache/beam/sdk/io/Sink.java  | 295 --------
 .../java/org/apache/beam/sdk/io/TFRecordIO.java |   6 +-
 .../java/org/apache/beam/sdk/io/TextIO.java     |   8 +-
 .../main/java/org/apache/beam/sdk/io/Write.java | 603 ----------------
 .../java/org/apache/beam/sdk/io/WriteFiles.java | 555 +++++++++++++++
 .../main/java/org/apache/beam/sdk/io/XmlIO.java |  37 +-
 .../java/org/apache/beam/sdk/io/XmlSink.java    |   4 +-
 .../java/org/apache/beam/sdk/values/PDone.java  |   3 +-
 .../apache/beam/sdk/io/FileBasedSinkTest.java   |  73 --
 .../java/org/apache/beam/sdk/io/SimpleSink.java |  95 +++
 .../org/apache/beam/sdk/io/WriteFilesTest.java  | 457 ++++++++++++
 .../java/org/apache/beam/sdk/io/WriteTest.java  | 705 -------------------
 .../beam/sdk/runners/TransformTreeTest.java     |   6 +-
 .../apache/beam/sdk/io/hdfs/HDFSFileSink.java   |   6 +-
 .../java/org/apache/beam/sdk/io/hdfs/Sink.java  | 195 +++++
 .../java/org/apache/beam/sdk/io/hdfs/Write.java | 582 +++++++++++++++
 .../beam/sdk/io/hdfs/HDFSFileSinkTest.java      |   1 -
 26 files changed, 2014 insertions(+), 1981 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/6a6a1a8c/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
index 09946bc..b2bd7d9 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
@@ -22,7 +22,7 @@ import java.util.HashSet;
 import java.util.Set;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
-import org.apache.beam.sdk.io.Write;
+import org.apache.beam.sdk.io.WriteFiles;
 import org.apache.beam.sdk.runners.PTransformMatcher;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -268,8 +268,8 @@ public class PTransformMatchers {
     return new PTransformMatcher() {
       @Override
       public boolean matches(AppliedPTransform<?, ?, ?> application) {
-        if (application.getTransform() instanceof Write) {
-          Write write = (Write) application.getTransform();
+        if (application.getTransform() instanceof WriteFiles) {
+          WriteFiles write = (WriteFiles) application.getTransform();
           return write.getSharding() == null && write.getNumShards() == null;
         }
         return false;

http://git-wip-us.apache.org/repos/asf/beam/blob/6a6a1a8c/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
index 4084cdc..d9bc1e7 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
@@ -30,7 +30,7 @@ import java.util.Collections;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.FileBasedSink;
-import org.apache.beam.sdk.io.Write;
+import org.apache.beam.sdk.io.WriteFiles;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.runners.PTransformMatcher;
 import org.apache.beam.sdk.testing.TestPipeline;
@@ -499,8 +499,8 @@ public class PTransformMatchersTest implements Serializable {
 
   @Test
   public void writeWithRunnerDeterminedSharding() {
-    Write<Integer> write =
-        Write.to(
+    WriteFiles<Integer> write =
+        WriteFiles.to(
             new FileBasedSink<Integer>("foo", "bar") {
               @Override
               public FileBasedWriteOperation<Integer> createWriteOperation(
@@ -512,13 +512,13 @@ public class PTransformMatchersTest implements Serializable {
         PTransformMatchers.writeWithRunnerDeterminedSharding().matches(appliedWrite(write)),
         is(true));
 
-    Write<Integer> withStaticSharding = write.withNumShards(3);
+    WriteFiles<Integer> withStaticSharding = write.withNumShards(3);
     assertThat(
         PTransformMatchers.writeWithRunnerDeterminedSharding()
             .matches(appliedWrite(withStaticSharding)),
         is(false));
 
-    Write<Integer> withCustomSharding =
+    WriteFiles<Integer> withCustomSharding =
         write.withSharding(Sum.integersGlobally().asSingletonView());
     assertThat(
         PTransformMatchers.writeWithRunnerDeterminedSharding()
@@ -526,9 +526,9 @@ public class PTransformMatchersTest implements Serializable {
         is(false));
   }
 
-  private AppliedPTransform<?, ?, ?> appliedWrite(Write<Integer> write) {
-    return AppliedPTransform.<PCollection<Integer>, PDone, Write<Integer>>of(
-        "Write",
+  private AppliedPTransform<?, ?, ?> appliedWrite(WriteFiles<Integer> write) {
+    return AppliedPTransform.<PCollection<Integer>, PDone, WriteFiles<Integer>>of(
+        "WriteFiles",
         Collections.<TupleTag<?>, PValue>emptyMap(),
         Collections.<TupleTag<?>, PValue>emptyMap(),
         write,

http://git-wip-us.apache.org/repos/asf/beam/blob/6a6a1a8c/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
index a23ab94..24462e5 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
@@ -26,7 +26,7 @@ import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.ThreadLocalRandom;
 import org.apache.beam.runners.core.construction.PTransformReplacements;
-import org.apache.beam.sdk.io.Write;
+import org.apache.beam.sdk.io.WriteFiles;
 import org.apache.beam.sdk.runners.PTransformOverrideFactory;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.Count;
@@ -43,18 +43,19 @@ import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.TupleTag;
 
 /**
- * A {@link PTransformOverrideFactory} that overrides {@link Write} {@link PTransform PTransforms}
- * with an unspecified number of shards with a write with a specified number of shards. The number
- * of shards is the log base 10 of the number of input records, with up to 2 additional shards.
+ * A {@link PTransformOverrideFactory} that overrides {@link WriteFiles}
+ * {@link PTransform PTransforms} with an unspecified number of shards with a write with a
+ * specified number of shards. The number of shards is the log base 10 of the number of input
+ * records, with up to 2 additional shards.
  */
 class WriteWithShardingFactory<InputT>
-    implements PTransformOverrideFactory<PCollection<InputT>, PDone, Write<InputT>> {
+    implements PTransformOverrideFactory<PCollection<InputT>, PDone, WriteFiles<InputT>> {
   static final int MAX_RANDOM_EXTRA_SHARDS = 3;
   @VisibleForTesting static final int MIN_SHARDS_FOR_LOG = 3;
 
   @Override
   public PTransformReplacement<PCollection<InputT>, PDone> getReplacementTransform(
-      AppliedPTransform<PCollection<InputT>, PDone, Write<InputT>> transform) {
+      AppliedPTransform<PCollection<InputT>, PDone, WriteFiles<InputT>> transform) {
 
     return PTransformReplacement.of(
         PTransformReplacements.getSingletonMainInput(transform),
@@ -108,7 +109,7 @@ class WriteWithShardingFactory<InputT>
 
     private int calculateShards(long totalRecords) {
       if (totalRecords == 0) {
-        // Write out at least one shard, even if there is no input.
+        // WriteFiles out at least one shard, even if there is no input.
         return 1;
       }
       // Windows get their own number of random extra shards. This is stored in a side input, so

http://git-wip-us.apache.org/repos/asf/beam/blob/6a6a1a8c/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
index 361850d..b0c9f6d 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
@@ -39,9 +39,9 @@ import java.util.UUID;
 import org.apache.beam.runners.direct.WriteWithShardingFactory.CalculateShardsFn;
 import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
-import org.apache.beam.sdk.io.Sink;
+import org.apache.beam.sdk.io.FileBasedSink;
 import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.io.Write;
+import org.apache.beam.sdk.io.WriteFiles;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
@@ -84,7 +84,7 @@ public class WriteWithShardingFactoryTest {
     String fileName = "resharded_write";
     String outputPath = tmp.getRoot().getAbsolutePath();
     String targetLocation = IOChannelUtils.resolve(outputPath, fileName);
-    // TextIO is implemented in terms of the Write PTransform. When sharding is not specified,
+    // TextIO is implemented in terms of the WriteFiles PTransform. When sharding is not specified,
     // resharding should be automatically applied
     p.apply(Create.of(strs)).apply(TextIO.Write.to(targetLocation));
 
@@ -121,10 +121,10 @@ public class WriteWithShardingFactoryTest {
 
   @Test
   public void withNoShardingSpecifiedReturnsNewTransform() {
-    Write<Object> original = Write.to(new TestSink());
+    WriteFiles<Object> original = WriteFiles.to(new TestSink());
     PCollection<Object> objs = (PCollection) p.apply(Create.empty(VoidCoder.of()));
 
-    AppliedPTransform<PCollection<Object>, PDone, Write<Object>> originalApplication =
+    AppliedPTransform<PCollection<Object>, PDone, WriteFiles<Object>> originalApplication =
         AppliedPTransform.of(
             "write", objs.expand(), Collections.<TupleTag<?>, PValue>emptyMap(), original, p);
 
@@ -207,12 +207,16 @@ public class WriteWithShardingFactoryTest {
     assertThat(shards, containsInAnyOrder(13));
   }
 
-  private static class TestSink extends Sink<Object> {
+  private static class TestSink extends FileBasedSink<Object> {
+    public TestSink() {
+      super("", "");
+    }
+
     @Override
     public void validate(PipelineOptions options) {}
 
     @Override
-    public WriteOperation<Object, ?> createWriteOperation(PipelineOptions options) {
+    public FileBasedWriteOperation<Object> createWriteOperation(PipelineOptions options) {
       throw new IllegalArgumentException("Should not be used");
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/6a6a1a8c/runners/flink/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java
deleted file mode 100644
index 38b790e..0000000
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java
+++ /dev/null
@@ -1,192 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.flink;
-
-import static org.junit.Assert.assertNotNull;
-
-import com.google.common.base.Joiner;
-import com.google.common.collect.ImmutableList;
-import java.io.File;
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.net.URI;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.io.Sink;
-import org.apache.beam.sdk.io.Write;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-/**
- * Tests the translation of custom Write sinks.
- */
-public class WriteSinkITCase extends JavaProgramTestBase {
-
-  protected String resultPath;
-
-  public WriteSinkITCase(){
-  }
-
-  static final String[] EXPECTED_RESULT = new String[] {
-      "Joe red 3", "Mary blue 4", "Max yellow 23"};
-
-  @Override
-  protected void preSubmit() throws Exception {
-    resultPath = getTempDirPath("result-" + System.nanoTime());
-  }
-
-  @Override
-  protected void postSubmit() throws Exception {
-    compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath);
-  }
-
-  @Override
-  protected void testProgram() throws Exception {
-    runProgram(resultPath);
-  }
-
-  @Override
-  public void stopCluster() throws Exception {
-    try {
-      super.stopCluster();
-    } catch (final IOException ioe) {
-      if (ioe.getMessage().startsWith("Unable to delete file")) {
-        // that's ok for the test itself, just the OS playing with us on cleanup phase
-      }
-    }
-  }
-
-  private static void runProgram(String resultPath) {
-    Pipeline p = FlinkTestPipeline.createForBatch();
-
-    p.apply(Create.of(ImmutableList.copyOf(EXPECTED_RESULT))).setCoder(StringUtf8Coder.of())
-      .apply("CustomSink", Write.to(new MyCustomSink(resultPath)));
-
-    p.run();
-  }
-
-  /**
-   * Simple custom sink which writes to a file.
-   */
-  private static class MyCustomSink extends Sink<String> {
-
-    private final String resultPath;
-
-    public MyCustomSink(String resultPath) {
-      this.resultPath = resultPath;
-    }
-
-    @Override
-    public void validate(PipelineOptions options) {
-      assertNotNull(options);
-    }
-
-    @Override
-    public WriteOperation<String, ?> createWriteOperation(PipelineOptions options) {
-      return new MyWriteOperation();
-    }
-
-    private class MyWriteOperation extends WriteOperation<String, String> {
-
-      @Override
-      public Coder<String> getWriterResultCoder() {
-        return StringUtf8Coder.of();
-      }
-
-      @Override
-      public void initialize(PipelineOptions options) throws Exception {
-
-      }
-
-      @Override
-      public void setWindowedWrites(boolean windowedWrites) {
-
-      }
-
-      @Override
-      public void finalize(Iterable<String> writerResults, PipelineOptions options)
-          throws Exception {
-
-      }
-
-      @Override
-      public Writer<String, String> createWriter(PipelineOptions options) throws Exception {
-        return new MyWriter();
-      }
-
-      @Override
-      public Sink<String> getSink() {
-        return MyCustomSink.this;
-      }
-
-      /**
-       * Simple Writer which writes to a file.
-       */
-      private class MyWriter extends Writer<String, String> {
-
-        private PrintWriter internalWriter;
-
-        @Override
-        public final void openWindowed(String uId,
-                                       BoundedWindow window,
-                                       PaneInfo paneInfo,
-                                       int shard,
-                                       int numShards) throws Exception {
-          throw new UnsupportedOperationException("Windowed writes not supported.");
-        }
-
-        @Override
-        public final void openUnwindowed(String uId, int shard, int numShards) throws Exception {
-          Path path = new Path(resultPath + "/" + uId);
-          FileSystem.get(new URI("file:///")).create(path, false);
-          internalWriter = new PrintWriter(new File(path.toUri()));
-        }
-
-        @Override
-        public void cleanup() throws Exception {
-
-        }
-
-        @Override
-        public void write(String value) throws Exception {
-          internalWriter.println(value);
-        }
-
-        @Override
-        public String close() throws Exception {
-          internalWriter.close();
-          return resultPath;
-        }
-
-        @Override
-        public WriteOperation<String, String> getWriteOperation() {
-          return MyWriteOperation.this;
-        }
-      }
-    }
-  }
-
-}
-

http://git-wip-us.apache.org/repos/asf/beam/blob/6a6a1a8c/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 4eec6b8..19ea529 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -85,7 +85,7 @@ import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.FileBasedSink;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.io.UnboundedSource;
-import org.apache.beam.sdk.io.Write;
+import org.apache.beam.sdk.io.WriteFiles;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSink;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSource;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -341,10 +341,10 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
                   PTransformMatchers.stateOrTimerParDoSingle(),
                   BatchStatefulParDoOverrides.singleOutputOverrideFactory()))
 
-          // Write uses views internally
+          // WriteFiles uses views internally
           .add(
               PTransformOverride.of(
-                  PTransformMatchers.classEqualTo(Write.class), new BatchWriteFactory(this)))
+                  PTransformMatchers.classEqualTo(WriteFiles.class), new BatchWriteFactory(this)))
           .add(
               PTransformOverride.of(
                   PTransformMatchers.classEqualTo(Combine.GloballyAsSingletonView.class),
@@ -803,7 +803,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
   }
 
   private class BatchWriteFactory<T>
-      implements PTransformOverrideFactory<PCollection<T>, PDone, Write<T>> {
+      implements PTransformOverrideFactory<PCollection<T>, PDone, WriteFiles<T>> {
     private final DataflowRunner runner;
     private BatchWriteFactory(DataflowRunner dataflowRunner) {
       this.runner = dataflowRunner;
@@ -811,7 +811,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
 
     @Override
     public PTransformReplacement<PCollection<T>, PDone> getReplacementTransform(
-        AppliedPTransform<PCollection<T>, PDone, Write<T>> transform) {
+        AppliedPTransform<PCollection<T>, PDone, WriteFiles<T>> transform) {
       return PTransformReplacement.of(
           PTransformReplacements.getSingletonMainInput(transform),
           new BatchWrite<>(runner, transform.getTransform()));
@@ -826,17 +826,17 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
 
   /**
    * Specialized implementation which overrides
-   * {@link org.apache.beam.sdk.io.Write Write} to provide Google
+   * {@link WriteFiles WriteFiles} to provide Google
    * Cloud Dataflow specific path validation of {@link FileBasedSink}s.
    */
   private static class BatchWrite<T> extends PTransform<PCollection<T>, PDone> {
     private final DataflowRunner runner;
-    private final Write<T> transform;
+    private final WriteFiles<T> transform;
     /**
      * Builds an instance of this class from the overridden transform.
      */
     @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply()
-    public BatchWrite(DataflowRunner runner, Write<T> transform) {
+    public BatchWrite(DataflowRunner runner, WriteFiles<T> transform) {
       this.runner = runner;
       this.transform = transform;
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/6a6a1a8c/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index 5016d88..b779f0f 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -532,7 +532,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
                 Collections.<DataflowPackage>emptyList())
             .getJob();
 
-    assertEquals(13, job.getSteps().size());
+    assertEquals(8, job.getSteps().size());
     Step step = job.getSteps().get(1);
     assertEquals(stepName, getString(step.getProperties(), PropertyNames.USER_NAME));
     assertAllStepOutputsHaveUniqueIds(job);

http://git-wip-us.apache.org/repos/asf/beam/blob/6a6a1a8c/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index 33fe323..5cba3f3 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -886,16 +886,16 @@ public class AvroIO {
           throw new IllegalStateException("need to set the schema of an AvroIO.Write transform");
         }
 
-        org.apache.beam.sdk.io.Write<T> write = null;
+        WriteFiles<T> write = null;
         if (filenamePolicy != null) {
-          write = org.apache.beam.sdk.io.Write.to(
+          write = WriteFiles.to(
               new AvroSink<>(
                   filenamePolicy,
                   AvroCoder.of(type, schema),
                   codec,
                   metadata));
         } else {
-          write = org.apache.beam.sdk.io.Write.to(
+          write = WriteFiles.to(
               new AvroSink<>(
                   filenamePrefix,
                   filenameSuffix,

http://git-wip-us.apache.org/repos/asf/beam/blob/6a6a1a8c/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index 9b5f130..d9682e7 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -57,6 +57,7 @@ import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.HasDisplayData;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.IOChannelFactory;
@@ -70,12 +71,12 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Abstract {@link Sink} for file-based output. An implementation of FileBasedSink writes file-based
+ * Abstract class for file-based output. An implementation of FileBasedSink writes file-based
  * output and defines the format of output files (how values are written, headers/footers, MIME
  * type, etc.).
  *
  * <p>At pipeline construction time, the methods of FileBasedSink are called to validate the sink
- * and to create a {@link Sink.WriteOperation} that manages the process of writing to the sink.
+ * and to create a {@link FileBasedWriteOperation} that manages the process of writing to the sink.
  *
  * <p>The process of writing to file-based sink is as follows:
  * <ol>
@@ -84,11 +85,28 @@ import org.slf4j.LoggerFactory;
  * <li>these temporary files are renamed with final output filenames.
  * </ol>
  *
+ * <p>In order to ensure fault-tolerance, a bundle may be executed multiple times (e.g., in the
+ * event of failure/retry or for redundancy). However, exactly one of these executions will have its
+ * result passed to the finalize method. Each call to {@link FileBasedWriter#openWindowed}
+ * or {@link FileBasedWriter#openUnwindowed} is passed a unique <i>bundle id</i> when it is called
+ * by the WriteFiles transform, so even redundant or retried bundles will have a unique way of
+ * identifying
+ * their output.
+ *
+ * <p>The bundle id should be used to guarantee that a bundle's output is unique. This uniqueness
+ * guarantee is important; if a bundle is to be output to a file, for example, the name of the file
+ * will encode the unique bundle id to avoid conflicts with other writers.
+ *
+ * {@link FileBasedSink} can take a custom {@link FilenamePolicy} object to determine output
+ * filenames, and this policy object can be used to write windowed or triggered
+ * PCollections into separate files per window pane. This allows file output from unbounded
+ * PCollections, and also works for bounded PCollecctions.
+ *
  * <p>Supported file systems are those registered with {@link IOChannelUtils}.
  *
  * @param <T> the type of values written to the sink.
  */
-public abstract class FileBasedSink<T> extends Sink<T> {
+public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
   private static final Logger LOG = LoggerFactory.getLogger(FileBasedSink.class);
 
   /**
@@ -385,19 +403,15 @@ public abstract class FileBasedSink<T> extends Sink<T> {
     return fileNamePolicy;
   }
 
-  @Override
   public void validate(PipelineOptions options) {}
 
   /**
    * Return a subclass of {@link FileBasedSink.FileBasedWriteOperation} that will manage the write
    * to the sink.
    */
-  @Override
   public abstract FileBasedWriteOperation<T> createWriteOperation(PipelineOptions options);
 
-  @Override
   public void populateDisplayData(DisplayData.Builder builder) {
-    super.populateDisplayData(builder);
     getFileNamePolicy().populateDisplayData(builder);
   }
 
@@ -417,8 +431,7 @@ public abstract class FileBasedSink<T> extends Sink<T> {
   }
 
   /**
-   * Abstract {@link Sink.WriteOperation} that manages the process of writing to a
-   * {@link FileBasedSink}.
+   * Abstract operation that manages the process of writing to {@link FileBasedSink}.
    *
    * <p>The primary responsibilities of the FileBasedWriteOperation is the management of output
    * files. During a write, {@link FileBasedSink.FileBasedWriter}s write bundles to temporary file
@@ -457,7 +470,7 @@ public abstract class FileBasedSink<T> extends Sink<T> {
    *
    * @param <T> the type of values written to the sink.
    */
-  public abstract static class FileBasedWriteOperation<T> extends WriteOperation<T, FileResult> {
+  public abstract static class FileBasedWriteOperation<T> implements Serializable {
     /**
      * The Sink that this WriteOperation will write to.
      */
@@ -531,27 +544,18 @@ public abstract class FileBasedSink<T> extends Sink<T> {
 
     /**
      * Clients must implement to return a subclass of {@link FileBasedSink.FileBasedWriter}. This
-     * method must satisfy the restrictions placed on implementations of
-     * {@link Sink.WriteOperation#createWriter}. Namely, it must not mutate the state of the object.
+     * method must not mutate the state of the object.
      */
-    @Override
     public abstract FileBasedWriter<T> createWriter(PipelineOptions options) throws Exception;
 
-    @Override
+    /**
+     * Indicates that the operation will be performing windowed writes.
+     */
     public void setWindowedWrites(boolean windowedWrites) {
       this.windowedWrites = windowedWrites;
     }
 
     /**
-     * Initialization of the sink. Default implementation is a no-op. May be overridden by subclass
-     * implementations to perform initialization of the sink at pipeline runtime. This method must
-     * be idempotent and is subject to the same implementation restrictions as
-     * {@link Sink.WriteOperation#initialize}.
-     */
-    @Override
-    public void initialize(PipelineOptions options) throws Exception {}
-
-    /**
      * Finalizes writing by copying temporary output files to their final location and optionally
      * removing temporary files.
      *
@@ -565,10 +569,8 @@ public abstract class FileBasedSink<T> extends Sink<T> {
      *
      * @param writerResults the results of writes (FileResult).
      */
-    @Override
     public void finalize(Iterable<FileResult> writerResults,
-                         PipelineOptions options)
-        throws Exception {
+                         PipelineOptions options) throws Exception {
       // Collect names of temporary files and rename them.
       Map<String, String> outputFilenames = buildOutputFilenames(writerResults);
       copyToOutputFiles(outputFilenames, options);
@@ -696,24 +698,22 @@ public abstract class FileBasedSink<T> extends Sink<T> {
     /**
      * Provides a coder for {@link FileBasedSink.FileResult}.
      */
-    @Override
-    public Coder<FileResult> getWriterResultCoder() {
+    public final Coder<FileResult> getFileResultCoder() {
       return FileResultCoder.of();
     }
 
     /**
      * Returns the FileBasedSink for this write operation.
      */
-    @Override
     public FileBasedSink<T> getSink() {
       return sink;
     }
   }
 
   /**
-   * Abstract {@link Sink.Writer} that writes a bundle to a {@link FileBasedSink}. Subclass
-   * implementations provide a method that can write a single value to a {@link WritableByteChannel}
-   * ({@link Sink.Writer#write}).
+   * Abstract writer that writes a bundle to a {@link FileBasedSink}. Subclass
+   * implementations provide a method that can write a single value to a
+   * {@link WritableByteChannel}.
    *
    * <p>Subclass implementations may also override methods that write headers and footers before and
    * after the values in a bundle, respectively, as well as provide a MIME type for the output
@@ -724,7 +724,7 @@ public abstract class FileBasedSink<T> extends Sink<T> {
    *
    * @param <T> the type of values to write.
    */
-  public abstract static class FileBasedWriter<T> extends Writer<T, FileResult> {
+  public abstract static class FileBasedWriter<T> {
     private static final Logger LOG = LoggerFactory.getLogger(FileBasedWriter.class);
 
     final FileBasedWriteOperation<T> writeOperation;
@@ -793,9 +793,17 @@ public abstract class FileBasedSink<T> extends Sink<T> {
     protected void finishWrite() throws Exception {}
 
     /**
-     * Opens the channel.
+     *  Performs bundle initialization. For example, creates a temporary file for writing or
+     * initializes any state that will be used across calls to {@link FileBasedWriter#write}.
+     *
+     * <p>The unique id that is given to open should be used to ensure that the writer's output
+     * does not interfere with the output of other Writers, as a bundle may be executed many
+     * times for fault tolerance.
+     *
+     * <p>The window and paneInfo arguments are populated when windowed writes are requested.
+     * shard and numShards are populated for the case of static sharding. In cases where the
+     * runner is dynamically picking sharding, shard and numShards might both be set to -1.
      */
-    @Override
     public final void openWindowed(String uId,
                                    BoundedWindow window,
                                    PaneInfo paneInfo,
@@ -807,7 +815,15 @@ public abstract class FileBasedSink<T> extends Sink<T> {
       open(uId, window, paneInfo, shard, numShards);
     }
 
-    @Override
+    /**
+     * Called for each value in the bundle.
+     */
+    public abstract void write(T value) throws Exception;
+
+    /**
+     * Similar to {@link #openWindowed} however for the case where unwindowed writes were
+     * requested.
+     */
     public final void openUnwindowed(String uId,
                                      int shard,
                                      int numShards) throws Exception {
@@ -854,8 +870,7 @@ public abstract class FileBasedSink<T> extends Sink<T> {
       LOG.debug("Starting write of bundle {} to {}.", this.id, filename);
     }
 
-    @Override
-    public void cleanup() throws Exception {
+    public final void cleanup() throws Exception {
       if (filename != null) {
         IOChannelUtils.getFactory(filename).remove(Lists.<String>newArrayList(filename));
       }
@@ -864,7 +879,6 @@ public abstract class FileBasedSink<T> extends Sink<T> {
     /**
      * Closes the channel and returns the bundle result.
      */
-    @Override
     public final FileResult close() throws Exception {
       try (WritableByteChannel theChannel = channel) {
         LOG.debug("Writing footer to {}.", filename);
@@ -892,7 +906,6 @@ public abstract class FileBasedSink<T> extends Sink<T> {
     /**
      * Return the FileBasedWriteOperation that this Writer belongs to.
      */
-    @Override
     public FileBasedWriteOperation<T> getWriteOperation() {
       return writeOperation;
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/6a6a1a8c/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Sink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Sink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Sink.java
deleted file mode 100644
index ba1afbb..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Sink.java
+++ /dev/null
@@ -1,295 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io;
-
-import java.io.Serializable;
-import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.display.HasDisplayData;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.values.PCollection;
-
-/**
- * A {@code Sink} represents a resource that can be written to using the {@link Write} transform.
- *
- * <p>A parallel write to a {@code Sink} consists of three phases:
- * <ol>
- * <li>A sequential <i>initialization</i> phase (e.g., creating a temporary output directory, etc.)
- * <li>A <i>parallel write</i> phase where workers write bundles of records
- * <li>A sequential <i>finalization</i> phase (e.g., committing the writes, merging output files,
- * etc.)
- * </ol>
- *
- * <p>The {@link Write} transform can be used in a pipeline to perform this write.
- * Specifically, a Write transform can be applied to a {@link PCollection} {@code p} by:
- *
- * <p>{@code p.apply(Write.to(new MySink()));}
- *
- * <p>Implementing a {@link Sink} and the corresponding write operations requires extending three
- * abstract classes:
- *
- * <ul>
- * <li>{@link Sink}: an immutable logical description of the location/resource to write to.
- * Depending on the type of sink, it may contain fields such as the path to an output directory
- * on a filesystem, a database table name, etc. Implementors of {@link Sink} must
- * implement two methods: {@link Sink#validate} and {@link Sink#createWriteOperation}.
- * {@link Sink#validate Validate} is called by the Write transform at pipeline creation, and should
- * validate that the Sink can be written to. The createWriteOperation method is also called at
- * pipeline creation, and should return a WriteOperation object that defines how to write to the
- * Sink. Note that implementations of Sink must be serializable and Sinks must be immutable.
- *
- * <li>{@link WriteOperation}: The WriteOperation implements the <i>initialization</i> and
- * <i>finalization</i> phases of a write. Implementors of {@link WriteOperation} must implement
- * corresponding {@link WriteOperation#initialize} and {@link WriteOperation#finalize} methods. A
- * WriteOperation must also implement {@link WriteOperation#createWriter} that creates Writers,
- * {@link WriteOperation#getWriterResultCoder} that returns a {@link Coder} for the result of a
- * parallel write, and a {@link WriteOperation#getSink} that returns the Sink that the write
- * operation corresponds to. See below for more information about these methods and restrictions on
- * their implementation.
- *
- * <li>{@link Writer}: A Writer writes a bundle of records. Writer defines several methods:
- * {@link Writer#openWindowed} and {@link Writer#openUnwindowed}, which are called once at the
- * start of writing a bundle, depending on whether windowed or unwindowed output is requested.
- * {@link Writer#write}, which writes a single record from the bundle; {@link Writer#close},
- * which is called once at the end of writing a bundle; and {@link Writer#getWriteOperation},
- * which returns the write operation that the writer belongs to.
- * </ul>
- *
- * <h2>WriteOperation</h2>
- *
- * <p>{@link WriteOperation#initialize} and {@link WriteOperation#finalize} are conceptually called
- * once: at the beginning and end of a Write transform. However, implementors must ensure that these
- * methods are idempotent, as they may be called multiple times on different machines in the case of
- * failure/retry or for redundancy.
- *
- * <p>The finalize method of WriteOperation is passed an Iterable of a writer result type. This
- * writer result type should encode the result of a write and, in most cases, some encoding of the
- * unique bundle id.
- *
- * <p>All implementations of {@link WriteOperation} must be serializable.
- *
- * <p>WriteOperation may have mutable state. For instance, {@link WriteOperation#initialize} may
- * mutate the object state. These mutations will be visible in {@link WriteOperation#createWriter}
- * and {@link WriteOperation#finalize} because the object will be serialized after initialize and
- * deserialized before these calls. However, it is not serialized again after createWriter is
- * called, as createWriter will be called within workers to create Writers for the bundles that are
- * distributed to these workers. Therefore, newWriter should not mutate the WriteOperation state (as
- * these mutations will not be visible in finalize).
- *
- * <h2>Bundle Ids:</h2>
- *
- * <p>In order to ensure fault-tolerance, a bundle may be executed multiple times (e.g., in the
- * event of failure/retry or for redundancy). However, exactly one of these executions will have its
- * result passed to the WriteOperation's finalize method. Each call to {@link Writer#openWindowed}
- * or {@link Writer#openUnwindowed} is passed a unique <i>bundle id</i> when it is called by the
- * Write transform, so even redundant or retried bundles will have a unique way of identifying
- * their output.
- *
- * <p>The bundle id should be used to guarantee that a bundle's output is unique. This uniqueness
- * guarantee is important; if a bundle is to be output to a file, for example, the name of the file
- * must be unique to avoid conflicts with other Writers. The bundle id should be encoded in the
- * writer result returned by the Writer and subsequently used by the WriteOperation's finalize
- * method to identify the results of successful writes.
- *
- * <p>For example, consider the scenario where a Writer writes files containing serialized records
- * and the WriteOperation's finalization step is to merge or rename these output files. In this
- * case, a Writer may use its unique id to name its output file (to avoid conflicts) and return the
- * name of the file it wrote as its writer result. The WriteOperation will then receive an Iterable
- * of output file names that it can then merge or rename using some bundle naming scheme.
- *
- * <h2>Writer Results:</h2>
- *
- * <p>{@link WriteOperation}s and {@link Writer}s must agree on a writer result type that will be
- * returned by a Writer after it writes a bundle. This type can be a client-defined object or an
- * existing type; {@link WriteOperation#getWriterResultCoder} should return a {@link Coder} for the
- * type.
- *
- * <p>A note about thread safety: Any use of static members or methods in Writer should be thread
- * safe, as different instances of Writer objects may be created in different threads on the same
- * worker.
- *
- * @param <T> the type that will be written to the Sink.
- */
-@Experimental(Experimental.Kind.SOURCE_SINK)
-public abstract class Sink<T> implements Serializable, HasDisplayData {
-  /**
-   * Ensures that the sink is valid and can be written to before the write operation begins. One
-   * should use {@link com.google.common.base.Preconditions} to implement this method.
-   */
-  public abstract void validate(PipelineOptions options);
-
-  /**
-   * Returns an instance of a {@link WriteOperation} that can write to this Sink.
-   */
-  public abstract WriteOperation<T, ?> createWriteOperation(PipelineOptions options);
-
-  /**
-   * {@inheritDoc}
-   *
-   * <p>By default, does not register any display data. Implementors may override this method
-   * to provide their own display data.
-   */
-  @Override
-  public void populateDisplayData(DisplayData.Builder builder) {}
-
-  /**
-   * A {@link WriteOperation} defines the process of a parallel write of objects to a Sink.
-   *
-   * <p>The {@code WriteOperation} defines how to perform initialization and finalization of a
-   * parallel write to a sink as well as how to create a {@link Sink.Writer} object that can write
-   * a bundle to the sink.
-   *
-   * <p>Since operations in Beam may be run multiple times for redundancy or fault-tolerance,
-   * the initialization and finalization defined by a WriteOperation <b>must be idempotent</b>.
-   *
-   * <p>{@code WriteOperation}s may be mutable; a {@code WriteOperation} is serialized after the
-   * call to {@code initialize} method and deserialized before calls to
-   * {@code createWriter} and {@code finalized}. However, it is not
-   * reserialized after {@code createWriter}, so {@code createWriter} should not mutate the
-   * state of the {@code WriteOperation}.
-   *
-   * <p>See {@link Sink} for more detailed documentation about the process of writing to a Sink.
-   *
-   * @param <T> The type of objects to write
-   * @param <WriteT> The result of a per-bundle write
-   */
-  public abstract static class WriteOperation<T, WriteT> implements Serializable {
-    /**
-     * Performs initialization before writing to the sink. Called before writing begins.
-     */
-    public abstract void initialize(PipelineOptions options) throws Exception;
-
-    /**
-     * Indicates that the operation will be performing windowed writes.
-     */
-    public abstract void setWindowedWrites(boolean windowedWrites);
-
-    /**
-     * Given an Iterable of results from bundle writes, performs finalization after writing and
-     * closes the sink. Called after all bundle writes are complete.
-     *
-     * <p>The results that are passed to finalize are those returned by bundles that completed
-     * successfully. Although bundles may have been run multiple times (for fault-tolerance), only
-     * one writer result will be passed to finalize for each bundle. An implementation of finalize
-     * should perform clean up of any failed and successfully retried bundles.  Note that these
-     * failed bundles will not have their writer result passed to finalize, so finalize should be
-     * capable of locating any temporary/partial output written by failed bundles.
-     *
-     * <p>A best practice is to make finalize atomic. If this is impossible given the semantics
-     * of the sink, finalize should be idempotent, as it may be called multiple times in the case of
-     * failure/retry or for redundancy.
-     *
-     * <p>Note that the iteration order of the writer results is not guaranteed to be consistent if
-     * finalize is called multiple times.
-     *
-     * @param writerResults an Iterable of results from successful bundle writes.
-     */
-    public abstract void finalize(Iterable<WriteT> writerResults, PipelineOptions options)
-        throws Exception;
-
-    /**
-     * Creates a new {@link Sink.Writer} to write a bundle of the input to the sink.
-     *
-     * <p>The bundle id that the writer will use to uniquely identify its output will be passed to
-     * {@link Writer#openWindowed} or {@link Writer#openUnwindowed}.
-     *
-     * <p>Must not mutate the state of the WriteOperation.
-     */
-    public abstract Writer<T, WriteT> createWriter(PipelineOptions options) throws Exception;
-
-    /**
-     * Returns the Sink that this write operation writes to.
-     */
-    public abstract Sink<T> getSink();
-
-    /**
-     * Returns a coder for the writer result type.
-     */
-    public abstract Coder<WriteT> getWriterResultCoder();
-  }
-
-  /**
-   * A Writer writes a bundle of elements from a PCollection to a sink.
-   * {@link Writer#openWindowed} or {@link Writer#openUnwindowed} is called before writing begins
-   * and {@link Writer#close} is called after all elements in the bundle have been written.
-   * {@link Writer#write} writes an element to the sink.
-   *
-   * <p>Note that any access to static members or methods of a Writer must be thread-safe, as
-   * multiple instances of a Writer may be instantiated in different threads on the same worker.
-   *
-   * <p>See {@link Sink} for more detailed documentation about the process of writing to a Sink.
-   *
-   * @param <T> The type of object to write
-   * @param <WriteT> The writer results type (e.g., the bundle's output filename, as String)
-   */
-  public abstract static class Writer<T, WriteT> {
-    /**
-     * Performs bundle initialization. For example, creates a temporary file for writing or
-     * initializes any state that will be used across calls to {@link Writer#write}.
-     *
-     * <p>The unique id that is given to open should be used to ensure that the writer's output does
-     * not interfere with the output of other Writers, as a bundle may be executed many times for
-     * fault tolerance. See {@link Sink} for more information about bundle ids.
-     *
-     * <p>The window and paneInfo arguments are populated when windowed writes are requested.
-     * shard and numbShards are populated for the case of static sharding. In cases where the
-     * runner is dynamically picking sharding, shard and numShards might both be set to -1.
-     */
-    public abstract void openWindowed(String uId,
-                                      BoundedWindow window,
-                                      PaneInfo paneInfo,
-                                      int shard,
-                                      int numShards) throws Exception;
-
-    /**
-     * Perform bundle initialization for the case where the file is written unwindowed.
-     */
-    public abstract void openUnwindowed(String uId,
-                                        int shard,
-                                        int numShards) throws Exception;
-
-    public abstract void cleanup() throws Exception;
-
-    /**
-     * Called for each value in the bundle.
-     */
-    public abstract void write(T value) throws Exception;
-
-    /**
-     * Finishes writing the bundle. Closes any resources used for writing the bundle.
-     *
-     * <p>Returns a writer result that will be used in the {@link Sink.WriteOperation}'s
-     * finalization. The result should contain some way to identify the output of this bundle (using
-     * the bundle id). {@link WriteOperation#finalize} will use the writer result to identify
-     * successful writes. See {@link Sink} for more information about bundle ids.
-     *
-     * @return the writer result
-     */
-    public abstract WriteT close() throws Exception;
-
-    /**
-     * Returns the write operation this writer belongs to.
-     */
-    public abstract WriteOperation<T, WriteT> getWriteOperation();
-
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/6a6a1a8c/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
index 748086d..34dbe21 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
@@ -343,9 +343,9 @@ public class TFRecordIO {
       if (getFilenamePrefix() == null) {
         throw new IllegalStateException(
             "need to set the filename prefix of a TFRecordIO.Write transform");
-      }
-      org.apache.beam.sdk.io.Write<byte[]> write =
-          org.apache.beam.sdk.io.Write.to(
+     }
+      org.apache.beam.sdk.io.WriteFiles<byte[]> write =
+          org.apache.beam.sdk.io.WriteFiles.to(
               new TFRecordSink(
                   getFilenamePrefix(),
                   getFilenameSuffix(),

http://git-wip-us.apache.org/repos/asf/beam/blob/6a6a1a8c/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index ea80639..fbd76df 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -702,12 +702,12 @@ public class TextIO {
           throw new IllegalStateException(
               "cannot set both a filename policy and a filename prefix");
         }
-        org.apache.beam.sdk.io.Write<String> write = null;
+        WriteFiles<String> write = null;
         if (filenamePolicy != null) {
-         write = org.apache.beam.sdk.io.Write.to(
+         write = WriteFiles.to(
              new TextSink(filenamePolicy, header, footer, writableByteChannelFactory));
         } else {
-          write = org.apache.beam.sdk.io.Write.to(
+          write = WriteFiles.to(
               new TextSink(filenamePrefix, filenameSuffix, header, footer, shardTemplate,
                   writableByteChannelFactory));
         }
@@ -717,7 +717,7 @@ public class TextIO {
         if (windowedWrites) {
           write = write.withWindowedWrites();
         }
-        return input.apply("Write", write);
+        return input.apply("WriteFiles", write);
       }
 
       @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/6a6a1a8c/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
deleted file mode 100644
index 16f3eb6..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
+++ /dev/null
@@ -1,603 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.ThreadLocalRandom;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.SerializableCoder;
-import org.apache.beam.sdk.coders.VoidCoder;
-import org.apache.beam.sdk.io.Sink.WriteOperation;
-import org.apache.beam.sdk.io.Sink.Writer;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.ValueProvider;
-import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.View;
-import org.apache.beam.sdk.transforms.WithKeys;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollection.IsBounded;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PDone;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A {@link PTransform} that writes to a {@link Sink}. A write begins with a sequential global
- * initialization of a sink, followed by a parallel write, and ends with a sequential finalization
- * of the write. The output of a write is {@link PDone}.
- *
- * <p>By default, every bundle in the input {@link PCollection} will be processed by a
- * {@link WriteOperation}, so the number of outputs will vary based on runner behavior, though at
- * least 1 output will always be produced. The exact parallelism of the write stage can be
- * controlled using {@link Write#withNumShards}, typically used to control how many files are
- * produced or to globally limit the number of workers connecting to an external service. However,
- * this option can often hurt performance: it adds an additional {@link GroupByKey} to the pipeline.
- *
- * <p>{@code Write} re-windows the data into the global window, so it is typically not well suited
- * to use in streaming pipelines.
- *
- * <p>Example usage with runner-determined sharding:
- *
- * <pre>{@code p.apply(Write.to(new MySink(...)));}</pre>
- *
- * <p>Example usage with a fixed number of shards:
- *
- * <pre>{@code p.apply(Write.to(new MySink(...)).withNumShards(3));}</pre>
- */
-@Experimental(Experimental.Kind.SOURCE_SINK)
-public class Write<T> extends PTransform<PCollection<T>, PDone> {
-  private static final Logger LOG = LoggerFactory.getLogger(Write.class);
-
-  private static final int UNKNOWN_SHARDNUM = -1;
-  private static final int UNKNOWN_NUMSHARDS = -1;
-
-  private final Sink<T> sink;
-  // This allows the number of shards to be dynamically computed based on the input
-  // PCollection.
-  @Nullable
-  private final PTransform<PCollection<T>, PCollectionView<Integer>> computeNumShards;
-  // We don't use a side input for static sharding, as we want this value to be updatable
-  // when a pipeline is updated.
-  @Nullable
-  private final ValueProvider<Integer> numShardsProvider;
-  private boolean windowedWrites;
-
-  /**
-   * Creates a {@link Write} transform that writes to the given {@link Sink}, letting the runner
-   * control how many different shards are produced.
-   */
-  public static <T> Write<T> to(Sink<T> sink) {
-    checkNotNull(sink, "sink");
-    return new Write<>(sink, null /* runner-determined sharding */, null, false);
-  }
-
-  private Write(
-      Sink<T> sink,
-      @Nullable PTransform<PCollection<T>, PCollectionView<Integer>> computeNumShards,
-      @Nullable ValueProvider<Integer> numShardsProvider,
-      boolean windowedWrites) {
-    this.sink = sink;
-    this.computeNumShards = computeNumShards;
-    this.numShardsProvider = numShardsProvider;
-    this.windowedWrites = windowedWrites;
-  }
-
-  @Override
-  public PDone expand(PCollection<T> input) {
-    checkArgument(IsBounded.BOUNDED == input.isBounded() || windowedWrites,
-        "%s can only be applied to an unbounded PCollection if doing windowed writes",
-        Write.class.getSimpleName());
-    PipelineOptions options = input.getPipeline().getOptions();
-    sink.validate(options);
-    return createWrite(input, sink.createWriteOperation(options));
-  }
-
-  @Override
-  public void populateDisplayData(DisplayData.Builder builder) {
-    super.populateDisplayData(builder);
-    builder
-        .add(DisplayData.item("sink", sink.getClass()).withLabel("Write Sink"))
-        .include("sink", sink);
-    if (getSharding() != null) {
-      builder.include("sharding", getSharding());
-    } else if (getNumShards() != null) {
-      String numShards = getNumShards().isAccessible()
-          ? getNumShards().get().toString() : getNumShards().toString();
-      builder.add(DisplayData.item("numShards", numShards)
-          .withLabel("Fixed Number of Shards"));
-    }
-  }
-
-  /**
-   * Returns the {@link Sink} associated with this PTransform.
-   */
-  public Sink<T> getSink() {
-    return sink;
-  }
-
-  /**
-   * Gets the {@link PTransform} that will be used to determine sharding. This can be either a
-   * static number of shards (as following a call to {@link #withNumShards(int)}), dynamic (by
-   * {@link #withSharding(PTransform)}), or runner-determined (by {@link
-   * #withRunnerDeterminedSharding()}.
-   */
-  @Nullable
-  public PTransform<PCollection<T>, PCollectionView<Integer>> getSharding() {
-    return computeNumShards;
-  }
-
-  public ValueProvider<Integer> getNumShards() {
-    return numShardsProvider;
-  }
-
-  /**
-   * Returns a new {@link Write} that will write to the current {@link Sink} using the
-   * specified number of shards.
-   *
-   * <p>This option should be used sparingly as it can hurt performance. See {@link Write} for
-   * more information.
-   *
-   * <p>A value less than or equal to 0 will be equivalent to the default behavior of
-   * runner-determined sharding.
-   */
-  public Write<T> withNumShards(int numShards) {
-    if (numShards > 0) {
-      return withNumShards(StaticValueProvider.of(numShards));
-    }
-    return withRunnerDeterminedSharding();
-  }
-
-  /**
-   * Returns a new {@link Write} that will write to the current {@link Sink} using the
-   * {@link ValueProvider} specified number of shards.
-   *
-   * <p>This option should be used sparingly as it can hurt performance. See {@link Write} for
-   * more information.
-   */
-  public Write<T> withNumShards(ValueProvider<Integer> numShardsProvider) {
-    return new Write<>(sink, null, numShardsProvider, windowedWrites);
-  }
-
-  /**
-   * Returns a new {@link Write} that will write to the current {@link Sink} using the
-   * specified {@link PTransform} to compute the number of shards.
-   *
-   * <p>This option should be used sparingly as it can hurt performance. See {@link Write} for
-   * more information.
-   */
-  public Write<T> withSharding(PTransform<PCollection<T>, PCollectionView<Integer>> sharding) {
-    checkNotNull(
-        sharding, "Cannot provide null sharding. Use withRunnerDeterminedSharding() instead");
-    return new Write<>(sink, sharding, null, windowedWrites);
-  }
-
-  /**
-   * Returns a new {@link Write} that will write to the current {@link Sink} with
-   * runner-determined sharding.
-   */
-  public Write<T> withRunnerDeterminedSharding() {
-    return new Write<>(sink, null, null, windowedWrites);
-  }
-
-  /**
-   * Returns a new {@link Write} that writes preserves windowing on it's input.
-   *
-   * <p>If this option is not specified, windowing and triggering are replaced by
-   * {@link GlobalWindows} and {@link DefaultTrigger}.
-   *
-   * <p>If there is no data for a window, no output shards will be generated for that window.
-   * If a window triggers multiple times, then more than a single output shard might be
-   * generated multiple times; it's up to the sink implementation to keep these output shards
-   * unique.
-   *
-   * <p>This option can only be used if {@link #withNumShards(int)} is also set to a
-   * positive value.
-   */
-  public Write<T> withWindowedWrites() {
-    return new Write<>(sink, computeNumShards, numShardsProvider, true);
-  }
-
-  /**
-   * Writes all the elements in a bundle using a {@link Writer} produced by the
-   * {@link WriteOperation} associated with the {@link Sink}.
-   */
-  private class WriteBundles<WriteT> extends DoFn<T, WriteT> {
-    // Writer that will write the records in this bundle. Lazily
-    // initialized in processElement.
-    private Writer<T, WriteT> writer = null;
-    private final PCollectionView<WriteOperation<T, WriteT>> writeOperationView;
-
-    WriteBundles(PCollectionView<WriteOperation<T, WriteT>> writeOperationView) {
-      this.writeOperationView = writeOperationView;
-    }
-
-    @ProcessElement
-    public void processElement(ProcessContext c, BoundedWindow window) throws Exception {
-      // Lazily initialize the Writer
-      if (writer == null) {
-        WriteOperation<T, WriteT> writeOperation = c.sideInput(writeOperationView);
-        LOG.info("Opening writer for write operation {}", writeOperation);
-        writer = writeOperation.createWriter(c.getPipelineOptions());
-
-        if (windowedWrites) {
-          writer.openWindowed(UUID.randomUUID().toString(), window, c.pane(), UNKNOWN_SHARDNUM,
-              UNKNOWN_NUMSHARDS);
-        } else {
-          writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM, UNKNOWN_NUMSHARDS);
-        }
-        LOG.debug("Done opening writer {} for operation {}", writer, writeOperationView);
-      }
-      try {
-        writer.write(c.element());
-      } catch (Exception e) {
-        // Discard write result and close the write.
-        try {
-          writer.close();
-          // The writer does not need to be reset, as this DoFn cannot be reused.
-        } catch (Exception closeException) {
-          if (closeException instanceof InterruptedException) {
-            // Do not silently ignore interrupted state.
-            Thread.currentThread().interrupt();
-          }
-          // Do not mask the exception that caused the write to fail.
-          e.addSuppressed(closeException);
-        }
-        throw e;
-      }
-    }
-
-    @FinishBundle
-    public void finishBundle(Context c) throws Exception {
-      if (writer != null) {
-        WriteT result = writer.close();
-        c.output(result);
-        // Reset state in case of reuse.
-        writer = null;
-      }
-    }
-
-    @Override
-    public void populateDisplayData(DisplayData.Builder builder) {
-      builder.delegate(Write.this);
-    }
-  }
-
-  /**
-   * Like {@link WriteBundles}, but where the elements for each shard have been collected into
-   * a single iterable.
-   *
-   * @see WriteBundles
-   */
-  private class WriteShardedBundles<WriteT> extends DoFn<KV<Integer, Iterable<T>>, WriteT> {
-    private final PCollectionView<WriteOperation<T, WriteT>> writeOperationView;
-    private final PCollectionView<Integer> numShardsView;
-
-    WriteShardedBundles(PCollectionView<WriteOperation<T, WriteT>> writeOperationView,
-                        PCollectionView<Integer> numShardsView) {
-      this.writeOperationView = writeOperationView;
-      this.numShardsView = numShardsView;
-    }
-
-    @ProcessElement
-    public void processElement(ProcessContext c, BoundedWindow window) throws Exception {
-      int numShards = numShardsView != null ? c.sideInput(numShardsView) : getNumShards().get();
-      // In a sharded write, single input element represents one shard. We can open and close
-      // the writer in each call to processElement.
-      WriteOperation<T, WriteT> writeOperation = c.sideInput(writeOperationView);
-      LOG.info("Opening writer for write operation {}", writeOperation);
-      Writer<T, WriteT> writer = writeOperation.createWriter(c.getPipelineOptions());
-      if (windowedWrites) {
-        writer.openWindowed(UUID.randomUUID().toString(), window, c.pane(), c.element().getKey(),
-            numShards);
-      } else {
-        writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM, UNKNOWN_NUMSHARDS);
-      }
-      LOG.debug("Done opening writer {} for operation {}", writer, writeOperationView);
-
-      try {
-        try {
-          for (T t : c.element().getValue()) {
-            writer.write(t);
-          }
-        } catch (Exception e) {
-          try {
-            writer.close();
-          } catch (Exception closeException) {
-            if (closeException instanceof InterruptedException) {
-              // Do not silently ignore interrupted state.
-              Thread.currentThread().interrupt();
-            }
-            // Do not mask the exception that caused the write to fail.
-            e.addSuppressed(closeException);
-          }
-          throw e;
-        }
-
-        // Close the writer; if this throws let the error propagate.
-        WriteT result = writer.close();
-        c.output(result);
-      } catch (Exception e) {
-        // If anything goes wrong, make sure to delete the temporary file.
-        writer.cleanup();
-        throw e;
-      }
-    }
-
-    @Override
-    public void populateDisplayData(DisplayData.Builder builder) {
-      builder.delegate(Write.this);
-    }
-  }
-
-  private static class ApplyShardingKey<T> extends DoFn<T, KV<Integer, T>> {
-    private final PCollectionView<Integer> numShardsView;
-    private final ValueProvider<Integer> numShardsProvider;
-    private int shardNumber;
-
-    ApplyShardingKey(PCollectionView<Integer> numShardsView,
-                     ValueProvider<Integer> numShardsProvider) {
-      this.numShardsView = numShardsView;
-      this.numShardsProvider = numShardsProvider;
-      shardNumber = UNKNOWN_SHARDNUM;
-    }
-
-    @ProcessElement
-    public void processElement(ProcessContext context) {
-      int shardCount = 0;
-      if (numShardsView != null) {
-        shardCount = context.sideInput(numShardsView);
-      } else {
-        checkNotNull(numShardsProvider);
-        shardCount = numShardsProvider.get();
-      }
-      checkArgument(
-          shardCount > 0,
-          "Must have a positive number of shards specified for non-runner-determined sharding."
-              + " Got %s",
-          shardCount);
-      if (shardNumber == UNKNOWN_SHARDNUM) {
-        // We want to desynchronize the first record sharding key for each instance of
-        // ApplyShardingKey, so records in a small PCollection will be statistically balanced.
-        shardNumber = ThreadLocalRandom.current().nextInt(shardCount);
-      } else {
-        shardNumber = (shardNumber + 1) % shardCount;
-      }
-      context.output(KV.of(shardNumber, context.element()));
-    }
-  }
-
-  /**
-   * A write is performed as sequence of three {@link ParDo}'s.
-   *
-   * <p>In the first, a do-once ParDo is applied to a singleton PCollection containing the Sink's
-   * {@link WriteOperation}. In this initialization ParDo, {@link WriteOperation#initialize} is
-   * called. The output of this ParDo is a singleton PCollection
-   * containing the WriteOperation.
-   *
-   * <p>This singleton collection containing the WriteOperation is then used as a side input to a
-   * ParDo over the PCollection of elements to write. In this bundle-writing phase,
-   * {@link WriteOperation#createWriter} is called to obtain a {@link Writer}.
-   * {@link Writer#open} and {@link Writer#close} are called in {@link DoFn#startBundle} and
-   * {@link DoFn#finishBundle}, respectively, and {@link Writer#write} method is called for
-   * every element in the bundle. The output of this ParDo is a PCollection of
-   * <i>writer result</i> objects (see {@link Sink} for a description of writer results)-one for
-   * each bundle.
-   *
-   * <p>The final do-once ParDo uses the singleton collection of the WriteOperation as input and
-   * the collection of writer results as a side-input. In this ParDo,
-   * {@link WriteOperation#finalize} is called to finalize the write.
-   *
-   * <p>If the write of any element in the PCollection fails, {@link Writer#close} will be called
-   * before the exception that caused the write to fail is propagated and the write result will be
-   * discarded.
-   *
-   * <p>Since the {@link WriteOperation} is serialized after the initialization ParDo and
-   * deserialized in the bundle-writing and finalization phases, any state change to the
-   * WriteOperation object that occurs during initialization is visible in the latter phases.
-   * However, the WriteOperation is not serialized after the bundle-writing phase.  This is why
-   * implementations should guarantee that {@link WriteOperation#createWriter} does not mutate
-   * WriteOperation).
-   */
-  private <WriteT> PDone createWrite(
-      PCollection<T> input, WriteOperation<T, WriteT> writeOperation) {
-    Pipeline p = input.getPipeline();
-    writeOperation.setWindowedWrites(windowedWrites);
-
-    // A coder to use for the WriteOperation.
-    @SuppressWarnings("unchecked")
-    Coder<WriteOperation<T, WriteT>> operationCoder =
-        (Coder<WriteOperation<T, WriteT>>) SerializableCoder.of(writeOperation.getClass());
-
-    // A singleton collection of the WriteOperation, to be used as input to a ParDo to initialize
-    // the sink.
-    PCollection<WriteOperation<T, WriteT>> operationCollection =
-        p.apply("CreateOperationCollection", Create.of(writeOperation).withCoder(operationCoder));
-
-    // Initialize the resource in a do-once ParDo on the WriteOperation.
-    operationCollection = operationCollection
-        .apply("Initialize", ParDo.of(
-            new DoFn<WriteOperation<T, WriteT>, WriteOperation<T, WriteT>>() {
-          @ProcessElement
-          public void processElement(ProcessContext c) throws Exception {
-            WriteOperation<T, WriteT> writeOperation = c.element();
-            LOG.info("Initializing write operation {}", writeOperation);
-            writeOperation.initialize(c.getPipelineOptions());
-            writeOperation.setWindowedWrites(windowedWrites);
-            LOG.debug("Done initializing write operation {}", writeOperation);
-            // The WriteOperation is also the output of this ParDo, so it can have mutable
-            // state.
-            c.output(writeOperation);
-          }
-        }))
-        .setCoder(operationCoder);
-
-    // Create a view of the WriteOperation to be used as a sideInput to the parallel write phase.
-    final PCollectionView<WriteOperation<T, WriteT>> writeOperationView =
-        operationCollection.apply(View.<WriteOperation<T, WriteT>>asSingleton());
-
-    if (!windowedWrites) {
-      // Re-window the data into the global window and remove any existing triggers.
-      input =
-          input.apply(
-              Window.<T>into(new GlobalWindows())
-                  .triggering(DefaultTrigger.of())
-                  .discardingFiredPanes());
-    }
-
-
-    // Perform the per-bundle writes as a ParDo on the input PCollection (with the WriteOperation
-    // as a side input) and collect the results of the writes in a PCollection.
-    // There is a dependency between this ParDo and the first (the WriteOperation PCollection
-    // as a side input), so this will happen after the initial ParDo.
-    PCollection<WriteT> results;
-    final PCollectionView<Integer> numShardsView;
-    if (computeNumShards == null && numShardsProvider == null) {
-      if (windowedWrites) {
-        throw new IllegalStateException("When doing windowed writes, numShards must be set"
-            + "explicitly to a positive value");
-      }
-      numShardsView = null;
-      results = input
-          .apply("WriteBundles",
-              ParDo.of(new WriteBundles<>(writeOperationView))
-                  .withSideInputs(writeOperationView));
-    } else {
-      if (computeNumShards != null) {
-        numShardsView = input.apply(computeNumShards);
-        results  = input
-            .apply("ApplyShardLabel", ParDo.of(
-                new ApplyShardingKey<T>(numShardsView, null)).withSideInputs(numShardsView))
-            .apply("GroupIntoShards", GroupByKey.<Integer, T>create())
-            .apply("WriteShardedBundles",
-                ParDo.of(new WriteShardedBundles<>(writeOperationView, numShardsView))
-                    .withSideInputs(numShardsView, writeOperationView));
-      } else {
-        numShardsView = null;
-        results = input
-            .apply("ApplyShardLabel", ParDo.of(new ApplyShardingKey<T>(null, numShardsProvider)))
-            .apply("GroupIntoShards", GroupByKey.<Integer, T>create())
-            .apply("WriteShardedBundles",
-                ParDo.of(new WriteShardedBundles<>(writeOperationView, null))
-                    .withSideInputs(writeOperationView));
-      }
-    }
-    results.setCoder(writeOperation.getWriterResultCoder());
-
-    if (windowedWrites) {
-      // When processing streaming windowed writes, results will arrive multiple times. This
-      // means we can't share the below implementation that turns the results into a side input,
-      // as new data arriving into a side input does not trigger the listening DoFn. Instead
-      // we aggregate the result set using a singleton GroupByKey, so the DoFn will be triggered
-      // whenever new data arrives.
-      PCollection<KV<Void, WriteT>> keyedResults =
-          results.apply("AttachSingletonKey", WithKeys.<Void, WriteT>of((Void) null));
-      keyedResults.setCoder(KvCoder.<Void, WriteT>of(VoidCoder.of(), writeOperation
-          .getWriterResultCoder()));
-
-      // Is the continuation trigger sufficient?
-      keyedResults
-          .apply("FinalizeGroupByKey", GroupByKey.<Void, WriteT>create())
-          .apply("Finalize", ParDo.of(new DoFn<KV<Void, Iterable<WriteT>>, Integer>() {
-            @ProcessElement
-            public void processElement(ProcessContext c) throws Exception {
-              WriteOperation<T, WriteT> writeOperation = c.sideInput(writeOperationView);
-              LOG.info("Finalizing write operation {}.", writeOperation);
-              List<WriteT> results = Lists.newArrayList(c.element().getValue());
-              writeOperation.finalize(results, c.getPipelineOptions());
-              LOG.debug("Done finalizing write operation {}", writeOperation);
-            }
-          }).withSideInputs(writeOperationView));
-    } else {
-      final PCollectionView<Iterable<WriteT>> resultsView =
-          results.apply(View.<WriteT>asIterable());
-      ImmutableList.Builder<PCollectionView<?>> sideInputs =
-          ImmutableList.<PCollectionView<?>>builder().add(resultsView);
-      if (numShardsView != null) {
-        sideInputs.add(numShardsView);
-      }
-
-      // Finalize the write in another do-once ParDo on the singleton collection containing the
-      // Writer. The results from the per-bundle writes are given as an Iterable side input.
-      // The WriteOperation's state is the same as after its initialization in the first do-once
-      // ParDo. There is a dependency between this ParDo and the parallel write (the writer
-      // results collection as a side input), so it will happen after the parallel write.
-      // For the non-windowed case, we guarantee that  if no data is written but the user has
-      // set numShards, then all shards will be written out as empty files. For this reason we
-      // use a side input here.
-      operationCollection
-          .apply("Finalize", ParDo.of(new DoFn<WriteOperation<T, WriteT>, Integer>() {
-            @ProcessElement
-            public void processElement(ProcessContext c) throws Exception {
-              WriteOperation<T, WriteT> writeOperation = c.element();
-              LOG.info("Finalizing write operation {}.", writeOperation);
-              List<WriteT> results = Lists.newArrayList(c.sideInput(resultsView));
-              LOG.debug("Side input initialized to finalize write operation {}.", writeOperation);
-
-              // We must always output at least 1 shard, and honor user-specified numShards if
-              // set.
-              int minShardsNeeded;
-              if (numShardsView != null) {
-                minShardsNeeded = c.sideInput(numShardsView);
-              } else if (numShardsProvider != null) {
-                minShardsNeeded = numShardsProvider.get();
-              } else {
-                minShardsNeeded = 1;
-              }
-              int extraShardsNeeded = minShardsNeeded - results.size();
-              if (extraShardsNeeded > 0) {
-                LOG.info(
-                    "Creating {} empty output shards in addition to {} written for a total of "
-                        + " {}.", extraShardsNeeded, results.size(), minShardsNeeded);
-                for (int i = 0; i < extraShardsNeeded; ++i) {
-                  Writer<T, WriteT> writer = writeOperation.createWriter(c.getPipelineOptions());
-                  writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM,
-                      UNKNOWN_NUMSHARDS);
-                  WriteT emptyWrite = writer.close();
-                  results.add(emptyWrite);
-                }
-                LOG.debug("Done creating extra shards.");
-              }
-              writeOperation.finalize(results, c.getPipelineOptions());
-              LOG.debug("Done finalizing write operation {}", writeOperation);
-            }
-          }).withSideInputs(sideInputs.build()));
-    }
-    return PDone.in(input.getPipeline());
-  }
-}