You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ta...@apache.org on 2017/07/13 03:06:48 UTC

[36/50] [abbrv] beam git commit: Adds DynamicDestinations support to FileBasedSink

Adds DynamicDestinations support to FileBasedSink


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4c336e84
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4c336e84
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4c336e84

Branch: refs/heads/DSL_SQL
Commit: 4c336e840e69e83e15d9ffb7e0a0178dd3ab8404
Parents: 1f6117f
Author: Reuven Lax <re...@google.com>
Authored: Fri Jun 9 17:11:32 2017 -0700
Committer: Tyler Akidau <ta...@apache.org>
Committed: Wed Jul 12 20:01:01 2017 -0700

----------------------------------------------------------------------
 .../examples/common/WriteOneFilePerWindow.java  |  52 +-
 .../beam/examples/WindowedWordCountIT.java      |   4 +-
 .../complete/game/utils/WriteToText.java        |  43 +-
 .../construction/WriteFilesTranslation.java     |  67 +-
 .../construction/PTransformMatchersTest.java    |  22 +-
 .../construction/WriteFilesTranslationTest.java |  62 +-
 .../direct/WriteWithShardingFactory.java        |   6 +-
 .../direct/WriteWithShardingFactoryTest.java    |  18 +-
 .../beam/runners/dataflow/DataflowRunner.java   |  15 +-
 .../runners/dataflow/DataflowRunnerTest.java    |  35 +-
 .../runners/spark/SparkRunnerDebuggerTest.java  |  26 +-
 .../src/main/proto/beam_runner_api.proto        |   7 +-
 .../apache/beam/sdk/coders/ShardedKeyCoder.java |  66 ++
 .../java/org/apache/beam/sdk/io/AvroIO.java     | 220 ++++---
 .../java/org/apache/beam/sdk/io/AvroSink.java   |  32 +-
 .../beam/sdk/io/DefaultFilenamePolicy.java      | 274 +++++---
 .../beam/sdk/io/DynamicFileDestinations.java    | 115 ++++
 .../org/apache/beam/sdk/io/FileBasedSink.java   | 513 ++++++++-------
 .../java/org/apache/beam/sdk/io/TFRecordIO.java |  44 +-
 .../java/org/apache/beam/sdk/io/TextIO.java     | 488 ++++++++++----
 .../java/org/apache/beam/sdk/io/TextSink.java   |  22 +-
 .../java/org/apache/beam/sdk/io/WriteFiles.java | 640 +++++++++++--------
 .../sdk/transforms/SerializableFunctions.java   |  50 ++
 .../org/apache/beam/sdk/values/ShardedKey.java  |  65 ++
 .../java/org/apache/beam/sdk/io/AvroIOTest.java |  85 ++-
 .../beam/sdk/io/DefaultFilenamePolicyTest.java  | 135 ++--
 .../sdk/io/DrunkWritableByteChannelFactory.java |   2 +-
 .../apache/beam/sdk/io/FileBasedSinkTest.java   |  93 +--
 .../java/org/apache/beam/sdk/io/SimpleSink.java |  56 +-
 .../java/org/apache/beam/sdk/io/TextIOTest.java | 264 +++++++-
 .../org/apache/beam/sdk/io/WriteFilesTest.java  | 339 ++++++++--
 .../beam/sdk/io/gcp/bigquery/BatchLoads.java    |   2 +
 .../io/gcp/bigquery/DynamicDestinations.java    |  29 +-
 .../io/gcp/bigquery/GenerateShardedTable.java   |   1 +
 .../beam/sdk/io/gcp/bigquery/ShardedKey.java    |  67 --
 .../sdk/io/gcp/bigquery/ShardedKeyCoder.java    |  74 ---
 .../sdk/io/gcp/bigquery/StreamingWriteFn.java   |   1 +
 .../io/gcp/bigquery/StreamingWriteTables.java   |   2 +
 .../sdk/io/gcp/bigquery/TagWithUniqueIds.java   |   1 +
 .../io/gcp/bigquery/WriteBundlesToFiles.java    |   2 +
 .../bigquery/WriteGroupedRecordsToFiles.java    |   1 +
 .../sdk/io/gcp/bigquery/WritePartition.java     |   1 +
 .../beam/sdk/io/gcp/bigquery/WriteTables.java   |   1 +
 .../sdk/io/gcp/bigquery/BigQueryIOTest.java     |   2 +
 .../java/org/apache/beam/sdk/io/xml/XmlIO.java  |   4 +-
 .../org/apache/beam/sdk/io/xml/XmlSink.java     |  21 +-
 .../org/apache/beam/sdk/io/xml/XmlSinkTest.java |   4 +-
 47 files changed, 2710 insertions(+), 1363 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/4c336e84/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java b/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java
index 5e6df9c..49865ba 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java
@@ -17,11 +17,12 @@
  */
 package org.apache.beam.examples.common;
 
-import static com.google.common.base.Verify.verifyNotNull;
+import static com.google.common.base.MoreObjects.firstNonNull;
 
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.io.FileBasedSink;
 import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
+import org.apache.beam.sdk.io.FileBasedSink.OutputFileHints;
 import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
 import org.apache.beam.sdk.io.fs.ResourceId;
