You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2017/07/20 17:09:43 UTC

[16/28] beam git commit: Revert "[BEAM-2610] This closes #3553"

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
index 7013044..2fd10ac 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
@@ -20,12 +20,9 @@ package org.apache.beam.sdk.io;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 
-import com.google.common.base.Objects;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import com.google.common.hash.Hashing;
-import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
@@ -33,12 +30,8 @@ import java.util.concurrent.ThreadLocalRandom;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
 import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.ShardedKeyCoder;
-import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.FileBasedSink.FileResult;
 import org.apache.beam.sdk.io.FileBasedSink.FileResultCoder;
@@ -49,11 +42,9 @@ import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.WithKeys;
 import org.apache.beam.sdk.transforms.display.DisplayData;
@@ -62,17 +53,11 @@ import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollection.IsBounded;
-import org.apache.beam.sdk.values.PCollectionList;
-import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PDone;
-import org.apache.beam.sdk.values.ShardedKey;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.TupleTagList;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -81,12 +66,13 @@ import org.slf4j.LoggerFactory;
  * global initialization of a sink, followed by a parallel write, and ends with a sequential
  * finalization of the write. The output of a write is {@link PDone}.
  *
- * <p>By default, every bundle in the input {@link PCollection} will be processed by a {@link
- * WriteOperation}, so the number of output will vary based on runner behavior, though at least 1
- * output will always be produced. The exact parallelism of the write stage can be controlled using
- * {@link WriteFiles#withNumShards}, typically used to control how many files are produced or to
- * globally limit the number of workers connecting to an external service. However, this option can
- * often hurt performance: it adds an additional {@link GroupByKey} to the pipeline.
+ * <p>By default, every bundle in the input {@link PCollection} will be processed by a
+ * {@link WriteOperation}, so the number of output
+ * will vary based on runner behavior, though at least 1 output will always be produced. The
+ * exact parallelism of the write stage can be controlled using {@link WriteFiles#withNumShards},
+ * typically used to control how many files are produced or to globally limit the number of
+ * workers connecting to an external service. However, this option can often hurt performance: it
+ * adds an additional {@link GroupByKey} to the pipeline.
  *
  * <p>Example usage with runner-determined sharding:
  *
@@ -97,70 +83,44 @@ import org.slf4j.LoggerFactory;
  * <pre>{@code p.apply(WriteFiles.to(new MySink(...)).withNumShards(3));}</pre>
  */
 @Experimental(Experimental.Kind.SOURCE_SINK)
-public class WriteFiles<UserT, DestinationT, OutputT>
-    extends PTransform<PCollection<UserT>, PDone> {
+public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> {
   private static final Logger LOG = LoggerFactory.getLogger(WriteFiles.class);
 
-  // The maximum number of file writers to keep open in a single bundle at a time, since file
-  // writers default to 64mb buffers. This comes into play when writing per-window files.
-  // The first 20 files from a single WriteFiles transform will write files inline in the
-  // transform. Anything beyond that might be shuffled.
-  // Keep in mind that specific runners may decide to run multiple bundles in parallel, based on
-  // their own policy.
-  private static final int DEFAULT_MAX_NUM_WRITERS_PER_BUNDLE = 20;
-
-  // When we spill records, shard the output keys to prevent hotspots.
-  // We could consider making this a parameter.
-  private static final int SPILLED_RECORD_SHARDING_FACTOR = 10;
-
   static final int UNKNOWN_SHARDNUM = -1;
-  private FileBasedSink<OutputT, DestinationT> sink;
-  private SerializableFunction<UserT, OutputT> formatFunction;
-  private WriteOperation<OutputT, DestinationT> writeOperation;
+  private FileBasedSink<T> sink;
+  private WriteOperation<T> writeOperation;
   // This allows the number of shards to be dynamically computed based on the input
   // PCollection.
-  @Nullable private final PTransform<PCollection<UserT>, PCollectionView<Integer>> computeNumShards;
+  @Nullable
+  private final PTransform<PCollection<T>, PCollectionView<Integer>> computeNumShards;
   // We don't use a side input for static sharding, as we want this value to be updatable
   // when a pipeline is updated.
   @Nullable
   private final ValueProvider<Integer> numShardsProvider;
   private final boolean windowedWrites;
-  private int maxNumWritersPerBundle;
 
   /**
    * Creates a {@link WriteFiles} transform that writes to the given {@link FileBasedSink}, letting
    * the runner control how many different shards are produced.
    */
-  public static <UserT, DestinationT, OutputT> WriteFiles<UserT, DestinationT, OutputT> to(
-      FileBasedSink<OutputT, DestinationT> sink,
-      SerializableFunction<UserT, OutputT> formatFunction) {
+  public static <T> WriteFiles<T> to(FileBasedSink<T> sink) {
     checkNotNull(sink, "sink");
-    return new WriteFiles<>(
-        sink,
-        formatFunction,
-        null /* runner-determined sharding */,
-        null,
-        false,
-        DEFAULT_MAX_NUM_WRITERS_PER_BUNDLE);
+    return new WriteFiles<>(sink, null /* runner-determined sharding */, null, false);
   }
 
   private WriteFiles(
-      FileBasedSink<OutputT, DestinationT> sink,
-      SerializableFunction<UserT, OutputT> formatFunction,
-      @Nullable PTransform<PCollection<UserT>, PCollectionView<Integer>> computeNumShards,
+      FileBasedSink<T> sink,
+      @Nullable PTransform<PCollection<T>, PCollectionView<Integer>> computeNumShards,
       @Nullable ValueProvider<Integer> numShardsProvider,
-      boolean windowedWrites,
-      int maxNumWritersPerBundle) {
+      boolean windowedWrites) {
     this.sink = sink;
-    this.formatFunction = checkNotNull(formatFunction);
     this.computeNumShards = computeNumShards;
     this.numShardsProvider = numShardsProvider;
     this.windowedWrites = windowedWrites;
-    this.maxNumWritersPerBundle = maxNumWritersPerBundle;
   }
 
   @Override
-  public PDone expand(PCollection<UserT> input) {
+  public PDone expand(PCollection<T> input) {
     if (input.isBounded() == IsBounded.UNBOUNDED) {
       checkArgument(windowedWrites,
           "Must use windowed writes when applying %s to an unbounded PCollection",
@@ -199,16 +159,13 @@ public class WriteFiles<UserT, DestinationT, OutputT>
     }
   }
 
-  /** Returns the {@link FileBasedSink} associated with this PTransform. */
-  public FileBasedSink<OutputT, DestinationT> getSink() {
+  /**
+   * Returns the {@link FileBasedSink} associated with this PTransform.
+   */
+  public FileBasedSink<T> getSink() {
     return sink;
   }
 
-  /** Returns the the format function that maps the user type to the record written to files. */
-  public SerializableFunction<UserT, OutputT> getFormatFunction() {
-    return formatFunction;
-  }
-
   /**
    * Returns whether or not to perform windowed writes.
    */
@@ -223,7 +180,7 @@ public class WriteFiles<UserT, DestinationT, OutputT>
    * #withRunnerDeterminedSharding()}.
    */
   @Nullable
-  public PTransform<PCollection<UserT>, PCollectionView<Integer>> getSharding() {
+  public PTransform<PCollection<T>, PCollectionView<Integer>> getSharding() {
     return computeNumShards;
   }
 
@@ -241,7 +198,7 @@ public class WriteFiles<UserT, DestinationT, OutputT>
    * <p>A value less than or equal to 0 will be equivalent to the default behavior of
    * runner-determined sharding.
    */
-  public WriteFiles<UserT, DestinationT, OutputT> withNumShards(int numShards) {
+  public WriteFiles<T> withNumShards(int numShards) {
     if (numShards > 0) {
       return withNumShards(StaticValueProvider.of(numShards));
     }
@@ -255,27 +212,8 @@ public class WriteFiles<UserT, DestinationT, OutputT>
    * <p>This option should be used sparingly as it can hurt performance. See {@link WriteFiles} for
    * more information.
    */
-  public WriteFiles<UserT, DestinationT, OutputT> withNumShards(
-      ValueProvider<Integer> numShardsProvider) {
-    return new WriteFiles<>(
-        sink,
-        formatFunction,
-        computeNumShards,
-        numShardsProvider,
-        windowedWrites,
-        maxNumWritersPerBundle);
-  }
-
-  /** Set the maximum number of writers created in a bundle before spilling to shuffle. */
-  public WriteFiles<UserT, DestinationT, OutputT> withMaxNumWritersPerBundle(
-      int maxNumWritersPerBundle) {
-    return new WriteFiles<>(
-        sink,
-        formatFunction,
-        computeNumShards,
-        numShardsProvider,
-        windowedWrites,
-        maxNumWritersPerBundle);
+  public WriteFiles<T> withNumShards(ValueProvider<Integer> numShardsProvider) {
+    return new WriteFiles<>(sink, null, numShardsProvider, windowedWrites);
   }
 
   /**
@@ -285,169 +223,127 @@ public class WriteFiles<UserT, DestinationT, OutputT>
    * <p>This option should be used sparingly as it can hurt performance. See {@link WriteFiles} for
    * more information.
    */
-  public WriteFiles<UserT, DestinationT, OutputT> withSharding(
-      PTransform<PCollection<UserT>, PCollectionView<Integer>> sharding) {
+  public WriteFiles<T> withSharding(PTransform<PCollection<T>, PCollectionView<Integer>> sharding) {
     checkNotNull(
         sharding, "Cannot provide null sharding. Use withRunnerDeterminedSharding() instead");
-    return new WriteFiles<>(
-        sink, formatFunction, sharding, null, windowedWrites, maxNumWritersPerBundle);
+    return new WriteFiles<>(sink, sharding, null, windowedWrites);
   }
 
   /**
    * Returns a new {@link WriteFiles} that will write to the current {@link FileBasedSink} with
    * runner-determined sharding.
    */
-  public WriteFiles<UserT, DestinationT, OutputT> withRunnerDeterminedSharding() {
-    return new WriteFiles<>(
-        sink, formatFunction, null, null, windowedWrites, maxNumWritersPerBundle);
+  public WriteFiles<T> withRunnerDeterminedSharding() {
+    return new WriteFiles<>(sink, null, null, windowedWrites);
   }
 
   /**
    * Returns a new {@link WriteFiles} that writes preserves windowing on it's input.
    *
-   * <p>If this option is not specified, windowing and triggering are replaced by {@link
-   * GlobalWindows} and {@link DefaultTrigger}.
+   * <p>If this option is not specified, windowing and triggering are replaced by
+   * {@link GlobalWindows} and {@link DefaultTrigger}.
    *
-   * <p>If there is no data for a window, no output shards will be generated for that window. If a
-   * window triggers multiple times, then more than a single output shard might be generated
-   * multiple times; it's up to the sink implementation to keep these output shards unique.
+   * <p>If there is no data for a window, no output shards will be generated for that window.
+   * If a window triggers multiple times, then more than a single output shard might be
+   * generated multiple times; it's up to the sink implementation to keep these output shards
+   * unique.
    *
-   * <p>This option can only be used if {@link #withNumShards(int)} is also set to a positive value.
+   * <p>This option can only be used if {@link #withNumShards(int)} is also set to a
+   * positive value.
    */
-  public WriteFiles<UserT, DestinationT, OutputT> withWindowedWrites() {
-    return new WriteFiles<>(
-        sink, formatFunction, computeNumShards, numShardsProvider, true, maxNumWritersPerBundle);
+  public WriteFiles<T> withWindowedWrites() {
+    return new WriteFiles<>(sink, computeNumShards, numShardsProvider, true);
   }
 
-  private static class WriterKey<DestinationT> {
-    private final BoundedWindow window;
-    private final PaneInfo paneInfo;
-    private final DestinationT destination;
+  /**
+   * Writes all the elements in a bundle using a {@link Writer} produced by the
+   * {@link WriteOperation} associated with the {@link FileBasedSink} with windowed writes enabled.
+   */
+  private class WriteWindowedBundles extends DoFn<T, FileResult> {
+    private Map<KV<BoundedWindow, PaneInfo>, Writer<T>> windowedWriters;
 
-    WriterKey(BoundedWindow window, PaneInfo paneInfo, DestinationT destination) {
-      this.window = window;
-      this.paneInfo = paneInfo;
-      this.destination = destination;
+    @StartBundle
+    public void startBundle(StartBundleContext c) {
+      // Reset state in case of reuse. We need to make sure that each bundle gets unique writers.
+      windowedWriters = Maps.newHashMap();
     }
 
-    @Override
-    public boolean equals(Object o) {
-      if (!(o instanceof WriterKey)) {
-        return false;
+    @ProcessElement
+    public void processElement(ProcessContext c, BoundedWindow window) throws Exception {
+      PaneInfo paneInfo = c.pane();
+      Writer<T> writer;
+      // If we are doing windowed writes, we need to ensure that we have separate files for
+      // data in different windows/panes.
+      KV<BoundedWindow, PaneInfo> key = KV.of(window, paneInfo);
+      writer = windowedWriters.get(key);
+      if (writer == null) {
+        String uuid = UUID.randomUUID().toString();
+        LOG.info(
+            "Opening writer {} for write operation {}, window {} pane {}",
+            uuid,
+            writeOperation,
+            window,
+            paneInfo);
+        writer = writeOperation.createWriter();
+        writer.openWindowed(uuid, window, paneInfo, UNKNOWN_SHARDNUM);
+        windowedWriters.put(key, writer);
+        LOG.debug("Done opening writer");
       }
-      WriterKey other = (WriterKey) o;
-      return Objects.equal(window, other.window)
-          && Objects.equal(paneInfo, other.paneInfo)
-          && Objects.equal(destination, other.destination);
+
+      writeOrClose(writer, c.element());
     }
 
-    @Override
-    public int hashCode() {
-      return Objects.hashCode(window, paneInfo, destination);
+    @FinishBundle
+    public void finishBundle(FinishBundleContext c) throws Exception {
+      for (Map.Entry<KV<BoundedWindow, PaneInfo>, Writer<T>> entry : windowedWriters.entrySet()) {
+        FileResult result = entry.getValue().close();
+        BoundedWindow window = entry.getKey().getKey();
+        c.output(result, window.maxTimestamp(), window);
+      }
     }
-  }
 
-  // Hash the destination in a manner that we can then use as a key in a GBK. Since Java's
-  // hashCode isn't guaranteed to be stable across machines, we instead serialize the destination
-  // and use murmur3_32 to hash it. We enforce that destinationCoder must be deterministic, so
-  // this can be used as a key.
-  private static <DestinationT> int hashDestination(
-      DestinationT destination, Coder<DestinationT> destinationCoder) throws IOException {
-    return Hashing.murmur3_32()
-        .hashBytes(CoderUtils.encodeToByteArray(destinationCoder, destination))
-        .asInt();
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      builder.delegate(WriteFiles.this);
+    }
   }
 
   /**
-   * Writes all the elements in a bundle using a {@link Writer} produced by the {@link
-   * WriteOperation} associated with the {@link FileBasedSink}.
+   * Writes all the elements in a bundle using a {@link Writer} produced by the
+   * {@link WriteOperation} associated with the {@link FileBasedSink} with windowed writes disabled.
    */
-  private class WriteBundles extends DoFn<UserT, FileResult<DestinationT>> {
-    private final TupleTag<KV<ShardedKey<Integer>, UserT>> unwrittenRecordsTag;
-    private final Coder<DestinationT> destinationCoder;
-    private final boolean windowedWrites;
-
-    private Map<WriterKey<DestinationT>, Writer<OutputT, DestinationT>> writers;
-    private int spilledShardNum = UNKNOWN_SHARDNUM;
-
-    WriteBundles(
-        boolean windowedWrites,
-        TupleTag<KV<ShardedKey<Integer>, UserT>> unwrittenRecordsTag,
-        Coder<DestinationT> destinationCoder) {
-      this.windowedWrites = windowedWrites;
-      this.unwrittenRecordsTag = unwrittenRecordsTag;
-      this.destinationCoder = destinationCoder;
-    }
+  private class WriteUnwindowedBundles extends DoFn<T, FileResult> {
+    // Writer that will write the records in this bundle. Lazily
+    // initialized in processElement.
+    private Writer<T> writer = null;
+    private BoundedWindow window = null;
 
     @StartBundle
     public void startBundle(StartBundleContext c) {
       // Reset state in case of reuse. We need to make sure that each bundle gets unique writers.
-      writers = Maps.newHashMap();
+      writer = null;
     }
 
     @ProcessElement
     public void processElement(ProcessContext c, BoundedWindow window) throws Exception {
-      PaneInfo paneInfo = c.pane();
-      // If we are doing windowed writes, we need to ensure that we have separate files for
-      // data in different windows/panes. Similar for dynamic writes, make sure that different
-      // destinations go to different writers.
-      // In the case of unwindowed writes, the window and the pane will always be the same, and
-      // the map will only have a single element.
-      DestinationT destination = sink.getDynamicDestinations().getDestination(c.element());
-      WriterKey<DestinationT> key = new WriterKey<>(window, c.pane(), destination);
-      Writer<OutputT, DestinationT> writer = writers.get(key);
+      // Cache a single writer for the bundle.
       if (writer == null) {
-        if (writers.size() <= maxNumWritersPerBundle) {
-          String uuid = UUID.randomUUID().toString();
-          LOG.info(
-              "Opening writer {} for write operation {}, window {} pane {} destination {}",
-              uuid,
-              writeOperation,
-              window,
-              paneInfo,
-              destination);
-          writer = writeOperation.createWriter();
-          if (windowedWrites) {
-            writer.openWindowed(uuid, window, paneInfo, UNKNOWN_SHARDNUM, destination);
-          } else {
-            writer.openUnwindowed(uuid, UNKNOWN_SHARDNUM, destination);
-          }
-          writers.put(key, writer);
-          LOG.debug("Done opening writer");
-        } else {
-          if (spilledShardNum == UNKNOWN_SHARDNUM) {
-            // Cache the random value so we only call ThreadLocalRandom once per DoFn instance.
-            spilledShardNum = ThreadLocalRandom.current().nextInt(SPILLED_RECORD_SHARDING_FACTOR);
-          } else {
-            spilledShardNum = (spilledShardNum + 1) % SPILLED_RECORD_SHARDING_FACTOR;
-          }
-          c.output(
-              unwrittenRecordsTag,
-              KV.of(
-                  ShardedKey.of(hashDestination(destination, destinationCoder), spilledShardNum),
-                  c.element()));
-          return;
-        }
+        LOG.info("Opening writer for write operation {}", writeOperation);
+        writer = writeOperation.createWriter();
+        writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM);
+        LOG.debug("Done opening writer");
       }
-      writeOrClose(writer, formatFunction.apply(c.element()));
+      this.window = window;
+      writeOrClose(this.writer, c.element());
     }
 
     @FinishBundle
     public void finishBundle(FinishBundleContext c) throws Exception {
-      for (Map.Entry<WriterKey<DestinationT>, Writer<OutputT, DestinationT>> entry :
-          writers.entrySet()) {
-        Writer<OutputT, DestinationT> writer = entry.getValue();
-        FileResult<DestinationT> result;
-        try {
-          result = writer.close();
-        } catch (Exception e) {
-          // If anything goes wrong, make sure to delete the temporary file.
-          writer.cleanup();
-          throw e;
-        }
-        BoundedWindow window = entry.getKey().window;
-        c.output(result, window.maxTimestamp(), window);
+      if (writer == null) {
+        return;
       }
+      FileResult result = writer.close();
+      c.output(result, window.maxTimestamp(), window);
     }
 
     @Override
@@ -456,62 +352,38 @@ public class WriteFiles<UserT, DestinationT, OutputT>
     }
   }
 
-  enum ShardAssignment { ASSIGN_IN_FINALIZE, ASSIGN_WHEN_WRITING }
-
-  /*
-   * Like {@link WriteBundles}, but where the elements for each shard have been collected into a
-   * single iterable.
+  /**
+   * Like {@link WriteWindowedBundles} and {@link WriteUnwindowedBundles}, but where the elements
+   * for each shard have been collected into a single iterable.
    */
-  private class WriteShardedBundles
-      extends DoFn<KV<ShardedKey<Integer>, Iterable<UserT>>, FileResult<DestinationT>> {
-    ShardAssignment shardNumberAssignment;
-    WriteShardedBundles(ShardAssignment shardNumberAssignment) {
-      this.shardNumberAssignment = shardNumberAssignment;
-    }
-
+  private class WriteShardedBundles extends DoFn<KV<Integer, Iterable<T>>, FileResult> {
     @ProcessElement
     public void processElement(ProcessContext c, BoundedWindow window) throws Exception {
-      // Since we key by a 32-bit hash of the destination, there might be multiple destinations
-      // in this iterable. The number of destinations is generally very small (1000s or less), so
-      // there will rarely be hash collisions.
-      Map<DestinationT, Writer<OutputT, DestinationT>> writers = Maps.newHashMap();
-      for (UserT input : c.element().getValue()) {
-        DestinationT destination = sink.getDynamicDestinations().getDestination(input);
-        Writer<OutputT, DestinationT> writer = writers.get(destination);
-        if (writer == null) {
-          LOG.debug("Opening writer for write operation {}", writeOperation);
-          writer = writeOperation.createWriter();
-          if (windowedWrites) {
-            int shardNumber =
-                shardNumberAssignment == ShardAssignment.ASSIGN_WHEN_WRITING
-                    ? c.element().getKey().getShardNumber()
-                    : UNKNOWN_SHARDNUM;
-            writer.openWindowed(
-                UUID.randomUUID().toString(), window, c.pane(), shardNumber, destination);
-          } else {
-            writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM, destination);
-          }
-          LOG.debug("Done opening writer");
-          writers.put(destination, writer);
-        }
-        writeOrClose(writer, formatFunction.apply(input));
-        }
+      // In a sharded write, single input element represents one shard. We can open and close
+      // the writer in each call to processElement.
+      LOG.info("Opening writer for write operation {}", writeOperation);
+      Writer<T> writer = writeOperation.createWriter();
+      if (windowedWrites) {
+        writer.openWindowed(UUID.randomUUID().toString(), window, c.pane(), c.element().getKey());
+      } else {
+        writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM);
+      }
+      LOG.debug("Done opening writer");
 
-      // Close all writers.
-      for (Map.Entry<DestinationT, Writer<OutputT, DestinationT>> entry : writers.entrySet()) {
-        Writer<OutputT, DestinationT> writer = entry.getValue();
-        FileResult<DestinationT> result;
-        try {
-          // Close the writer; if this throws let the error propagate.
-          result = writer.close();
-          c.output(result);
-        } catch (Exception e) {
-          // If anything goes wrong, make sure to delete the temporary file.
-          writer.cleanup();
-          throw e;
+      try {
+        for (T t : c.element().getValue()) {
+          writeOrClose(writer, t);
         }
+
+        // Close the writer; if this throws let the error propagate.
+        FileResult result = writer.close();
+        c.output(result);
+      } catch (Exception e) {
+        // If anything goes wrong, make sure to delete the temporary file.
+        writer.cleanup();
+        throw e;
       }
-      }
+    }
 
     @Override
     public void populateDisplayData(DisplayData.Builder builder) {
@@ -519,15 +391,12 @@ public class WriteFiles<UserT, DestinationT, OutputT>
     }
   }
 
-  private static <OutputT, DestinationT> void writeOrClose(
-      Writer<OutputT, DestinationT> writer, OutputT t) throws Exception {
+  private static <T> void writeOrClose(Writer<T> writer, T t) throws Exception {
     try {
       writer.write(t);
     } catch (Exception e) {
       try {
         writer.close();
-        // If anything goes wrong, make sure to delete the temporary file.
-        writer.cleanup();
       } catch (Exception closeException) {
         if (closeException instanceof InterruptedException) {
           // Do not silently ignore interrupted state.
@@ -540,25 +409,20 @@ public class WriteFiles<UserT, DestinationT, OutputT>
     }
   }
 
-  private class ApplyShardingKey extends DoFn<UserT, KV<ShardedKey<Integer>, UserT>> {
+  private static class ApplyShardingKey<T> extends DoFn<T, KV<Integer, T>> {
     private final PCollectionView<Integer> numShardsView;
     private final ValueProvider<Integer> numShardsProvider;
-    private final Coder<DestinationT> destinationCoder;
-
     private int shardNumber;
 
-    ApplyShardingKey(
-        PCollectionView<Integer> numShardsView,
-        ValueProvider<Integer> numShardsProvider,
-        Coder<DestinationT> destinationCoder) {
-      this.destinationCoder = destinationCoder;
+    ApplyShardingKey(PCollectionView<Integer> numShardsView,
+                     ValueProvider<Integer> numShardsProvider) {
       this.numShardsView = numShardsView;
       this.numShardsProvider = numShardsProvider;
       shardNumber = UNKNOWN_SHARDNUM;
     }
 
     @ProcessElement
-    public void processElement(ProcessContext context) throws IOException {
+    public void processElement(ProcessContext context) {
       final int shardCount;
       if (numShardsView != null) {
         shardCount = context.sideInput(numShardsView);
@@ -578,110 +442,65 @@ public class WriteFiles<UserT, DestinationT, OutputT>
       } else {
         shardNumber = (shardNumber + 1) % shardCount;
       }
-      // We avoid using destination itself as a sharding key, because destination is often large.
-      // e.g. when using {@link DefaultFilenamePolicy}, the destination contains the entire path
-      // to the file. Often most of the path is constant across all destinations, just the path
-      // suffix is appended by the destination function. Instead we key by a 32-bit hash (carefully
-      // chosen to be guaranteed stable), and call getDestination again in the next ParDo to resolve
-      // the destinations. This does mean that multiple destinations might end up on the same shard,
-      // however the number of collisions should be small, so there's no need to worry about memory
-      // issues.
-      DestinationT destination = sink.getDynamicDestinations().getDestination(context.element());
-      context.output(
-          KV.of(
-              ShardedKey.of(hashDestination(destination, destinationCoder), shardNumber),
-              context.element()));
+      context.output(KV.of(shardNumber, context.element()));
     }
   }
 
   /**
    * A write is performed as sequence of three {@link ParDo}'s.
    *
-   * <p>This singleton collection containing the WriteOperation is then used as a side input to a
-   * ParDo over the PCollection of elements to write. In this bundle-writing phase, {@link
-   * WriteOperation#createWriter} is called to obtain a {@link Writer}. {@link Writer#open} and
-   * {@link Writer#close} are called in {@link DoFn.StartBundle} and {@link DoFn.FinishBundle},
-   * respectively, and {@link Writer#write} method is called for every element in the bundle. The
-   * output of this ParDo is a PCollection of <i>writer result</i> objects (see {@link
-   * FileBasedSink} for a description of writer results)-one for each bundle.
+   * <p>This singleton collection containing the WriteOperation is then used as a side
+   * input to a ParDo over the PCollection of elements to write. In this bundle-writing phase,
+   * {@link WriteOperation#createWriter} is called to obtain a {@link Writer}.
+   * {@link Writer#open} and {@link Writer#close} are called in
+   * {@link DoFn.StartBundle} and {@link DoFn.FinishBundle}, respectively, and
+   * {@link Writer#write} method is called for every element in the bundle. The output
+   * of this ParDo is a PCollection of <i>writer result</i> objects (see {@link FileBasedSink}
+   * for a description of writer results)-one for each bundle.
    *
    * <p>The final do-once ParDo uses a singleton collection asinput and the collection of writer
-   * results as a side-input. In this ParDo, {@link WriteOperation#finalize} is called to finalize
-   * the write.
+   * results as a side-input. In this ParDo, {@link WriteOperation#finalize} is called
+   * to finalize the write.
    *
-   * <p>If the write of any element in the PCollection fails, {@link Writer#close} will be called
-   * before the exception that caused the write to fail is propagated and the write result will be
-   * discarded.
+   * <p>If the write of any element in the PCollection fails, {@link Writer#close} will be
+   * called before the exception that caused the write to fail is propagated and the write result
+   * will be discarded.
    *
    * <p>Since the {@link WriteOperation} is serialized after the initialization ParDo and
    * deserialized in the bundle-writing and finalization phases, any state change to the
-   * WriteOperation object that occurs during initialization is visible in the latter phases.
-   * However, the WriteOperation is not serialized after the bundle-writing phase. This is why
-   * implementations should guarantee that {@link WriteOperation#createWriter} does not mutate
-   * WriteOperation).
+   * WriteOperation object that occurs during initialization is visible in the latter
+   * phases. However, the WriteOperation is not serialized after the bundle-writing
+   * phase. This is why implementations should guarantee that
+   * {@link WriteOperation#createWriter} does not mutate WriteOperation).
    */
-  private PDone createWrite(PCollection<UserT> input) {
+  private PDone createWrite(PCollection<T> input) {
     Pipeline p = input.getPipeline();
 
     if (!windowedWrites) {
       // Re-window the data into the global window and remove any existing triggers.
       input =
           input.apply(
-              Window.<UserT>into(new GlobalWindows())
+              Window.<T>into(new GlobalWindows())
                   .triggering(DefaultTrigger.of())
                   .discardingFiredPanes());
     }
 
+
     // Perform the per-bundle writes as a ParDo on the input PCollection (with the
     // WriteOperation as a side input) and collect the results of the writes in a
     // PCollection. There is a dependency between this ParDo and the first (the
     // WriteOperation PCollection as a side input), so this will happen after the
     // initial ParDo.
-    PCollection<FileResult<DestinationT>> results;
+    PCollection<FileResult> results;
     final PCollectionView<Integer> numShardsView;
-    @SuppressWarnings("unchecked")
     Coder<BoundedWindow> shardedWindowCoder =
         (Coder<BoundedWindow>) input.getWindowingStrategy().getWindowFn().windowCoder();
-    final Coder<DestinationT> destinationCoder;
-    try {
-      destinationCoder =
-          sink.getDynamicDestinations()
-              .getDestinationCoderWithDefault(input.getPipeline().getCoderRegistry());
-      destinationCoder.verifyDeterministic();
-    } catch (CannotProvideCoderException | NonDeterministicException e) {
-      throw new RuntimeException(e);
-    }
-
     if (computeNumShards == null && numShardsProvider == null) {
       numShardsView = null;
-      TupleTag<FileResult<DestinationT>> writtenRecordsTag = new TupleTag<>("writtenRecordsTag");
-      TupleTag<KV<ShardedKey<Integer>, UserT>> unwrittedRecordsTag =
-          new TupleTag<>("unwrittenRecordsTag");
-      String writeName = windowedWrites ? "WriteWindowedBundles" : "WriteBundles";
-      PCollectionTuple writeTuple =
-          input.apply(
-              writeName,
-              ParDo.of(new WriteBundles(windowedWrites, unwrittedRecordsTag, destinationCoder))
-                  .withOutputTags(writtenRecordsTag, TupleTagList.of(unwrittedRecordsTag)));
-      PCollection<FileResult<DestinationT>> writtenBundleFiles =
-          writeTuple
-              .get(writtenRecordsTag)
-              .setCoder(FileResultCoder.of(shardedWindowCoder, destinationCoder));
-      // Any "spilled" elements are written using WriteShardedBundles. Assign shard numbers in
-      // finalize to stay consistent with what WriteWindowedBundles does.
-      PCollection<FileResult<DestinationT>> writtenGroupedFiles =
-          writeTuple
-              .get(unwrittedRecordsTag)
-              .setCoder(KvCoder.of(ShardedKeyCoder.of(VarIntCoder.of()), input.getCoder()))
-              .apply("GroupUnwritten", GroupByKey.<ShardedKey<Integer>, UserT>create())
-              .apply(
-                  "WriteUnwritten",
-                  ParDo.of(new WriteShardedBundles(ShardAssignment.ASSIGN_IN_FINALIZE)))
-              .setCoder(FileResultCoder.of(shardedWindowCoder, destinationCoder));
       results =
-          PCollectionList.of(writtenBundleFiles)
-              .and(writtenGroupedFiles)
-              .apply(Flatten.<FileResult<DestinationT>>pCollections());
+          input.apply(
+              "WriteBundles",
+              ParDo.of(windowedWrites ? new WriteWindowedBundles() : new WriteUnwindowedBundles()));
     } else {
       List<PCollectionView<?>> sideInputs = Lists.newArrayList();
       if (computeNumShards != null) {
@@ -690,31 +509,20 @@ public class WriteFiles<UserT, DestinationT, OutputT>
       } else {
         numShardsView = null;
       }
-      PCollection<KV<ShardedKey<Integer>, Iterable<UserT>>> sharded =
+
+      PCollection<KV<Integer, Iterable<T>>> sharded =
           input
-              .apply(
-                  "ApplyShardLabel",
-                  ParDo.of(
-                          new ApplyShardingKey(
-                              numShardsView,
-                              (numShardsView != null) ? null : numShardsProvider,
-                              destinationCoder))
-                      .withSideInputs(sideInputs))
-              .setCoder(KvCoder.of(ShardedKeyCoder.of(VarIntCoder.of()), input.getCoder()))
-              .apply("GroupIntoShards", GroupByKey.<ShardedKey<Integer>, UserT>create());
+              .apply("ApplyShardLabel", ParDo.of(
+                  new ApplyShardingKey<T>(numShardsView,
+                      (numShardsView != null) ? null : numShardsProvider))
+                  .withSideInputs(sideInputs))
+              .apply("GroupIntoShards", GroupByKey.<Integer, T>create());
       shardedWindowCoder =
           (Coder<BoundedWindow>) sharded.getWindowingStrategy().getWindowFn().windowCoder();
-      // Since this path might be used by streaming runners processing triggers, it's important
-      // to assign shard numbers here so that they are deterministic. The ASSIGN_IN_FINALIZE
-      // strategy works by sorting all FileResult objects and assigning them numbers, which is not
-      // guaranteed to work well when processing triggers - if the finalize step retries it might
-      // see a different Iterable of FileResult objects, and it will assign different shard numbers.
-      results =
-          sharded.apply(
-              "WriteShardedBundles",
-              ParDo.of(new WriteShardedBundles(ShardAssignment.ASSIGN_WHEN_WRITING)));
+
+      results = sharded.apply("WriteShardedBundles", ParDo.of(new WriteShardedBundles()));
     }
-    results.setCoder(FileResultCoder.of(shardedWindowCoder, destinationCoder));
+    results.setCoder(FileResultCoder.of(shardedWindowCoder));
 
     if (windowedWrites) {
       // When processing streaming windowed writes, results will arrive multiple times. This
@@ -722,31 +530,26 @@ public class WriteFiles<UserT, DestinationT, OutputT>
       // as new data arriving into a side input does not trigger the listening DoFn. Instead
       // we aggregate the result set using a singleton GroupByKey, so the DoFn will be triggered
       // whenever new data arrives.
-      PCollection<KV<Void, FileResult<DestinationT>>> keyedResults =
-          results.apply(
-              "AttachSingletonKey", WithKeys.<Void, FileResult<DestinationT>>of((Void) null));
-      keyedResults.setCoder(
-          KvCoder.of(VoidCoder.of(), FileResultCoder.of(shardedWindowCoder, destinationCoder)));
+      PCollection<KV<Void, FileResult>> keyedResults =
+          results.apply("AttachSingletonKey", WithKeys.<Void, FileResult>of((Void) null));
+      keyedResults.setCoder(KvCoder.of(VoidCoder.of(),
+          FileResultCoder.of(shardedWindowCoder)));
 
       // Is the continuation trigger sufficient?
       keyedResults
-          .apply("FinalizeGroupByKey", GroupByKey.<Void, FileResult<DestinationT>>create())
-          .apply(
-              "Finalize",
-              ParDo.of(
-                  new DoFn<KV<Void, Iterable<FileResult<DestinationT>>>, Integer>() {
-                    @ProcessElement
-                    public void processElement(ProcessContext c) throws Exception {
-                      LOG.info("Finalizing write operation {}.", writeOperation);
-                      List<FileResult<DestinationT>> results =
-                          Lists.newArrayList(c.element().getValue());
-                      writeOperation.finalize(results);
-                      LOG.debug("Done finalizing write operation");
-                    }
-                  }));
+          .apply("FinalizeGroupByKey", GroupByKey.<Void, FileResult>create())
+          .apply("Finalize", ParDo.of(new DoFn<KV<Void, Iterable<FileResult>>, Integer>() {
+            @ProcessElement
+            public void processElement(ProcessContext c) throws Exception {
+              LOG.info("Finalizing write operation {}.", writeOperation);
+              List<FileResult> results = Lists.newArrayList(c.element().getValue());
+              writeOperation.finalize(results);
+              LOG.debug("Done finalizing write operation");
+            }
+          }));
     } else {
-      final PCollectionView<Iterable<FileResult<DestinationT>>> resultsView =
-          results.apply(View.<FileResult<DestinationT>>asIterable());
+      final PCollectionView<Iterable<FileResult>> resultsView =
+          results.apply(View.<FileResult>asIterable());
       ImmutableList.Builder<PCollectionView<?>> sideInputs =
           ImmutableList.<PCollectionView<?>>builder().add(resultsView);
       if (numShardsView != null) {
@@ -762,53 +565,41 @@ public class WriteFiles<UserT, DestinationT, OutputT>
       // set numShards, then all shards will be written out as empty files. For this reason we
       // use a side input here.
       PCollection<Void> singletonCollection = p.apply(Create.of((Void) null));
-      singletonCollection.apply(
-          "Finalize",
-          ParDo.of(
-                  new DoFn<Void, Integer>() {
-                    @ProcessElement
-                    public void processElement(ProcessContext c) throws Exception {
-                      LOG.info("Finalizing write operation {}.", writeOperation);
-                      List<FileResult<DestinationT>> results =
-                          Lists.newArrayList(c.sideInput(resultsView));
-                      LOG.debug(
-                          "Side input initialized to finalize write operation {}.", writeOperation);
-
-                      // We must always output at least 1 shard, and honor user-specified numShards
-                      // if
-                      // set.
-                      int minShardsNeeded;
-                      if (numShardsView != null) {
-                        minShardsNeeded = c.sideInput(numShardsView);
-                      } else if (numShardsProvider != null) {
-                        minShardsNeeded = numShardsProvider.get();
-                      } else {
-                        minShardsNeeded = 1;
-                      }
-                      int extraShardsNeeded = minShardsNeeded - results.size();
-                      if (extraShardsNeeded > 0) {
-                        LOG.info(
-                            "Creating {} empty output shards in addition to {} written "
-                                + "for a total of {}.",
-                            extraShardsNeeded,
-                            results.size(),
-                            minShardsNeeded);
-                        for (int i = 0; i < extraShardsNeeded; ++i) {
-                          Writer<OutputT, DestinationT> writer = writeOperation.createWriter();
-                          writer.openUnwindowed(
-                              UUID.randomUUID().toString(),
-                              UNKNOWN_SHARDNUM,
-                              sink.getDynamicDestinations().getDefaultDestination());
-                          FileResult<DestinationT> emptyWrite = writer.close();
-                          results.add(emptyWrite);
-                        }
-                        LOG.debug("Done creating extra shards.");
-                      }
-                      writeOperation.finalize(results);
-                      LOG.debug("Done finalizing write operation {}", writeOperation);
-                    }
-                  })
-              .withSideInputs(sideInputs.build()));
+      singletonCollection
+          .apply("Finalize", ParDo.of(new DoFn<Void, Integer>() {
+            @ProcessElement
+            public void processElement(ProcessContext c) throws Exception {
+              LOG.info("Finalizing write operation {}.", writeOperation);
+              List<FileResult> results = Lists.newArrayList(c.sideInput(resultsView));
+              LOG.debug("Side input initialized to finalize write operation {}.", writeOperation);
+
+              // We must always output at least 1 shard, and honor user-specified numShards if
+              // set.
+              int minShardsNeeded;
+              if (numShardsView != null) {
+                minShardsNeeded = c.sideInput(numShardsView);
+              } else if (numShardsProvider != null) {
+                minShardsNeeded = numShardsProvider.get();
+              } else {
+                minShardsNeeded = 1;
+              }
+              int extraShardsNeeded = minShardsNeeded - results.size();
+              if (extraShardsNeeded > 0) {
+                LOG.info(
+                    "Creating {} empty output shards in addition to {} written for a total of {}.",
+                    extraShardsNeeded, results.size(), minShardsNeeded);
+                for (int i = 0; i < extraShardsNeeded; ++i) {
+                  Writer<T> writer = writeOperation.createWriter();
+                  writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM);
+                  FileResult emptyWrite = writer.close();
+                  results.add(emptyWrite);
+                }
+                LOG.debug("Done creating extra shards.");
+              }
+              writeOperation.finalize(results);
+              LOG.debug("Done finalizing write operation {}", writeOperation);
+            }
+          }).withSideInputs(sideInputs.build()));
     }
     return PDone.in(input.getPipeline());
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/ByteKeyRangeTracker.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/ByteKeyRangeTracker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/ByteKeyRangeTracker.java
index b889ec7..99717a4 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/ByteKeyRangeTracker.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/ByteKeyRangeTracker.java
@@ -71,10 +71,6 @@ public final class ByteKeyRangeTracker implements RangeTracker<ByteKey> {
         "Trying to return record which is before the last-returned record");
 
     if (position == null) {
-      LOG.info(
-          "Adjusting range start from {} to {} as position of first returned record",
-          range.getStartKey(),
-          recordStart);
       range = range.withStartKey(recordStart);
     }
     position = recordStart;
@@ -91,15 +87,6 @@ public final class ByteKeyRangeTracker implements RangeTracker<ByteKey> {
 
   @Override
   public synchronized boolean trySplitAtPosition(ByteKey splitPosition) {
-    // Sanity check.
-    if (!range.containsKey(splitPosition)) {
-      LOG.warn(
-          "{}: Rejecting split request at {} because it is not within the range.",
-          this,
-          splitPosition);
-      return false;
-    }
-
     // Unstarted.
     if (position == null) {
       LOG.warn(
@@ -119,6 +106,15 @@ public final class ByteKeyRangeTracker implements RangeTracker<ByteKey> {
       return false;
     }
 
+    // Sanity check.
+    if (!range.containsKey(splitPosition)) {
+      LOG.warn(
+          "{}: Rejecting split request at {} because it is not within the range.",
+          this,
+          splitPosition);
+      return false;
+    }
+
     range = range.withEndKey(splitPosition);
     return true;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/OffsetRange.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/OffsetRange.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/OffsetRange.java
deleted file mode 100644
index d3bff37..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/OffsetRange.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.range;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.beam.sdk.transforms.splittabledofn.HasDefaultTracker;
-
-/** A restriction represented by a range of integers [from, to). */
-public class OffsetRange
-    implements Serializable,
-    HasDefaultTracker<
-                OffsetRange, org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker> {
-  private final long from;
-  private final long to;
-
-  public OffsetRange(long from, long to) {
-    checkArgument(from <= to, "Malformed range [%s, %s)", from, to);
-    this.from = from;
-    this.to = to;
-  }
-
-  public long getFrom() {
-    return from;
-  }
-
-  public long getTo() {
-    return to;
-  }
-
-  @Override
-  public org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker newTracker() {
-    return new org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker(this);
-  }
-
-  @Override
-  public String toString() {
-    return "[" + from + ", " + to + ')';
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) {
-      return true;
-    }
-    if (o == null || getClass() != o.getClass()) {
-      return false;
-    }
-
-    OffsetRange that = (OffsetRange) o;
-
-    if (from != that.from) {
-      return false;
-    }
-    return to == that.to;
-  }
-
-  @Override
-  public int hashCode() {
-    int result = (int) (from ^ (from >>> 32));
-    result = 31 * result + (int) (to ^ (to >>> 32));
-    return result;
-  }
-
-  public List<OffsetRange> split(long desiredNumOffsetsPerSplit, long minNumOffsetPerSplit) {
-    List<OffsetRange> res = new ArrayList<>();
-    long start = getFrom();
-    long maxEnd = getTo();
-
-    while (start < maxEnd) {
-      long end = start + desiredNumOffsetsPerSplit;
-      end = Math.min(end, maxEnd);
-      // Avoid having a too small range at the end and ensure that we respect minNumOffsetPerSplit.
-      long remaining = maxEnd - end;
-      if ((remaining < desiredNumOffsetsPerSplit / 4) || (remaining < minNumOffsetPerSplit)) {
-        end = maxEnd;
-      }
-      res.add(new OffsetRange(start, end));
-      start = end;
-    }
-    return res;
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/OffsetRangeTracker.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/OffsetRangeTracker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/OffsetRangeTracker.java
index 8f0083e..51e2b1a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/OffsetRangeTracker.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/OffsetRangeTracker.java
@@ -26,9 +26,6 @@ import org.slf4j.LoggerFactory;
 
 /**
  * A {@link RangeTracker} for non-negative positions of type {@code long}.
- *
- * <p>Not to be confused with {@link
- * org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker}.
  */
 public class OffsetRangeTracker implements RangeTracker<Long> {
   private static final Logger LOG = LoggerFactory.getLogger(OffsetRangeTracker.class);

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java
index d7e6cc8..c0990cb 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsFactory.java
@@ -184,20 +184,18 @@ public class PipelineOptionsFactory {
     private final String[] args;
     private final boolean validation;
     private final boolean strictParsing;
-    private final boolean isCli;
 
     // Do not allow direct instantiation
     private Builder() {
-      this(null, false, true, false);
+      this(null, false, true);
     }
 
     private Builder(String[] args, boolean validation,
-        boolean strictParsing, boolean isCli) {
+        boolean strictParsing) {
       this.defaultAppName = findCallersClassName();
       this.args = args;
       this.validation = validation;
       this.strictParsing = strictParsing;
-      this.isCli = isCli;
     }
 
     /**
@@ -239,7 +237,7 @@ public class PipelineOptionsFactory {
      */
     public Builder fromArgs(String... args) {
       checkNotNull(args, "Arguments should not be null.");
-      return new Builder(args, validation, strictParsing, true);
+      return new Builder(args, validation, strictParsing);
     }
 
     /**
@@ -249,7 +247,7 @@ public class PipelineOptionsFactory {
      * validation.
      */
     public Builder withValidation() {
-      return new Builder(args, true, strictParsing, isCli);
+      return new Builder(args, true, strictParsing);
     }
 
     /**
@@ -257,7 +255,7 @@ public class PipelineOptionsFactory {
      * arguments.
      */
     public Builder withoutStrictParsing() {
-      return new Builder(args, validation, false, isCli);
+      return new Builder(args, validation, false);
     }
 
     /**
@@ -302,11 +300,7 @@ public class PipelineOptionsFactory {
       }
 
       if (validation) {
-        if (isCli) {
-          PipelineOptionsValidator.validateCli(klass, t);
-        } else {
-          PipelineOptionsValidator.validate(klass, t);
-        }
+        PipelineOptionsValidator.validate(klass, t);
       }
       return t;
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsValidator.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsValidator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsValidator.java
index fcffd74..bd54ec3 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsValidator.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptionsValidator.java
@@ -43,29 +43,9 @@ public class PipelineOptionsValidator {
    *
    * @param klass The interface to fetch validation criteria from.
    * @param options The {@link PipelineOptions} to validate.
-   * @return Validated options.
+   * @return The type
    */
   public static <T extends PipelineOptions> T validate(Class<T> klass, PipelineOptions options) {
-    return validate(klass, options, false);
-  }
-
-  /**
-   * Validates that the passed {@link PipelineOptions} from command line interface (CLI)
-   * conforms to all the validation criteria from the passed in interface.
-   *
-   * <p>Note that the interface requested must conform to the validation criteria specified on
-   * {@link PipelineOptions#as(Class)}.
-   *
-   * @param klass The interface to fetch validation criteria from.
-   * @param options The {@link PipelineOptions} to validate.
-   * @return Validated options.
-   */
-  public static <T extends PipelineOptions> T validateCli(Class<T> klass, PipelineOptions options) {
-    return validate(klass, options, true);
-  }
-
-  private static <T extends PipelineOptions> T validate(Class<T> klass, PipelineOptions options,
-      boolean isCli) {
     checkNotNull(klass);
     checkNotNull(options);
     checkArgument(Proxy.isProxyClass(options.getClass()));
@@ -87,15 +67,9 @@ public class PipelineOptionsValidator {
             requiredGroups.put(requiredGroup, method);
           }
         } else {
-          if (isCli) {
-            checkArgument(handler.invoke(asClassOptions, method, null) != null,
-                "Missing required value for [--%s, \"%s\"]. ",
-                handler.getOptionName(method), getDescription(method));
-          } else {
-            checkArgument(handler.invoke(asClassOptions, method, null) != null,
-                "Missing required value for [%s, \"%s\"]. ",
-                method, getDescription(method));
-          }
+          checkArgument(handler.invoke(asClassOptions, method, null) != null,
+              "Missing required value for [%s, \"%s\"]. ",
+              method, getDescription(method));
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java
index 926a7b9..eda21a8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ProxyInvocationHandler.java
@@ -45,8 +45,6 @@ import com.google.common.collect.Multimap;
 import com.google.common.collect.MutableClassToInstanceMap;
 import java.beans.PropertyDescriptor;
 import java.io.IOException;
-import java.io.NotSerializableException;
-import java.io.Serializable;
 import java.lang.annotation.Annotation;
 import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.Method;
@@ -89,7 +87,7 @@ import org.apache.beam.sdk.util.common.ReflectHelpers;
  * {@link PipelineOptions#as(Class)}.
  */
 @ThreadSafe
-class ProxyInvocationHandler implements InvocationHandler, Serializable {
+class ProxyInvocationHandler implements InvocationHandler {
   /**
    * No two instances of this class are considered equivalent hence we generate a random hash code.
    */
@@ -166,21 +164,6 @@ class ProxyInvocationHandler implements InvocationHandler, Serializable {
         + Arrays.toString(args) + "].");
   }
 
-  public String getOptionName(Method method) {
-    return gettersToPropertyNames.get(method.getName());
-  }
-
-  private void writeObject(java.io.ObjectOutputStream stream)
-      throws IOException {
-    throw new NotSerializableException(
-        "PipelineOptions objects are not serializable and should not be embedded into transforms "
-            + "(did you capture a PipelineOptions object in a field or in an anonymous class?). "
-            + "Instead, if you're using a DoFn, access PipelineOptions at runtime "
-            + "via ProcessContext/StartBundleContext/FinishBundleContext.getPipelineOptions(), "
-            + "or pre-extract necessary fields from PipelineOptions "
-            + "at pipeline construction time.");
-  }
-
   /**
    * Track whether options values are explicitly set, or retrieved from defaults.
    */

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
index d8ff59e..2f0e8ef 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
@@ -24,23 +24,21 @@ import static com.google.common.base.Preconditions.checkState;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.MoreObjects;
 import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Iterables;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+import javax.annotation.Nullable;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.Pipeline.PipelineVisitor;
 import org.apache.beam.sdk.Pipeline.PipelineVisitor.CompositeBehavior;
 import org.apache.beam.sdk.annotations.Internal;
 import org.apache.beam.sdk.runners.PTransformOverrideFactory.ReplacementOutput;
 import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.PValue;
@@ -70,7 +68,7 @@ public class TransformHierarchy {
     producers = new HashMap<>();
     producerInput = new HashMap<>();
     unexpandedInputs = new HashMap<>();
-    root = new Node();
+    root = new Node(null, null, "", null);
     current = root;
   }
 
@@ -145,6 +143,14 @@ public class TransformHierarchy {
       Node producerNode = getProducer(inputValue);
       PInput input = producerInput.remove(inputValue);
       inputValue.finishSpecifying(input, producerNode.getTransform());
+      checkState(
+          producers.get(inputValue) != null,
+          "Producer unknown for input %s",
+          inputValue);
+      checkState(
+          producers.get(inputValue) != null,
+          "Producer unknown for input %s",
+          inputValue);
     }
   }
 
@@ -159,7 +165,7 @@ public class TransformHierarchy {
    * nodes.
    */
   public void setOutput(POutput output) {
-    for (PCollection<?> value : fullyExpand(output).values()) {
+    for (PValue value : output.expand().values()) {
       if (!producers.containsKey(value)) {
         producers.put(value, current);
         value.finishSpecifyingOutput(
@@ -193,13 +199,13 @@ public class TransformHierarchy {
   }
 
   Node getProducer(PValue produced) {
-    return checkNotNull(producers.get(produced), "No producer found for %s", produced);
+    return producers.get(produced);
   }
 
   public Set<PValue> visit(PipelineVisitor visitor) {
     finishSpecifying();
     Set<PValue> visitedValues = new HashSet<>();
-    root.visit(visitor, visitedValues, new HashSet<Node>(), new HashSet<Node>());
+    root.visit(visitor, visitedValues);
     return visitedValues;
   }
 
@@ -220,47 +226,6 @@ public class TransformHierarchy {
     return current;
   }
 
-  private Map<TupleTag<?>, PCollection<?>> fullyExpand(POutput output) {
-    Map<TupleTag<?>, PCollection<?>> result = new LinkedHashMap<>();
-    for (Map.Entry<TupleTag<?>, PValue> value : output.expand().entrySet()) {
-      if (value.getValue() instanceof PCollection) {
-        PCollection<?> previous = result.put(value.getKey(), (PCollection<?>) value.getValue());
-        checkArgument(
-            previous == null,
-            "Found conflicting %ss in flattened expansion of %s: %s maps to %s and %s",
-            output,
-            TupleTag.class.getSimpleName(),
-            value.getKey(),
-            previous,
-            value.getValue());
-      } else {
-        if (value.getValue().expand().size() == 1
-            && Iterables.getOnlyElement(value.getValue().expand().values())
-                .equals(value.getValue())) {
-          throw new IllegalStateException(
-              String.format(
-                  "Non %s %s that expands into itself %s",
-                  PCollection.class.getSimpleName(),
-                  PValue.class.getSimpleName(),
-                  value.getValue()));
-        }
-        for (Map.Entry<TupleTag<?>, PCollection<?>> valueComponent :
-            fullyExpand(value.getValue()).entrySet()) {
-          PCollection<?> previous = result.put(valueComponent.getKey(), valueComponent.getValue());
-          checkArgument(
-              previous == null,
-              "Found conflicting %ss in flattened expansion of %s: %s maps to %s and %s",
-              output,
-              TupleTag.class.getSimpleName(),
-              valueComponent.getKey(),
-              previous,
-              valueComponent.getValue());
-        }
-      }
-    }
-    return result;
-  }
-
   /**
    * Provides internal tracking of transform relationships with helper methods
    * for initialization and ordered visitation.
@@ -288,36 +253,25 @@ public class TransformHierarchy {
     boolean finishedSpecifying = false;
 
     /**
-     * Creates the root-level node. The root level node has a null enclosing node, a null transform,
-     * an empty map of inputs, and a name equal to the empty string.
-     */
-    private Node() {
-      this.enclosingNode = null;
-      this.transform = null;
-      this.fullName = "";
-      this.inputs = Collections.emptyMap();
-    }
-
-    /**
      * Creates a new Node with the given parent and transform.
      *
+     * <p>EnclosingNode and transform may both be null for a root-level node, which holds all other
+     * nodes.
+     *
      * @param enclosingNode the composite node containing this node
      * @param transform the PTransform tracked by this node
      * @param fullName the fully qualified name of the transform
      * @param input the unexpanded input to the transform
      */
     private Node(
-        Node enclosingNode,
-        PTransform<?, ?> transform,
+        @Nullable Node enclosingNode,
+        @Nullable PTransform<?, ?> transform,
         String fullName,
-        PInput input) {
+        @Nullable PInput input) {
       this.enclosingNode = enclosingNode;
       this.transform = transform;
       this.fullName = fullName;
-      ImmutableMap.Builder<TupleTag<?>, PValue> inputs = ImmutableMap.builder();
-      inputs.putAll(input.expand());
-      inputs.putAll(transform.getAdditionalInputs());
-      this.inputs = inputs.build();
+      this.inputs = input == null ? Collections.<TupleTag<?>, PValue>emptyMap() : input.expand();
     }
 
     /**
@@ -398,7 +352,7 @@ public class TransformHierarchy {
       return fullName;
     }
 
-    /** Returns the transform input, in fully expanded form. */
+    /** Returns the transform input, in unexpanded form. */
     public Map<TupleTag<?>, PValue> getInputs() {
       return inputs == null ? Collections.<TupleTag<?>, PValue>emptyMap() : inputs;
     }
@@ -505,60 +459,10 @@ public class TransformHierarchy {
     /**
      * Visit the transform node.
      *
-     * <p>The visit proceeds in the following order:
-     *
-     * <ul>
-     *   <li>Visit all input {@link PValue PValues} returned by the flattened expansion of {@link
-     *       Node#getInputs()}.
-     *   <li>If the node is a composite:
-     *       <ul>
-     *         <li>Enter the node via {@link PipelineVisitor#enterCompositeTransform(Node)}.
-     *         <li>If the result of {@link PipelineVisitor#enterCompositeTransform(Node)} was {@link
-     *             CompositeBehavior#ENTER_TRANSFORM}, visit each child node of this {@link Node}.
-     *         <li>Leave the node via {@link PipelineVisitor#leaveCompositeTransform(Node)}.
-     *       </ul>
-     *   <li>If the node is a primitive, visit it via {@link
-     *       PipelineVisitor#visitPrimitiveTransform(Node)}.
-     *   <li>Visit each {@link PValue} that was output by this node.
-     * </ul>
-     *
-     * <p>Additionally, the following ordering restrictions are observed:
-     *
-     * <ul>
-     *   <li>A {@link Node} will be visited after its enclosing node has been entered and before its
-     *       enclosing node has been left
-     *   <li>A {@link Node} will not be visited if any enclosing {@link Node} has returned {@link
-     *       CompositeBehavior#DO_NOT_ENTER_TRANSFORM} from the call to {@link
-     *       PipelineVisitor#enterCompositeTransform(Node)}.
-     *   <li>A {@link PValue} will only be visited after the {@link Node} that originally produced
-     *       it has been visited.
-     * </ul>
-     *
      * <p>Provides an ordered visit of the input values, the primitive transform (or child nodes for
      * composite transforms), then the output values.
      */
-    private void visit(
-        PipelineVisitor visitor,
-        Set<PValue> visitedValues,
-        Set<Node> visitedNodes,
-        Set<Node> skippedComposites) {
-      if (getEnclosingNode() != null && !visitedNodes.contains(getEnclosingNode())) {
-        // Recursively enter all enclosing nodes, as appropriate.
-        getEnclosingNode().visit(visitor, visitedValues, visitedNodes, skippedComposites);
-      }
-      // These checks occur after visiting the enclosing node to ensure that if this node has been
-      // visited while visiting the enclosing node the node is not revisited, or, if an enclosing
-      // Node is skipped, this node is also skipped.
-      if (!visitedNodes.add(this)) {
-        LOG.debug("Not revisiting previously visited node {}", this);
-        return;
-      } else if (childNodeOf(skippedComposites)) {
-        // This node is a child of a node that has been passed over via CompositeBehavior, and
-        // should also be skipped. All child nodes of a skipped composite should always be skipped.
-        LOG.debug("Not revisiting Node {} which is a child of a previously passed composite", this);
-        return;
-      }
-
+    private void visit(PipelineVisitor visitor, Set<PValue> visitedValues) {
       if (!finishedSpecifying) {
         finishSpecifying();
       }
@@ -566,31 +470,22 @@ public class TransformHierarchy {
       if (!isRootNode()) {
         // Visit inputs.
         for (PValue inputValue : inputs.values()) {
-          Node valueProducer = getProducer(inputValue);
-          if (!visitedNodes.contains(valueProducer)) {
-            valueProducer.visit(visitor, visitedValues, visitedNodes, skippedComposites);
-          }
           if (visitedValues.add(inputValue)) {
-            LOG.debug("Visiting input value {}", inputValue);
-            visitor.visitValue(inputValue, valueProducer);
+            visitor.visitValue(inputValue, getProducer(inputValue));
           }
         }
       }
 
       if (isCompositeNode()) {
-        LOG.debug("Visiting composite node {}", this);
         PipelineVisitor.CompositeBehavior recurse = visitor.enterCompositeTransform(this);
 
         if (recurse.equals(CompositeBehavior.ENTER_TRANSFORM)) {
           for (Node child : parts) {
-            child.visit(visitor, visitedValues, visitedNodes, skippedComposites);
+            child.visit(visitor, visitedValues);
           }
-        } else {
-          skippedComposites.add(this);
         }
         visitor.leaveCompositeTransform(this);
       } else {
-        LOG.debug("Visiting primitive node {}", this);
         visitor.visitPrimitiveTransform(this);
       }
 
@@ -599,24 +494,12 @@ public class TransformHierarchy {
         // Visit outputs.
         for (PValue pValue : outputs.values()) {
           if (visitedValues.add(pValue)) {
-            LOG.debug("Visiting output value {}", pValue);
             visitor.visitValue(pValue, this);
           }
         }
       }
     }
 
-    private boolean childNodeOf(Set<Node> nodes) {
-      if (isRootNode()) {
-        return false;
-      }
-      Node parent = this.getEnclosingNode();
-      while (!parent.isRootNode() && !nodes.contains(parent)) {
-        parent = parent.getEnclosingNode();
-      }
-      return nodes.contains(parent);
-    }
-
     /**
      * Finish specifying a transform.
      *

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/StaticWindows.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/StaticWindows.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/StaticWindows.java
index eba6978..c11057a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/StaticWindows.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/StaticWindows.java
@@ -126,9 +126,4 @@ final class StaticWindows extends NonMergingWindowFn<Object, BoundedWindow> {
       }
     };
   }
-
-  @Override
-  public boolean assignsToOneWindow() {
-    return true;
-  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java
index d13fcf1..9ad8fd8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java
@@ -271,18 +271,6 @@ public final class TestStream<T> extends PTransform<PBegin, PCollection<T>> {
     return events;
   }
 
-  /**
-   * <b>For internal use only. No backwards-compatibility guarantees.</b>
-   *
-   * <p>Builder a test stream directly from events. No validation is performed on
-   * watermark monotonicity, etc. This is assumed to be a previously-serialized
-   * {@link TestStream} transform that is correct by construction.
-   */
-  @Internal
-  public static <T> TestStream<T> fromRawEvents(Coder<T> coder, List<Event<T>> events) {
-    return new TestStream<>(coder, events);
-  }
-
   @Override
   public boolean equals(Object other) {
     if (!(other instanceof TestStream)) {

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
index d7effb5..9e1cc71 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.transforms;
 import static com.google.common.base.Preconditions.checkState;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
 import java.io.IOException;
 import java.io.InputStream;
@@ -1121,7 +1122,11 @@ public class Combine {
      */
     @Override
     public Map<TupleTag<?>, PValue> getAdditionalInputs() {
-      return PCollectionViews.toAdditionalInputs(sideInputs);
+      ImmutableMap.Builder<TupleTag<?>, PValue> additionalInputs = ImmutableMap.builder();
+      for (PCollectionView<?> sideInput : sideInputs) {
+        additionalInputs.put(sideInput.getTagInternal(), sideInput.getPCollection());
+      }
+      return additionalInputs.build();
     }
 
     /**
@@ -1272,15 +1277,14 @@ public class Combine {
     public PCollectionView<OutputT> expand(PCollection<InputT> input) {
       PCollection<OutputT> combined =
           input.apply(Combine.<InputT, OutputT>globally(fn).withoutDefaults().withFanout(fanout));
-      PCollectionView<OutputT> view =
-          PCollectionViews.singletonView(
-              combined,
-              input.getWindowingStrategy(),
-              insertDefault,
-              insertDefault ? fn.defaultValue() : null,
-              combined.getCoder());
-      combined.apply(CreatePCollectionView.<OutputT, OutputT>of(view));
-      return view;
+      return combined.apply(
+          CreatePCollectionView.<OutputT, OutputT>of(
+              PCollectionViews.singletonView(
+                  combined,
+                  input.getWindowingStrategy(),
+                  insertDefault,
+                  insertDefault ? fn.defaultValue() : null,
+                  combined.getCoder())));
     }
 
     public int getFanout() {
@@ -1573,7 +1577,11 @@ public class Combine {
      */
     @Override
     public Map<TupleTag<?>, PValue> getAdditionalInputs() {
-      return PCollectionViews.toAdditionalInputs(sideInputs);
+      ImmutableMap.Builder<TupleTag<?>, PValue> additionalInputs = ImmutableMap.builder();
+      for (PCollectionView<?> sideInput : sideInputs) {
+        additionalInputs.put(sideInput.getTagInternal(), sideInput.getPCollection());
+      }
+      return additionalInputs.build();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
index 1b809c2..e711ac2 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
@@ -17,7 +17,6 @@
  */
 package org.apache.beam.sdk.transforms;
 
-import com.google.auto.value.AutoValue;
 import java.io.Serializable;
 import java.lang.annotation.Documented;
 import java.lang.annotation.ElementType;
@@ -386,7 +385,7 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
    * <pre><code>{@literal new DoFn<KV<Key, Foo>, Baz>()} {
    *
    *  {@literal @StateId("my-state-id")}
-   *  {@literal private final StateSpec<ValueState<MyState>>} myStateSpec =
+   *  {@literal private final StateSpec<K, ValueState<MyState>>} myStateSpec =
    *       StateSpecs.value(new MyStateCoder());
    *
    *  {@literal @ProcessElement}
@@ -546,15 +545,11 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
    *     returned by {@link GetInitialRestriction} implements {@link HasDefaultTracker}.
    * <li>It <i>may</i> define a {@link GetRestrictionCoder} method.
    * <li>The type of restrictions used by all of these methods must be the same.
-   * <li>Its {@link ProcessElement} method <i>may</i> return a {@link ProcessContinuation} to
-   *     indicate whether there is more work to be done for the current element.
    * <li>Its {@link ProcessElement} method <i>must not</i> use any extra context parameters, such as
    *     {@link BoundedWindow}.
    * <li>The {@link DoFn} itself <i>may</i> be annotated with {@link BoundedPerElement} or
    *     {@link UnboundedPerElement}, but not both at the same time. If it's not annotated with
-   *     either of these, it's assumed to be {@link BoundedPerElement} if its {@link
-   *     ProcessElement} method returns {@code void} and {@link UnboundedPerElement} if it
-   *     returns a {@link ProcessContinuation}.
+   *     either of these, it's assumed to be {@link BoundedPerElement}.
    * </ul>
    *
    * <p>A non-splittable {@link DoFn} <i>must not</i> define any of these methods.
@@ -682,49 +677,6 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
   @Experimental(Kind.SPLITTABLE_DO_FN)
   public @interface UnboundedPerElement {}
 
-  // This can't be put into ProcessContinuation itself due to the following problem:
-  // http://ternarysearch.blogspot.com/2013/07/static-initialization-deadlock.html
-  private static final ProcessContinuation PROCESS_CONTINUATION_STOP =
-      new AutoValue_DoFn_ProcessContinuation(false, Duration.ZERO);
-
-  /**
-   * When used as a return value of {@link ProcessElement}, indicates whether there is more work to
-   * be done for the current element.
-   *
-   * <p>If the {@link ProcessElement} call completes because of a failed {@code tryClaim()} call
-   * on the {@link RestrictionTracker}, then the call MUST return {@link #stop()}.
-   */
-  @Experimental(Kind.SPLITTABLE_DO_FN)
-  @AutoValue
-  public abstract static class ProcessContinuation {
-    /** Indicates that there is no more work to be done for the current element. */
-    public static ProcessContinuation stop() {
-      return PROCESS_CONTINUATION_STOP;
-    }
-
-    /** Indicates that there is more work to be done for the current element. */
-    public static ProcessContinuation resume() {
-      return new AutoValue_DoFn_ProcessContinuation(true, Duration.ZERO);
-    }
-
-    /**
-     * If false, the {@link DoFn} promises that there is no more work remaining for the current
-     * element, so the runner should not resume the {@link ProcessElement} call.
-     */
-    public abstract boolean shouldResume();
-
-    /**
-     * A minimum duration that should elapse between the end of this {@link ProcessElement} call and
-     * the {@link ProcessElement} call continuing processing of the same element. By default, zero.
-     */
-    public abstract Duration resumeDelay();
-
-    /** Builder method to set the value of {@link #resumeDelay()}. */
-    public ProcessContinuation withResumeDelay(Duration resumeDelay) {
-      return new AutoValue_DoFn_ProcessContinuation(shouldResume(), resumeDelay);
-    }
-  }
-
   /**
    * Finalize the {@link DoFn} construction to prepare for processing.
    * This method should be called by runners before any processing methods.

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
index b2377dd..8a03f3c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
@@ -290,11 +290,6 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
             }
 
             @Override
-            public PipelineOptions pipelineOptions() {
-              return getPipelineOptions();
-            }
-
-            @Override
             public DoFn<InputT, OutputT>.StartBundleContext startBundleContext(
                 DoFn<InputT, OutputT> doFn) {
               throw new UnsupportedOperationException(
@@ -551,6 +546,11 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
       fn.super();
     }
 
+    private void throwUnsupportedOutputFromBundleMethods() {
+      throw new UnsupportedOperationException(
+          "DoFnTester doesn't support output from bundle methods");
+    }
+
     @Override
     public PipelineOptions getPipelineOptions() {
       return options;
@@ -559,13 +559,12 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
     @Override
     public void output(
         OutputT output, Instant timestamp, BoundedWindow window) {
-      output(mainOutputTag, output, timestamp, window);
+      throwUnsupportedOutputFromBundleMethods();
     }
 
     @Override
     public <T> void output(TupleTag<T> tag, T output, Instant timestamp, BoundedWindow window) {
-      getMutableOutput(tag)
-          .add(ValueInSingleWindow.of(output, timestamp, window, PaneInfo.NO_FIRING));
+      throwUnsupportedOutputFromBundleMethods();
     }
   }
 
@@ -643,6 +642,12 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
       getMutableOutput(tag)
           .add(ValueInSingleWindow.of(output, timestamp, element.getWindow(), element.getPane()));
     }
+
+    private void throwUnsupportedOutputFromBundleMethods() {
+      throw new UnsupportedOperationException(
+          "DoFnTester doesn't support output from bundle methods");
+    }
+
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
index 0d03835..edf1419 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.transforms;
 import static com.google.common.base.Preconditions.checkArgument;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import java.io.Serializable;
 import java.lang.reflect.ParameterizedType;
 import java.lang.reflect.Type;
@@ -32,7 +33,6 @@ import org.apache.beam.sdk.PipelineRunner;
 import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderRegistry;
-import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.state.StateSpec;
 import org.apache.beam.sdk.transforms.DoFn.WindowedContext;
 import org.apache.beam.sdk.transforms.display.DisplayData;
@@ -50,7 +50,6 @@ import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PCollectionViews;
 import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TupleTagList;
@@ -456,27 +455,6 @@ public class ParDo {
     }
   }
 
-  private static void validateStateApplicableForInput(
-      DoFn<?, ?> fn,
-      PCollection<?> input) {
-    Coder<?> inputCoder = input.getCoder();
-    checkArgument(
-        inputCoder instanceof KvCoder,
-        "%s requires its input to use %s in order to use state and timers.",
-        ParDo.class.getSimpleName(),
-        KvCoder.class.getSimpleName());
-
-    KvCoder<?, ?> kvCoder = (KvCoder<?, ?>) inputCoder;
-    try {
-        kvCoder.getKeyCoder().verifyDeterministic();
-    } catch (Coder.NonDeterministicException exc) {
-      throw new IllegalArgumentException(
-          String.format(
-              "%s requires a deterministic key coder in order to use state and timers",
-              ParDo.class.getSimpleName()));
-    }
-  }
-
   /**
    * Try to provide coders for as many of the type arguments of given
    * {@link DoFnSignature.StateDeclaration} as possible.
@@ -684,7 +662,11 @@ public class ParDo {
      */
     @Override
     public Map<TupleTag<?>, PValue> getAdditionalInputs() {
-      return PCollectionViews.toAdditionalInputs(sideInputs);
+      ImmutableMap.Builder<TupleTag<?>, PValue> additionalInputs = ImmutableMap.builder();
+      for (PCollectionView<?> sideInput : sideInputs) {
+        additionalInputs.put(sideInput.getTagInternal(), sideInput.getPCollection());
+      }
+      return additionalInputs.build();
     }
   }
 
@@ -759,11 +741,6 @@ public class ParDo {
       // Use coder registry to determine coders for all StateSpec defined in the fn signature.
       finishSpecifyingStateSpecs(fn, input.getPipeline().getCoderRegistry(), input.getCoder());
 
-      DoFnSignature signature = DoFnSignatures.getSignature(fn.getClass());
-      if (signature.usesState() || signature.usesTimers()) {
-        validateStateApplicableForInput(fn, input);
-      }
-
       PCollectionTuple outputs = PCollectionTuple.ofPrimitiveOutputsInternal(
           input.getPipeline(),
           TupleTagList.of(mainOutputTag).and(additionalOutputTags.getAll()),
@@ -830,7 +807,11 @@ public class ParDo {
      */
     @Override
     public Map<TupleTag<?>, PValue> getAdditionalInputs() {
-      return PCollectionViews.toAdditionalInputs(sideInputs);
+      ImmutableMap.Builder<TupleTag<?>, PValue> additionalInputs = ImmutableMap.builder();
+      for (PCollectionView<?> sideInput : sideInputs) {
+        additionalInputs.put(sideInput.getTagInternal(), sideInput.getPCollection());
+      }
+      return additionalInputs.build();
     }
   }