You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/07/11 01:24:05 UTC

[2/5] beam git commit: Adds DynamicDestinations support to FileBasedSink

http://git-wip-us.apache.org/repos/asf/beam/blob/77ba7a35/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSink.java
index 511d697..b57b28c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSink.java
@@ -34,27 +34,29 @@ import org.apache.beam.sdk.util.MimeTypes;
  * '\n'} represented in {@code UTF-8} format as the record separator. Each record (including the
  * last) is terminated.
  */
-class TextSink extends FileBasedSink<String> {
+class TextSink<UserT, DestinationT> extends FileBasedSink<String, DestinationT> {
   @Nullable private final String header;
   @Nullable private final String footer;
 
   TextSink(
       ValueProvider<ResourceId> baseOutputFilename,
-      FilenamePolicy filenamePolicy,
+      DynamicDestinations<UserT, DestinationT> dynamicDestinations,
       @Nullable String header,
       @Nullable String footer,
       WritableByteChannelFactory writableByteChannelFactory) {
-    super(baseOutputFilename, filenamePolicy, writableByteChannelFactory);
+    super(baseOutputFilename, dynamicDestinations, writableByteChannelFactory);
     this.header = header;
     this.footer = footer;
   }
+
   @Override
-  public WriteOperation<String> createWriteOperation() {
-    return new TextWriteOperation(this, header, footer);
+  public WriteOperation<String, DestinationT> createWriteOperation() {
+    return new TextWriteOperation<>(this, header, footer);
   }
 
   /** A {@link WriteOperation WriteOperation} for text files. */
-  private static class TextWriteOperation extends WriteOperation<String> {
+  private static class TextWriteOperation<DestinationT>
+      extends WriteOperation<String, DestinationT> {
     @Nullable private final String header;
     @Nullable private final String footer;
 
@@ -65,20 +67,20 @@ class TextSink extends FileBasedSink<String> {
     }
 
     @Override
-    public Writer<String> createWriter() throws Exception {
-      return new TextWriter(this, header, footer);
+    public Writer<String, DestinationT> createWriter() throws Exception {
+      return new TextWriter<>(this, header, footer);
     }
   }
 
   /** A {@link Writer Writer} for text files. */
-  private static class TextWriter extends Writer<String> {
+  private static class TextWriter<DestinationT> extends Writer<String, DestinationT> {
     private static final String NEWLINE = "\n";
     @Nullable private final String header;
     @Nullable private final String footer;
     private OutputStreamWriter out;
 
     public TextWriter(
-        WriteOperation<String> writeOperation,
+        WriteOperation<String, DestinationT> writeOperation,
         @Nullable String header,
         @Nullable String footer) {
       super(writeOperation, MimeTypes.TEXT);

http://git-wip-us.apache.org/repos/asf/beam/blob/77ba7a35/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
index a220eab..7013044 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
@@ -20,9 +20,12 @@ package org.apache.beam.sdk.io;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 
+import com.google.common.base.Objects;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.hash.Hashing;
+import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
@@ -30,8 +33,11 @@ import java.util.concurrent.ThreadLocalRandom;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
 import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.ShardedKeyCoder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.FileBasedSink.FileResult;
@@ -47,6 +53,7 @@ import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.WithKeys;
 import org.apache.beam.sdk.transforms.display.DisplayData;
@@ -55,6 +62,7 @@ import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollection.IsBounded;
@@ -62,6 +70,7 @@ import org.apache.beam.sdk.values.PCollectionList;
 import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.ShardedKey;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TupleTagList;
 import org.slf4j.Logger;
@@ -72,13 +81,12 @@ import org.slf4j.LoggerFactory;
  * global initialization of a sink, followed by a parallel write, and ends with a sequential
  * finalization of the write. The output of a write is {@link PDone}.
  *
- * <p>By default, every bundle in the input {@link PCollection} will be processed by a
- * {@link WriteOperation}, so the number of output
- * will vary based on runner behavior, though at least 1 output will always be produced. The
- * exact parallelism of the write stage can be controlled using {@link WriteFiles#withNumShards},
- * typically used to control how many files are produced or to globally limit the number of
- * workers connecting to an external service. However, this option can often hurt performance: it
- * adds an additional {@link GroupByKey} to the pipeline.
+ * <p>By default, every bundle in the input {@link PCollection} will be processed by a {@link
+ * WriteOperation}, so the number of output will vary based on runner behavior, though at least 1
+ * output will always be produced. The exact parallelism of the write stage can be controlled using
+ * {@link WriteFiles#withNumShards}, typically used to control how many files are produced or to
+ * globally limit the number of workers connecting to an external service. However, this option can
+ * often hurt performance: it adds an additional {@link GroupByKey} to the pipeline.
  *
  * <p>Example usage with runner-determined sharding:
  *
@@ -89,7 +97,8 @@ import org.slf4j.LoggerFactory;
  * <pre>{@code p.apply(WriteFiles.to(new MySink(...)).withNumShards(3));}</pre>
  */
 @Experimental(Experimental.Kind.SOURCE_SINK)
-public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> {
+public class WriteFiles<UserT, DestinationT, OutputT>
+    extends PTransform<PCollection<UserT>, PDone> {
   private static final Logger LOG = LoggerFactory.getLogger(WriteFiles.class);
 
   // The maximum number of file writers to keep open in a single bundle at a time, since file
@@ -105,12 +114,12 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> {
   private static final int SPILLED_RECORD_SHARDING_FACTOR = 10;
 
   static final int UNKNOWN_SHARDNUM = -1;
-  private FileBasedSink<T> sink;
-  private WriteOperation<T> writeOperation;
+  private FileBasedSink<OutputT, DestinationT> sink;
+  private SerializableFunction<UserT, OutputT> formatFunction;
+  private WriteOperation<OutputT, DestinationT> writeOperation;
   // This allows the number of shards to be dynamically computed based on the input
   // PCollection.
-  @Nullable
-  private final PTransform<PCollection<T>, PCollectionView<Integer>> computeNumShards;
+  @Nullable private final PTransform<PCollection<UserT>, PCollectionView<Integer>> computeNumShards;
   // We don't use a side input for static sharding, as we want this value to be updatable
   // when a pipeline is updated.
   @Nullable
@@ -122,19 +131,28 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> {
    * Creates a {@link WriteFiles} transform that writes to the given {@link FileBasedSink}, letting
    * the runner control how many different shards are produced.
    */
-  public static <T> WriteFiles<T> to(FileBasedSink<T> sink) {
+  public static <UserT, DestinationT, OutputT> WriteFiles<UserT, DestinationT, OutputT> to(
+      FileBasedSink<OutputT, DestinationT> sink,
+      SerializableFunction<UserT, OutputT> formatFunction) {
     checkNotNull(sink, "sink");
-    return new WriteFiles<>(sink, null /* runner-determined sharding */, null,
-        false, DEFAULT_MAX_NUM_WRITERS_PER_BUNDLE);
+    return new WriteFiles<>(
+        sink,
+        formatFunction,
+        null /* runner-determined sharding */,
+        null,
+        false,
+        DEFAULT_MAX_NUM_WRITERS_PER_BUNDLE);
   }
 
   private WriteFiles(
-      FileBasedSink<T> sink,
-      @Nullable PTransform<PCollection<T>, PCollectionView<Integer>> computeNumShards,
+      FileBasedSink<OutputT, DestinationT> sink,
+      SerializableFunction<UserT, OutputT> formatFunction,
+      @Nullable PTransform<PCollection<UserT>, PCollectionView<Integer>> computeNumShards,
       @Nullable ValueProvider<Integer> numShardsProvider,
       boolean windowedWrites,
       int maxNumWritersPerBundle) {
     this.sink = sink;
+    this.formatFunction = checkNotNull(formatFunction);
     this.computeNumShards = computeNumShards;
     this.numShardsProvider = numShardsProvider;
     this.windowedWrites = windowedWrites;
@@ -142,7 +160,7 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> {
   }
 
   @Override
-  public PDone expand(PCollection<T> input) {
+  public PDone expand(PCollection<UserT> input) {
     if (input.isBounded() == IsBounded.UNBOUNDED) {
       checkArgument(windowedWrites,
           "Must use windowed writes when applying %s to an unbounded PCollection",
@@ -181,13 +199,16 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> {
     }
   }
 
-  /**
-   * Returns the {@link FileBasedSink} associated with this PTransform.
-   */
-  public FileBasedSink<T> getSink() {
+  /** Returns the {@link FileBasedSink} associated with this PTransform. */
+  public FileBasedSink<OutputT, DestinationT> getSink() {
     return sink;
   }
 
+  /** Returns the the format function that maps the user type to the record written to files. */
+  public SerializableFunction<UserT, OutputT> getFormatFunction() {
+    return formatFunction;
+  }
+
   /**
    * Returns whether or not to perform windowed writes.
    */
@@ -202,7 +223,7 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> {
    * #withRunnerDeterminedSharding()}.
    */
   @Nullable
-  public PTransform<PCollection<T>, PCollectionView<Integer>> getSharding() {
+  public PTransform<PCollection<UserT>, PCollectionView<Integer>> getSharding() {
     return computeNumShards;
   }
 
@@ -220,7 +241,7 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> {
    * <p>A value less than or equal to 0 will be equivalent to the default behavior of
    * runner-determined sharding.
    */
-  public WriteFiles<T> withNumShards(int numShards) {
+  public WriteFiles<UserT, DestinationT, OutputT> withNumShards(int numShards) {
     if (numShards > 0) {
       return withNumShards(StaticValueProvider.of(numShards));
     }
@@ -234,16 +255,26 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> {
    * <p>This option should be used sparingly as it can hurt performance. See {@link WriteFiles} for
    * more information.
    */
-  public WriteFiles<T> withNumShards(ValueProvider<Integer> numShardsProvider) {
-    return new WriteFiles<>(sink, null, numShardsProvider, windowedWrites,
+  public WriteFiles<UserT, DestinationT, OutputT> withNumShards(
+      ValueProvider<Integer> numShardsProvider) {
+    return new WriteFiles<>(
+        sink,
+        formatFunction,
+        computeNumShards,
+        numShardsProvider,
+        windowedWrites,
         maxNumWritersPerBundle);
   }
 
-  /**
-   * Set the maximum number of writers created in a bundle before spilling to shuffle.
-   */
-  public WriteFiles<T> withMaxNumWritersPerBundle(int maxNumWritersPerBundle) {
-    return new WriteFiles<>(sink, null, numShardsProvider, windowedWrites,
+  /** Set the maximum number of writers created in a bundle before spilling to shuffle. */
+  public WriteFiles<UserT, DestinationT, OutputT> withMaxNumWritersPerBundle(
+      int maxNumWritersPerBundle) {
+    return new WriteFiles<>(
+        sink,
+        formatFunction,
+        computeNumShards,
+        numShardsProvider,
+        windowedWrites,
         maxNumWritersPerBundle);
   }
 
@@ -254,97 +285,167 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> {
    * <p>This option should be used sparingly as it can hurt performance. See {@link WriteFiles} for
    * more information.
    */
-  public WriteFiles<T> withSharding(PTransform<PCollection<T>, PCollectionView<Integer>> sharding) {
+  public WriteFiles<UserT, DestinationT, OutputT> withSharding(
+      PTransform<PCollection<UserT>, PCollectionView<Integer>> sharding) {
     checkNotNull(
         sharding, "Cannot provide null sharding. Use withRunnerDeterminedSharding() instead");
-    return new WriteFiles<>(sink, sharding, null, windowedWrites, maxNumWritersPerBundle);
+    return new WriteFiles<>(
+        sink, formatFunction, sharding, null, windowedWrites, maxNumWritersPerBundle);
   }
 
   /**
    * Returns a new {@link WriteFiles} that will write to the current {@link FileBasedSink} with
    * runner-determined sharding.
    */
-  public WriteFiles<T> withRunnerDeterminedSharding() {
-    return new WriteFiles<>(sink, null, null, windowedWrites, maxNumWritersPerBundle);
+  public WriteFiles<UserT, DestinationT, OutputT> withRunnerDeterminedSharding() {
+    return new WriteFiles<>(
+        sink, formatFunction, null, null, windowedWrites, maxNumWritersPerBundle);
   }
 
   /**
    * Returns a new {@link WriteFiles} that writes preserves windowing on it's input.
    *
-   * <p>If this option is not specified, windowing and triggering are replaced by
-   * {@link GlobalWindows} and {@link DefaultTrigger}.
+   * <p>If this option is not specified, windowing and triggering are replaced by {@link
+   * GlobalWindows} and {@link DefaultTrigger}.
    *
-   * <p>If there is no data for a window, no output shards will be generated for that window.
-   * If a window triggers multiple times, then more than a single output shard might be
-   * generated multiple times; it's up to the sink implementation to keep these output shards
-   * unique.
+   * <p>If there is no data for a window, no output shards will be generated for that window. If a
+   * window triggers multiple times, then more than a single output shard might be generated
+   * multiple times; it's up to the sink implementation to keep these output shards unique.
    *
-   * <p>This option can only be used if {@link #withNumShards(int)} is also set to a
-   * positive value.
+   * <p>This option can only be used if {@link #withNumShards(int)} is also set to a positive value.
    */
-  public WriteFiles<T> withWindowedWrites() {
-    return new WriteFiles<>(sink, computeNumShards, numShardsProvider, true,
-        maxNumWritersPerBundle);
+  public WriteFiles<UserT, DestinationT, OutputT> withWindowedWrites() {
+    return new WriteFiles<>(
+        sink, formatFunction, computeNumShards, numShardsProvider, true, maxNumWritersPerBundle);
+  }
+
+  private static class WriterKey<DestinationT> {
+    private final BoundedWindow window;
+    private final PaneInfo paneInfo;
+    private final DestinationT destination;
+
+    WriterKey(BoundedWindow window, PaneInfo paneInfo, DestinationT destination) {
+      this.window = window;
+      this.paneInfo = paneInfo;
+      this.destination = destination;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (!(o instanceof WriterKey)) {
+        return false;
+      }
+      WriterKey other = (WriterKey) o;
+      return Objects.equal(window, other.window)
+          && Objects.equal(paneInfo, other.paneInfo)
+          && Objects.equal(destination, other.destination);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hashCode(window, paneInfo, destination);
+    }
+  }
+
+  // Hash the destination in a manner that we can then use as a key in a GBK. Since Java's
+  // hashCode isn't guaranteed to be stable across machines, we instead serialize the destination
+  // and use murmur3_32 to hash it. We enforce that destinationCoder must be deterministic, so
+  // this can be used as a key.
+  private static <DestinationT> int hashDestination(
+      DestinationT destination, Coder<DestinationT> destinationCoder) throws IOException {
+    return Hashing.murmur3_32()
+        .hashBytes(CoderUtils.encodeToByteArray(destinationCoder, destination))
+        .asInt();
   }
 
   /**
-   * Writes all the elements in a bundle using a {@link Writer} produced by the
-   * {@link WriteOperation} associated with the {@link FileBasedSink} with windowed writes enabled.
+   * Writes all the elements in a bundle using a {@link Writer} produced by the {@link
+   * WriteOperation} associated with the {@link FileBasedSink}.
    */
-  private class WriteWindowedBundles extends DoFn<T, FileResult> {
-    private final TupleTag<KV<Integer, T>> unwrittedRecordsTag;
-    private Map<KV<BoundedWindow, PaneInfo>, Writer<T>> windowedWriters;
-    int spilledShardNum = UNKNOWN_SHARDNUM;
-
-    WriteWindowedBundles(TupleTag<KV<Integer, T>> unwrittedRecordsTag) {
-      this.unwrittedRecordsTag = unwrittedRecordsTag;
+  private class WriteBundles extends DoFn<UserT, FileResult<DestinationT>> {
+    private final TupleTag<KV<ShardedKey<Integer>, UserT>> unwrittenRecordsTag;
+    private final Coder<DestinationT> destinationCoder;
+    private final boolean windowedWrites;
+
+    private Map<WriterKey<DestinationT>, Writer<OutputT, DestinationT>> writers;
+    private int spilledShardNum = UNKNOWN_SHARDNUM;
+
+    WriteBundles(
+        boolean windowedWrites,
+        TupleTag<KV<ShardedKey<Integer>, UserT>> unwrittenRecordsTag,
+        Coder<DestinationT> destinationCoder) {
+      this.windowedWrites = windowedWrites;
+      this.unwrittenRecordsTag = unwrittenRecordsTag;
+      this.destinationCoder = destinationCoder;
     }
 
     @StartBundle
     public void startBundle(StartBundleContext c) {
       // Reset state in case of reuse. We need to make sure that each bundle gets unique writers.
-      windowedWriters = Maps.newHashMap();
+      writers = Maps.newHashMap();
     }
 
     @ProcessElement
     public void processElement(ProcessContext c, BoundedWindow window) throws Exception {
       PaneInfo paneInfo = c.pane();
-      Writer<T> writer;
       // If we are doing windowed writes, we need to ensure that we have separate files for
-      // data in different windows/panes.
-      KV<BoundedWindow, PaneInfo> key = KV.of(window, paneInfo);
-      writer = windowedWriters.get(key);
+      // data in different windows/panes. Similar for dynamic writes, make sure that different
+      // destinations go to different writers.
+      // In the case of unwindowed writes, the window and the pane will always be the same, and
+      // the map will only have a single element.
+      DestinationT destination = sink.getDynamicDestinations().getDestination(c.element());
+      WriterKey<DestinationT> key = new WriterKey<>(window, c.pane(), destination);
+      Writer<OutputT, DestinationT> writer = writers.get(key);
       if (writer == null) {
-        if (windowedWriters.size() <= maxNumWritersPerBundle) {
+        if (writers.size() <= maxNumWritersPerBundle) {
           String uuid = UUID.randomUUID().toString();
           LOG.info(
-              "Opening writer {} for write operation {}, window {} pane {}",
+              "Opening writer {} for write operation {}, window {} pane {} destination {}",
               uuid,
               writeOperation,
               window,
-              paneInfo);
+              paneInfo,
+              destination);
           writer = writeOperation.createWriter();
-          writer.openWindowed(uuid, window, paneInfo, UNKNOWN_SHARDNUM);
-          windowedWriters.put(key, writer);
+          if (windowedWrites) {
+            writer.openWindowed(uuid, window, paneInfo, UNKNOWN_SHARDNUM, destination);
+          } else {
+            writer.openUnwindowed(uuid, UNKNOWN_SHARDNUM, destination);
+          }
+          writers.put(key, writer);
           LOG.debug("Done opening writer");
         } else {
           if (spilledShardNum == UNKNOWN_SHARDNUM) {
+            // Cache the random value so we only call ThreadLocalRandom once per DoFn instance.
             spilledShardNum = ThreadLocalRandom.current().nextInt(SPILLED_RECORD_SHARDING_FACTOR);
           } else {
             spilledShardNum = (spilledShardNum + 1) % SPILLED_RECORD_SHARDING_FACTOR;
           }
-          c.output(unwrittedRecordsTag, KV.of(spilledShardNum, c.element()));
+          c.output(
+              unwrittenRecordsTag,
+              KV.of(
+                  ShardedKey.of(hashDestination(destination, destinationCoder), spilledShardNum),
+                  c.element()));
           return;
         }
       }
-      writeOrClose(writer, c.element());
+      writeOrClose(writer, formatFunction.apply(c.element()));
     }
 
     @FinishBundle
     public void finishBundle(FinishBundleContext c) throws Exception {
-      for (Map.Entry<KV<BoundedWindow, PaneInfo>, Writer<T>> entry : windowedWriters.entrySet()) {
-        FileResult result = entry.getValue().close();
-        BoundedWindow window = entry.getKey().getKey();
+      for (Map.Entry<WriterKey<DestinationT>, Writer<OutputT, DestinationT>> entry :
+          writers.entrySet()) {
+        Writer<OutputT, DestinationT> writer = entry.getValue();
+        FileResult<DestinationT> result;
+        try {
+          result = writer.close();
+        } catch (Exception e) {
+          // If anything goes wrong, make sure to delete the temporary file.
+          writer.cleanup();
+          throw e;
+        }
+        BoundedWindow window = entry.getKey().window;
         c.output(result, window.maxTimestamp(), window);
       }
     }
@@ -355,90 +456,62 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> {
     }
   }
 
-  /**
-   * Writes all the elements in a bundle using a {@link Writer} produced by the
-   * {@link WriteOperation} associated with the {@link FileBasedSink} with windowed writes disabled.
-   */
-  private class WriteUnwindowedBundles extends DoFn<T, FileResult> {
-    // Writer that will write the records in this bundle. Lazily
-    // initialized in processElement.
-    private Writer<T> writer = null;
-    private BoundedWindow window = null;
-
-    @StartBundle
-    public void startBundle(StartBundleContext c) {
-      // Reset state in case of reuse. We need to make sure that each bundle gets unique writers.
-      writer = null;
-    }
-
-    @ProcessElement
-    public void processElement(ProcessContext c, BoundedWindow window) throws Exception {
-      // Cache a single writer for the bundle.
-      if (writer == null) {
-        LOG.info("Opening writer for write operation {}", writeOperation);
-        writer = writeOperation.createWriter();
-        writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM);
-        LOG.debug("Done opening writer");
-      }
-      this.window = window;
-      writeOrClose(this.writer, c.element());
-    }
+  enum ShardAssignment { ASSIGN_IN_FINALIZE, ASSIGN_WHEN_WRITING }
 
-    @FinishBundle
-    public void finishBundle(FinishBundleContext c) throws Exception {
-      if (writer == null) {
-        return;
-      }
-      FileResult result = writer.close();
-      c.output(result, window.maxTimestamp(), window);
-    }
-
-    @Override
-    public void populateDisplayData(DisplayData.Builder builder) {
-      builder.delegate(WriteFiles.this);
-    }
-  }
-
-  enum ShardAssignment { ASSIGN_IN_FINALIZE, ASSIGN_WHEN_WRITING };
-
-  /**
-   * Like {@link WriteWindowedBundles} and {@link WriteUnwindowedBundles}, but where the elements
-   * for each shard have been collected into a single iterable.
+  /*
+   * Like {@link WriteBundles}, but where the elements for each shard have been collected into a
+   * single iterable.
    */
-  private class WriteShardedBundles extends DoFn<KV<Integer, Iterable<T>>, FileResult> {
+  private class WriteShardedBundles
+      extends DoFn<KV<ShardedKey<Integer>, Iterable<UserT>>, FileResult<DestinationT>> {
     ShardAssignment shardNumberAssignment;
     WriteShardedBundles(ShardAssignment shardNumberAssignment) {
       this.shardNumberAssignment = shardNumberAssignment;
     }
+
     @ProcessElement
     public void processElement(ProcessContext c, BoundedWindow window) throws Exception {
-      // In a sharded write, single input element represents one shard. We can open and close
-      // the writer in each call to processElement.
-      LOG.info("Opening writer for write operation {}", writeOperation);
-      Writer<T> writer = writeOperation.createWriter();
-      if (windowedWrites) {
-        int shardNumber = shardNumberAssignment == ShardAssignment.ASSIGN_WHEN_WRITING
-            ? c.element().getKey() : UNKNOWN_SHARDNUM;
-        writer.openWindowed(UUID.randomUUID().toString(), window, c.pane(), shardNumber);
-      } else {
-        writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM);
-      }
-      LOG.debug("Done opening writer");
-
-      try {
-        for (T t : c.element().getValue()) {
-          writeOrClose(writer, t);
+      // Since we key by a 32-bit hash of the destination, there might be multiple destinations
+      // in this iterable. The number of destinations is generally very small (1000s or less), so
+      // there will rarely be hash collisions.
+      Map<DestinationT, Writer<OutputT, DestinationT>> writers = Maps.newHashMap();
+      for (UserT input : c.element().getValue()) {
+        DestinationT destination = sink.getDynamicDestinations().getDestination(input);
+        Writer<OutputT, DestinationT> writer = writers.get(destination);
+        if (writer == null) {
+          LOG.debug("Opening writer for write operation {}", writeOperation);
+          writer = writeOperation.createWriter();
+          if (windowedWrites) {
+            int shardNumber =
+                shardNumberAssignment == ShardAssignment.ASSIGN_WHEN_WRITING
+                    ? c.element().getKey().getShardNumber()
+                    : UNKNOWN_SHARDNUM;
+            writer.openWindowed(
+                UUID.randomUUID().toString(), window, c.pane(), shardNumber, destination);
+          } else {
+            writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM, destination);
+          }
+          LOG.debug("Done opening writer");
+          writers.put(destination, writer);
+        }
+        writeOrClose(writer, formatFunction.apply(input));
         }
 
-        // Close the writer; if this throws let the error propagate.
-        FileResult result = writer.close();
-        c.output(result);
-      } catch (Exception e) {
-        // If anything goes wrong, make sure to delete the temporary file.
-        writer.cleanup();
-        throw e;
+      // Close all writers.
+      for (Map.Entry<DestinationT, Writer<OutputT, DestinationT>> entry : writers.entrySet()) {
+        Writer<OutputT, DestinationT> writer = entry.getValue();
+        FileResult<DestinationT> result;
+        try {
+          // Close the writer; if this throws let the error propagate.
+          result = writer.close();
+          c.output(result);
+        } catch (Exception e) {
+          // If anything goes wrong, make sure to delete the temporary file.
+          writer.cleanup();
+          throw e;
+        }
+      }
       }
-    }
 
     @Override
     public void populateDisplayData(DisplayData.Builder builder) {
@@ -446,12 +519,15 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> {
     }
   }
 
-  private static <T> void writeOrClose(Writer<T> writer, T t) throws Exception {
+  private static <OutputT, DestinationT> void writeOrClose(
+      Writer<OutputT, DestinationT> writer, OutputT t) throws Exception {
     try {
       writer.write(t);
     } catch (Exception e) {
       try {
         writer.close();
+        // If anything goes wrong, make sure to delete the temporary file.
+        writer.cleanup();
       } catch (Exception closeException) {
         if (closeException instanceof InterruptedException) {
           // Do not silently ignore interrupted state.
@@ -464,20 +540,25 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> {
     }
   }
 
-  private static class ApplyShardingKey<T> extends DoFn<T, KV<Integer, T>> {
+  private class ApplyShardingKey extends DoFn<UserT, KV<ShardedKey<Integer>, UserT>> {
     private final PCollectionView<Integer> numShardsView;
     private final ValueProvider<Integer> numShardsProvider;
+    private final Coder<DestinationT> destinationCoder;
+
     private int shardNumber;
 
-    ApplyShardingKey(PCollectionView<Integer> numShardsView,
-                     ValueProvider<Integer> numShardsProvider) {
+    ApplyShardingKey(
+        PCollectionView<Integer> numShardsView,
+        ValueProvider<Integer> numShardsProvider,
+        Coder<DestinationT> destinationCoder) {
+      this.destinationCoder = destinationCoder;
       this.numShardsView = numShardsView;
       this.numShardsProvider = numShardsProvider;
       shardNumber = UNKNOWN_SHARDNUM;
     }
 
     @ProcessElement
-    public void processElement(ProcessContext context) {
+    public void processElement(ProcessContext context) throws IOException {
       final int shardCount;
       if (numShardsView != null) {
         shardCount = context.sideInput(numShardsView);
@@ -497,86 +578,110 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> {
       } else {
         shardNumber = (shardNumber + 1) % shardCount;
       }
-      context.output(KV.of(shardNumber, context.element()));
+      // We avoid using destination itself as a sharding key, because destination is often large.
+      // e.g. when using {@link DefaultFilenamePolicy}, the destination contains the entire path
+      // to the file. Often most of the path is constant across all destinations, just the path
+      // suffix is appended by the destination function. Instead we key by a 32-bit hash (carefully
+      // chosen to be guaranteed stable), and call getDestination again in the next ParDo to resolve
+      // the destinations. This does mean that multiple destinations might end up on the same shard,
+      // however the number of collisions should be small, so there's no need to worry about memory
+      // issues.
+      DestinationT destination = sink.getDynamicDestinations().getDestination(context.element());
+      context.output(
+          KV.of(
+              ShardedKey.of(hashDestination(destination, destinationCoder), shardNumber),
+              context.element()));
     }
   }
 
   /**
    * A write is performed as sequence of three {@link ParDo}'s.
    *
-   * <p>This singleton collection containing the WriteOperation is then used as a side
-   * input to a ParDo over the PCollection of elements to write. In this bundle-writing phase,
-   * {@link WriteOperation#createWriter} is called to obtain a {@link Writer}.
-   * {@link Writer#open} and {@link Writer#close} are called in
-   * {@link DoFn.StartBundle} and {@link DoFn.FinishBundle}, respectively, and
-   * {@link Writer#write} method is called for every element in the bundle. The output
-   * of this ParDo is a PCollection of <i>writer result</i> objects (see {@link FileBasedSink}
-   * for a description of writer results)-one for each bundle.
+   * <p>This singleton collection containing the WriteOperation is then used as a side input to a
+   * ParDo over the PCollection of elements to write. In this bundle-writing phase, {@link
+   * WriteOperation#createWriter} is called to obtain a {@link Writer}. {@link Writer#open} and
+   * {@link Writer#close} are called in {@link DoFn.StartBundle} and {@link DoFn.FinishBundle},
+   * respectively, and {@link Writer#write} method is called for every element in the bundle. The
+   * output of this ParDo is a PCollection of <i>writer result</i> objects (see {@link
+   * FileBasedSink} for a description of writer results)-one for each bundle.
    *
    * <p>The final do-once ParDo uses a singleton collection asinput and the collection of writer
-   * results as a side-input. In this ParDo, {@link WriteOperation#finalize} is called
-   * to finalize the write.
+   * results as a side-input. In this ParDo, {@link WriteOperation#finalize} is called to finalize
+   * the write.
    *
-   * <p>If the write of any element in the PCollection fails, {@link Writer#close} will be
-   * called before the exception that caused the write to fail is propagated and the write result
-   * will be discarded.
+   * <p>If the write of any element in the PCollection fails, {@link Writer#close} will be called
+   * before the exception that caused the write to fail is propagated and the write result will be
+   * discarded.
    *
    * <p>Since the {@link WriteOperation} is serialized after the initialization ParDo and
    * deserialized in the bundle-writing and finalization phases, any state change to the
-   * WriteOperation object that occurs during initialization is visible in the latter
-   * phases. However, the WriteOperation is not serialized after the bundle-writing
-   * phase. This is why implementations should guarantee that
-   * {@link WriteOperation#createWriter} does not mutate WriteOperation).
+   * WriteOperation object that occurs during initialization is visible in the latter phases.
+   * However, the WriteOperation is not serialized after the bundle-writing phase. This is why
+   * implementations should guarantee that {@link WriteOperation#createWriter} does not mutate
+   * WriteOperation).
    */
-  private PDone createWrite(PCollection<T> input) {
+  private PDone createWrite(PCollection<UserT> input) {
     Pipeline p = input.getPipeline();
 
     if (!windowedWrites) {
       // Re-window the data into the global window and remove any existing triggers.
       input =
           input.apply(
-              Window.<T>into(new GlobalWindows())
+              Window.<UserT>into(new GlobalWindows())
                   .triggering(DefaultTrigger.of())
                   .discardingFiredPanes());
     }
 
-
     // Perform the per-bundle writes as a ParDo on the input PCollection (with the
     // WriteOperation as a side input) and collect the results of the writes in a
     // PCollection. There is a dependency between this ParDo and the first (the
     // WriteOperation PCollection as a side input), so this will happen after the
     // initial ParDo.
-    PCollection<FileResult> results;
+    PCollection<FileResult<DestinationT>> results;
     final PCollectionView<Integer> numShardsView;
     @SuppressWarnings("unchecked")
     Coder<BoundedWindow> shardedWindowCoder =
         (Coder<BoundedWindow>) input.getWindowingStrategy().getWindowFn().windowCoder();
+    final Coder<DestinationT> destinationCoder;
+    try {
+      destinationCoder =
+          sink.getDynamicDestinations()
+              .getDestinationCoderWithDefault(input.getPipeline().getCoderRegistry());
+      destinationCoder.verifyDeterministic();
+    } catch (CannotProvideCoderException | NonDeterministicException e) {
+      throw new RuntimeException(e);
+    }
+
     if (computeNumShards == null && numShardsProvider == null) {
       numShardsView = null;
-      if (windowedWrites) {
-        TupleTag<FileResult> writtenRecordsTag = new TupleTag<>("writtenRecordsTag");
-        TupleTag<KV<Integer, T>> unwrittedRecordsTag = new TupleTag<>("unwrittenRecordsTag");
-        PCollectionTuple writeTuple = input.apply("WriteWindowedBundles", ParDo.of(
-            new WriteWindowedBundles(unwrittedRecordsTag))
-            .withOutputTags(writtenRecordsTag, TupleTagList.of(unwrittedRecordsTag)));
-        PCollection<FileResult> writtenBundleFiles = writeTuple.get(writtenRecordsTag)
-            .setCoder(FileResultCoder.of(shardedWindowCoder));
-        // Any "spilled" elements are written using WriteShardedBundles. Assign shard numbers in
-        // finalize to stay consistent with what WriteWindowedBundles does.
-        PCollection<FileResult> writtenGroupedFiles =
-            writeTuple
-                .get(unwrittedRecordsTag)
-                .setCoder(KvCoder.of(VarIntCoder.of(), input.getCoder()))
-                .apply("GroupUnwritten", GroupByKey.<Integer, T>create())
-                .apply("WriteUnwritten", ParDo.of(
-                    new WriteShardedBundles(ShardAssignment.ASSIGN_IN_FINALIZE)))
-                .setCoder(FileResultCoder.of(shardedWindowCoder));
-        results = PCollectionList.of(writtenBundleFiles).and(writtenGroupedFiles)
-            .apply(Flatten.<FileResult>pCollections());
-      } else {
-        results =
-            input.apply("WriteUnwindowedBundles", ParDo.of(new WriteUnwindowedBundles()));
-      }
+      TupleTag<FileResult<DestinationT>> writtenRecordsTag = new TupleTag<>("writtenRecordsTag");
+      TupleTag<KV<ShardedKey<Integer>, UserT>> unwrittedRecordsTag =
+          new TupleTag<>("unwrittenRecordsTag");
+      String writeName = windowedWrites ? "WriteWindowedBundles" : "WriteBundles";
+      PCollectionTuple writeTuple =
+          input.apply(
+              writeName,
+              ParDo.of(new WriteBundles(windowedWrites, unwrittedRecordsTag, destinationCoder))
+                  .withOutputTags(writtenRecordsTag, TupleTagList.of(unwrittedRecordsTag)));
+      PCollection<FileResult<DestinationT>> writtenBundleFiles =
+          writeTuple
+              .get(writtenRecordsTag)
+              .setCoder(FileResultCoder.of(shardedWindowCoder, destinationCoder));
+      // Any "spilled" elements are written using WriteShardedBundles. Assign shard numbers in
+      // finalize to stay consistent with what WriteWindowedBundles does.
+      PCollection<FileResult<DestinationT>> writtenGroupedFiles =
+          writeTuple
+              .get(unwrittedRecordsTag)
+              .setCoder(KvCoder.of(ShardedKeyCoder.of(VarIntCoder.of()), input.getCoder()))
+              .apply("GroupUnwritten", GroupByKey.<ShardedKey<Integer>, UserT>create())
+              .apply(
+                  "WriteUnwritten",
+                  ParDo.of(new WriteShardedBundles(ShardAssignment.ASSIGN_IN_FINALIZE)))
+              .setCoder(FileResultCoder.of(shardedWindowCoder, destinationCoder));
+      results =
+          PCollectionList.of(writtenBundleFiles)
+              .and(writtenGroupedFiles)
+              .apply(Flatten.<FileResult<DestinationT>>pCollections());
     } else {
       List<PCollectionView<?>> sideInputs = Lists.newArrayList();
       if (computeNumShards != null) {
@@ -585,23 +690,31 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> {
       } else {
         numShardsView = null;
       }
-
-      PCollection<KV<Integer, Iterable<T>>> sharded =
+      PCollection<KV<ShardedKey<Integer>, Iterable<UserT>>> sharded =
           input
-              .apply("ApplyShardLabel", ParDo.of(
-                  new ApplyShardingKey<T>(numShardsView,
-                      (numShardsView != null) ? null : numShardsProvider))
-                  .withSideInputs(sideInputs))
-              .apply("GroupIntoShards", GroupByKey.<Integer, T>create());
+              .apply(
+                  "ApplyShardLabel",
+                  ParDo.of(
+                          new ApplyShardingKey(
+                              numShardsView,
+                              (numShardsView != null) ? null : numShardsProvider,
+                              destinationCoder))
+                      .withSideInputs(sideInputs))
+              .setCoder(KvCoder.of(ShardedKeyCoder.of(VarIntCoder.of()), input.getCoder()))
+              .apply("GroupIntoShards", GroupByKey.<ShardedKey<Integer>, UserT>create());
+      shardedWindowCoder =
+          (Coder<BoundedWindow>) sharded.getWindowingStrategy().getWindowFn().windowCoder();
       // Since this path might be used by streaming runners processing triggers, it's important
       // to assign shard numbers here so that they are deterministic. The ASSIGN_IN_FINALIZE
       // strategy works by sorting all FileResult objects and assigning them numbers, which is not
       // guaranteed to work well when processing triggers - if the finalize step retries it might
       // see a different Iterable of FileResult objects, and it will assign different shard numbers.
-      results = sharded.apply("WriteShardedBundles",
-          ParDo.of(new WriteShardedBundles(ShardAssignment.ASSIGN_WHEN_WRITING)));
+      results =
+          sharded.apply(
+              "WriteShardedBundles",
+              ParDo.of(new WriteShardedBundles(ShardAssignment.ASSIGN_WHEN_WRITING)));
     }
-    results.setCoder(FileResultCoder.of(shardedWindowCoder));
+    results.setCoder(FileResultCoder.of(shardedWindowCoder, destinationCoder));
 
     if (windowedWrites) {
       // When processing streaming windowed writes, results will arrive multiple times. This
@@ -609,26 +722,31 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> {
       // as new data arriving into a side input does not trigger the listening DoFn. Instead
       // we aggregate the result set using a singleton GroupByKey, so the DoFn will be triggered
       // whenever new data arrives.
-      PCollection<KV<Void, FileResult>> keyedResults =
-          results.apply("AttachSingletonKey", WithKeys.<Void, FileResult>of((Void) null));
-      keyedResults.setCoder(KvCoder.of(VoidCoder.of(),
-          FileResultCoder.of(shardedWindowCoder)));
+      PCollection<KV<Void, FileResult<DestinationT>>> keyedResults =
+          results.apply(
+              "AttachSingletonKey", WithKeys.<Void, FileResult<DestinationT>>of((Void) null));
+      keyedResults.setCoder(
+          KvCoder.of(VoidCoder.of(), FileResultCoder.of(shardedWindowCoder, destinationCoder)));
 
       // Is the continuation trigger sufficient?
       keyedResults
-          .apply("FinalizeGroupByKey", GroupByKey.<Void, FileResult>create())
-          .apply("Finalize", ParDo.of(new DoFn<KV<Void, Iterable<FileResult>>, Integer>() {
-            @ProcessElement
-            public void processElement(ProcessContext c) throws Exception {
-              LOG.info("Finalizing write operation {}.", writeOperation);
-              List<FileResult> results = Lists.newArrayList(c.element().getValue());
-              writeOperation.finalize(results);
-              LOG.debug("Done finalizing write operation");
-            }
-          }));
+          .apply("FinalizeGroupByKey", GroupByKey.<Void, FileResult<DestinationT>>create())
+          .apply(
+              "Finalize",
+              ParDo.of(
+                  new DoFn<KV<Void, Iterable<FileResult<DestinationT>>>, Integer>() {
+                    @ProcessElement
+                    public void processElement(ProcessContext c) throws Exception {
+                      LOG.info("Finalizing write operation {}.", writeOperation);
+                      List<FileResult<DestinationT>> results =
+                          Lists.newArrayList(c.element().getValue());
+                      writeOperation.finalize(results);
+                      LOG.debug("Done finalizing write operation");
+                    }
+                  }));
     } else {
-      final PCollectionView<Iterable<FileResult>> resultsView =
-          results.apply(View.<FileResult>asIterable());
+      final PCollectionView<Iterable<FileResult<DestinationT>>> resultsView =
+          results.apply(View.<FileResult<DestinationT>>asIterable());
       ImmutableList.Builder<PCollectionView<?>> sideInputs =
           ImmutableList.<PCollectionView<?>>builder().add(resultsView);
       if (numShardsView != null) {
@@ -644,41 +762,53 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> {
       // set numShards, then all shards will be written out as empty files. For this reason we
       // use a side input here.
       PCollection<Void> singletonCollection = p.apply(Create.of((Void) null));
-      singletonCollection
-          .apply("Finalize", ParDo.of(new DoFn<Void, Integer>() {
-            @ProcessElement
-            public void processElement(ProcessContext c) throws Exception {
-              LOG.info("Finalizing write operation {}.", writeOperation);
-              List<FileResult> results = Lists.newArrayList(c.sideInput(resultsView));
-              LOG.debug("Side input initialized to finalize write operation {}.", writeOperation);
-
-              // We must always output at least 1 shard, and honor user-specified numShards if
-              // set.
-              int minShardsNeeded;
-              if (numShardsView != null) {
-                minShardsNeeded = c.sideInput(numShardsView);
-              } else if (numShardsProvider != null) {
-                minShardsNeeded = numShardsProvider.get();
-              } else {
-                minShardsNeeded = 1;
-              }
-              int extraShardsNeeded = minShardsNeeded - results.size();
-              if (extraShardsNeeded > 0) {
-                LOG.info(
-                    "Creating {} empty output shards in addition to {} written for a total of {}.",
-                    extraShardsNeeded, results.size(), minShardsNeeded);
-                for (int i = 0; i < extraShardsNeeded; ++i) {
-                  Writer<T> writer = writeOperation.createWriter();
-                  writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM);
-                  FileResult emptyWrite = writer.close();
-                  results.add(emptyWrite);
-                }
-                LOG.debug("Done creating extra shards.");
-              }
-              writeOperation.finalize(results);
-              LOG.debug("Done finalizing write operation {}", writeOperation);
-            }
-          }).withSideInputs(sideInputs.build()));
+      singletonCollection.apply(
+          "Finalize",
+          ParDo.of(
+                  new DoFn<Void, Integer>() {
+                    @ProcessElement
+                    public void processElement(ProcessContext c) throws Exception {
+                      LOG.info("Finalizing write operation {}.", writeOperation);
+                      List<FileResult<DestinationT>> results =
+                          Lists.newArrayList(c.sideInput(resultsView));
+                      LOG.debug(
+                          "Side input initialized to finalize write operation {}.", writeOperation);
+
+                      // We must always output at least 1 shard, and honor user-specified numShards
+                      // if
+                      // set.
+                      int minShardsNeeded;
+                      if (numShardsView != null) {
+                        minShardsNeeded = c.sideInput(numShardsView);
+                      } else if (numShardsProvider != null) {
+                        minShardsNeeded = numShardsProvider.get();
+                      } else {
+                        minShardsNeeded = 1;
+                      }
+                      int extraShardsNeeded = minShardsNeeded - results.size();
+                      if (extraShardsNeeded > 0) {
+                        LOG.info(
+                            "Creating {} empty output shards in addition to {} written "
+                                + "for a total of {}.",
+                            extraShardsNeeded,
+                            results.size(),
+                            minShardsNeeded);
+                        for (int i = 0; i < extraShardsNeeded; ++i) {
+                          Writer<OutputT, DestinationT> writer = writeOperation.createWriter();
+                          writer.openUnwindowed(
+                              UUID.randomUUID().toString(),
+                              UNKNOWN_SHARDNUM,
+                              sink.getDynamicDestinations().getDefaultDestination());
+                          FileResult<DestinationT> emptyWrite = writer.close();
+                          results.add(emptyWrite);
+                        }
+                        LOG.debug("Done creating extra shards.");
+                      }
+                      writeOperation.finalize(results);
+                      LOG.debug("Done finalizing write operation {}", writeOperation);
+                    }
+                  })
+              .withSideInputs(sideInputs.build()));
     }
     return PDone.in(input.getPipeline());
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/77ba7a35/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SerializableFunctions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SerializableFunctions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SerializableFunctions.java
new file mode 100644
index 0000000..d057d81
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SerializableFunctions.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.transforms;
+
+/** Useful {@link SerializableFunction} overrides. */
+public class SerializableFunctions {
+  private static class Identity<T> implements SerializableFunction<T, T> {
+    @Override
+    public T apply(T input) {
+      return input;
+    }
+  }
+
+  private static class Constant<InT, OutT> implements SerializableFunction<InT, OutT> {
+    OutT value;
+
+    Constant(OutT value) {
+      this.value = value;
+    }
+
+    @Override
+    public OutT apply(InT input) {
+      return value;
+    }
+  }
+
+  public static <T> SerializableFunction<T, T> identity() {
+    return new Identity<>();
+  }
+
+  public static <InT, OutT> SerializableFunction<InT, OutT> constant(OutT value) {
+    return new Constant<>(value);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/77ba7a35/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ShardedKey.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ShardedKey.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ShardedKey.java
new file mode 100644
index 0000000..e56af13
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ShardedKey.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.values;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/** A key and a shard number. */
+public class ShardedKey<K> implements Serializable {
+  private static final long serialVersionUID = 1L;
+  private final K key;
+  private final int shardNumber;
+
+  public static <K> ShardedKey<K> of(K key, int shardNumber) {
+    return new ShardedKey<>(key, shardNumber);
+  }
+
+  private ShardedKey(K key, int shardNumber) {
+    this.key = key;
+    this.shardNumber = shardNumber;
+  }
+
+  public K getKey() {
+    return key;
+  }
+
+  public int getShardNumber() {
+    return shardNumber;
+  }
+
+  @Override
+  public String toString() {
+    return "key: " + key + " shard: " + shardNumber;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (!(o instanceof ShardedKey)) {
+      return false;
+    }
+    ShardedKey<K> other = (ShardedKey<K>) o;
+    return Objects.equals(key, other.key) && Objects.equals(shardNumber, other.shardNumber);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(key, shardNumber);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/77ba7a35/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
index 6d01d32..260e47a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
@@ -54,10 +54,11 @@ import org.apache.avro.reflect.ReflectData;
 import org.apache.avro.reflect.ReflectDatumReader;
 import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.coders.DefaultCoder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
+import org.apache.beam.sdk.io.FileBasedSink.OutputFileHints;
 import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
 import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
@@ -276,37 +277,42 @@ public class AvroIOTest {
   }
 
   private static class WindowedFilenamePolicy extends FilenamePolicy {
-    final String outputFilePrefix;
+    final ResourceId outputFilePrefix;
 
-    WindowedFilenamePolicy(String outputFilePrefix) {
+    WindowedFilenamePolicy(ResourceId outputFilePrefix) {
       this.outputFilePrefix = outputFilePrefix;
     }
 
     @Override
-    public ResourceId windowedFilename(
-        ResourceId outputDirectory, WindowedContext input, String extension) {
-      String filename = String.format(
-          "%s-%s-%s-of-%s-pane-%s%s%s",
-          outputFilePrefix,
-          input.getWindow(),
-          input.getShardNumber(),
-          input.getNumShards() - 1,
-          input.getPaneInfo().getIndex(),
-          input.getPaneInfo().isLast() ? "-final" : "",
-          extension);
-      return outputDirectory.resolve(filename, StandardResolveOptions.RESOLVE_FILE);
+    public ResourceId windowedFilename(WindowedContext input, OutputFileHints outputFileHints) {
+      String filenamePrefix =
+          outputFilePrefix.isDirectory() ? "" : firstNonNull(outputFilePrefix.getFilename(), "");
+
+      String filename =
+          String.format(
+              "%s-%s-%s-of-%s-pane-%s%s%s",
+              filenamePrefix,
+              input.getWindow(),
+              input.getShardNumber(),
+              input.getNumShards() - 1,
+              input.getPaneInfo().getIndex(),
+              input.getPaneInfo().isLast() ? "-final" : "",
+              outputFileHints.getSuggestedFilenameSuffix());
+      return outputFilePrefix
+          .getCurrentDirectory()
+          .resolve(filename, StandardResolveOptions.RESOLVE_FILE);
     }
 
     @Override
-    public ResourceId unwindowedFilename(
-        ResourceId outputDirectory, Context input, String extension) {
+    public ResourceId unwindowedFilename(Context input, OutputFileHints outputFileHints) {
       throw new UnsupportedOperationException("Expecting windowed outputs only");
     }
 
     @Override
     public void populateDisplayData(DisplayData.Builder builder) {
-      builder.add(DisplayData.item("fileNamePrefix", outputFilePrefix)
-          .withLabel("File Name Prefix"));
+      builder.add(
+          DisplayData.item("fileNamePrefix", outputFilePrefix.toString())
+              .withLabel("File Name Prefix"));
     }
   }
 
@@ -359,15 +365,18 @@ public class AvroIOTest {
         Arrays.copyOfRange(secondWindowArray, 1, secondWindowArray.length))
         .advanceWatermarkToInfinity();
 
-    FilenamePolicy policy = new WindowedFilenamePolicy(baseFilename);
+    FilenamePolicy policy =
+        new WindowedFilenamePolicy(FileBasedSink.convertToFileResourceIfPossible(baseFilename));
     windowedAvroWritePipeline
         .apply(values)
         .apply(Window.<GenericClass>into(FixedWindows.of(Duration.standardMinutes(1))))
-        .apply(AvroIO.write(GenericClass.class)
-            .to(baseFilename)
-            .withFilenamePolicy(policy)
-            .withWindowedWrites()
-            .withNumShards(2));
+        .apply(
+            AvroIO.write(GenericClass.class)
+                .to(policy)
+                .withTempDirectory(
+                    StaticValueProvider.of(FileSystems.matchNewResource(baseDir.toString(), true)))
+                .withWindowedWrites()
+                .withNumShards(2));
     windowedAvroWritePipeline.run();
 
     // Validate that the data written matches the expected elements in the expected order
@@ -494,13 +503,14 @@ public class AvroIOTest {
       expectedFiles.add(
           new File(
               DefaultFilenamePolicy.constructName(
-                  outputFilePrefix,
-                  shardNameTemplate,
-                  "" /* no suffix */,
-                  i,
-                  numShards,
-                  null,
-                  null)));
+                      FileBasedSink.convertToFileResourceIfPossible(outputFilePrefix),
+                      shardNameTemplate,
+                      "" /* no suffix */,
+                      i,
+                      numShards,
+                      null,
+                      null)
+                  .toString()));
     }
 
     List<String> actualElements = new ArrayList<>();
@@ -572,15 +582,4 @@ public class AvroIOTest {
     assertThat(displayData, hasDisplayItem("numShards", 100));
     assertThat(displayData, hasDisplayItem("codec", CodecFactory.snappyCodec().toString()));
   }
-
-  @Test
-  public void testWindowedWriteRequiresFilenamePolicy() {
-    PCollection<String> emptyInput = p.apply(Create.empty(StringUtf8Coder.of()));
-    AvroIO.Write write = AvroIO.write(String.class).to("/tmp/some/file").withWindowedWrites();
-
-    expectedException.expect(IllegalStateException.class);
-    expectedException.expectMessage(
-        "When using windowed writes, a filename policy must be set via withFilenamePolicy()");
-    emptyInput.apply(write);
-  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/77ba7a35/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DefaultFilenamePolicyTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DefaultFilenamePolicyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DefaultFilenamePolicyTest.java
index 217420c..9dc6d33 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DefaultFilenamePolicyTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DefaultFilenamePolicyTest.java
@@ -17,9 +17,9 @@
  */
 package org.apache.beam.sdk.io;
 
-import static org.apache.beam.sdk.io.DefaultFilenamePolicy.constructName;
 import static org.junit.Assert.assertEquals;
 
+import org.apache.beam.sdk.io.fs.ResourceId;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -30,69 +30,108 @@ import org.junit.runners.JUnit4;
 @RunWith(JUnit4.class)
 public class DefaultFilenamePolicyTest {
 
+  private static String constructName(
+      String baseFilename,
+      String shardTemplate,
+      String suffix,
+      int shardNum,
+      int numShards,
+      String paneStr,
+      String windowStr) {
+    ResourceId constructed =
+        DefaultFilenamePolicy.constructName(
+            FileSystems.matchNewResource(baseFilename, false),
+            shardTemplate,
+            suffix,
+            shardNum,
+            numShards,
+            paneStr,
+            windowStr);
+    return constructed.toString();
+  }
+
   @Test
   public void testConstructName() {
-    assertEquals("output-001-of-123.txt",
-        constructName("output", "-SSS-of-NNN", ".txt", 1, 123, null, null));
+    assertEquals(
+        "/path/to/output-001-of-123.txt",
+        constructName("/path/to/output", "-SSS-of-NNN", ".txt", 1, 123, null, null));
 
-    assertEquals("out.txt/part-00042",
-        constructName("out.txt", "/part-SSSSS", "", 42, 100, null, null));
+    assertEquals(
+        "/path/to/out.txt/part-00042",
+        constructName("/path/to/out.txt", "/part-SSSSS", "", 42, 100, null, null));
 
-    assertEquals("out.txt",
-        constructName("ou", "t.t", "xt", 1, 1, null, null));
+    assertEquals("/path/to/out.txt", constructName("/path/to/ou", "t.t", "xt", 1, 1, null, null));
 
-    assertEquals("out0102shard.txt",
-        constructName("out", "SSNNshard", ".txt", 1, 2, null, null));
+    assertEquals(
+        "/path/to/out0102shard.txt",
+        constructName("/path/to/out", "SSNNshard", ".txt", 1, 2, null, null));
 
-    assertEquals("out-2/1.part-1-of-2.txt",
-        constructName("out", "-N/S.part-S-of-N", ".txt", 1, 2, null, null));
+    assertEquals(
+        "/path/to/out-2/1.part-1-of-2.txt",
+        constructName("/path/to/out", "-N/S.part-S-of-N", ".txt", 1, 2, null, null));
   }
 
   @Test
   public void testConstructNameWithLargeShardCount() {
-    assertEquals("out-100-of-5000.txt",
-        constructName("out", "-SS-of-NN", ".txt", 100, 5000, null, null));
+    assertEquals(
+        "/out-100-of-5000.txt", constructName("/out", "-SS-of-NN", ".txt", 100, 5000, null, null));
   }
 
   @Test
   public void testConstructWindowedName() {
-    assertEquals("output-001-of-123.txt",
-        constructName("output", "-SSS-of-NNN", ".txt", 1, 123, null, null));
-
-    assertEquals("output-001-of-123-PPP-W.txt",
-        constructName("output", "-SSS-of-NNN-PPP-W", ".txt", 1, 123, null, null));
-
-    assertEquals("out.txt/part-00042-myPaneStr-myWindowStr",
-        constructName("out.txt", "/part-SSSSS-P-W", "", 42, 100, "myPaneStr",
-            "myWindowStr"));
-
-    assertEquals("out.txt", constructName("ou", "t.t", "xt", 1, 1, "myPaneStr2",
-        "anotherWindowStr"));
-
-    assertEquals("out0102shard-oneMoreWindowStr-anotherPaneStr.txt",
-        constructName("out", "SSNNshard-W-P", ".txt", 1, 2, "anotherPaneStr",
-            "oneMoreWindowStr"));
-
-    assertEquals("out-2/1.part-1-of-2-slidingWindow1-myPaneStr3-windowslidingWindow1-"
-        + "panemyPaneStr3.txt",
-        constructName("out", "-N/S.part-S-of-N-W-P-windowW-paneP", ".txt", 1, 2, "myPaneStr3",
-        "slidingWindow1"));
+    assertEquals(
+        "/path/to/output-001-of-123.txt",
+        constructName("/path/to/output", "-SSS-of-NNN", ".txt", 1, 123, null, null));
+
+    assertEquals(
+        "/path/to/output-001-of-123-PPP-W.txt",
+        constructName("/path/to/output", "-SSS-of-NNN-PPP-W", ".txt", 1, 123, null, null));
+
+    assertEquals(
+        "/path/to/out" + ".txt/part-00042-myPaneStr-myWindowStr",
+        constructName(
+            "/path/to/out.txt", "/part-SSSSS-P-W", "", 42, 100, "myPaneStr", "myWindowStr"));
+
+    assertEquals(
+        "/path/to/out.txt",
+        constructName("/path/to/ou", "t.t", "xt", 1, 1, "myPaneStr2", "anotherWindowStr"));
+
+    assertEquals(
+        "/path/to/out0102shard-oneMoreWindowStr-anotherPaneStr.txt",
+        constructName(
+            "/path/to/out", "SSNNshard-W-P", ".txt", 1, 2, "anotherPaneStr", "oneMoreWindowStr"));
+
+    assertEquals(
+        "/out-2/1.part-1-of-2-slidingWindow1-myPaneStr3-windowslidingWindow1-"
+            + "panemyPaneStr3.txt",
+        constructName(
+            "/out",
+            "-N/S.part-S-of-N-W-P-windowW-paneP",
+            ".txt",
+            1,
+            2,
+            "myPaneStr3",
+            "slidingWindow1"));
 
     // test first/last pane
-    assertEquals("out.txt/part-00042-myWindowStr-pane-11-true-false",
-        constructName("out.txt", "/part-SSSSS-W-P", "", 42, 100, "pane-11-true-false",
-            "myWindowStr"));
-
-    assertEquals("out.txt", constructName("ou", "t.t", "xt", 1, 1, "pane",
-        "anotherWindowStr"));
-
-    assertEquals("out0102shard-oneMoreWindowStr-pane--1-false-false-pane--1-false-false.txt",
-        constructName("out", "SSNNshard-W-P-P", ".txt", 1, 2, "pane--1-false-false",
-            "oneMoreWindowStr"));
-
-    assertEquals("out-2/1.part-1-of-2-sWindow1-winsWindow1-ppaneL.txt",
-        constructName("out",
-        "-N/S.part-S-of-N-W-winW-pP", ".txt", 1, 2, "paneL", "sWindow1"));
+    assertEquals(
+        "/out.txt/part-00042-myWindowStr-pane-11-true-false",
+        constructName(
+            "/out.txt", "/part-SSSSS-W-P", "", 42, 100, "pane-11-true-false", "myWindowStr"));
+
+    assertEquals(
+        "/path/to/out.txt",
+        constructName("/path/to/ou", "t.t", "xt", 1, 1, "pane", "anotherWindowStr"));
+
+    assertEquals(
+        "/out0102shard-oneMoreWindowStr-pane--1-false-false-pane--1-false-false.txt",
+        constructName(
+            "/out", "SSNNshard-W-P-P", ".txt", 1, 2, "pane--1-false-false", "oneMoreWindowStr"));
+
+    assertEquals(
+        "/path/to/out-2/1.part-1-of-2-sWindow1-winsWindow1-ppaneL.txt",
+        constructName(
+            "/path/to/out", "-N/S.part-S-of-N-W-winW-pP", ".txt", 1, 2, "paneL", "sWindow1"));
   }
 
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/77ba7a35/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DrunkWritableByteChannelFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DrunkWritableByteChannelFactory.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DrunkWritableByteChannelFactory.java
index 6615a2e..a7644b6 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DrunkWritableByteChannelFactory.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DrunkWritableByteChannelFactory.java
@@ -39,7 +39,7 @@ public class DrunkWritableByteChannelFactory implements WritableByteChannelFacto
   }
 
   @Override
-  public String getFilenameSuffix() {
+  public String getSuggestedFilenameSuffix() {
     return ".drunk";
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/77ba7a35/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
index caad759..755bb59 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
@@ -103,7 +103,7 @@ public class FileBasedSinkTest {
 
     SimpleSink.SimpleWriter writer =
         buildWriteOperationWithTempDir(getBaseTempDirectory()).createWriter();
-    writer.openUnwindowed(testUid, -1);
+    writer.openUnwindowed(testUid, -1, null);
     for (String value : values) {
       writer.write(value);
     }
@@ -198,23 +198,27 @@ public class FileBasedSinkTest {
       throws Exception {
     int numFiles = temporaryFiles.size();
 
-    List<FileResult> fileResults = new ArrayList<>();
+    List<FileResult<Void>> fileResults = new ArrayList<>();
     // Create temporary output bundles and output File objects.
     for (int i = 0; i < numFiles; i++) {
       fileResults.add(
-          new FileResult(
+          new FileResult<Void>(
               LocalResources.fromFile(temporaryFiles.get(i), false),
               WriteFiles.UNKNOWN_SHARDNUM,
               null,
+              null,
               null));
     }
 
     writeOp.finalize(fileResults);
 
-    ResourceId outputDirectory = writeOp.getSink().getBaseOutputDirectoryProvider().get();
     for (int i = 0; i < numFiles; i++) {
-      ResourceId outputFilename = writeOp.getSink().getFilenamePolicy()
-          .unwindowedFilename(outputDirectory, new Context(i, numFiles), "");
+      ResourceId outputFilename =
+          writeOp
+              .getSink()
+              .getDynamicDestinations()
+              .getFilenamePolicy(null)
+              .unwindowedFilename(new Context(i, numFiles), CompressionType.UNCOMPRESSED);
       assertTrue(new File(outputFilename.toString()).exists());
       assertFalse(temporaryFiles.get(i).exists());
     }
@@ -231,11 +235,12 @@ public class FileBasedSinkTest {
   private void testRemoveTemporaryFiles(int numFiles, ResourceId tempDirectory)
       throws Exception {
     String prefix = "file";
-    SimpleSink sink =
-        new SimpleSink(getBaseOutputDirectory(), prefix, "", "");
+    SimpleSink<Void> sink =
+        SimpleSink.makeSimpleSink(
+            getBaseOutputDirectory(), prefix, "", "", CompressionType.UNCOMPRESSED);
 
-    WriteOperation<String> writeOp =
-        new SimpleSink.SimpleWriteOperation(sink, tempDirectory);
+    WriteOperation<String, Void> writeOp =
+        new SimpleSink.SimpleWriteOperation<>(sink, tempDirectory);
 
     List<File> temporaryFiles = new ArrayList<>();
     List<File> outputFiles = new ArrayList<>();
@@ -272,8 +277,6 @@ public class FileBasedSinkTest {
   @Test
   public void testCopyToOutputFiles() throws Exception {
     SimpleSink.SimpleWriteOperation writeOp = buildWriteOperation();
-    ResourceId outputDirectory = writeOp.getSink().getBaseOutputDirectoryProvider().get();
-
     List<String> inputFilenames = Arrays.asList("input-1", "input-2", "input-3");
     List<String> inputContents = Arrays.asList("1", "2", "3");
     List<String> expectedOutputFilenames = Arrays.asList(
@@ -292,9 +295,14 @@ public class FileBasedSinkTest {
       File inputTmpFile = tmpFolder.newFile(inputFilenames.get(i));
       List<String> lines = Collections.singletonList(inputContents.get(i));
       writeFile(lines, inputTmpFile);
-      inputFilePaths.put(LocalResources.fromFile(inputTmpFile, false),
-          writeOp.getSink().getFilenamePolicy()
-              .unwindowedFilename(outputDirectory, new Context(i, inputFilenames.size()), ""));
+      inputFilePaths.put(
+          LocalResources.fromFile(inputTmpFile, false),
+          writeOp
+              .getSink()
+              .getDynamicDestinations()
+              .getFilenamePolicy(null)
+              .unwindowedFilename(
+                  new Context(i, inputFilenames.size()), CompressionType.UNCOMPRESSED));
     }
 
     // Copy input files to output files.
@@ -311,7 +319,8 @@ public class FileBasedSinkTest {
       ResourceId outputDirectory, FilenamePolicy policy, int numFiles) {
     List<ResourceId> filenames = new ArrayList<>();
     for (int i = 0; i < numFiles; i++) {
-      filenames.add(policy.unwindowedFilename(outputDirectory, new Context(i, numFiles), ""));
+      filenames.add(
+          policy.unwindowedFilename(new Context(i, numFiles), CompressionType.UNCOMPRESSED));
     }
     return filenames;
   }
@@ -326,8 +335,10 @@ public class FileBasedSinkTest {
     List<ResourceId> actual;
     ResourceId root = getBaseOutputDirectory();
 
-    SimpleSink sink = new SimpleSink(root, "file", ".SSSSS.of.NNNNN", ".test");
-    FilenamePolicy policy = sink.getFilenamePolicy();
+    SimpleSink<Void> sink =
+        SimpleSink.makeSimpleSink(
+            root, "file", ".SSSSS.of.NNNNN", ".test", CompressionType.UNCOMPRESSED);
+    FilenamePolicy policy = sink.getDynamicDestinations().getFilenamePolicy(null);
 
     expected = Arrays.asList(
         root.resolve("file.00000.of.00003.test", StandardResolveOptions.RESOLVE_FILE),
@@ -352,8 +363,9 @@ public class FileBasedSinkTest {
   @Test
   public void testCollidingOutputFilenames() throws IOException {
     ResourceId root = getBaseOutputDirectory();
-    SimpleSink sink = new SimpleSink(root, "file", "-NN", "test");
-    SimpleSink.SimpleWriteOperation writeOp = new SimpleSink.SimpleWriteOperation(sink);
+    SimpleSink<Void> sink =
+        SimpleSink.makeSimpleSink(root, "file", "-NN", "test", CompressionType.UNCOMPRESSED);
+    SimpleSink.SimpleWriteOperation<Void> writeOp = new SimpleSink.SimpleWriteOperation<>(sink);
 
     ResourceId temp1 = root.resolve("temp1", StandardResolveOptions.RESOLVE_FILE);
     ResourceId temp2 = root.resolve("temp2", StandardResolveOptions.RESOLVE_FILE);
@@ -361,11 +373,11 @@ public class FileBasedSinkTest {
     ResourceId output = root.resolve("file-03.test", StandardResolveOptions.RESOLVE_FILE);
     // More than one shard does.
     try {
-      Iterable<FileResult> results =
+      Iterable<FileResult<Void>> results =
           Lists.newArrayList(
-              new FileResult(temp1, 1, null, null),
-              new FileResult(temp2, 1, null, null),
-              new FileResult(temp3, 1, null, null));
+              new FileResult<Void>(temp1, 1, null, null, null),
+              new FileResult<Void>(temp2, 1, null, null, null),
+              new FileResult<Void>(temp3, 1, null, null, null));
       writeOp.buildOutputFilenames(results);
       fail("Should have failed.");
     } catch (IllegalStateException exn) {
@@ -379,8 +391,10 @@ public class FileBasedSinkTest {
     List<ResourceId> expected;
     List<ResourceId> actual;
     ResourceId root = getBaseOutputDirectory();
-    SimpleSink sink = new SimpleSink(root, "file", "-SSSSS-of-NNNNN", "");
-    FilenamePolicy policy = sink.getFilenamePolicy();
+    SimpleSink<Void> sink =
+        SimpleSink.makeSimpleSink(
+            root, "file", "-SSSSS-of-NNNNN", "", CompressionType.UNCOMPRESSED);
+    FilenamePolicy policy = sink.getDynamicDestinations().getFilenamePolicy(null);
 
     expected = Arrays.asList(
         root.resolve("file-00000-of-00003", StandardResolveOptions.RESOLVE_FILE),
@@ -486,10 +500,11 @@ public class FileBasedSinkTest {
   public void testFileBasedWriterWithWritableByteChannelFactory() throws Exception {
     final String testUid = "testId";
     ResourceId root = getBaseOutputDirectory();
-    WriteOperation<String> writeOp =
-        new SimpleSink(root, "file", "-SS-of-NN", "txt", new DrunkWritableByteChannelFactory())
+    WriteOperation<String, Void> writeOp =
+        SimpleSink.makeSimpleSink(
+                root, "file", "-SS-of-NN", "txt", new DrunkWritableByteChannelFactory())
             .createWriteOperation();
-    final Writer<String> writer = writeOp.createWriter();
+    final Writer<String, Void> writer = writeOp.createWriter();
     final ResourceId expectedFile =
         writeOp.tempDirectory.get().resolve(testUid, StandardResolveOptions.RESOLVE_FILE);
 
@@ -503,7 +518,7 @@ public class FileBasedSinkTest {
     expected.add("footer");
     expected.add("footer");
 
-    writer.openUnwindowed(testUid, -1);
+    writer.openUnwindowed(testUid, -1, null);
     writer.write("a");
     writer.write("b");
     final FileResult result = writer.close();
@@ -513,20 +528,20 @@ public class FileBasedSinkTest {
   }
 
   /** Build a SimpleSink with default options. */
-  private SimpleSink buildSink() {
-    return new SimpleSink(getBaseOutputDirectory(), "file", "-SS-of-NN", ".test");
+  private SimpleSink<Void> buildSink() {
+    return SimpleSink.makeSimpleSink(
+        getBaseOutputDirectory(), "file", "-SS-of-NN", ".test", CompressionType.UNCOMPRESSED);
   }
 
-  /**
-   * Build a SimpleWriteOperation with default options and the given temporary directory.
-   */
-  private SimpleSink.SimpleWriteOperation buildWriteOperationWithTempDir(ResourceId tempDirectory) {
-    SimpleSink sink = buildSink();
-    return new SimpleSink.SimpleWriteOperation(sink, tempDirectory);
+  /** Build a SimpleWriteOperation with default options and the given temporary directory. */
+  private SimpleSink.SimpleWriteOperation<Void> buildWriteOperationWithTempDir(
+      ResourceId tempDirectory) {
+    SimpleSink<Void> sink = buildSink();
+    return new SimpleSink.SimpleWriteOperation<>(sink, tempDirectory);
   }
 
   /** Build a write operation with the default options for it and its parent sink. */
-  private SimpleSink.SimpleWriteOperation buildWriteOperation() {
+  private SimpleSink.SimpleWriteOperation<Void> buildWriteOperation() {
     return buildSink().createWriteOperation();
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/77ba7a35/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java
index bdf37f6..9196178 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java
@@ -19,37 +19,55 @@ package org.apache.beam.sdk.io;
 
 import java.nio.ByteBuffer;
 import java.nio.channels.WritableByteChannel;
+import org.apache.beam.sdk.io.DefaultFilenamePolicy.Params;
+import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
 import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
 import org.apache.beam.sdk.util.MimeTypes;
 
 /**
- * A simple {@link FileBasedSink} that writes {@link String} values as lines with
- * header and footer.
+ * A simple {@link FileBasedSink} that writes {@link String} values as lines with header and footer.
  */
-class SimpleSink extends FileBasedSink<String> {
-  public SimpleSink(ResourceId baseOutputDirectory, String prefix, String template, String suffix) {
-    this(baseOutputDirectory, prefix, template, suffix, CompressionType.UNCOMPRESSED);
+class SimpleSink<DestinationT> extends FileBasedSink<String, DestinationT> {
+  public SimpleSink(
+      ResourceId tempDirectory,
+      DynamicDestinations<String, DestinationT> dynamicDestinations,
+      WritableByteChannelFactory writableByteChannelFactory) {
+    super(StaticValueProvider.of(tempDirectory), dynamicDestinations, writableByteChannelFactory);
   }
 
-  public SimpleSink(ResourceId baseOutputDirectory, String prefix, String template, String suffix,
-                    WritableByteChannelFactory writableByteChannelFactory) {
-    super(
-        StaticValueProvider.of(baseOutputDirectory),
-        new DefaultFilenamePolicy(StaticValueProvider.of(prefix), template, suffix),
-        writableByteChannelFactory);
+  public static SimpleSink<Void> makeSimpleSink(
+      ResourceId tempDirectory, FilenamePolicy filenamePolicy) {
+    return new SimpleSink<>(
+        tempDirectory,
+        DynamicFileDestinations.<String>constant(filenamePolicy),
+        CompressionType.UNCOMPRESSED);
   }
 
-  public SimpleSink(ResourceId baseOutputDirectory, FilenamePolicy filenamePolicy) {
-    super(StaticValueProvider.of(baseOutputDirectory), filenamePolicy);
+  public static SimpleSink<Void> makeSimpleSink(
+      ResourceId baseDirectory,
+      String prefix,
+      String shardTemplate,
+      String suffix,
+      WritableByteChannelFactory writableByteChannelFactory) {
+    DynamicDestinations<String, Void> dynamicDestinations =
+        DynamicFileDestinations.constant(
+            DefaultFilenamePolicy.fromParams(
+                new Params()
+                    .withBaseFilename(
+                        baseDirectory.resolve(prefix, StandardResolveOptions.RESOLVE_FILE))
+                    .withShardTemplate(shardTemplate)
+                    .withSuffix(suffix)));
+    return new SimpleSink<>(baseDirectory, dynamicDestinations, writableByteChannelFactory);
   }
 
   @Override
-  public SimpleWriteOperation createWriteOperation() {
-    return new SimpleWriteOperation(this);
+  public SimpleWriteOperation<DestinationT> createWriteOperation() {
+    return new SimpleWriteOperation<>(this);
   }
 
-  static final class SimpleWriteOperation extends WriteOperation<String> {
+  static final class SimpleWriteOperation<DestinationT>
+      extends WriteOperation<String, DestinationT> {
     public SimpleWriteOperation(SimpleSink sink, ResourceId tempOutputDirectory) {
       super(sink, tempOutputDirectory);
     }
@@ -59,12 +77,12 @@ class SimpleSink extends FileBasedSink<String> {
     }
 
     @Override
-    public SimpleWriter createWriter() throws Exception {
-      return new SimpleWriter(this);
+    public SimpleWriter<DestinationT> createWriter() throws Exception {
+      return new SimpleWriter<>(this);
     }
   }
 
-  static final class SimpleWriter extends Writer<String> {
+  static final class SimpleWriter<DestinationT> extends Writer<String, DestinationT> {
     static final String HEADER = "header";
     static final String FOOTER = "footer";