@@ -53,22 +54,12 @@ public class WriteOneFilePerWindow extends PTransform<PCollection<String>, PDone
 
   @Override
   public PDone expand(PCollection<String> input) {
-    // filenamePrefix may contain a directory and a filename component. Pull out only the filename
-    // component from that path for the PerWindowFiles.
-    String prefix = "";
     ResourceId resource = FileBasedSink.convertToFileResourceIfPossible(filenamePrefix);
-    if (!resource.isDirectory()) {
-      prefix = verifyNotNull(
-          resource.getFilename(),
-          "A non-directory resource should have a non-null filename: %s",
-          resource);
-    }
-
-
-    TextIO.Write write = TextIO.write()
-        .to(resource.getCurrentDirectory())
-        .withFilenamePolicy(new PerWindowFiles(prefix))
-        .withWindowedWrites();
+    TextIO.Write write =
+        TextIO.write()
+            .to(new PerWindowFiles(resource))
+            .withTempDirectory(resource.getCurrentDirectory())
+            .withWindowedWrites();
     if (numShards != null) {
       write = write.withNumShards(numShards);
     }
@@ -83,31 +74,36 @@ public class WriteOneFilePerWindow extends PTransform<PCollection<String>, PDone
    */
   public static class PerWindowFiles extends FilenamePolicy {
 
-    private final String prefix;
+    private final ResourceId baseFilename;
 
-    public PerWindowFiles(String prefix) {
-      this.prefix = prefix;
+    public PerWindowFiles(ResourceId baseFilename) {
+      this.baseFilename = baseFilename;
     }
 
     public String filenamePrefixForWindow(IntervalWindow window) {
+      String prefix =
+          baseFilename.isDirectory() ? "" : firstNonNull(baseFilename.getFilename(), "");
       return String.format("%s-%s-%s",
           prefix, FORMATTER.print(window.start()), FORMATTER.print(window.end()));
     }
 
     @Override
-    public ResourceId windowedFilename(
-        ResourceId outputDirectory, WindowedContext context, String extension) {
+    public ResourceId windowedFilename(WindowedContext context, OutputFileHints outputFileHints) {
       IntervalWindow window = (IntervalWindow) context.getWindow();
-      String filename = String.format(
-          "%s-%s-of-%s%s",
-          filenamePrefixForWindow(window), context.getShardNumber(), context.getNumShards(),
-          extension);
-      return outputDirectory.resolve(filename, StandardResolveOptions.RESOLVE_FILE);
+      String filename =
+          String.format(
+              "%s-%s-of-%s%s",
+              filenamePrefixForWindow(window),
+              context.getShardNumber(),
+              context.getNumShards(),
+              outputFileHints.getSuggestedFilenameSuffix());
+      return baseFilename
+          .getCurrentDirectory()
+          .resolve(filename, StandardResolveOptions.RESOLVE_FILE);
     }
 
     @Override
-    public ResourceId unwindowedFilename(
-        ResourceId outputDirectory, Context context, String extension) {
+    public ResourceId unwindowedFilename(Context context, OutputFileHints outputFileHints) {
       throw new UnsupportedOperationException("Unsupported.");
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/4c336e84/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java
----------------------------------------------------------------------
diff --git a/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java b/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java
index eb7e4c4..bec7952 100644
--- a/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java
+++ b/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java
@@ -32,6 +32,7 @@ import java.util.concurrent.ThreadLocalRandom;
 import org.apache.beam.examples.common.ExampleUtils;
 import org.apache.beam.examples.common.WriteOneFilePerWindow.PerWindowFiles;
 import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.io.FileBasedSink;
 import org.apache.beam.sdk.io.FileSystems;
 import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -149,7 +150,8 @@ public class WindowedWordCountIT {
 
     String outputPrefix = options.getOutput();
 
-    PerWindowFiles filenamePolicy = new PerWindowFiles(outputPrefix);
+    PerWindowFiles filenamePolicy =
+        new PerWindowFiles(FileBasedSink.convertToFileResourceIfPossible(outputPrefix));
 
     List<ShardedFile> expectedOutputFiles = Lists.newArrayListWithCapacity(6);
 

http://git-wip-us.apache.org/repos/asf/beam/blob/4c336e84/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToText.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToText.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToText.java
index e6c8ddb..1d60198 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToText.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToText.java
@@ -18,7 +18,6 @@
 package org.apache.beam.examples.complete.game.utils;
 
 import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Verify.verifyNotNull;
 
 import java.io.Serializable;
 import java.util.ArrayList;
@@ -28,6 +27,7 @@ import java.util.TimeZone;
 import java.util.stream.Collectors;
 import org.apache.beam.sdk.io.FileBasedSink;
 import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
+import org.apache.beam.sdk.io.FileBasedSink.OutputFileHints;
 import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
 import org.apache.beam.sdk.io.fs.ResourceId;
@@ -111,21 +111,12 @@ public class WriteToText<InputT>
       checkArgument(
           input.getWindowingStrategy().getWindowFn().windowCoder() == IntervalWindow.getCoder());
 
-      // filenamePrefix may contain a directory and a filename component. Pull out only the filename
-      // component from that path for the PerWindowFiles.
-      String prefix = "";
       ResourceId resource = FileBasedSink.convertToFileResourceIfPossible(filenamePrefix);
-      if (!resource.isDirectory()) {
-        prefix = verifyNotNull(
-            resource.getFilename(),
-            "A non-directory resource should have a non-null filename: %s",
-            resource);
-      }
 
       return input.apply(
           TextIO.write()
-              .to(resource.getCurrentDirectory())
-              .withFilenamePolicy(new PerWindowFiles(prefix))
+              .to(new PerWindowFiles(resource))
+              .withTempDirectory(resource.getCurrentDirectory())
               .withWindowedWrites()
               .withNumShards(3));
     }
@@ -139,31 +130,33 @@ public class WriteToText<InputT>
    */
   protected static class PerWindowFiles extends FilenamePolicy {
 
-    private final String prefix;
+    private final ResourceId prefix;
 
-    public PerWindowFiles(String prefix) {
+    public PerWindowFiles(ResourceId prefix) {
       this.prefix = prefix;
     }
 
     public String filenamePrefixForWindow(IntervalWindow window) {
-      return String.format("%s-%s-%s",
-          prefix, formatter.print(window.start()), formatter.print(window.end()));
+      String filePrefix = prefix.isDirectory() ? "" : prefix.getFilename();
+      return String.format(
+          "%s-%s-%s", filePrefix, formatter.print(window.start()), formatter.print(window.end()));
     }
 
     @Override
-    public ResourceId windowedFilename(
-        ResourceId outputDirectory, WindowedContext context, String extension) {
+    public ResourceId windowedFilename(WindowedContext context, OutputFileHints outputFileHints) {
       IntervalWindow window = (IntervalWindow) context.getWindow();
-      String filename = String.format(
-          "%s-%s-of-%s%s",
-          filenamePrefixForWindow(window), context.getShardNumber(), context.getNumShards(),
-          extension);
-      return outputDirectory.resolve(filename, StandardResolveOptions.RESOLVE_FILE);
+      String filename =
+          String.format(
+              "%s-%s-of-%s%s",
+              filenamePrefixForWindow(window),
+              context.getShardNumber(),
+              context.getNumShards(),
+              outputFileHints.getSuggestedFilenameSuffix());
+      return prefix.getCurrentDirectory().resolve(filename, StandardResolveOptions.RESOLVE_FILE);
     }
 
     @Override
-    public ResourceId unwindowedFilename(
-        ResourceId outputDirectory, Context context, String extension) {
+    public ResourceId unwindowedFilename(Context context, OutputFileHints outputFileHints) {
       throw new UnsupportedOperationException("Unsupported.");
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/4c336e84/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java
index 99b77ef..b1d2da4 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java
@@ -26,6 +26,7 @@ import com.google.protobuf.Any;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.BytesValue;
 import java.io.IOException;
+import java.io.Serializable;
 import java.util.Collections;
 import java.util.Map;
 import org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator;
@@ -37,6 +38,7 @@ import org.apache.beam.sdk.io.FileBasedSink;
 import org.apache.beam.sdk.io.WriteFiles;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PDone;
@@ -51,32 +53,45 @@ public class WriteFilesTranslation {
   public static final String CUSTOM_JAVA_FILE_BASED_SINK_URN =
       "urn:beam:file_based_sink:javasdk:0.1";
 
+  public static final String CUSTOM_JAVA_FILE_BASED_SINK_FORMAT_FUNCTION_URN =
+      "urn:beam:file_based_sink_format_function:javasdk:0.1";
+
   @VisibleForTesting
-  static WriteFilesPayload toProto(WriteFiles<?> transform) {
+  static WriteFilesPayload toProto(WriteFiles<?, ?, ?> transform) {
     return WriteFilesPayload.newBuilder()
         .setSink(toProto(transform.getSink()))
+        .setFormatFunction(toProto(transform.getFormatFunction()))
         .setWindowedWrites(transform.isWindowedWrites())
         .setRunnerDeterminedSharding(
             transform.getNumShards() == null && transform.getSharding() == null)
         .build();
   }
 
-  private static SdkFunctionSpec toProto(FileBasedSink<?> sink) {
+  private static SdkFunctionSpec toProto(FileBasedSink<?, ?> sink) {
+    return toProto(CUSTOM_JAVA_FILE_BASED_SINK_URN, sink);
+  }
+
+  private static SdkFunctionSpec toProto(SerializableFunction<?, ?> serializableFunction) {
+    return toProto(CUSTOM_JAVA_FILE_BASED_SINK_FORMAT_FUNCTION_URN, serializableFunction);
+  }
+
+  private static SdkFunctionSpec toProto(String urn, Serializable serializable) {
     return SdkFunctionSpec.newBuilder()
         .setSpec(
             FunctionSpec.newBuilder()
-                .setUrn(CUSTOM_JAVA_FILE_BASED_SINK_URN)
+                .setUrn(urn)
                 .setParameter(
                     Any.pack(
                         BytesValue.newBuilder()
                             .setValue(
-                                ByteString.copyFrom(SerializableUtils.serializeToByteArray(sink)))
+                                ByteString.copyFrom(
+                                    SerializableUtils.serializeToByteArray(serializable)))
                             .build())))
         .build();
   }
 
   @VisibleForTesting
-  static FileBasedSink<?> sinkFromProto(SdkFunctionSpec sinkProto) throws IOException {
+  static FileBasedSink<?, ?> sinkFromProto(SdkFunctionSpec sinkProto) throws IOException {
     checkArgument(
         sinkProto.getSpec().getUrn().equals(CUSTOM_JAVA_FILE_BASED_SINK_URN),
         "Cannot extract %s instance from %s with URN %s",
@@ -87,16 +102,44 @@ public class WriteFilesTranslation {
     byte[] serializedSink =
         sinkProto.getSpec().getParameter().unpack(BytesValue.class).getValue().toByteArray();
 
-    return (FileBasedSink<?>)
+    return (FileBasedSink<?, ?>)
         SerializableUtils.deserializeFromByteArray(
             serializedSink, FileBasedSink.class.getSimpleName());
   }
 
-  public static <T> FileBasedSink<T> getSink(
-      AppliedPTransform<PCollection<T>, PDone, ? extends PTransform<PCollection<T>, PDone>>
+  @VisibleForTesting
+  static <InputT, OutputT> SerializableFunction<InputT, OutputT> formatFunctionFromProto(
+      SdkFunctionSpec sinkProto) throws IOException {
+    checkArgument(
+        sinkProto.getSpec().getUrn().equals(CUSTOM_JAVA_FILE_BASED_SINK_FORMAT_FUNCTION_URN),
+        "Cannot extract %s instance from %s with URN %s",
+        SerializableFunction.class.getSimpleName(),
+        FunctionSpec.class.getSimpleName(),
+        sinkProto.getSpec().getUrn());
+
+    byte[] serializedFunction =
+        sinkProto.getSpec().getParameter().unpack(BytesValue.class).getValue().toByteArray();
+
+    return (SerializableFunction<InputT, OutputT>)
+        SerializableUtils.deserializeFromByteArray(
+            serializedFunction, FileBasedSink.class.getSimpleName());
+  }
+
+  public static <UserT, DestinationT, OutputT> FileBasedSink<OutputT, DestinationT> getSink(
+      AppliedPTransform<PCollection<UserT>, PDone, ? extends PTransform<PCollection<UserT>, PDone>>
+          transform)
+      throws IOException {
+    return (FileBasedSink<OutputT, DestinationT>)
+        sinkFromProto(getWriteFilesPayload(transform).getSink());
+  }
+
+  public static <InputT, OutputT> SerializableFunction<InputT, OutputT> getFormatFunction(
+      AppliedPTransform<
+              PCollection<InputT>, PDone, ? extends PTransform<PCollection<InputT>, PDone>>
           transform)
       throws IOException {
-    return (FileBasedSink<T>) sinkFromProto(getWriteFilesPayload(transform).getSink());
+    return formatFunctionFromProto(
+        getWriteFilesPayload(transform).<InputT, OutputT>getFormatFunction());
   }
 
   public static <T> boolean isWindowedWrites(
@@ -124,15 +167,15 @@ public class WriteFilesTranslation {
         .unpack(WriteFilesPayload.class);
   }
 
-  static class WriteFilesTranslator implements TransformPayloadTranslator<WriteFiles<?>> {
+  static class WriteFilesTranslator implements TransformPayloadTranslator<WriteFiles<?, ?, ?>> {
     @Override
-    public String getUrn(WriteFiles<?> transform) {
+    public String getUrn(WriteFiles<?, ?, ?> transform) {
       return PTransformTranslation.WRITE_FILES_TRANSFORM_URN;
     }
 
     @Override
     public FunctionSpec translate(
-        AppliedPTransform<?, ?, WriteFiles<?>> transform, SdkComponents components) {
+        AppliedPTransform<?, ?, WriteFiles<?, ?, ?>> transform, SdkComponents components) {
       return FunctionSpec.newBuilder()
           .setUrn(getUrn(transform.getTransform()))
           .setParameter(Any.pack(toProto(transform.getTransform())))

http://git-wip-us.apache.org/repos/asf/beam/blob/4c336e84/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
index 6459849..99d3dd1 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
@@ -32,6 +32,7 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.DefaultFilenamePolicy;
+import org.apache.beam.sdk.io.DynamicFileDestinations;
 import org.apache.beam.sdk.io.FileBasedSink;
 import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
 import org.apache.beam.sdk.io.LocalResources;
@@ -55,6 +56,7 @@ import org.apache.beam.sdk.transforms.Materialization;
 import org.apache.beam.sdk.transforms.Materializations;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunctions;
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
@@ -537,30 +539,32 @@ public class PTransformMatchersTest implements Serializable {
   public void writeWithRunnerDeterminedSharding() {
     ResourceId outputDirectory = LocalResources.fromString("/foo/bar", true /* isDirectory */);
     FilenamePolicy policy =
-        DefaultFilenamePolicy.constructUsingStandardParameters(
+        DefaultFilenamePolicy.fromStandardParameters(
             StaticValueProvider.of(outputDirectory),
             DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE,
             "",
             false);
-    WriteFiles<Integer> write =
+    WriteFiles<Integer, Void, Integer> write =
         WriteFiles.to(
-            new FileBasedSink<Integer>(StaticValueProvider.of(outputDirectory), policy) {
+            new FileBasedSink<Integer, Void>(
+                StaticValueProvider.of(outputDirectory), DynamicFileDestinations.constant(null)) {
               @Override
-              public WriteOperation<Integer> createWriteOperation() {
+              public WriteOperation<Integer, Void> createWriteOperation() {
                 return null;
               }
-            });
+            },
+            SerializableFunctions.<Integer>identity());
     assertThat(
         PTransformMatchers.writeWithRunnerDeterminedSharding().matches(appliedWrite(write)),
         is(true));
 
-    WriteFiles<Integer> withStaticSharding = write.withNumShards(3);
+    WriteFiles<Integer, Void, Integer> withStaticSharding = write.withNumShards(3);
     assertThat(
         PTransformMatchers.writeWithRunnerDeterminedSharding()
             .matches(appliedWrite(withStaticSharding)),
         is(false));
 
-    WriteFiles<Integer> withCustomSharding =
+    WriteFiles<Integer, Void, Integer> withCustomSharding =
         write.withSharding(Sum.integersGlobally().asSingletonView());
     assertThat(
         PTransformMatchers.writeWithRunnerDeterminedSharding()
@@ -568,8 +572,8 @@ public class PTransformMatchersTest implements Serializable {
         is(false));
   }
 
-  private AppliedPTransform<?, ?, ?> appliedWrite(WriteFiles<Integer> write) {
-    return AppliedPTransform.<PCollection<Integer>, PDone, WriteFiles<Integer>>of(
+  private AppliedPTransform<?, ?, ?> appliedWrite(WriteFiles<Integer, Void, Integer> write) {
+    return AppliedPTransform.<PCollection<Integer>, PDone, WriteFiles<Integer, Void, Integer>>of(
         "WriteFiles",
         Collections.<TupleTag<?>, PValue>emptyMap(),
         Collections.<TupleTag<?>, PValue>emptyMap(),

http://git-wip-us.apache.org/repos/asf/beam/blob/4c336e84/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java
index 739034c..283df16 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java
@@ -26,8 +26,10 @@ import java.util.Objects;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi.ParDoPayload;
+import org.apache.beam.sdk.io.DynamicFileDestinations;
 import org.apache.beam.sdk.io.FileBasedSink;
 import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
+import org.apache.beam.sdk.io.FileBasedSink.OutputFileHints;
 import org.apache.beam.sdk.io.FileSystems;
 import org.apache.beam.sdk.io.WriteFiles;
 import org.apache.beam.sdk.io.fs.ResourceId;
@@ -36,6 +38,8 @@ import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.SerializableFunctions;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PDone;
 import org.junit.Test;
@@ -56,16 +60,17 @@ public class WriteFilesTranslationTest {
   @RunWith(Parameterized.class)
   public static class TestWriteFilesPayloadTranslation {
     @Parameters(name = "{index}: {0}")
-    public static Iterable<WriteFiles<?>> data() {
-      return ImmutableList.<WriteFiles<?>>of(
-          WriteFiles.to(new DummySink()),
-          WriteFiles.to(new DummySink()).withWindowedWrites(),
-          WriteFiles.to(new DummySink()).withNumShards(17),
-          WriteFiles.to(new DummySink()).withWindowedWrites().withNumShards(42));
+    public static Iterable<WriteFiles<Object, Void, Object>> data() {
+      SerializableFunction<Object, Object> format = SerializableFunctions.constant(null);
+      return ImmutableList.of(
+          WriteFiles.to(new DummySink(), format),
+          WriteFiles.to(new DummySink(), format).withWindowedWrites(),
+          WriteFiles.to(new DummySink(), format).withNumShards(17),
+          WriteFiles.to(new DummySink(), format).withWindowedWrites().withNumShards(42));
     }
 
     @Parameter(0)
-    public WriteFiles<String> writeFiles;
+    public WriteFiles<String, Void, String> writeFiles;
 
     public static TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);
 
@@ -80,7 +85,7 @@ public class WriteFilesTranslationTest {
       assertThat(payload.getWindowedWrites(), equalTo(writeFiles.isWindowedWrites()));
 
       assertThat(
-          (FileBasedSink<String>) WriteFilesTranslation.sinkFromProto(payload.getSink()),
+          (FileBasedSink<String, Void>) WriteFilesTranslation.sinkFromProto(payload.getSink()),
           equalTo(writeFiles.getSink()));
     }
 
@@ -89,9 +94,9 @@ public class WriteFilesTranslationTest {
       PCollection<String> input = p.apply(Create.of("hello"));
       PDone output = input.apply(writeFiles);
 
-      AppliedPTransform<PCollection<String>, PDone, WriteFiles<String>> appliedPTransform =
-          AppliedPTransform.<PCollection<String>, PDone, WriteFiles<String>>of(
-              "foo", input.expand(), output.expand(), writeFiles, p);
+      AppliedPTransform<PCollection<String>, PDone, WriteFiles<String, Void, String>>
+          appliedPTransform =
+              AppliedPTransform.of("foo", input.expand(), output.expand(), writeFiles, p);
 
       assertThat(
           WriteFilesTranslation.isRunnerDeterminedSharding(appliedPTransform),
@@ -101,7 +106,9 @@ public class WriteFilesTranslationTest {
           WriteFilesTranslation.isWindowedWrites(appliedPTransform),
           equalTo(writeFiles.isWindowedWrites()));
 
-      assertThat(WriteFilesTranslation.getSink(appliedPTransform), equalTo(writeFiles.getSink()));
+      assertThat(
+          WriteFilesTranslation.<String, Void, String>getSink(appliedPTransform),
+          equalTo(writeFiles.getSink()));
     }
   }
 
@@ -109,16 +116,16 @@ public class WriteFilesTranslationTest {
    * A simple {@link FileBasedSink} for testing serialization/deserialization. Not mocked to avoid
    * any issues serializing mocks.
    */
-  private static class DummySink extends FileBasedSink<String> {
+  private static class DummySink extends FileBasedSink<Object, Void> {
 
     DummySink() {
       super(
           StaticValueProvider.of(FileSystems.matchNewResource("nowhere", false)),
-          new DummyFilenamePolicy());
+          DynamicFileDestinations.constant(new DummyFilenamePolicy()));
     }
 
     @Override
-    public WriteOperation<String> createWriteOperation() {
+    public WriteOperation<Object, Void> createWriteOperation() {
       return new DummyWriteOperation(this);
     }
 
@@ -130,46 +137,39 @@ public class WriteFilesTranslationTest {
 
       DummySink that = (DummySink) other;
 
-      return getFilenamePolicy().equals(((DummySink) other).getFilenamePolicy())
-          && getBaseOutputDirectoryProvider().isAccessible()
-          && that.getBaseOutputDirectoryProvider().isAccessible()
-          && getBaseOutputDirectoryProvider()
-              .get()
-              .equals(that.getBaseOutputDirectoryProvider().get());
+      return getTempDirectoryProvider().isAccessible()
+          && that.getTempDirectoryProvider().isAccessible()
+          && getTempDirectoryProvider().get().equals(that.getTempDirectoryProvider().get());
     }
 
     @Override
     public int hashCode() {
       return Objects.hash(
           DummySink.class,
-          getFilenamePolicy(),
-          getBaseOutputDirectoryProvider().isAccessible()
-              ? getBaseOutputDirectoryProvider().get()
-              : null);
+          getTempDirectoryProvider().isAccessible() ? getTempDirectoryProvider().get() : null);
     }
   }
 
-  private static class DummyWriteOperation extends FileBasedSink.WriteOperation<String> {
-    public DummyWriteOperation(FileBasedSink<String> sink) {
+  private static class DummyWriteOperation extends FileBasedSink.WriteOperation<Object, Void> {
+    public DummyWriteOperation(FileBasedSink<Object, Void> sink) {
       super(sink);
     }
 
     @Override
-    public FileBasedSink.Writer<String> createWriter() throws Exception {
+    public FileBasedSink.Writer<Object, Void> createWriter() throws Exception {
       throw new UnsupportedOperationException("Should never be called.");
     }
   }
 
   private static class DummyFilenamePolicy extends FilenamePolicy {
     @Override
-    public ResourceId windowedFilename(
-        ResourceId outputDirectory, WindowedContext c, String extension) {
+    public ResourceId windowedFilename(WindowedContext c, OutputFileHints outputFileHints) {
       throw new UnsupportedOperationException("Should never be called.");
     }
 
     @Nullable
     @Override
-    public ResourceId unwindowedFilename(ResourceId outputDirectory, Context c, String extension) {
+    public ResourceId unwindowedFilename(Context c, OutputFileHints outputFileHints) {
       throw new UnsupportedOperationException("Should never be called.");
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/4c336e84/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
index d8734a1..ba796ae 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
@@ -60,9 +60,11 @@ class WriteWithShardingFactory<InputT>
   public PTransformReplacement<PCollection<InputT>, PDone> getReplacementTransform(
       AppliedPTransform<PCollection<InputT>, PDone, PTransform<PCollection<InputT>, PDone>>
           transform) {
-
     try {
-      WriteFiles<InputT> replacement = WriteFiles.to(WriteFilesTranslation.getSink(transform));
+      WriteFiles<InputT, ?, ?> replacement =
+          WriteFiles.to(
+              WriteFilesTranslation.getSink(transform),
+              WriteFilesTranslation.getFormatFunction(transform));
       if (WriteFilesTranslation.isWindowedWrites(transform)) {
         replacement = replacement.withWindowedWrites();
       }

http://git-wip-us.apache.org/repos/asf/beam/blob/4c336e84/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
index 41d671f..546a181 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
@@ -39,9 +39,8 @@ import java.util.UUID;
 import org.apache.beam.runners.direct.WriteWithShardingFactory.CalculateShardsFn;
 import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
-import org.apache.beam.sdk.io.DefaultFilenamePolicy;
+import org.apache.beam.sdk.io.DynamicFileDestinations;
 import org.apache.beam.sdk.io.FileBasedSink;
-import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
 import org.apache.beam.sdk.io.FileSystems;
 import org.apache.beam.sdk.io.LocalResources;
 import org.apache.beam.sdk.io.TextIO;
@@ -55,6 +54,7 @@ import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.DoFnTester;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.SerializableFunctions;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
@@ -137,21 +137,17 @@ public class WriteWithShardingFactoryTest implements Serializable {
   @Test
   public void withNoShardingSpecifiedReturnsNewTransform() {
     ResourceId outputDirectory = LocalResources.fromString("/foo", true /* isDirectory */);
-    FilenamePolicy policy =
-        DefaultFilenamePolicy.constructUsingStandardParameters(
-            StaticValueProvider.of(outputDirectory),
-            DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE,
-            "",
-            false);
 
     PTransform<PCollection<Object>, PDone> original =
         WriteFiles.to(
-            new FileBasedSink<Object>(StaticValueProvider.of(outputDirectory), policy) {
+            new FileBasedSink<Object, Void>(
+                StaticValueProvider.of(outputDirectory), DynamicFileDestinations.constant(null)) {
               @Override
-              public WriteOperation<Object> createWriteOperation() {
+              public WriteOperation<Object, Void> createWriteOperation() {
                 throw new IllegalArgumentException("Should not be used");
               }
-            });
+            },
+            SerializableFunctions.identity());
     @SuppressWarnings("unchecked")
     PCollection<Object> objs = (PCollection) p.apply(Create.empty(VoidCoder.of()));
 

http://git-wip-us.apache.org/repos/asf/beam/blob/4c336e84/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 5d9f0f3..8935759 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -1455,8 +1455,9 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
   }
 
   @VisibleForTesting
-  static class StreamingShardedWriteFactory<T>
-      implements PTransformOverrideFactory<PCollection<T>, PDone, WriteFiles<T>> {
+  static class StreamingShardedWriteFactory<UserT, DestinationT, OutputT>
+      implements PTransformOverrideFactory<
+          PCollection<UserT>, PDone, WriteFiles<UserT, DestinationT, OutputT>> {
     // We pick 10 as a a default, as it works well with the default number of workers started
     // by Dataflow.
     static final int DEFAULT_NUM_SHARDS = 10;
@@ -1467,8 +1468,9 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
     }
 
     @Override
-    public PTransformReplacement<PCollection<T>, PDone> getReplacementTransform(
-        AppliedPTransform<PCollection<T>, PDone, WriteFiles<T>> transform) {
+    public PTransformReplacement<PCollection<UserT>, PDone> getReplacementTransform(
+        AppliedPTransform<PCollection<UserT>, PDone, WriteFiles<UserT, DestinationT, OutputT>>
+            transform) {
       // By default, if numShards is not set WriteFiles will produce one file per bundle. In
       // streaming, there are large numbers of small bundles, resulting in many tiny files.
       // Instead we pick max workers * 2 to ensure full parallelism, but prevent too-many files.
@@ -1485,7 +1487,10 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
       }
 
       try {
-        WriteFiles<T> replacement = WriteFiles.to(WriteFilesTranslation.getSink(transform));
+        WriteFiles<UserT, DestinationT, OutputT> replacement =
+            WriteFiles.<UserT, DestinationT, OutputT>to(
+                WriteFilesTranslation.<UserT, DestinationT, OutputT>getSink(transform),
+                WriteFilesTranslation.<UserT, OutputT>getFormatFunction(transform));
         if (WriteFilesTranslation.isWindowedWrites(transform)) {
           replacement = replacement.withWindowedWrites();
         }

http://git-wip-us.apache.org/repos/asf/beam/blob/4c336e84/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index bc1a042..94985f8 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -76,6 +76,7 @@ import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.extensions.gcp.auth.NoopCredentialFactory;
 import org.apache.beam.sdk.extensions.gcp.auth.TestCredential;
 import org.apache.beam.sdk.extensions.gcp.storage.NoopPathValidator;
+import org.apache.beam.sdk.io.DynamicFileDestinations;
 import org.apache.beam.sdk.io.FileBasedSink;
 import org.apache.beam.sdk.io.FileSystems;
 import org.apache.beam.sdk.io.TextIO;
@@ -100,6 +101,7 @@ import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunctions;
 import org.apache.beam.sdk.transforms.windowing.Sessions;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.util.GcsUtil;
@@ -1263,30 +1265,39 @@ public class DataflowRunnerTest implements Serializable {
   private void testStreamingWriteOverride(PipelineOptions options, int expectedNumShards) {
     TestPipeline p = TestPipeline.fromOptions(options);
 
-    StreamingShardedWriteFactory<Object> factory =
+    StreamingShardedWriteFactory<Object, Void, Object> factory =
         new StreamingShardedWriteFactory<>(p.getOptions());
-    WriteFiles<Object> original = WriteFiles.to(new TestSink(tmpFolder.toString()));
+    WriteFiles<Object, Void, Object> original =
+        WriteFiles.to(new TestSink(tmpFolder.toString()), SerializableFunctions.identity());
     PCollection<Object> objs = (PCollection) p.apply(Create.empty(VoidCoder.of()));
-    AppliedPTransform<PCollection<Object>, PDone, WriteFiles<Object>> originalApplication =
-        AppliedPTransform.of(
-            "writefiles", objs.expand(), Collections.<TupleTag<?>, PValue>emptyMap(), original, p);
-
-    WriteFiles<Object> replacement = (WriteFiles<Object>)
-        factory.getReplacementTransform(originalApplication).getTransform();
+    AppliedPTransform<PCollection<Object>, PDone, WriteFiles<Object, Void, Object>>
+        originalApplication =
+            AppliedPTransform.of(
+                "writefiles",
+                objs.expand(),
+                Collections.<TupleTag<?>, PValue>emptyMap(),
+                original,
+                p);
+
+    WriteFiles<Object, Void, Object> replacement =
+        (WriteFiles<Object, Void, Object>)
+            factory.getReplacementTransform(originalApplication).getTransform();
     assertThat(replacement, not(equalTo((Object) original)));
     assertThat(replacement.getNumShards().get(), equalTo(expectedNumShards));
   }
 
-  private static class TestSink extends FileBasedSink<Object> {
+  private static class TestSink extends FileBasedSink<Object, Void> {
     @Override
     public void validate(PipelineOptions options) {}
 
     TestSink(String tmpFolder) {
-      super(StaticValueProvider.of(FileSystems.matchNewResource(tmpFolder, true)),
-          null);
+      super(
+          StaticValueProvider.of(FileSystems.matchNewResource(tmpFolder, true)),
+          DynamicFileDestinations.constant(null));
     }
+
     @Override
-    public WriteOperation<Object> createWriteOperation() {
+    public WriteOperation<Object, Void> createWriteOperation() {
       throw new IllegalArgumentException("Should not be used");
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/4c336e84/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java
index 64ff98c..246eb81 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java
@@ -52,7 +52,6 @@ import org.hamcrest.Matchers;
 import org.joda.time.Duration;
 import org.junit.Test;
 
-
 /**
  * Test {@link SparkRunnerDebugger} with different pipelines.
  */
@@ -85,17 +84,20 @@ public class SparkRunnerDebuggerTest {
         .apply(MapElements.via(new WordCount.FormatAsTextFn()))
         .apply(TextIO.write().to("!!PLACEHOLDER-OUTPUT-DIR!!").withNumShards(3).withSuffix(".txt"));
 
-    final String expectedPipeline = "sparkContext.parallelize(Arrays.asList(...))\n"
-        + "_.mapPartitions(new org.apache.beam.runners.spark.examples.WordCount$ExtractWordsFn())\n"
-        + "_.mapPartitions(new org.apache.beam.sdk.transforms.Count$PerElement$1())\n"
-        + "_.combineByKey(..., new org.apache.beam.sdk.transforms.Count$CountFn(), ...)\n"
-        + "_.groupByKey()\n"
-        + "_.map(new org.apache.beam.sdk.transforms.Sum$SumLongFn())\n"
-        + "_.mapPartitions(new org.apache.beam.runners.spark"
-        + ".SparkRunnerDebuggerTest$PlusOne())\n"
-        + "sparkContext.union(...)\n"
-        + "_.mapPartitions(new org.apache.beam.runners.spark.examples.WordCount$FormatAsTextFn())\n"
-        + "_.<org.apache.beam.sdk.io.AutoValue_TextIO_Write>";
+    final String expectedPipeline =
+        "sparkContext.parallelize(Arrays.asList(...))\n"
+            + "_.mapPartitions("
+            + "new org.apache.beam.runners.spark.examples.WordCount$ExtractWordsFn())\n"
+            + "_.mapPartitions(new org.apache.beam.sdk.transforms.Count$PerElement$1())\n"
+            + "_.combineByKey(..., new org.apache.beam.sdk.transforms.Count$CountFn(), ...)\n"
+            + "_.groupByKey()\n"
+            + "_.map(new org.apache.beam.sdk.transforms.Sum$SumLongFn())\n"
+            + "_.mapPartitions(new org.apache.beam.runners.spark"
+            + ".SparkRunnerDebuggerTest$PlusOne())\n"
+            + "sparkContext.union(...)\n"
+            + "_.mapPartitions("
+            + "new org.apache.beam.runners.spark.examples.WordCount$FormatAsTextFn())\n"
+            + "_.<org.apache.beam.sdk.io.TextIO$Write>";
 
     SparkRunnerDebugger.DebugSparkPipelineResult result =
         (SparkRunnerDebugger.DebugSparkPipelineResult) pipeline.run();

http://git-wip-us.apache.org/repos/asf/beam/blob/4c336e84/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
----------------------------------------------------------------------
diff --git a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
index 24e907a..1f74afb 100644
--- a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
+++ b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
@@ -367,9 +367,12 @@ message WriteFilesPayload {
   // (Required) The SdkFunctionSpec of the FileBasedSink.
   SdkFunctionSpec sink = 1;
 
-  bool windowed_writes = 2;
+  // (Required) The format function.
+  SdkFunctionSpec format_function = 2;
 
-  bool runner_determined_sharding = 3;
+  bool windowed_writes = 3;
+
+  bool runner_determined_sharding = 4;
 }
 
 // A coder, the binary format for serialization and deserialization of data in

http://git-wip-us.apache.org/repos/asf/beam/blob/4c336e84/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ShardedKeyCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ShardedKeyCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ShardedKeyCoder.java
new file mode 100644
index 0000000..a86b198
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ShardedKeyCoder.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.coders;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.sdk.values.ShardedKey;
+
+/** A {@link Coder} for {@link ShardedKey}, using a wrapped key {@link Coder}. */
+@VisibleForTesting
+public class ShardedKeyCoder<KeyT> extends StructuredCoder<ShardedKey<KeyT>> {
+  public static <KeyT> ShardedKeyCoder<KeyT> of(Coder<KeyT> keyCoder) {
+    return new ShardedKeyCoder<>(keyCoder);
+  }
+
+  private final Coder<KeyT> keyCoder;
+  private final VarIntCoder shardNumberCoder;
+
+  protected ShardedKeyCoder(Coder<KeyT> keyCoder) {
+    this.keyCoder = keyCoder;
+    this.shardNumberCoder = VarIntCoder.of();
+  }
+
+  @Override
+  public List<? extends Coder<?>> getCoderArguments() {
+    return Arrays.asList(keyCoder);
+  }
+
+  @Override
+  public void encode(ShardedKey<KeyT> key, OutputStream outStream)
+      throws IOException {
+    keyCoder.encode(key.getKey(), outStream);
+    shardNumberCoder.encode(key.getShardNumber(), outStream);
+  }
+
+  @Override
+  public ShardedKey<KeyT> decode(InputStream inStream)
+      throws IOException {
+    return ShardedKey.of(keyCoder.decode(inStream), shardNumberCoder.decode(inStream));
+  }
+
+  @Override
+  public void verifyDeterministic() throws NonDeterministicException {
+    keyCoder.verifyDeterministic();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/4c336e84/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index 4143db2..89cadbd 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -18,7 +18,6 @@
 package org.apache.beam.sdk.io;
 
 import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkState;
 
 import com.google.auto.value.AutoValue;
 import com.google.common.collect.ImmutableMap;
@@ -35,6 +34,7 @@ import org.apache.beam.sdk.annotations.Experimental.Kind;
 import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.io.FileBasedSink.DynamicDestinations;
 import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
 import org.apache.beam.sdk.io.Read.Bounded;
 import org.apache.beam.sdk.io.fs.ResourceId;
@@ -43,6 +43,7 @@ import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.SerializableFunctions;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.HasDisplayData;
 import org.apache.beam.sdk.values.PBegin;
@@ -52,18 +53,19 @@ import org.apache.beam.sdk.values.PDone;
 /**
  * {@link PTransform}s for reading and writing Avro files.
  *
- * <p>To read a {@link PCollection} from one or more Avro files, use {@code AvroIO.read()},
- * using {@link AvroIO.Read#from} to specify the filename or filepattern to read from.
- * See {@link FileSystems} for information on supported file systems and filepatterns.
+ * <p>To read a {@link PCollection} from one or more Avro files, use {@code AvroIO.read()}, using
+ * {@link AvroIO.Read#from} to specify the filename or filepattern to read from. See {@link
+ * FileSystems} for information on supported file systems and filepatterns.
  *
- * <p>To read specific records, such as Avro-generated classes, use {@link #read(Class)}.
- * To read {@link GenericRecord GenericRecords}, use {@link #readGenericRecords(Schema)} which takes
- * a {@link Schema} object, or {@link #readGenericRecords(String)} which takes an Avro schema in a
+ * <p>To read specific records, such as Avro-generated classes, use {@link #read(Class)}. To read
+ * {@link GenericRecord GenericRecords}, use {@link #readGenericRecords(Schema)} which takes a
+ * {@link Schema} object, or {@link #readGenericRecords(String)} which takes an Avro schema in a
  * JSON-encoded string form. An exception will be thrown if a record doesn't match the specified
  * schema.
  *
  * <p>For example:
- * <pre> {@code
+ *
+ * <pre>{@code
  * Pipeline p = ...;
  *
  * // A simple Read of a local file (only runs locally):
@@ -75,34 +77,33 @@ import org.apache.beam.sdk.values.PDone;
  * PCollection<GenericRecord> records =
  *     p.apply(AvroIO.readGenericRecords(schema)
  *                .from("gs://my_bucket/path/to/records-*.avro"));
- * } </pre>
+ * }</pre>
  *
  * <p>To write a {@link PCollection} to one or more Avro files, use {@link AvroIO.Write}, using
- * {@code AvroIO.write().to(String)} to specify the output filename prefix. The default
- * {@link DefaultFilenamePolicy} will use this prefix, in conjunction with a
- * {@link ShardNameTemplate} (set via {@link Write#withShardNameTemplate(String)}) and optional
- * filename suffix (set via {@link Write#withSuffix(String)}, to generate output filenames in a
- * sharded way. You can override this default write filename policy using
- * {@link Write#withFilenamePolicy(FileBasedSink.FilenamePolicy)} to specify a custom file naming
- * policy.
+ * {@code AvroIO.write().to(String)} to specify the output filename prefix. The default {@link
+ * DefaultFilenamePolicy} will use this prefix, in conjunction with a {@link ShardNameTemplate} (set
+ * via {@link Write#withShardNameTemplate(String)}) and optional filename suffix (set via {@link
+ * Write#withSuffix(String)}, to generate output filenames in a sharded way. You can override this
+ * default write filename policy using {@link Write#to(FileBasedSink.FilenamePolicy)} to specify a
+ * custom file naming policy.
  *
  * <p>By default, all input is put into the global window before writing. If per-window writes are
- * desired - for example, when using a streaming runner -
- * {@link AvroIO.Write#withWindowedWrites()} will cause windowing and triggering to be
- * preserved. When producing windowed writes, the number of output shards must be set explicitly
- * using {@link AvroIO.Write#withNumShards(int)}; some runners may set this for you to a
- * runner-chosen value, so you may need not set it yourself. A
- * {@link FileBasedSink.FilenamePolicy} must be set, and unique windows and triggers must produce
- * unique filenames.
+ * desired - for example, when using a streaming runner - {@link AvroIO.Write#withWindowedWrites()}
+ * will cause windowing and triggering to be preserved. When producing windowed writes with a
+ * streaming runner that supports triggers, the number of output shards must be set explicitly using
+ * {@link AvroIO.Write#withNumShards(int)}; some runners may set this for you to a runner-chosen
+ * value, so you may need not set it yourself. A {@link FileBasedSink.FilenamePolicy} must be set,
+ * and unique windows and triggers must produce unique filenames.
  *
- * <p>To write specific records, such as Avro-generated classes, use {@link #write(Class)}.
- * To write {@link GenericRecord GenericRecords}, use either {@link #writeGenericRecords(Schema)}
- * which takes a {@link Schema} object, or {@link #writeGenericRecords(String)} which takes a schema
- * in a JSON-encoded string form. An exception will be thrown if a record doesn't match the
- * specified schema.
+ * <p>To write specific records, such as Avro-generated classes, use {@link #write(Class)}. To write
+ * {@link GenericRecord GenericRecords}, use either {@link #writeGenericRecords(Schema)} which takes
+ * a {@link Schema} object, or {@link #writeGenericRecords(String)} which takes a schema in a
+ * JSON-encoded string form. An exception will be thrown if a record doesn't match the specified
+ * schema.
  *
  * <p>For example:
- * <pre> {@code
+ *
+ * <pre>{@code
  * // A simple Write to a local file (only runs locally):
  * PCollection<AvroAutoGenClass> records = ...;
  * records.apply(AvroIO.write(AvroAutoGenClass.class).to("/path/to/file.avro"));
@@ -113,11 +114,11 @@ import org.apache.beam.sdk.values.PDone;
  * records.apply("WriteToAvro", AvroIO.writeGenericRecords(schema)
  *     .to("gs://my_bucket/path/to/numbers")
  *     .withSuffix(".avro"));
- * } </pre>
+ * }</pre>
  *
- * <p>By default, {@link AvroIO.Write} produces output files that are compressed using the
- * {@link org.apache.avro.file.Codec CodecFactory.deflateCodec(6)}. This default can
- * be changed or overridden using {@link AvroIO.Write#withCodec}.
+ * <p>By default, {@link AvroIO.Write} produces output files that are compressed using the {@link
+ * org.apache.avro.file.Codec CodecFactory.deflateCodec(6)}. This default can be changed or
+ * overridden using {@link AvroIO.Write#withCodec}.
  */
 public class AvroIO {
   /**
@@ -258,11 +259,16 @@ public class AvroIO {
     @Nullable abstract ValueProvider<ResourceId> getFilenamePrefix();
     @Nullable abstract String getShardTemplate();
     @Nullable abstract String getFilenameSuffix();
+
+    @Nullable
+    abstract ValueProvider<ResourceId> getTempDirectory();
+
     abstract int getNumShards();
     @Nullable abstract Class<T> getRecordClass();
     @Nullable abstract Schema getSchema();
     abstract boolean getWindowedWrites();
     @Nullable abstract FilenamePolicy getFilenamePolicy();
+
     /**
      * The codec used to encode the blocks in the Avro file. String value drawn from those in
      * https://avro.apache.org/docs/1.7.7/api/java/org/apache/avro/file/CodecFactory.html
@@ -277,6 +283,9 @@ public class AvroIO {
     abstract static class Builder<T> {
       abstract Builder<T> setFilenamePrefix(ValueProvider<ResourceId> filenamePrefix);
       abstract Builder<T> setFilenameSuffix(String filenameSuffix);
+
+      abstract Builder<T> setTempDirectory(ValueProvider<ResourceId> tempDirectory);
+
       abstract Builder<T> setNumShards(int numShards);
       abstract Builder<T> setShardTemplate(String shardTemplate);
       abstract Builder<T> setRecordClass(Class<T> recordClass);
@@ -296,9 +305,9 @@ public class AvroIO {
      * <p>The name of the output files will be determined by the {@link FilenamePolicy} used.
      *
      * <p>By default, a {@link DefaultFilenamePolicy} will build output filenames using the
-     * specified prefix, a shard name template (see {@link #withShardNameTemplate(String)}, and
-     * a common suffix (if supplied using {@link #withSuffix(String)}). This default can be
-     * overridden using {@link #withFilenamePolicy(FilenamePolicy)}.
+     * specified prefix, a shard name template (see {@link #withShardNameTemplate(String)}, and a
+     * common suffix (if supplied using {@link #withSuffix(String)}). This default can be overridden
+     * using {@link #to(FilenamePolicy)}.
      */
     public Write<T> to(String outputPrefix) {
       return to(FileBasedSink.convertToFileResourceIfPossible(outputPrefix));
@@ -306,14 +315,21 @@ public class AvroIO {
 
     /**
      * Writes to file(s) with the given output prefix. See {@link FileSystems} for information on
-     * supported file systems.
-     *
-     * <p>The name of the output files will be determined by the {@link FilenamePolicy} used.
+     * supported file systems. This prefix is used by the {@link DefaultFilenamePolicy} to generate
+     * filenames.
      *
      * <p>By default, a {@link DefaultFilenamePolicy} will build output filenames using the
-     * specified prefix, a shard name template (see {@link #withShardNameTemplate(String)}, and
-     * a common suffix (if supplied using {@link #withSuffix(String)}). This default can be
-     * overridden using {@link #withFilenamePolicy(FilenamePolicy)}.
+     * specified prefix, a shard name template (see {@link #withShardNameTemplate(String)}, and a
+     * common suffix (if supplied using {@link #withSuffix(String)}). This default can be overridden
+     * using {@link #to(FilenamePolicy)}.
+     *
+     * <p>This default policy can be overridden using {@link #to(FilenamePolicy)}, in which case
+     * {@link #withShardNameTemplate(String)} and {@link #withSuffix(String)} should not be set.
+     * Custom filename policies do not automatically see this prefix - you should explicitly pass
+     * the prefix into your {@link FilenamePolicy} object if you need this.
+     *
+     * <p>If {@link #withTempDirectory} has not been called, this filename prefix will be used to
+     * infer a directory for temporary files.
      */
     @Experimental(Kind.FILESYSTEM)
     public Write<T> to(ResourceId outputPrefix) {
@@ -342,15 +358,22 @@ public class AvroIO {
     }
 
     /**
-     * Configures the {@link FileBasedSink.FilenamePolicy} that will be used to name written files.
+     * Writes to files named according to the given {@link FileBasedSink.FilenamePolicy}. A
+     * directory for temporary files must be specified using {@link #withTempDirectory}.
      */
-    public Write<T> withFilenamePolicy(FilenamePolicy filenamePolicy) {
+    public Write<T> to(FilenamePolicy filenamePolicy) {
       return toBuilder().setFilenamePolicy(filenamePolicy).build();
     }
 
+    /** Set the base directory used to generate temporary files. */
+    @Experimental(Kind.FILESYSTEM)
+    public Write<T> withTempDirectory(ValueProvider<ResourceId> tempDirectory) {
+      return toBuilder().setTempDirectory(tempDirectory).build();
+    }
+
     /**
      * Uses the given {@link ShardNameTemplate} for naming output files. This option may only be
-     * used when {@link #withFilenamePolicy(FilenamePolicy)} has not been configured.
+     * used when using one of the default filename-prefix to() overrides.
      *
      * <p>See {@link DefaultFilenamePolicy} for how the prefix, shard name template, and suffix are
      * used.
@@ -360,8 +383,8 @@ public class AvroIO {
     }
 
     /**
-     * Configures the filename suffix for written files. This option may only be used when
-     * {@link #withFilenamePolicy(FilenamePolicy)} has not been configured.
+     * Configures the filename suffix for written files. This option may only be used when using one
+     * of the default filename-prefix to() overrides.
      *
      * <p>See {@link DefaultFilenamePolicy} for how the prefix, shard name template, and suffix are
      * used.
@@ -402,9 +425,8 @@ public class AvroIO {
     /**
      * Preserves windowing of input elements and writes them to files based on the element's window.
      *
-     * <p>Requires use of {@link #withFilenamePolicy(FileBasedSink.FilenamePolicy)}. Filenames will
-     * be generated using {@link FilenamePolicy#windowedFilename}. See also
-     * {@link WriteFiles#withWindowedWrites()}.
+     * <p>If using {@link #to(FileBasedSink.FilenamePolicy)}. Filenames will be generated using
+     * {@link FilenamePolicy#windowedFilename}. See also {@link WriteFiles#withWindowedWrites()}.
      */
     public Write<T> withWindowedWrites() {
       return toBuilder().setWindowedWrites(true).build();
@@ -435,32 +457,46 @@ public class AvroIO {
       return toBuilder().setMetadata(ImmutableMap.copyOf(metadata)).build();
     }
 
-    @Override
-    public PDone expand(PCollection<T> input) {
-      checkState(getFilenamePrefix() != null,
-          "Need to set the filename prefix of an AvroIO.Write transform.");
-      checkState(
-          (getFilenamePolicy() == null)
-              || (getShardTemplate() == null && getFilenameSuffix() == null),
-          "Cannot set a filename policy and also a filename template or suffix.");
-      checkState(getSchema() != null,
-          "Need to set the schema of an AvroIO.Write transform.");
-      checkState(!getWindowedWrites() || (getFilenamePolicy() != null),
-          "When using windowed writes, a filename policy must be set via withFilenamePolicy().");
-
+    DynamicDestinations<T, Void> resolveDynamicDestinations() {
       FilenamePolicy usedFilenamePolicy = getFilenamePolicy();
       if (usedFilenamePolicy == null) {
-        usedFilenamePolicy = DefaultFilenamePolicy.constructUsingStandardParameters(
-            getFilenamePrefix(), getShardTemplate(), getFilenameSuffix(), getWindowedWrites());
+        usedFilenamePolicy =
+            DefaultFilenamePolicy.fromStandardParameters(
+                getFilenamePrefix(), getShardTemplate(), getFilenameSuffix(), getWindowedWrites());
+      }
+      return DynamicFileDestinations.constant(usedFilenamePolicy);
+    }
+
+    @Override
+    public PDone expand(PCollection<T> input) {
+      checkArgument(
+          getFilenamePrefix() != null || getTempDirectory() != null,
+          "Need to set either the filename prefix or the tempDirectory of a AvroIO.Write "
+              + "transform.");
+      if (getFilenamePolicy() != null) {
+        checkArgument(
+            getShardTemplate() == null && getFilenameSuffix() == null,
+            "shardTemplate and filenameSuffix should only be used with the default "
+                + "filename policy");
       }
+      return expandTyped(input, resolveDynamicDestinations());
+    }
 
-      WriteFiles<T> write = WriteFiles.to(
-            new AvroSink<>(
-                getFilenamePrefix(),
-                usedFilenamePolicy,
-                AvroCoder.of(getRecordClass(), getSchema()),
-                getCodec(),
-                getMetadata()));
+    public <DestinationT> PDone expandTyped(
+        PCollection<T> input, DynamicDestinations<T, DestinationT> dynamicDestinations) {
+      ValueProvider<ResourceId> tempDirectory = getTempDirectory();
+      if (tempDirectory == null) {
+        tempDirectory = getFilenamePrefix();
+      }
+      WriteFiles<T, DestinationT, T> write =
+          WriteFiles.to(
+              new AvroSink<>(
+                  tempDirectory,
+                  dynamicDestinations,
+                  AvroCoder.of(getRecordClass(), getSchema()),
+                  getCodec(),
+                  getMetadata()),
+              SerializableFunctions.<T>identity());
       if (getNumShards() > 0) {
         write = write.withNumShards(getNumShards());
       }
@@ -473,31 +509,25 @@ public class AvroIO {
     @Override
     public void populateDisplayData(DisplayData.Builder builder) {
       super.populateDisplayData(builder);
-      checkState(
-          getFilenamePrefix() != null,
-          "Unable to populate DisplayData for invalid AvroIO.Write (unset output prefix).");
-      String outputPrefixString = null;
-      if (getFilenamePrefix().isAccessible()) {
-        ResourceId dir = getFilenamePrefix().get();
-        outputPrefixString = dir.toString();
-      } else {
-        outputPrefixString = getFilenamePrefix().toString();
+      resolveDynamicDestinations().populateDisplayData(builder);
+
+      String tempDirectory = null;
+      if (getTempDirectory() != null) {
+        tempDirectory =
+            getTempDirectory().isAccessible()
+                ? getTempDirectory().get().toString()
+                : getTempDirectory().toString();
       }
       builder
-          .add(DisplayData.item("schema", getRecordClass())
-            .withLabel("Record Schema"))
-          .addIfNotNull(DisplayData.item("filePrefix", outputPrefixString)
-            .withLabel("Output File Prefix"))
-          .addIfNotNull(DisplayData.item("shardNameTemplate", getShardTemplate())
-              .withLabel("Output Shard Name Template"))
-          .addIfNotNull(DisplayData.item("fileSuffix", getFilenameSuffix())
-              .withLabel("Output File Suffix"))
-          .addIfNotDefault(DisplayData.item("numShards", getNumShards())
-              .withLabel("Maximum Output Shards"),
-              0)
-          .addIfNotDefault(DisplayData.item("codec", getCodec().toString())
-              .withLabel("Avro Compression Codec"),
-              DEFAULT_CODEC.toString());
+          .add(DisplayData.item("schema", getRecordClass()).withLabel("Record Schema"))
+          .addIfNotDefault(
+              DisplayData.item("numShards", getNumShards()).withLabel("Maximum Output Shards"), 0)
+          .addIfNotDefault(
+              DisplayData.item("codec", getCodec().toString()).withLabel("Avro Compression Codec"),
+              DEFAULT_CODEC.toString())
+          .addIfNotNull(
+              DisplayData.item("tempDirectory", tempDirectory)
+                  .withLabel("Directory for temporary files"));
       builder.include("Metadata", new Metadata());
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/4c336e84/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java
index 6c36266..c78870b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java
@@ -32,39 +32,40 @@ import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.util.MimeTypes;
 
 /** A {@link FileBasedSink} for Avro files. */
-class AvroSink<T> extends FileBasedSink<T> {
+class AvroSink<T, DestinationT> extends FileBasedSink<T, DestinationT> {
   private final AvroCoder<T> coder;
   private final SerializableAvroCodecFactory codec;
   private final ImmutableMap<String, Object> metadata;
 
   AvroSink(
       ValueProvider<ResourceId> outputPrefix,
-      FilenamePolicy filenamePolicy,
+      DynamicDestinations<T, DestinationT> dynamicDestinations,
       AvroCoder<T> coder,
       SerializableAvroCodecFactory codec,
       ImmutableMap<String, Object> metadata) {
     // Avro handle compression internally using the codec.
-    super(outputPrefix, filenamePolicy, CompressionType.UNCOMPRESSED);
+    super(outputPrefix, dynamicDestinations, CompressionType.UNCOMPRESSED);
     this.coder = coder;
     this.codec = codec;
     this.metadata = metadata;
   }
 
   @Override
-  public WriteOperation<T> createWriteOperation() {
+  public WriteOperation<T, DestinationT> createWriteOperation() {
     return new AvroWriteOperation<>(this, coder, codec, metadata);
   }
 
   /** A {@link WriteOperation WriteOperation} for Avro files. */
-  private static class AvroWriteOperation<T> extends WriteOperation<T> {
+  private static class AvroWriteOperation<T, DestinationT> extends WriteOperation<T, DestinationT> {
     private final AvroCoder<T> coder;
     private final SerializableAvroCodecFactory codec;
     private final ImmutableMap<String, Object> metadata;
 
-    private AvroWriteOperation(AvroSink<T> sink,
-                               AvroCoder<T> coder,
-                               SerializableAvroCodecFactory codec,
-                               ImmutableMap<String, Object> metadata) {
+    private AvroWriteOperation(
+        AvroSink<T, DestinationT> sink,
+        AvroCoder<T> coder,
+        SerializableAvroCodecFactory codec,
+        ImmutableMap<String, Object> metadata) {
       super(sink);
       this.coder = coder;
       this.codec = codec;
@@ -72,22 +73,23 @@ class AvroSink<T> extends FileBasedSink<T> {
     }
 
     @Override
-    public Writer<T> createWriter() throws Exception {
+    public Writer<T, DestinationT> createWriter() throws Exception {
       return new AvroWriter<>(this, coder, codec, metadata);
     }
   }
 
   /** A {@link Writer Writer} for Avro files. */
-  private static class AvroWriter<T> extends Writer<T> {
+  private static class AvroWriter<T, DestinationT> extends Writer<T, DestinationT> {
     private final AvroCoder<T> coder;
     private DataFileWriter<T> dataFileWriter;
     private SerializableAvroCodecFactory codec;
     private final ImmutableMap<String, Object> metadata;
 
-    public AvroWriter(WriteOperation<T> writeOperation,
-                      AvroCoder<T> coder,
-                      SerializableAvroCodecFactory codec,
-                      ImmutableMap<String, Object> metadata) {
+    public AvroWriter(
+        WriteOperation<T, DestinationT> writeOperation,
+        AvroCoder<T> coder,
+        SerializableAvroCodecFactory codec,
+        ImmutableMap<String, Object> metadata) {
       super(writeOperation, MimeTypes.BINARY);
       this.coder = coder;
       this.codec = codec;

http://git-wip-us.apache.org/repos/asf/beam/blob/4c336e84/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java
index f9e4ac4..7a60e49 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java
@@ -20,25 +20,31 @@ package org.apache.beam.sdk.io;
 import static com.google.common.base.MoreObjects.firstNonNull;
 
 import com.google.common.annotations.VisibleForTesting;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
 import java.text.DecimalFormat;
 import java.util.Arrays;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
+import org.apache.beam.sdk.io.FileBasedSink.OutputFileHints;
 import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
 import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.options.ValueProvider;
-import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
-import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * A default {@link FilenamePolicy} for windowed and unwindowed files. This policy is constructed
@@ -51,10 +57,7 @@ import org.slf4j.LoggerFactory;
  * {@code WriteOneFilePerWindow} example pipeline.
  */
 public final class DefaultFilenamePolicy extends FilenamePolicy {
-
-  private static final Logger LOG = LoggerFactory.getLogger(DefaultFilenamePolicy.class);
-
-  /** The default sharding name template used in {@link #constructUsingStandardParameters}. */
+  /** The default sharding name template. */
   public static final String DEFAULT_UNWINDOWED_SHARD_TEMPLATE = ShardNameTemplate.INDEX_OF_MAX;
 
   /** The default windowed sharding name template used when writing windowed files.
@@ -67,75 +70,184 @@ public final class DefaultFilenamePolicy extends FilenamePolicy {
       "W-P" + DEFAULT_UNWINDOWED_SHARD_TEMPLATE;
 
   /*
-   * pattern for both windowed and non-windowed file names
+   * pattern for both windowed and non-windowed file names.
    */
   private static final Pattern SHARD_FORMAT_RE = Pattern.compile("(S+|N+|W|P)");
 
   /**
+   * Encapsulates constructor parameters to {@link DefaultFilenamePolicy}.
+   *
+   * <p>This is used as the {@code DestinationT} argument to allow {@link DefaultFilenamePolicy}
+   * objects to be dynamically generated.
+   */
+  public static class Params implements Serializable {
+    private final ValueProvider<ResourceId> baseFilename;
+    private final String shardTemplate;
+    private final boolean explicitTemplate;
+    private final String suffix;
+
+    /**
+     * Construct a default Params object. The shard template will be set to the default {@link
+     * #DEFAULT_UNWINDOWED_SHARD_TEMPLATE} value.
+     */
+    public Params() {
+      this.baseFilename = null;
+      this.shardTemplate = DEFAULT_UNWINDOWED_SHARD_TEMPLATE;
+      this.suffix = "";
+      this.explicitTemplate = false;
+    }
+
+    private Params(
+        ValueProvider<ResourceId> baseFilename,
+        String shardTemplate,
+        String suffix,
+        boolean explicitTemplate) {
+      this.baseFilename = baseFilename;
+      this.shardTemplate = shardTemplate;
+      this.suffix = suffix;
+      this.explicitTemplate = explicitTemplate;
+    }
+
+    /**
+     * Specify that writes are windowed. This affects the default shard template, changing it to
+     * {@link #DEFAULT_WINDOWED_SHARD_TEMPLATE}.
+     */
+    public Params withWindowedWrites() {
+      String template = this.shardTemplate;
+      if (!explicitTemplate) {
+        template = DEFAULT_WINDOWED_SHARD_TEMPLATE;
+      }
+      return new Params(baseFilename, template, suffix, explicitTemplate);
+    }
+
+    /** Sets the base filename. */
+    public Params withBaseFilename(ResourceId baseFilename) {
+      return withBaseFilename(StaticValueProvider.of(baseFilename));
+    }
+
+    /** Like {@link #withBaseFilename(ResourceId)}, but takes in a {@link ValueProvider}. */
+    public Params withBaseFilename(ValueProvider<ResourceId> baseFilename) {
+      return new Params(baseFilename, shardTemplate, suffix, explicitTemplate);
+    }
+
+    /** Sets the shard template. */
+    public Params withShardTemplate(String shardTemplate) {
+      return new Params(baseFilename, shardTemplate, suffix, true);
+    }
+
+    /** Sets the suffix. */
+    public Params withSuffix(String suffix) {
+      return new Params(baseFilename, shardTemplate, suffix, explicitTemplate);
+    }
+  }
+
+  /** A Coder for {@link Params}. */
+  public static class ParamsCoder extends AtomicCoder<Params> {
+    private static final ParamsCoder INSTANCE = new ParamsCoder();
+    private Coder<String> stringCoder = StringUtf8Coder.of();
+
+    public static ParamsCoder of() {
+      return INSTANCE;
+    }
+
+    @Override
+    public void encode(Params value, OutputStream outStream) throws IOException {
+      if (value == null) {
+        throw new CoderException("cannot encode a null value");
+      }
+      stringCoder.encode(value.baseFilename.get().toString(), outStream);
+      stringCoder.encode(value.shardTemplate, outStream);
+      stringCoder.encode(value.suffix, outStream);
+    }
+
+    @Override
+    public Params decode(InputStream inStream) throws IOException {
+      ResourceId prefix =
+          FileBasedSink.convertToFileResourceIfPossible(stringCoder.decode(inStream));
+      String shardTemplate = stringCoder.decode(inStream);
+      String suffix = stringCoder.decode(inStream);
+      return new Params()
+          .withBaseFilename(prefix)
+          .withShardTemplate(shardTemplate)
+          .withSuffix(suffix);
+    }
+  }
+
+  private final Params params;
+  /**
    * Constructs a new {@link DefaultFilenamePolicy}.
    *
    * @see DefaultFilenamePolicy for more information on the arguments to this function.
    */
   @VisibleForTesting
-  DefaultFilenamePolicy(ValueProvider<String> prefix, String shardTemplate, String suffix) {
-    this.prefix = prefix;
-    this.shardTemplate = shardTemplate;
-    this.suffix = suffix;
+  DefaultFilenamePolicy(Params params) {
+    this.params = params;
   }
 
   /**
-   * A helper function to construct a {@link DefaultFilenamePolicy} using the standard filename
-   * parameters, namely a provided {@link ResourceId} for the output prefix, and possibly-null
-   * shard name template and suffix.
+   * Construct a {@link DefaultFilenamePolicy}.
    *
-   * <p>Any filename component of the provided resource will be used as the filename prefix.
+   * <p>This is a shortcut for:
    *
-   * <p>If provided, the shard name template will be used; otherwise
-   * {@link #DEFAULT_UNWINDOWED_SHARD_TEMPLATE} will be used for non-windowed file names and
-   * {@link #DEFAULT_WINDOWED_SHARD_TEMPLATE} will be used for windowed file names.
+   * <pre>{@code
+   *   DefaultFilenamePolicy.fromParams(new Params()
+   *     .withBaseFilename(baseFilename)
+   *     .withShardTemplate(shardTemplate)
+   *     .withSuffix(filenameSuffix)
+   *     .withWindowedWrites())
+   * }</pre>
    *
-   * <p>If provided, the suffix will be used; otherwise the files will have an empty suffix.
+   * <p>Where the respective {@code with} methods are invoked only if the value is non-null or true.
    */
-  public static DefaultFilenamePolicy constructUsingStandardParameters(
-      ValueProvider<ResourceId> outputPrefix,
+  public static DefaultFilenamePolicy fromStandardParameters(
+      ValueProvider<ResourceId> baseFilename,
       @Nullable String shardTemplate,
       @Nullable String filenameSuffix,
       boolean windowedWrites) {
-    // Pick the appropriate default policy based on whether windowed writes are being performed.
-    String defaultTemplate =
-        windowedWrites ? DEFAULT_WINDOWED_SHARD_TEMPLATE : DEFAULT_UNWINDOWED_SHARD_TEMPLATE;
-    return new DefaultFilenamePolicy(
-        NestedValueProvider.of(outputPrefix, new ExtractFilename()),
-        firstNonNull(shardTemplate, defaultTemplate),
-        firstNonNull(filenameSuffix, ""));
+    Params params = new Params().withBaseFilename(baseFilename);
+    if (shardTemplate != null) {
+      params = params.withShardTemplate(shardTemplate);
+    }
+    if (filenameSuffix != null) {
+      params = params.withSuffix(filenameSuffix);
+    }
+    if (windowedWrites) {
+      params = params.withWindowedWrites();
+    }
+    return fromParams(params);
   }
 
-  private final ValueProvider<String> prefix;
-  private final String shardTemplate;
-  private final String suffix;
+  /** Construct a {@link DefaultFilenamePolicy} from a {@link Params} object. */
+  public static DefaultFilenamePolicy fromParams(Params params) {
+    return new DefaultFilenamePolicy(params);
+  }
 
   /**
    * Constructs a fully qualified name from components.
    *
-   * <p>The name is built from a prefix, shard template (with shard numbers
-   * applied), and a suffix.  All components are required, but may be empty
-   * strings.
+   * <p>The name is built from a base filename, shard template (with shard numbers applied), and a
+   * suffix. All components are required, but may be empty strings.
    *
-   * <p>Within a shard template, repeating sequences of the letters "S" or "N"
-   * are replaced with the shard number, or number of shards respectively.
-   * "P" is replaced with by stringification of current pane.
-   * "W" is replaced by stringification of current window.
+   * <p>Within a shard template, repeating sequences of the letters "S" or "N" are replaced with the
+   * shard number, or number of shards respectively. "P" is replaced with by stringification of
+   * current pane. "W" is replaced by stringification of current window.
    *
-   * <p>The numbers are formatted with leading zeros to match the length of the
-   * repeated sequence of letters.
+   * <p>The numbers are formatted with leading zeros to match the length of the repeated sequence of
+   * letters.
    *
-   * <p>For example, if prefix = "output", shardTemplate = "-SSS-of-NNN", and
-   * suffix = ".txt", with shardNum = 1 and numShards = 100, the following is
-   * produced:  "output-001-of-100.txt".
+   * <p>For example, if baseFilename = "path/to/output", shardTemplate = "-SSS-of-NNN", and suffix =
+   * ".txt", with shardNum = 1 and numShards = 100, the following is produced:
+   * "path/to/output-001-of-100.txt".
    */
-  static String constructName(
-      String prefix, String shardTemplate, String suffix, int shardNum, int numShards,
-      String paneStr, String windowStr) {
+  static ResourceId constructName(
+      ResourceId baseFilename,
+      String shardTemplate,
+      String suffix,
+      int shardNum,
+      int numShards,
+      String paneStr,
+      String windowStr) {
+    String prefix = extractFilename(baseFilename);
     // Matcher API works with StringBuffer, rather than StringBuilder.
     StringBuffer sb = new StringBuffer();
     sb.append(prefix);
@@ -165,27 +277,37 @@ public final class DefaultFilenamePolicy extends FilenamePolicy {
     m.appendTail(sb);
 
     sb.append(suffix);
-    return sb.toString();
+    return baseFilename
+        .getCurrentDirectory()
+        .resolve(sb.toString(), StandardResolveOptions.RESOLVE_FILE);
   }
 
   @Override
   @Nullable
-  public ResourceId unwindowedFilename(ResourceId outputDirectory, Context context,
-      String extension) {
-    String filename = constructName(prefix.get(), shardTemplate, suffix, context.getShardNumber(),
-        context.getNumShards(), null, null) + extension;
-    return outputDirectory.resolve(filename, StandardResolveOptions.RESOLVE_FILE);
+  public ResourceId unwindowedFilename(Context context, OutputFileHints outputFileHints) {
+    return constructName(
+        params.baseFilename.get(),
+        params.shardTemplate,
+        params.suffix + outputFileHints.getSuggestedFilenameSuffix(),
+        context.getShardNumber(),
+        context.getNumShards(),
+        null,
+        null);
   }
 
   @Override
-  public ResourceId windowedFilename(ResourceId outputDirectory,
-      WindowedContext context, String extension) {
+  public ResourceId windowedFilename(WindowedContext context, OutputFileHints outputFileHints) {
     final PaneInfo paneInfo = context.getPaneInfo();
     String paneStr = paneInfoToString(paneInfo);
     String windowStr = windowToString(context.getWindow());
-    String filename = constructName(prefix.get(), shardTemplate, suffix, context.getShardNumber(),
-        context.getNumShards(), paneStr, windowStr) + extension;
-    return outputDirectory.resolve(filename, StandardResolveOptions.RESOLVE_FILE);
+    return constructName(
+        params.baseFilename.get(),
+        params.shardTemplate,
+        params.suffix + outputFileHints.getSuggestedFilenameSuffix(),
+        context.getShardNumber(),
+        context.getNumShards(),
+        paneStr,
+        windowStr);
   }
 
   /*
@@ -216,24 +338,32 @@ public final class DefaultFilenamePolicy extends FilenamePolicy {
   @Override
   public void populateDisplayData(DisplayData.Builder builder) {
     String filenamePattern;
-    if (prefix.isAccessible()) {
-      filenamePattern = String.format("%s%s%s", prefix.get(), shardTemplate, suffix);
+    if (params.baseFilename.isAccessible()) {
+      filenamePattern =
+          String.format("%s%s%s", params.baseFilename.get(), params.shardTemplate, params.suffix);
     } else {
-      filenamePattern = String.format("%s%s%s", prefix, shardTemplate, suffix);
+      filenamePattern =
+          String.format("%s%s%s", params.baseFilename, params.shardTemplate, params.suffix);
     }
+
+    String outputPrefixString = null;
+    outputPrefixString =
+        params.baseFilename.isAccessible()
+            ? params.baseFilename.get().toString()
+            : params.baseFilename.toString();
+    builder.add(DisplayData.item("filenamePattern", filenamePattern).withLabel("Filename Pattern"));
+    builder.add(DisplayData.item("filePrefix", outputPrefixString).withLabel("Output File Prefix"));
+    builder.add(DisplayData.item("fileSuffix", params.suffix).withLabel("Output file Suffix"));
     builder.add(
-        DisplayData.item("filenamePattern", filenamePattern)
-            .withLabel("Filename Pattern"));
+        DisplayData.item("shardNameTemplate", params.shardTemplate)
+            .withLabel("Output Shard Name Template"));
   }
 
-  private static class ExtractFilename implements SerializableFunction<ResourceId, String> {
-    @Override
-    public String apply(ResourceId input) {
-      if (input.isDirectory()) {
-        return "";
-      } else {
-        return firstNonNull(input.getFilename(), "");
-      }
+  private static String extractFilename(ResourceId input) {
+    if (input.isDirectory()) {
+      return "";
+    } else {
+      return firstNonNull(input.getFilename(), "");
     }
   }
 }