You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/07/29 00:49:40 UTC

[1/4] beam git commit: [BEAM-92] Supports DynamicDestinations in AvroIO.

Repository: beam
Updated Branches:
  refs/heads/master 1f2634d23 -> 540fa9b42


http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java
index 60088de..1d4ce08 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java
@@ -68,8 +68,6 @@ import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.transforms.SerializableFunctions;
 import org.apache.beam.sdk.transforms.SimpleFunction;
 import org.apache.beam.sdk.transforms.Top;
 import org.apache.beam.sdk.transforms.View;
@@ -178,11 +176,7 @@ public class WriteFilesTest {
             "Intimidating pigeon",
             "Pedantic gull",
             "Frisky finch");
-    runWrite(
-        inputs,
-        IDENTITY_MAP,
-        getBaseOutputFilename(),
-        WriteFiles.to(makeSimpleSink(), SerializableFunctions.<String>identity()));
+    runWrite(inputs, IDENTITY_MAP, getBaseOutputFilename(), WriteFiles.to(makeSimpleSink()));
   }
 
   /** Test that WriteFiles with an empty input still produces one shard. */
@@ -193,7 +187,7 @@ public class WriteFilesTest {
         Collections.<String>emptyList(),
         IDENTITY_MAP,
         getBaseOutputFilename(),
-        WriteFiles.to(makeSimpleSink(), SerializableFunctions.<String>identity()));
+        WriteFiles.to(makeSimpleSink()));
     checkFileContents(getBaseOutputFilename(), Collections.<String>emptyList(), Optional.of(1));
   }
 
@@ -208,7 +202,7 @@ public class WriteFilesTest {
         Arrays.asList("one", "two", "three", "four", "five", "six"),
         IDENTITY_MAP,
         getBaseOutputFilename(),
-        WriteFiles.to(makeSimpleSink(), SerializableFunctions.<String>identity()).withNumShards(1));
+        WriteFiles.to(makeSimpleSink()));
   }
 
   private ResourceId getBaseOutputDirectory() {
@@ -241,9 +235,7 @@ public class WriteFilesTest {
     }
 
     SimpleSink<Void> sink = makeSimpleSink();
-    WriteFiles<String, ?, String> write =
-        WriteFiles.to(sink, SerializableFunctions.<String>identity())
-            .withSharding(new LargestInt());
+    WriteFiles<String, ?, String> write = WriteFiles.to(sink).withSharding(new LargestInt());
     p.apply(Create.timestamped(inputs, timestamps).withCoder(StringUtf8Coder.of()))
         .apply(IDENTITY_MAP)
         .apply(write);
@@ -264,8 +256,7 @@ public class WriteFilesTest {
         Arrays.asList("one", "two", "three", "four", "five", "six"),
         IDENTITY_MAP,
         getBaseOutputFilename(),
-        WriteFiles.to(makeSimpleSink(), SerializableFunctions.<String>identity())
-            .withNumShards(20));
+        WriteFiles.to(makeSimpleSink()).withNumShards(20));
   }
 
   /** Test a WriteFiles transform with an empty PCollection. */
@@ -273,11 +264,7 @@ public class WriteFilesTest {
   @Category(NeedsRunner.class)
   public void testWriteWithEmptyPCollection() throws IOException {
     List<String> inputs = new ArrayList<>();
-    runWrite(
-        inputs,
-        IDENTITY_MAP,
-        getBaseOutputFilename(),
-        WriteFiles.to(makeSimpleSink(), SerializableFunctions.<String>identity()));
+    runWrite(inputs, IDENTITY_MAP, getBaseOutputFilename(), WriteFiles.to(makeSimpleSink()));
   }
 
   /** Test a WriteFiles with a windowed PCollection. */
@@ -295,7 +282,7 @@ public class WriteFilesTest {
         inputs,
         new WindowAndReshuffle<>(Window.<String>into(FixedWindows.of(Duration.millis(2)))),
         getBaseOutputFilename(),
-        WriteFiles.to(makeSimpleSink(), SerializableFunctions.<String>identity()));
+        WriteFiles.to(makeSimpleSink()));
   }
 
   /** Test a WriteFiles with sessions. */
@@ -314,7 +301,7 @@ public class WriteFilesTest {
         inputs,
         new WindowAndReshuffle<>(Window.<String>into(Sessions.withGapDuration(Duration.millis(1)))),
         getBaseOutputFilename(),
-        WriteFiles.to(makeSimpleSink(), SerializableFunctions.<String>identity()));
+        WriteFiles.to(makeSimpleSink()));
   }
 
   @Test
@@ -328,15 +315,12 @@ public class WriteFilesTest {
         inputs,
         Window.<String>into(FixedWindows.of(Duration.millis(2))),
         getBaseOutputFilename(),
-        WriteFiles.to(makeSimpleSink(), SerializableFunctions.<String>identity())
-            .withMaxNumWritersPerBundle(2)
-            .withWindowedWrites());
+        WriteFiles.to(makeSimpleSink()).withMaxNumWritersPerBundle(2).withWindowedWrites());
   }
 
   public void testBuildWrite() {
     SimpleSink<Void> sink = makeSimpleSink();
-    WriteFiles<String, ?, String> write =
-        WriteFiles.to(sink, SerializableFunctions.<String>identity()).withNumShards(3);
+    WriteFiles<String, ?, String> write = WriteFiles.to(sink).withNumShards(3);
     assertThat((SimpleSink<Void>) write.getSink(), is(sink));
     PTransform<PCollection<String>, PCollectionView<Integer>> originalSharding =
         write.getSharding();
@@ -358,7 +342,7 @@ public class WriteFilesTest {
 
   @Test
   public void testDisplayData() {
-    DynamicDestinations<String, Void> dynamicDestinations =
+    DynamicDestinations<String, Void, String> dynamicDestinations =
         DynamicFileDestinations.constant(
             DefaultFilenamePolicy.fromParams(
                 new Params()
@@ -374,8 +358,7 @@ public class WriteFilesTest {
             builder.add(DisplayData.item("foo", "bar"));
           }
         };
-    WriteFiles<String, ?, String> write =
-        WriteFiles.to(sink, SerializableFunctions.<String>identity());
+    WriteFiles<String, ?, String> write = WriteFiles.to(sink);
 
     DisplayData displayData = DisplayData.from(write);
 
@@ -391,9 +374,7 @@ public class WriteFilesTest {
         "Must use windowed writes when applying WriteFiles to an unbounded PCollection");
 
     SimpleSink<Void> sink = makeSimpleSink();
-    p.apply(Create.of("foo"))
-        .setIsBoundedInternal(IsBounded.UNBOUNDED)
-        .apply(WriteFiles.to(sink, SerializableFunctions.<String>identity()));
+    p.apply(Create.of("foo")).setIsBoundedInternal(IsBounded.UNBOUNDED).apply(WriteFiles.to(sink));
     p.run();
   }
 
@@ -408,13 +389,13 @@ public class WriteFilesTest {
     SimpleSink<Void> sink = makeSimpleSink();
     p.apply(Create.of("foo"))
         .setIsBoundedInternal(IsBounded.UNBOUNDED)
-        .apply(WriteFiles.to(sink, SerializableFunctions.<String>identity()).withWindowedWrites());
+        .apply(WriteFiles.to(sink).withWindowedWrites());
     p.run();
   }
 
   // Test DynamicDestinations class. Expects user values to be string-encoded integers.
   // Stores the integer mod 5 as the destination, and uses that in the file prefix.
-  static class TestDestinations extends DynamicDestinations<String, Integer> {
+  static class TestDestinations extends DynamicDestinations<String, Integer, String> {
     private ResourceId baseOutputDirectory;
 
     TestDestinations(ResourceId baseOutputDirectory) {
@@ -422,6 +403,11 @@ public class WriteFilesTest {
     }
 
     @Override
+    public String formatRecord(String record) {
+      return "record_" + record;
+    }
+
+    @Override
     public Integer getDestination(String element) {
       return Integer.valueOf(element) % 5;
     }
@@ -444,14 +430,6 @@ public class WriteFilesTest {
     }
   }
 
-  // Test format function. Prepend a string to each record before writing.
-  static class TestDynamicFormatFunction implements SerializableFunction<String, String> {
-    @Override
-    public String apply(String input) {
-      return "record_" + input;
-    }
-  }
-
   @Test
   @Category(NeedsRunner.class)
   public void testDynamicDestinationsBounded() throws Exception {
@@ -495,8 +473,7 @@ public class WriteFilesTest {
     // If emptyShards==true make numShards larger than the number of elements per destination.
     // This will force every destination to generate some empty shards.
     int numShards = emptyShards ? 2 * numInputs / 5 : 2;
-    WriteFiles<String, Integer, String> writeFiles =
-        WriteFiles.to(sink, new TestDynamicFormatFunction()).withNumShards(numShards);
+    WriteFiles<String, Integer, String> writeFiles = WriteFiles.to(sink).withNumShards(numShards);
 
     PCollection<String> input = p.apply(Create.timestamped(inputs, timestamps));
     if (!bounded) {
@@ -521,7 +498,7 @@ public class WriteFilesTest {
 
   @Test
   public void testShardedDisplayData() {
-    DynamicDestinations<String, Void> dynamicDestinations =
+    DynamicDestinations<String, Void, String> dynamicDestinations =
         DynamicFileDestinations.constant(
             DefaultFilenamePolicy.fromParams(
                 new Params()
@@ -537,8 +514,7 @@ public class WriteFilesTest {
             builder.add(DisplayData.item("foo", "bar"));
           }
         };
-    WriteFiles<String, ?, String> write =
-        WriteFiles.to(sink, SerializableFunctions.<String>identity()).withNumShards(1);
+    WriteFiles<String, ?, String> write = WriteFiles.to(sink).withNumShards(1);
     DisplayData displayData = DisplayData.from(write);
     assertThat(displayData, hasDisplayItem("sink", sink.getClass()));
     assertThat(displayData, includesDisplayDataFor("sink", sink));
@@ -547,7 +523,7 @@ public class WriteFilesTest {
 
   @Test
   public void testCustomShardStrategyDisplayData() {
-    DynamicDestinations<String, Void> dynamicDestinations =
+    DynamicDestinations<String, Void, String> dynamicDestinations =
         DynamicFileDestinations.constant(
             DefaultFilenamePolicy.fromParams(
                 new Params()
@@ -564,7 +540,7 @@ public class WriteFilesTest {
           }
         };
     WriteFiles<String, ?, String> write =
-        WriteFiles.to(sink, SerializableFunctions.<String>identity())
+        WriteFiles.to(sink)
             .withSharding(
                 new PTransform<PCollection<String>, PCollectionView<Integer>>() {
                   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java
index 442fba5..7255a94 100644
--- a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java
+++ b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java
@@ -36,7 +36,6 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
 import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.SerializableFunctions;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
@@ -522,8 +521,7 @@ public class XmlIO {
 
     @Override
     public PDone expand(PCollection<T> input) {
-      return input.apply(
-          org.apache.beam.sdk.io.WriteFiles.to(createSink(), SerializableFunctions.<T>identity()));
+      return input.apply(org.apache.beam.sdk.io.WriteFiles.to(createSink()));
     }
 
     @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java
index 74e0bda..b663544 100644
--- a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java
+++ b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java
@@ -35,7 +35,7 @@ import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.util.MimeTypes;
 
 /** Implementation of {@link XmlIO#write}. */
-class XmlSink<T> extends FileBasedSink<T, Void> {
+class XmlSink<T> extends FileBasedSink<T, Void, T> {
   private static final String XML_EXTENSION = ".xml";
 
   private final XmlIO.Write<T> spec;
@@ -46,7 +46,7 @@ class XmlSink<T> extends FileBasedSink<T, Void> {
   }
 
   XmlSink(XmlIO.Write<T> spec) {
-    super(spec.getFilenamePrefix(), DynamicFileDestinations.constant(makeFilenamePolicy(spec)));
+    super(spec.getFilenamePrefix(), DynamicFileDestinations.<T>constant(makeFilenamePolicy(spec)));
     this.spec = spec;
   }
 
@@ -77,7 +77,7 @@ class XmlSink<T> extends FileBasedSink<T, Void> {
   }
 
   /** {@link WriteOperation} for XML {@link FileBasedSink}s. */
-  protected static final class XmlWriteOperation<T> extends WriteOperation<T, Void> {
+  protected static final class XmlWriteOperation<T> extends WriteOperation<Void, T> {
     public XmlWriteOperation(XmlSink<T> sink) {
       super(sink);
     }
@@ -112,7 +112,7 @@ class XmlSink<T> extends FileBasedSink<T, Void> {
   }
 
   /** A {@link Writer} that can write objects as XML elements. */
-  protected static final class XmlWriter<T> extends Writer<T, Void> {
+  protected static final class XmlWriter<T> extends Writer<Void, T> {
     final Marshaller marshaller;
     private OutputStream os = null;
 


[4/4] beam git commit: This closes #3541: [BEAM-92] Supports DynamicDestinations in AvroIO.

Posted by jk...@apache.org.
This closes #3541: [BEAM-92] Supports DynamicDestinations in AvroIO.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/540fa9b4
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/540fa9b4
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/540fa9b4

Branch: refs/heads/master
Commit: 540fa9b4246f2e680b6550537c4dda575e5cf71f
Parents: 1f2634d 9f2622f
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri Jul 28 17:28:29 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Fri Jul 28 17:28:29 2017 -0700

----------------------------------------------------------------------
 .../core/construction/ParDoTranslation.java     |   2 +-
 .../construction/WriteFilesTranslation.java     |  81 ++--
 .../construction/PTransformMatchersTest.java    |  10 +-
 .../construction/WriteFilesTranslationTest.java |  26 +-
 .../direct/WriteWithShardingFactory.java        |  10 +-
 .../direct/WriteWithShardingFactoryTest.java    |   8 +-
 .../beam/runners/dataflow/DataflowRunner.java   |   8 +-
 .../runners/dataflow/DataflowRunnerTest.java    |  10 +-
 .../src/main/proto/beam_runner_api.proto        |   2 +
 .../java/org/apache/beam/sdk/io/AvroIO.java     | 436 +++++++++++++++----
 .../java/org/apache/beam/sdk/io/AvroSink.java   |  93 ++--
 .../beam/sdk/io/ConstantAvroDestination.java    | 130 ++++++
 .../beam/sdk/io/DefaultFilenamePolicy.java      |   1 -
 .../beam/sdk/io/DynamicAvroDestinations.java    |  46 ++
 .../beam/sdk/io/DynamicFileDestinations.java    |  59 ++-
 .../org/apache/beam/sdk/io/FileBasedSink.java   | 121 +++--
 .../java/org/apache/beam/sdk/io/TFRecordIO.java |  23 +-
 .../java/org/apache/beam/sdk/io/TextIO.java     | 228 ++++++----
 .../java/org/apache/beam/sdk/io/TextSink.java   |  14 +-
 .../java/org/apache/beam/sdk/io/WriteFiles.java | 116 ++---
 .../java/org/apache/beam/sdk/io/AvroIOTest.java | 156 ++++++-
 .../apache/beam/sdk/io/FileBasedSinkTest.java   |   6 +-
 .../java/org/apache/beam/sdk/io/SimpleSink.java |  10 +-
 .../org/apache/beam/sdk/io/TextIOWriteTest.java |  23 +-
 .../org/apache/beam/sdk/io/WriteFilesTest.java  |  74 ++--
 .../java/org/apache/beam/sdk/io/xml/XmlIO.java  |   4 +-
 .../org/apache/beam/sdk/io/xml/XmlSink.java     |   8 +-
 27 files changed, 1214 insertions(+), 491 deletions(-)
----------------------------------------------------------------------



[3/4] beam git commit: [BEAM-92] Supports DynamicDestinations in AvroIO.

Posted by jk...@apache.org.
[BEAM-92] Supports DynamicDestinations in AvroIO.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9f2622fa
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9f2622fa
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9f2622fa

Branch: refs/heads/master
Commit: 9f2622fa19da1284222e872fdcd63b086bdc3509
Parents: 1f2634d
Author: Reuven Lax <re...@google.com>
Authored: Thu Jul 6 20:22:25 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Fri Jul 28 17:28:12 2017 -0700

----------------------------------------------------------------------
 .../core/construction/ParDoTranslation.java     |   2 +-
 .../construction/WriteFilesTranslation.java     |  81 ++--
 .../construction/PTransformMatchersTest.java    |  10 +-
 .../construction/WriteFilesTranslationTest.java |  26 +-
 .../direct/WriteWithShardingFactory.java        |  10 +-
 .../direct/WriteWithShardingFactoryTest.java    |   8 +-
 .../beam/runners/dataflow/DataflowRunner.java   |   8 +-
 .../runners/dataflow/DataflowRunnerTest.java    |  10 +-
 .../src/main/proto/beam_runner_api.proto        |   2 +
 .../java/org/apache/beam/sdk/io/AvroIO.java     | 436 +++++++++++++++----
 .../java/org/apache/beam/sdk/io/AvroSink.java   |  93 ++--
 .../beam/sdk/io/ConstantAvroDestination.java    | 130 ++++++
 .../beam/sdk/io/DefaultFilenamePolicy.java      |   1 -
 .../beam/sdk/io/DynamicAvroDestinations.java    |  46 ++
 .../beam/sdk/io/DynamicFileDestinations.java    |  59 ++-
 .../org/apache/beam/sdk/io/FileBasedSink.java   | 121 +++--
 .../java/org/apache/beam/sdk/io/TFRecordIO.java |  23 +-
 .../java/org/apache/beam/sdk/io/TextIO.java     | 228 ++++++----
 .../java/org/apache/beam/sdk/io/TextSink.java   |  14 +-
 .../java/org/apache/beam/sdk/io/WriteFiles.java | 116 ++---
 .../java/org/apache/beam/sdk/io/AvroIOTest.java | 156 ++++++-
 .../apache/beam/sdk/io/FileBasedSinkTest.java   |   6 +-
 .../java/org/apache/beam/sdk/io/SimpleSink.java |  10 +-
 .../org/apache/beam/sdk/io/TextIOWriteTest.java |  23 +-
 .../org/apache/beam/sdk/io/WriteFilesTest.java  |  74 ++--
 .../java/org/apache/beam/sdk/io/xml/XmlIO.java  |   4 +-
 .../org/apache/beam/sdk/io/xml/XmlSink.java     |   8 +-
 27 files changed, 1214 insertions(+), 491 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
index d7b0e9f..5765c51 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
@@ -484,7 +484,7 @@ public class ParDoTranslation {
         });
   }
 
-  private static SideInput toProto(PCollectionView<?> view) {
+  public static SideInput toProto(PCollectionView<?> view) {
     Builder builder = SideInput.newBuilder();
     builder.setAccessPattern(
         FunctionSpec.newBuilder()

http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java
index b1d2da4..7954b0e 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java
@@ -19,29 +19,35 @@
 package org.apache.beam.runners.core.construction;
 
 import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
 
 import com.google.auto.service.AutoService;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import com.google.protobuf.Any;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.BytesValue;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi.SdkFunctionSpec;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.SideInput;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi.WriteFilesPayload;
 import org.apache.beam.sdk.io.FileBasedSink;
 import org.apache.beam.sdk.io.WriteFiles;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.TupleTag;
 
 /**
  * Utility methods for translating a {@link WriteFiles} to and from {@link RunnerApi}
@@ -53,28 +59,25 @@ public class WriteFilesTranslation {
   public static final String CUSTOM_JAVA_FILE_BASED_SINK_URN =
       "urn:beam:file_based_sink:javasdk:0.1";
 
-  public static final String CUSTOM_JAVA_FILE_BASED_SINK_FORMAT_FUNCTION_URN =
-      "urn:beam:file_based_sink_format_function:javasdk:0.1";
-
   @VisibleForTesting
   static WriteFilesPayload toProto(WriteFiles<?, ?, ?> transform) {
+    Map<String, SideInput> sideInputs = Maps.newHashMap();
+    for (PCollectionView<?> view : transform.getSink().getDynamicDestinations().getSideInputs()) {
+      sideInputs.put(view.getTagInternal().getId(), ParDoTranslation.toProto(view));
+    }
     return WriteFilesPayload.newBuilder()
         .setSink(toProto(transform.getSink()))
-        .setFormatFunction(toProto(transform.getFormatFunction()))
         .setWindowedWrites(transform.isWindowedWrites())
         .setRunnerDeterminedSharding(
             transform.getNumShards() == null && transform.getSharding() == null)
+        .putAllSideInputs(sideInputs)
         .build();
   }
 
-  private static SdkFunctionSpec toProto(FileBasedSink<?, ?> sink) {
+  private static SdkFunctionSpec toProto(FileBasedSink<?, ?, ?> sink) {
     return toProto(CUSTOM_JAVA_FILE_BASED_SINK_URN, sink);
   }
 
-  private static SdkFunctionSpec toProto(SerializableFunction<?, ?> serializableFunction) {
-    return toProto(CUSTOM_JAVA_FILE_BASED_SINK_FORMAT_FUNCTION_URN, serializableFunction);
-  }
-
   private static SdkFunctionSpec toProto(String urn, Serializable serializable) {
     return SdkFunctionSpec.newBuilder()
         .setSpec(
@@ -91,7 +94,7 @@ public class WriteFilesTranslation {
   }
 
   @VisibleForTesting
-  static FileBasedSink<?, ?> sinkFromProto(SdkFunctionSpec sinkProto) throws IOException {
+  static FileBasedSink<?, ?, ?> sinkFromProto(SdkFunctionSpec sinkProto) throws IOException {
     checkArgument(
         sinkProto.getSpec().getUrn().equals(CUSTOM_JAVA_FILE_BASED_SINK_URN),
         "Cannot extract %s instance from %s with URN %s",
@@ -102,44 +105,44 @@ public class WriteFilesTranslation {
     byte[] serializedSink =
         sinkProto.getSpec().getParameter().unpack(BytesValue.class).getValue().toByteArray();
 
-    return (FileBasedSink<?, ?>)
+    return (FileBasedSink<?, ?, ?>)
         SerializableUtils.deserializeFromByteArray(
             serializedSink, FileBasedSink.class.getSimpleName());
   }
 
-  @VisibleForTesting
-  static <InputT, OutputT> SerializableFunction<InputT, OutputT> formatFunctionFromProto(
-      SdkFunctionSpec sinkProto) throws IOException {
-    checkArgument(
-        sinkProto.getSpec().getUrn().equals(CUSTOM_JAVA_FILE_BASED_SINK_FORMAT_FUNCTION_URN),
-        "Cannot extract %s instance from %s with URN %s",
-        SerializableFunction.class.getSimpleName(),
-        FunctionSpec.class.getSimpleName(),
-        sinkProto.getSpec().getUrn());
-
-    byte[] serializedFunction =
-        sinkProto.getSpec().getParameter().unpack(BytesValue.class).getValue().toByteArray();
-
-    return (SerializableFunction<InputT, OutputT>)
-        SerializableUtils.deserializeFromByteArray(
-            serializedFunction, FileBasedSink.class.getSimpleName());
-  }
-
-  public static <UserT, DestinationT, OutputT> FileBasedSink<OutputT, DestinationT> getSink(
+  public static <UserT, DestinationT, OutputT> FileBasedSink<UserT, DestinationT, OutputT> getSink(
       AppliedPTransform<PCollection<UserT>, PDone, ? extends PTransform<PCollection<UserT>, PDone>>
           transform)
       throws IOException {
-    return (FileBasedSink<OutputT, DestinationT>)
+    return (FileBasedSink<UserT, DestinationT, OutputT>)
         sinkFromProto(getWriteFilesPayload(transform).getSink());
   }
 
-  public static <InputT, OutputT> SerializableFunction<InputT, OutputT> getFormatFunction(
-      AppliedPTransform<
-              PCollection<InputT>, PDone, ? extends PTransform<PCollection<InputT>, PDone>>
-          transform)
-      throws IOException {
-    return formatFunctionFromProto(
-        getWriteFilesPayload(transform).<InputT, OutputT>getFormatFunction());
+  public static <UserT, DestinationT, OutputT>
+      List<PCollectionView<?>> getDynamicDestinationSideInputs(
+          AppliedPTransform<
+                  PCollection<UserT>, PDone, ? extends PTransform<PCollection<UserT>, PDone>>
+              transform)
+          throws IOException {
+    SdkComponents sdkComponents = SdkComponents.create();
+    RunnerApi.PTransform transformProto = PTransformTranslation.toProto(transform, sdkComponents);
+    List<PCollectionView<?>> views = Lists.newArrayList();
+    Map<String, SideInput> sideInputs = getWriteFilesPayload(transform).getSideInputsMap();
+    for (Map.Entry<String, SideInput> entry : sideInputs.entrySet()) {
+      PCollection<?> originalPCollection =
+          checkNotNull(
+              (PCollection<?>) transform.getInputs().get(new TupleTag<>(entry.getKey())),
+              "no input with tag %s",
+              entry.getKey());
+      views.add(
+          ParDoTranslation.viewFromProto(
+              entry.getValue(),
+              entry.getKey(),
+              originalPCollection,
+              transformProto,
+              RehydratedComponents.forComponents(sdkComponents.toComponents())));
+    }
+    return views;
   }
 
   public static <T> boolean isWindowedWrites(

http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
index 316645b..1862699 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
@@ -57,7 +57,6 @@ import org.apache.beam.sdk.transforms.Materialization;
 import org.apache.beam.sdk.transforms.Materializations;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.SerializableFunctions;
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
@@ -549,15 +548,14 @@ public class PTransformMatchersTest implements Serializable {
             false);
     WriteFiles<Integer, Void, Integer> write =
         WriteFiles.to(
-            new FileBasedSink<Integer, Void>(
+            new FileBasedSink<Integer, Void, Integer>(
                 StaticValueProvider.of(outputDirectory),
-                DynamicFileDestinations.constant(new FakeFilenamePolicy())) {
+                DynamicFileDestinations.<Integer>constant(new FakeFilenamePolicy())) {
               @Override
-              public WriteOperation<Integer, Void> createWriteOperation() {
+              public WriteOperation<Void, Integer> createWriteOperation() {
                 return null;
               }
-            },
-            SerializableFunctions.<Integer>identity());
+            });
     assertThat(
         PTransformMatchers.writeWithRunnerDeterminedSharding().matches(appliedWrite(write)),
         is(true));

http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java
index 4259ac8..e067fac 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java
@@ -38,7 +38,6 @@ import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.SerializableFunctions;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
@@ -63,12 +62,11 @@ public class WriteFilesTranslationTest {
   public static class TestWriteFilesPayloadTranslation {
     @Parameters(name = "{index}: {0}")
     public static Iterable<WriteFiles<Object, Void, Object>> data() {
-      SerializableFunction<Object, Object> format = SerializableFunctions.constant(null);
       return ImmutableList.of(
-          WriteFiles.to(new DummySink(), format),
-          WriteFiles.to(new DummySink(), format).withWindowedWrites(),
-          WriteFiles.to(new DummySink(), format).withNumShards(17),
-          WriteFiles.to(new DummySink(), format).withWindowedWrites().withNumShards(42));
+          WriteFiles.to(new DummySink()),
+          WriteFiles.to(new DummySink()).withWindowedWrites(),
+          WriteFiles.to(new DummySink()).withNumShards(17),
+          WriteFiles.to(new DummySink()).withWindowedWrites().withNumShards(42));
     }
 
     @Parameter(0)
@@ -87,7 +85,8 @@ public class WriteFilesTranslationTest {
       assertThat(payload.getWindowedWrites(), equalTo(writeFiles.isWindowedWrites()));
 
       assertThat(
-          (FileBasedSink<String, Void>) WriteFilesTranslation.sinkFromProto(payload.getSink()),
+          (FileBasedSink<String, Void, String>)
+              WriteFilesTranslation.sinkFromProto(payload.getSink()),
           equalTo(writeFiles.getSink()));
     }
 
@@ -118,16 +117,17 @@ public class WriteFilesTranslationTest {
    * A simple {@link FileBasedSink} for testing serialization/deserialization. Not mocked to avoid
    * any issues serializing mocks.
    */
-  private static class DummySink extends FileBasedSink<Object, Void> {
+  private static class DummySink extends FileBasedSink<Object, Void, Object> {
 
     DummySink() {
       super(
           StaticValueProvider.of(FileSystems.matchNewResource("nowhere", false)),
-          DynamicFileDestinations.constant(new DummyFilenamePolicy()));
+          DynamicFileDestinations.constant(
+              new DummyFilenamePolicy(), SerializableFunctions.constant(null)));
     }
 
     @Override
-    public WriteOperation<Object, Void> createWriteOperation() {
+    public WriteOperation<Void, Object> createWriteOperation() {
       return new DummyWriteOperation(this);
     }
 
@@ -152,13 +152,13 @@ public class WriteFilesTranslationTest {
     }
   }
 
-  private static class DummyWriteOperation extends FileBasedSink.WriteOperation<Object, Void> {
-    public DummyWriteOperation(FileBasedSink<Object, Void> sink) {
+  private static class DummyWriteOperation extends FileBasedSink.WriteOperation<Void, Object> {
+    public DummyWriteOperation(FileBasedSink<Object, Void, Object> sink) {
       super(sink);
     }
 
     @Override
-    public FileBasedSink.Writer<Object, Void> createWriter() throws Exception {
+    public FileBasedSink.Writer<Void, Object> createWriter() throws Exception {
       throw new UnsupportedOperationException("Should never be called.");
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
index ba796ae..3557c5d 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
@@ -24,10 +24,12 @@ import com.google.common.base.Suppliers;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ThreadLocalRandom;
 import org.apache.beam.runners.core.construction.PTransformReplacements;
 import org.apache.beam.runners.core.construction.WriteFilesTranslation;
+import org.apache.beam.sdk.io.FileBasedSink;
 import org.apache.beam.sdk.io.WriteFiles;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.runners.PTransformOverrideFactory;
@@ -61,10 +63,10 @@ class WriteWithShardingFactory<InputT>
       AppliedPTransform<PCollection<InputT>, PDone, PTransform<PCollection<InputT>, PDone>>
           transform) {
     try {
-      WriteFiles<InputT, ?, ?> replacement =
-          WriteFiles.to(
-              WriteFilesTranslation.getSink(transform),
-              WriteFilesTranslation.getFormatFunction(transform));
+      List<PCollectionView<?>> sideInputs =
+          WriteFilesTranslation.getDynamicDestinationSideInputs(transform);
+      FileBasedSink sink = WriteFilesTranslation.getSink(transform);
+      WriteFiles<InputT, ?, ?> replacement = WriteFiles.to(sink).withSideInputs(sideInputs);
       if (WriteFilesTranslation.isWindowedWrites(transform)) {
         replacement = replacement.withWindowedWrites();
       }

http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
index 6dd069c..d0db44e 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
@@ -55,7 +55,6 @@ import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.DoFnTester;
 import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.SerializableFunctions;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
@@ -143,15 +142,14 @@ public class WriteWithShardingFactoryTest implements Serializable {
 
     PTransform<PCollection<Object>, PDone> original =
         WriteFiles.to(
-            new FileBasedSink<Object, Void>(
+            new FileBasedSink<Object, Void, Object>(
                 StaticValueProvider.of(outputDirectory),
                 DynamicFileDestinations.constant(new FakeFilenamePolicy())) {
               @Override
-              public WriteOperation<Object, Void> createWriteOperation() {
+              public WriteOperation<Void, Object> createWriteOperation() {
                 throw new IllegalArgumentException("Should not be used");
               }
-            },
-            SerializableFunctions.identity());
+            });
     @SuppressWarnings("unchecked")
     PCollection<Object> objs = (PCollection) p.apply(Create.empty(VoidCoder.of()));
 

http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 762ac9f..f8d2c3c 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -92,6 +92,7 @@ import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi;
 import org.apache.beam.sdk.extensions.gcp.storage.PathValidator;
 import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.FileBasedSink;
 import org.apache.beam.sdk.io.FileSystems;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.io.UnboundedSource;
@@ -1501,10 +1502,11 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
       }
 
       try {
+        List<PCollectionView<?>> sideInputs =
+            WriteFilesTranslation.getDynamicDestinationSideInputs(transform);
+        FileBasedSink sink = WriteFilesTranslation.getSink(transform);
         WriteFiles<UserT, DestinationT, OutputT> replacement =
-            WriteFiles.<UserT, DestinationT, OutputT>to(
-                WriteFilesTranslation.<UserT, DestinationT, OutputT>getSink(transform),
-                WriteFilesTranslation.<UserT, OutputT>getFormatFunction(transform));
+            WriteFiles.to(sink).withSideInputs(sideInputs);
         if (WriteFilesTranslation.isWindowedWrites(transform)) {
           replacement = replacement.withWindowedWrites();
         }

http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index 7556a28..9db73c6 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -1271,8 +1271,7 @@ public class DataflowRunnerTest implements Serializable {
 
     StreamingShardedWriteFactory<Object, Void, Object> factory =
         new StreamingShardedWriteFactory<>(p.getOptions());
-    WriteFiles<Object, Void, Object> original =
-        WriteFiles.to(new TestSink(tmpFolder.toString()), SerializableFunctions.identity());
+    WriteFiles<Object, Void, Object> original = WriteFiles.to(new TestSink(tmpFolder.toString()));
     PCollection<Object> objs = (PCollection) p.apply(Create.empty(VoidCoder.of()));
     AppliedPTransform<PCollection<Object>, PDone, WriteFiles<Object, Void, Object>>
         originalApplication =
@@ -1290,7 +1289,7 @@ public class DataflowRunnerTest implements Serializable {
     assertThat(replacement.getNumShards().get(), equalTo(expectedNumShards));
   }
 
-  private static class TestSink extends FileBasedSink<Object, Void> {
+  private static class TestSink extends FileBasedSink<Object, Void, Object> {
     @Override
     public void validate(PipelineOptions options) {}
 
@@ -1315,11 +1314,12 @@ public class DataflowRunnerTest implements Serializable {
                     int shardNumber, int numShards, OutputFileHints outputFileHints) {
                   throw new UnsupportedOperationException("should not be called");
                 }
-              }));
+              },
+              SerializableFunctions.identity()));
     }
 
     @Override
-    public WriteOperation<Object, Void> createWriteOperation() {
+    public WriteOperation<Void, Object> createWriteOperation() {
       throw new IllegalArgumentException("Should not be used");
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
----------------------------------------------------------------------
diff --git a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
index 42e2601..9afb565 100644
--- a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
+++ b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
@@ -375,6 +375,8 @@ message WriteFilesPayload {
   bool windowed_writes = 3;
 
   bool runner_determined_sharding = 4;
+
+  map<string, SideInput> side_inputs = 5;
 }
 
 // A coder, the binary format for serialization and deserialization of data in

http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index 27c9073..824f725 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -25,7 +25,6 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Supplier;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
-import com.google.common.io.BaseEncoding;
 import java.util.Map;
 import javax.annotation.Nullable;
 import org.apache.avro.Schema;
@@ -40,7 +39,6 @@ import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VoidCoder;
-import org.apache.beam.sdk.io.FileBasedSink.DynamicDestinations;
 import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
 import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.options.ValueProvider;
@@ -51,7 +49,6 @@ import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.SerializableFunctions;
 import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.display.HasDisplayData;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PDone;
@@ -161,6 +158,51 @@ import org.apache.beam.sdk.values.TypeDescriptors;
  *     .withSuffix(".avro"));
  * }</pre>
  *
+ * <p>The following shows a more-complex example of AvroIO.Write usage, generating dynamic file
+ * destinations as well as a dynamic Avro schema per file. In this example, a PCollection of user
+ * events (e.g. actions on a website) is written out to Avro files. Each event contains the user id
+ * as an integer field. We want events for each user to go into a specific directory for that user,
+ * and each user's data should be written with a specific schema for that user; a side input is
+ * used, so the schema can be calculated in a different stage.
+ *
+ * <pre>{@code
+ * // This is the user class that controls dynamic destinations for this avro write. The input to
+ * // AvroIO.Write will be UserEvent, and we will be writing GenericRecords to the file (in order
+ * // to have dynamic schemas). Everything is per userid, so we define a dynamic destination type
+ * // of Integer.
+ * class UserDynamicAvroDestinations
+ *     extends DynamicAvroDestinations<UserEvent, Integer, GenericRecord> {
+ *   private final PCollectionView<Map<Integer, String>> userToSchemaMap;
+ *   public UserDynamicAvroDestinations( PCollectionView<Map<Integer, String>> userToSchemaMap) {
+ *     this.userToSchemaMap = userToSchemaMap;
+ *   }
+ *   public GenericRecord formatRecord(UserEvent record) {
+ *     return formatUserRecord(record, getSchema(record.getUserId()));
+ *   }
+ *   public Schema getSchema(Integer userId) {
+ *     return new Schema.Parser().parse(sideInput(userToSchemaMap).get(userId));
+ *   }
+ *   public Integer getDestination(UserEvent record) {
+ *     return record.getUserId();
+ *   }
+ *   public Integer getDefaultDestination() {
+ *     return 0;
+ *   }
+ *   public FilenamePolicy getFilenamePolicy(Integer userId) {
+ *     return DefaultFilenamePolicy.fromParams(new Params().withBaseFilename(baseDir + "/user-"
+ *     + userId + "/events"));
+ *   }
+ *   public List<PCollectionView<?>> getSideInputs() {
+ *     return ImmutableList.<PCollectionView<?>>of(userToSchemaMap);
+ *   }
+ * }
+ * PCollection<UserEvents> events = ...;
+ * PCollectionView<Integer, String> schemaMap = events.apply(
+ *     "ComputeSchemas", new ComputePerUserSchemas());
+ * events.apply("WriteAvros", AvroIO.<Integer>writeCustomTypeToGenericRecords()
+ *     .to(new UserDynamicAvros()));
+ * }</pre>
+ *
  * <p>By default, {@link AvroIO.Write} produces output files that are compressed using the {@link
  * org.apache.avro.file.Codec CodecFactory.deflateCodec(6)}. This default can be changed or
  * overridden using {@link AvroIO.Write#withCodec}.
@@ -256,18 +298,53 @@ public class AvroIO {
    * pattern).
    */
   public static <T> Write<T> write(Class<T> recordClass) {
-    return AvroIO.<T>defaultWriteBuilder()
-        .setRecordClass(recordClass)
-        .setSchema(ReflectData.get().getSchema(recordClass))
-        .build();
+    return new Write<>(
+        AvroIO.<T, T>defaultWriteBuilder()
+            .setGenericRecords(false)
+            .setSchema(ReflectData.get().getSchema(recordClass))
+            .build());
   }
 
   /** Writes Avro records of the specified schema. */
   public static Write<GenericRecord> writeGenericRecords(Schema schema) {
-    return AvroIO.<GenericRecord>defaultWriteBuilder()
-        .setRecordClass(GenericRecord.class)
-        .setSchema(schema)
-        .build();
+    return new Write<>(
+        AvroIO.<GenericRecord, GenericRecord>defaultWriteBuilder()
+            .setGenericRecords(true)
+            .setSchema(schema)
+            .build());
+  }
+
+  /**
+   * A {@link PTransform} that writes a {@link PCollection} to an avro file (or multiple avro files
+   * matching a sharding pattern), with each element of the input collection encoded into its own
+   * record of type OutputT.
+   *
+   * <p>This version allows you to apply {@link AvroIO} writes to a PCollection of a custom type
+   * {@link UserT}. A format mechanism that converts the input type {@link UserT} to the output type
+   * that will be written to the file must be specified. If using a custom {@link
+   * DynamicAvroDestinations} object this is done using {@link
+   * DynamicAvroDestinations#formatRecord}, otherwise the {@link
+   * AvroIO.TypedWrite#withFormatFunction} can be used to specify a format function.
+   *
+   * <p>The advantage of using a custom type is that is it allows a user-provided {@link
+   * DynamicAvroDestinations} object, set via {@link AvroIO.Write#to(DynamicAvroDestinations)} to
+   * examine the custom type when choosing a destination.
+   *
+   * <p>If the output type is {@link GenericRecord} use {@link #writeCustomTypeToGenericRecords()}
+   * instead.
+   */
+  public static <UserT, OutputT> TypedWrite<UserT, OutputT> writeCustomType() {
+    return AvroIO.<UserT, OutputT>defaultWriteBuilder().setGenericRecords(false).build();
+  }
+
+  /**
+   * Similar to {@link #writeCustomType()}, but specialized for the case where the output type is
+   * {@link GenericRecord}. A schema must be specified either in {@link
+   * DynamicAvroDestinations#getSchema} or if not using dynamic destinations, by using {@link
+   * TypedWrite#withSchema(Schema)}.
+   */
+  public static <UserT> TypedWrite<UserT, GenericRecord> writeCustomTypeToGenericRecords() {
+    return AvroIO.<UserT, GenericRecord>defaultWriteBuilder().setGenericRecords(true).build();
   }
 
   /**
@@ -277,12 +354,12 @@ public class AvroIO {
     return writeGenericRecords(new Schema.Parser().parse(schema));
   }
 
-  private static <T> Write.Builder<T> defaultWriteBuilder() {
-    return new AutoValue_AvroIO_Write.Builder<T>()
+  private static <UserT, OutputT> TypedWrite.Builder<UserT, OutputT> defaultWriteBuilder() {
+    return new AutoValue_AvroIO_TypedWrite.Builder<UserT, OutputT>()
         .setFilenameSuffix(null)
         .setShardTemplate(null)
         .setNumShards(0)
-        .setCodec(Write.DEFAULT_CODEC)
+        .setCodec(TypedWrite.DEFAULT_SERIALIZABLE_CODEC)
         .setMetadata(ImmutableMap.<String, Object>of())
         .setWindowedWrites(false);
   }
@@ -572,15 +649,18 @@ public class AvroIO {
     }
   }
 
-  /////////////////////////////////////////////////////////////////////////////
+  // ///////////////////////////////////////////////////////////////////////////
 
   /** Implementation of {@link #write}. */
   @AutoValue
-  public abstract static class Write<T> extends PTransform<PCollection<T>, PDone> {
-    private static final SerializableAvroCodecFactory DEFAULT_CODEC =
-        new SerializableAvroCodecFactory(CodecFactory.deflateCodec(6));
-    // This should be a multiple of 4 to not get a partial encoded byte.
-    private static final int METADATA_BYTES_MAX_LENGTH = 40;
+  public abstract static class TypedWrite<UserT, OutputT>
+      extends PTransform<PCollection<UserT>, PDone> {
+    static final CodecFactory DEFAULT_CODEC = CodecFactory.deflateCodec(6);
+    static final SerializableAvroCodecFactory DEFAULT_SERIALIZABLE_CODEC =
+        new SerializableAvroCodecFactory(DEFAULT_CODEC);
+
+    @Nullable
+    abstract SerializableFunction<UserT, OutputT> getFormatFunction();
 
     @Nullable abstract ValueProvider<ResourceId> getFilenamePrefix();
     @Nullable abstract String getShardTemplate();
@@ -590,11 +670,16 @@ public class AvroIO {
     abstract ValueProvider<ResourceId> getTempDirectory();
 
     abstract int getNumShards();
-    @Nullable abstract Class<T> getRecordClass();
+
+    abstract boolean getGenericRecords();
+
     @Nullable abstract Schema getSchema();
     abstract boolean getWindowedWrites();
     @Nullable abstract FilenamePolicy getFilenamePolicy();
 
+    @Nullable
+    abstract DynamicAvroDestinations<UserT, ?, OutputT> getDynamicDestinations();
+
     /**
      * The codec used to encode the blocks in the Avro file. String value drawn from those in
      * https://avro.apache.org/docs/1.7.7/api/java/org/apache/avro/file/CodecFactory.html
@@ -603,25 +688,39 @@ public class AvroIO {
     /** Avro file metadata. */
     abstract ImmutableMap<String, Object> getMetadata();
 
-    abstract Builder<T> toBuilder();
+    abstract Builder<UserT, OutputT> toBuilder();
 
     @AutoValue.Builder
-    abstract static class Builder<T> {
-      abstract Builder<T> setFilenamePrefix(ValueProvider<ResourceId> filenamePrefix);
-      abstract Builder<T> setFilenameSuffix(String filenameSuffix);
+    abstract static class Builder<UserT, OutputT> {
+      abstract Builder<UserT, OutputT> setFormatFunction(
+          SerializableFunction<UserT, OutputT> formatFunction);
 
-      abstract Builder<T> setTempDirectory(ValueProvider<ResourceId> tempDirectory);
+      abstract Builder<UserT, OutputT> setFilenamePrefix(ValueProvider<ResourceId> filenamePrefix);
 
-      abstract Builder<T> setNumShards(int numShards);
-      abstract Builder<T> setShardTemplate(String shardTemplate);
-      abstract Builder<T> setRecordClass(Class<T> recordClass);
-      abstract Builder<T> setSchema(Schema schema);
-      abstract Builder<T> setWindowedWrites(boolean windowedWrites);
-      abstract Builder<T> setFilenamePolicy(FilenamePolicy filenamePolicy);
-      abstract Builder<T> setCodec(SerializableAvroCodecFactory codec);
-      abstract Builder<T> setMetadata(ImmutableMap<String, Object> metadata);
+      abstract Builder<UserT, OutputT> setFilenameSuffix(String filenameSuffix);
+
+      abstract Builder<UserT, OutputT> setTempDirectory(ValueProvider<ResourceId> tempDirectory);
+
+      abstract Builder<UserT, OutputT> setNumShards(int numShards);
+
+      abstract Builder<UserT, OutputT> setShardTemplate(String shardTemplate);
+
+      abstract Builder<UserT, OutputT> setGenericRecords(boolean genericRecords);
 
-      abstract Write<T> build();
+      abstract Builder<UserT, OutputT> setSchema(Schema schema);
+
+      abstract Builder<UserT, OutputT> setWindowedWrites(boolean windowedWrites);
+
+      abstract Builder<UserT, OutputT> setFilenamePolicy(FilenamePolicy filenamePolicy);
+
+      abstract Builder<UserT, OutputT> setCodec(SerializableAvroCodecFactory codec);
+
+      abstract Builder<UserT, OutputT> setMetadata(ImmutableMap<String, Object> metadata);
+
+      abstract Builder<UserT, OutputT> setDynamicDestinations(
+          DynamicAvroDestinations<UserT, ?, OutputT> dynamicDestinations);
+
+      abstract TypedWrite<UserT, OutputT> build();
     }
 
     /**
@@ -635,7 +734,7 @@ public class AvroIO {
      * common suffix (if supplied using {@link #withSuffix(String)}). This default can be overridden
      * using {@link #to(FilenamePolicy)}.
      */
-    public Write<T> to(String outputPrefix) {
+    public TypedWrite<UserT, OutputT> to(String outputPrefix) {
       return to(FileBasedSink.convertToFileResourceIfPossible(outputPrefix));
     }
 
@@ -658,14 +757,12 @@ public class AvroIO {
      * infer a directory for temporary files.
      */
     @Experimental(Kind.FILESYSTEM)
-    public Write<T> to(ResourceId outputPrefix) {
+    public TypedWrite<UserT, OutputT> to(ResourceId outputPrefix) {
       return toResource(StaticValueProvider.of(outputPrefix));
     }
 
-    /**
-     * Like {@link #to(String)}.
-     */
-    public Write<T> to(ValueProvider<String> outputPrefix) {
+    /** Like {@link #to(String)}. */
+    public TypedWrite<UserT, OutputT> to(ValueProvider<String> outputPrefix) {
       return toResource(NestedValueProvider.of(outputPrefix,
           new SerializableFunction<String, ResourceId>() {
             @Override
@@ -675,11 +772,9 @@ public class AvroIO {
           }));
     }
 
-    /**
-     * Like {@link #to(ResourceId)}.
-     */
+    /** Like {@link #to(ResourceId)}. */
     @Experimental(Kind.FILESYSTEM)
-    public Write<T> toResource(ValueProvider<ResourceId> outputPrefix) {
+    public TypedWrite<UserT, OutputT> toResource(ValueProvider<ResourceId> outputPrefix) {
       return toBuilder().setFilenamePrefix(outputPrefix).build();
     }
 
@@ -687,16 +782,52 @@ public class AvroIO {
      * Writes to files named according to the given {@link FileBasedSink.FilenamePolicy}. A
      * directory for temporary files must be specified using {@link #withTempDirectory}.
      */
-    public Write<T> to(FilenamePolicy filenamePolicy) {
+    @Experimental(Kind.FILESYSTEM)
+    public TypedWrite<UserT, OutputT> to(FilenamePolicy filenamePolicy) {
       return toBuilder().setFilenamePolicy(filenamePolicy).build();
     }
 
+    /**
+     * Use a {@link DynamicAvroDestinations} object to vend {@link FilenamePolicy} objects. These
+     * objects can examine the input record when creating a {@link FilenamePolicy}. A directory for
+     * temporary files must be specified using {@link #withTempDirectory}.
+     */
+    @Experimental(Kind.FILESYSTEM)
+    public TypedWrite<UserT, OutputT> to(
+        DynamicAvroDestinations<UserT, ?, OutputT> dynamicDestinations) {
+      return toBuilder().setDynamicDestinations(dynamicDestinations).build();
+    }
+
+    /**
+     * Sets the the output schema. Can only be used when the output type is {@link GenericRecord}
+     * and when not using {@link #to(DynamicAvroDestinations)}.
+     */
+    public TypedWrite<UserT, OutputT> withSchema(Schema schema) {
+      return toBuilder().setSchema(schema).build();
+    }
+
+    /**
+     * Specifies a format function to convert {@link UserT} to the output type. If {@link
+     * #to(DynamicAvroDestinations)} is used, {@link DynamicAvroDestinations#formatRecord} must be
+     * used instead.
+     */
+    public TypedWrite<UserT, OutputT> withFormatFunction(
+        SerializableFunction<UserT, OutputT> formatFunction) {
+      return toBuilder().setFormatFunction(formatFunction).build();
+    }
+
     /** Set the base directory used to generate temporary files. */
     @Experimental(Kind.FILESYSTEM)
-    public Write<T> withTempDirectory(ValueProvider<ResourceId> tempDirectory) {
+    public TypedWrite<UserT, OutputT> withTempDirectory(ValueProvider<ResourceId> tempDirectory) {
       return toBuilder().setTempDirectory(tempDirectory).build();
     }
 
+    /** Set the base directory used to generate temporary files. */
+    @Experimental(Kind.FILESYSTEM)
+    public TypedWrite<UserT, OutputT> withTempDirectory(ResourceId tempDirectory) {
+      return withTempDirectory(StaticValueProvider.of(tempDirectory));
+    }
+
     /**
      * Uses the given {@link ShardNameTemplate} for naming output files. This option may only be
      * used when using one of the default filename-prefix to() overrides.
@@ -704,7 +835,7 @@ public class AvroIO {
      * <p>See {@link DefaultFilenamePolicy} for how the prefix, shard name template, and suffix are
      * used.
      */
-    public Write<T> withShardNameTemplate(String shardTemplate) {
+    public TypedWrite<UserT, OutputT> withShardNameTemplate(String shardTemplate) {
       return toBuilder().setShardTemplate(shardTemplate).build();
     }
 
@@ -715,7 +846,7 @@ public class AvroIO {
      * <p>See {@link DefaultFilenamePolicy} for how the prefix, shard name template, and suffix are
      * used.
      */
-    public Write<T> withSuffix(String filenameSuffix) {
+    public TypedWrite<UserT, OutputT> withSuffix(String filenameSuffix) {
       return toBuilder().setFilenameSuffix(filenameSuffix).build();
     }
 
@@ -729,7 +860,7 @@ public class AvroIO {
      *
      * @param numShards the number of shards to use, or 0 to let the system decide.
      */
-    public Write<T> withNumShards(int numShards) {
+    public TypedWrite<UserT, OutputT> withNumShards(int numShards) {
       checkArgument(numShards >= 0);
       return toBuilder().setNumShards(numShards).build();
     }
@@ -744,7 +875,7 @@ public class AvroIO {
      *
      * <p>This is equivalent to {@code .withNumShards(1).withShardNameTemplate("")}
      */
-    public Write<T> withoutSharding() {
+    public TypedWrite<UserT, OutputT> withoutSharding() {
       return withNumShards(1).withShardNameTemplate("");
     }
 
@@ -754,12 +885,12 @@ public class AvroIO {
      * <p>If using {@link #to(FileBasedSink.FilenamePolicy)}. Filenames will be generated using
      * {@link FilenamePolicy#windowedFilename}. See also {@link WriteFiles#withWindowedWrites()}.
      */
-    public Write<T> withWindowedWrites() {
+    public TypedWrite<UserT, OutputT> withWindowedWrites() {
       return toBuilder().setWindowedWrites(true).build();
     }
 
     /** Writes to Avro file(s) compressed using specified codec. */
-    public Write<T> withCodec(CodecFactory codec) {
+    public TypedWrite<UserT, OutputT> withCodec(CodecFactory codec) {
       return toBuilder().setCodec(new SerializableAvroCodecFactory(codec)).build();
     }
 
@@ -768,7 +899,7 @@ public class AvroIO {
      *
      * <p>Supported value types are String, Long, and byte[].
      */
-    public Write<T> withMetadata(Map<String, Object> metadata) {
+    public TypedWrite<UserT, OutputT> withMetadata(Map<String, Object> metadata) {
       Map<String, String> badKeys = Maps.newLinkedHashMap();
       for (Map.Entry<String, Object> entry : metadata.entrySet()) {
         Object v = entry.getValue();
@@ -783,18 +914,31 @@ public class AvroIO {
       return toBuilder().setMetadata(ImmutableMap.copyOf(metadata)).build();
     }
 
-    DynamicDestinations<T, Void> resolveDynamicDestinations() {
-      FilenamePolicy usedFilenamePolicy = getFilenamePolicy();
-      if (usedFilenamePolicy == null) {
-        usedFilenamePolicy =
-            DefaultFilenamePolicy.fromStandardParameters(
-                getFilenamePrefix(), getShardTemplate(), getFilenameSuffix(), getWindowedWrites());
+    DynamicAvroDestinations<UserT, ?, OutputT> resolveDynamicDestinations() {
+      DynamicAvroDestinations<UserT, ?, OutputT> dynamicDestinations = getDynamicDestinations();
+      if (dynamicDestinations == null) {
+        FilenamePolicy usedFilenamePolicy = getFilenamePolicy();
+        if (usedFilenamePolicy == null) {
+          usedFilenamePolicy =
+              DefaultFilenamePolicy.fromStandardParameters(
+                  getFilenamePrefix(),
+                  getShardTemplate(),
+                  getFilenameSuffix(),
+                  getWindowedWrites());
+        }
+        dynamicDestinations =
+            constantDestinations(
+                usedFilenamePolicy,
+                getSchema(),
+                getMetadata(),
+                getCodec().getCodec(),
+                getFormatFunction());
       }
-      return DynamicFileDestinations.constant(usedFilenamePolicy);
+      return dynamicDestinations;
     }
 
     @Override
-    public PDone expand(PCollection<T> input) {
+    public PDone expand(PCollection<UserT> input) {
       checkArgument(
           getFilenamePrefix() != null || getTempDirectory() != null,
           "Need to set either the filename prefix or the tempDirectory of a AvroIO.Write "
@@ -805,24 +949,25 @@ public class AvroIO {
             "shardTemplate and filenameSuffix should only be used with the default "
                 + "filename policy");
       }
+      if (getDynamicDestinations() != null) {
+        checkArgument(
+            getFormatFunction() == null,
+            "A format function should not be specified "
+                + "with DynamicDestinations. Use DynamicDestinations.formatRecord instead");
+      }
+
       return expandTyped(input, resolveDynamicDestinations());
     }
 
     public <DestinationT> PDone expandTyped(
-        PCollection<T> input, DynamicDestinations<T, DestinationT> dynamicDestinations) {
+        PCollection<UserT> input,
+        DynamicAvroDestinations<UserT, DestinationT, OutputT> dynamicDestinations) {
       ValueProvider<ResourceId> tempDirectory = getTempDirectory();
       if (tempDirectory == null) {
         tempDirectory = getFilenamePrefix();
       }
-      WriteFiles<T, DestinationT, T> write =
-          WriteFiles.to(
-              new AvroSink<>(
-                  tempDirectory,
-                  dynamicDestinations,
-                  AvroCoder.of(getRecordClass(), getSchema()),
-                  getCodec(),
-                  getMetadata()),
-              SerializableFunctions.<T>identity());
+      WriteFiles<UserT, DestinationT, OutputT> write =
+          WriteFiles.to(new AvroSink<>(tempDirectory, dynamicDestinations, getGenericRecords()));
       if (getNumShards() > 0) {
         write = write.withNumShards(getNumShards());
       }
@@ -845,33 +990,11 @@ public class AvroIO {
                 : getTempDirectory().toString();
       }
       builder
-          .add(DisplayData.item("schema", getRecordClass()).withLabel("Record Schema"))
           .addIfNotDefault(
               DisplayData.item("numShards", getNumShards()).withLabel("Maximum Output Shards"), 0)
-          .addIfNotDefault(
-              DisplayData.item("codec", getCodec().toString()).withLabel("Avro Compression Codec"),
-              DEFAULT_CODEC.toString())
           .addIfNotNull(
               DisplayData.item("tempDirectory", tempDirectory)
                   .withLabel("Directory for temporary files"));
-      builder.include("Metadata", new Metadata());
-    }
-
-    private class Metadata implements HasDisplayData {
-      @Override
-      public void populateDisplayData(DisplayData.Builder builder) {
-        for (Map.Entry<String, Object> entry : getMetadata().entrySet()) {
-          DisplayData.Type type = DisplayData.inferType(entry.getValue());
-          if (type != null) {
-            builder.add(DisplayData.item(entry.getKey(), type, entry.getValue()));
-          } else {
-            String base64 = BaseEncoding.base64().encode((byte[]) entry.getValue());
-            String repr = base64.length() <= METADATA_BYTES_MAX_LENGTH
-                ? base64 : base64.substring(0, METADATA_BYTES_MAX_LENGTH) + "...";
-            builder.add(DisplayData.item(entry.getKey(), repr));
-          }
-        }
-      }
     }
 
     @Override
@@ -880,6 +1003,131 @@ public class AvroIO {
     }
   }
 
+  /**
+   * This class is used as the default return value of {@link AvroIO#write}
+   *
+   * <p>All methods in this class delegate to the appropriate method of {@link AvroIO.TypedWrite}.
+   * This class exists for backwards compatibility, and will be removed in Beam 3.0.
+   */
+  public static class Write<T> extends PTransform<PCollection<T>, PDone> {
+    @VisibleForTesting TypedWrite<T, T> inner;
+
+    Write(TypedWrite<T, T> inner) {
+      this.inner = inner;
+    }
+
+    /** See {@link TypedWrite#to(String)}. */
+    public Write<T> to(String outputPrefix) {
+      return new Write<>(
+          inner
+              .to(FileBasedSink.convertToFileResourceIfPossible(outputPrefix))
+              .withFormatFunction(SerializableFunctions.<T>identity()));
+    }
+
+    /** See {@link TypedWrite#to(ResourceId)} . */
+    @Experimental(Kind.FILESYSTEM)
+    public Write<T> to(ResourceId outputPrefix) {
+      return new Write<T>(
+          inner.to(outputPrefix).withFormatFunction(SerializableFunctions.<T>identity()));
+    }
+
+    /** See {@link TypedWrite#to(ValueProvider)}. */
+    public Write<T> to(ValueProvider<String> outputPrefix) {
+      return new Write<>(
+          inner.to(outputPrefix).withFormatFunction(SerializableFunctions.<T>identity()));
+    }
+
+    /** See {@link TypedWrite#to(ResourceId)}. */
+    @Experimental(Kind.FILESYSTEM)
+    public Write<T> toResource(ValueProvider<ResourceId> outputPrefix) {
+      return new Write<>(
+          inner.toResource(outputPrefix).withFormatFunction(SerializableFunctions.<T>identity()));
+    }
+
+    /** See {@link TypedWrite#to(FilenamePolicy)}. */
+    public Write<T> to(FilenamePolicy filenamePolicy) {
+      return new Write<>(
+          inner.to(filenamePolicy).withFormatFunction(SerializableFunctions.<T>identity()));
+    }
+
+    /** See {@link TypedWrite#to(DynamicAvroDestinations)}. */
+    public Write to(DynamicAvroDestinations<T, ?, T> dynamicDestinations) {
+      return new Write<>(inner.to(dynamicDestinations).withFormatFunction(null));
+    }
+
+    /** See {@link TypedWrite#withSchema}. */
+    public Write withSchema(Schema schema) {
+      return new Write<>(inner.withSchema(schema));
+    }
+    /** See {@link TypedWrite#withTempDirectory(ValueProvider)}. */
+    @Experimental(Kind.FILESYSTEM)
+    public Write<T> withTempDirectory(ValueProvider<ResourceId> tempDirectory) {
+      return new Write<>(inner.withTempDirectory(tempDirectory));
+    }
+
+    /** See {@link TypedWrite#withTempDirectory(ResourceId)}. */
+    public Write<T> withTempDirectory(ResourceId tempDirectory) {
+      return new Write<>(inner.withTempDirectory(tempDirectory));
+    }
+
+    /** See {@link TypedWrite#withShardNameTemplate}. */
+    public Write<T> withShardNameTemplate(String shardTemplate) {
+      return new Write<>(inner.withShardNameTemplate(shardTemplate));
+    }
+
+    /** See {@link TypedWrite#withSuffix}. */
+    public Write<T> withSuffix(String filenameSuffix) {
+      return new Write<>(inner.withSuffix(filenameSuffix));
+    }
+
+    /** See {@link TypedWrite#withNumShards}. */
+    public Write<T> withNumShards(int numShards) {
+      return new Write<>(inner.withNumShards(numShards));
+    }
+
+    /** See {@link TypedWrite#withoutSharding}. */
+    public Write<T> withoutSharding() {
+      return new Write<>(inner.withoutSharding());
+    }
+
+    /** See {@link TypedWrite#withWindowedWrites}. */
+    public Write withWindowedWrites() {
+      return new Write<T>(inner.withWindowedWrites());
+    }
+
+    /** See {@link TypedWrite#withCodec}. */
+    public Write<T> withCodec(CodecFactory codec) {
+      return new Write<>(inner.withCodec(codec));
+    }
+
+    /** See {@link TypedWrite#withMetadata} . */
+    public Write withMetadata(Map<String, Object> metadata) {
+      return new Write<>(inner.withMetadata(metadata));
+    }
+
+    @Override
+    public PDone expand(PCollection<T> input) {
+      return inner.expand(input);
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      inner.populateDisplayData(builder);
+    }
+  }
+
+  /**
+   * Returns a {@link DynamicAvroDestinations} that always returns the same {@link FilenamePolicy},
+   * schema, metadata, and codec.
+   */
+  public static <UserT, OutputT> DynamicAvroDestinations<UserT, Void, OutputT> constantDestinations(
+      FilenamePolicy filenamePolicy,
+      Schema schema,
+      Map<String, Object> metadata,
+      CodecFactory codec,
+      SerializableFunction<UserT, OutputT> formatFunction) {
+    return new ConstantAvroDestination<>(filenamePolicy, schema, metadata, codec, formatFunction);
+  }
   /////////////////////////////////////////////////////////////////////////////
 
   /** Disallow construction of utility class. */

http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java
index c78870b..acd3ea6 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java
@@ -17,93 +17,90 @@
  */
 package org.apache.beam.sdk.io;
 
-import com.google.common.collect.ImmutableMap;
 import java.nio.channels.Channels;
 import java.nio.channels.WritableByteChannel;
 import java.util.Map;
+import org.apache.avro.Schema;
+import org.apache.avro.file.CodecFactory;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericDatumWriter;
-import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.io.DatumWriter;
 import org.apache.avro.reflect.ReflectDatumWriter;
-import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.util.MimeTypes;
 
 /** A {@link FileBasedSink} for Avro files. */
-class AvroSink<T, DestinationT> extends FileBasedSink<T, DestinationT> {
-  private final AvroCoder<T> coder;
-  private final SerializableAvroCodecFactory codec;
-  private final ImmutableMap<String, Object> metadata;
+class AvroSink<UserT, DestinationT, OutputT> extends FileBasedSink<UserT, DestinationT, OutputT> {
+  private final DynamicAvroDestinations<UserT, DestinationT, OutputT> dynamicDestinations;
+  private final boolean genericRecords;
 
   AvroSink(
       ValueProvider<ResourceId> outputPrefix,
-      DynamicDestinations<T, DestinationT> dynamicDestinations,
-      AvroCoder<T> coder,
-      SerializableAvroCodecFactory codec,
-      ImmutableMap<String, Object> metadata) {
+      DynamicAvroDestinations<UserT, DestinationT, OutputT> dynamicDestinations,
+      boolean genericRecords) {
     // Avro handle compression internally using the codec.
     super(outputPrefix, dynamicDestinations, CompressionType.UNCOMPRESSED);
-    this.coder = coder;
-    this.codec = codec;
-    this.metadata = metadata;
+    this.dynamicDestinations = dynamicDestinations;
+    this.genericRecords = genericRecords;
   }
 
   @Override
-  public WriteOperation<T, DestinationT> createWriteOperation() {
-    return new AvroWriteOperation<>(this, coder, codec, metadata);
+  public DynamicAvroDestinations<UserT, DestinationT, OutputT> getDynamicDestinations() {
+    return (DynamicAvroDestinations<UserT, DestinationT, OutputT>) super.getDynamicDestinations();
+  }
+
+  @Override
+  public WriteOperation<DestinationT, OutputT> createWriteOperation() {
+    return new AvroWriteOperation<>(this, genericRecords);
   }
 
   /** A {@link WriteOperation WriteOperation} for Avro files. */
-  private static class AvroWriteOperation<T, DestinationT> extends WriteOperation<T, DestinationT> {
-    private final AvroCoder<T> coder;
-    private final SerializableAvroCodecFactory codec;
-    private final ImmutableMap<String, Object> metadata;
+  private static class AvroWriteOperation<DestinationT, OutputT>
+      extends WriteOperation<DestinationT, OutputT> {
+    private final DynamicAvroDestinations<?, DestinationT, ?> dynamicDestinations;
+    private final boolean genericRecords;
 
-    private AvroWriteOperation(
-        AvroSink<T, DestinationT> sink,
-        AvroCoder<T> coder,
-        SerializableAvroCodecFactory codec,
-        ImmutableMap<String, Object> metadata) {
+    private AvroWriteOperation(AvroSink<?, DestinationT, OutputT> sink, boolean genericRecords) {
       super(sink);
-      this.coder = coder;
-      this.codec = codec;
-      this.metadata = metadata;
+      this.dynamicDestinations = sink.getDynamicDestinations();
+      this.genericRecords = genericRecords;
     }
 
     @Override
-    public Writer<T, DestinationT> createWriter() throws Exception {
-      return new AvroWriter<>(this, coder, codec, metadata);
+    public Writer<DestinationT, OutputT> createWriter() throws Exception {
+      return new AvroWriter<>(this, dynamicDestinations, genericRecords);
     }
   }
 
   /** A {@link Writer Writer} for Avro files. */
-  private static class AvroWriter<T, DestinationT> extends Writer<T, DestinationT> {
-    private final AvroCoder<T> coder;
-    private DataFileWriter<T> dataFileWriter;
-    private SerializableAvroCodecFactory codec;
-    private final ImmutableMap<String, Object> metadata;
+  private static class AvroWriter<DestinationT, OutputT> extends Writer<DestinationT, OutputT> {
+    private DataFileWriter<OutputT> dataFileWriter;
+    private final DynamicAvroDestinations<?, DestinationT, ?> dynamicDestinations;
+    private final boolean genericRecords;
 
     public AvroWriter(
-        WriteOperation<T, DestinationT> writeOperation,
-        AvroCoder<T> coder,
-        SerializableAvroCodecFactory codec,
-        ImmutableMap<String, Object> metadata) {
+        WriteOperation<DestinationT, OutputT> writeOperation,
+        DynamicAvroDestinations<?, DestinationT, ?> dynamicDestinations,
+        boolean genericRecords) {
       super(writeOperation, MimeTypes.BINARY);
-      this.coder = coder;
-      this.codec = codec;
-      this.metadata = metadata;
+      this.dynamicDestinations = dynamicDestinations;
+      this.genericRecords = genericRecords;
     }
 
     @SuppressWarnings("deprecation") // uses internal test functionality.
     @Override
     protected void prepareWrite(WritableByteChannel channel) throws Exception {
-      DatumWriter<T> datumWriter = coder.getType().equals(GenericRecord.class)
-          ? new GenericDatumWriter<T>(coder.getSchema())
-          : new ReflectDatumWriter<T>(coder.getSchema());
+      DestinationT destination = getDestination();
+      CodecFactory codec = dynamicDestinations.getCodec(destination);
+      Schema schema = dynamicDestinations.getSchema(destination);
+      Map<String, Object> metadata = dynamicDestinations.getMetadata(destination);
 
-      dataFileWriter = new DataFileWriter<>(datumWriter).setCodec(codec.getCodec());
+      DatumWriter<OutputT> datumWriter =
+          genericRecords
+              ? new GenericDatumWriter<OutputT>(schema)
+              : new ReflectDatumWriter<OutputT>(schema);
+      dataFileWriter = new DataFileWriter<>(datumWriter).setCodec(codec);
       for (Map.Entry<String, Object> entry : metadata.entrySet()) {
         Object v = entry.getValue();
         if (v instanceof String) {
@@ -118,11 +115,11 @@ class AvroSink<T, DestinationT> extends FileBasedSink<T, DestinationT> {
                   + v.getClass().getSimpleName());
         }
       }
-      dataFileWriter.create(coder.getSchema(), Channels.newOutputStream(channel));
+      dataFileWriter.create(schema, Channels.newOutputStream(channel));
     }
 
     @Override
-    public void write(T value) throws Exception {
+    public void write(OutputT value) throws Exception {
       dataFileWriter.append(value);
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ConstantAvroDestination.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ConstantAvroDestination.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ConstantAvroDestination.java
new file mode 100644
index 0000000..b006e26
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ConstantAvroDestination.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io;
+
+import com.google.common.base.Function;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
+import com.google.common.io.BaseEncoding;
+import java.io.Serializable;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.avro.Schema;
+import org.apache.avro.file.CodecFactory;
+import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.HasDisplayData;
+
+/** Always returns a constant {@link FilenamePolicy}, {@link Schema}, metadata, and codec. */
+class ConstantAvroDestination<UserT, OutputT>
+    extends DynamicAvroDestinations<UserT, Void, OutputT> {
+  private static class SchemaFunction implements Serializable, Function<String, Schema> {
+    @Nullable
+    @Override
+    public Schema apply(@Nullable String input) {
+      return new Schema.Parser().parse(input);
+    }
+  }
+
+  // This should be a multiple of 4 to not get a partial encoded byte.
+  private static final int METADATA_BYTES_MAX_LENGTH = 40;
+  private final FilenamePolicy filenamePolicy;
+  private final Supplier<Schema> schema;
+  private final Map<String, Object> metadata;
+  private final SerializableAvroCodecFactory codec;
+  private final SerializableFunction<UserT, OutputT> formatFunction;
+
+  private class Metadata implements HasDisplayData {
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      for (Map.Entry<String, Object> entry : metadata.entrySet()) {
+        DisplayData.Type type = DisplayData.inferType(entry.getValue());
+        if (type != null) {
+          builder.add(DisplayData.item(entry.getKey(), type, entry.getValue()));
+        } else {
+          String base64 = BaseEncoding.base64().encode((byte[]) entry.getValue());
+          String repr =
+              base64.length() <= METADATA_BYTES_MAX_LENGTH
+                  ? base64
+                  : base64.substring(0, METADATA_BYTES_MAX_LENGTH) + "...";
+          builder.add(DisplayData.item(entry.getKey(), repr));
+        }
+      }
+    }
+  }
+
+  public ConstantAvroDestination(
+      FilenamePolicy filenamePolicy,
+      Schema schema,
+      Map<String, Object> metadata,
+      CodecFactory codec,
+      SerializableFunction<UserT, OutputT> formatFunction) {
+    this.filenamePolicy = filenamePolicy;
+    this.schema = Suppliers.compose(new SchemaFunction(), Suppliers.ofInstance(schema.toString()));
+    this.metadata = metadata;
+    this.codec = new SerializableAvroCodecFactory(codec);
+    this.formatFunction = formatFunction;
+  }
+
+  @Override
+  public OutputT formatRecord(UserT record) {
+    return formatFunction.apply(record);
+  }
+
+  @Override
+  public Void getDestination(UserT element) {
+    return (Void) null;
+  }
+
+  @Override
+  public Void getDefaultDestination() {
+    return (Void) null;
+  }
+
+  @Override
+  public FilenamePolicy getFilenamePolicy(Void destination) {
+    return filenamePolicy;
+  }
+
+  @Override
+  public Schema getSchema(Void destination) {
+    return schema.get();
+  }
+
+  @Override
+  public Map<String, Object> getMetadata(Void destination) {
+    return metadata;
+  }
+
+  @Override
+  public CodecFactory getCodec(Void destination) {
+    return codec.getCodec();
+  }
+
+  @Override
+  public void populateDisplayData(DisplayData.Builder builder) {
+    filenamePolicy.populateDisplayData(builder);
+    builder.add(DisplayData.item("schema", schema.get().toString()).withLabel("Record Schema"));
+    builder.addIfNotDefault(
+        DisplayData.item("codec", codec.getCodec().toString()).withLabel("Avro Compression Codec"),
+        AvroIO.TypedWrite.DEFAULT_SERIALIZABLE_CODEC.toString());
+    builder.include("Metadata", new Metadata());
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java
index 4021609..1f438d5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java
@@ -157,7 +157,6 @@ public final class DefaultFilenamePolicy extends FilenamePolicy {
           && shardTemplate.equals(other.shardTemplate)
           && suffix.equals(other.suffix);
     }
-
     @Override
     public String toString() {
       return MoreObjects.toStringHelper(this)

http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DynamicAvroDestinations.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DynamicAvroDestinations.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DynamicAvroDestinations.java
new file mode 100644
index 0000000..f4e8ee6
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DynamicAvroDestinations.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
+import org.apache.avro.Schema;
+import org.apache.avro.file.CodecFactory;
+import org.apache.beam.sdk.io.FileBasedSink.DynamicDestinations;
+
+/**
+ * A specialization of {@link DynamicDestinations} for {@link AvroIO}. In addition to dynamic file
+ * destinations, this allows specifying other AVRO properties (schema, metadata, codec) per
+ * destination.
+ */
+public abstract class DynamicAvroDestinations<UserT, DestinationT, OutputT>
+    extends DynamicDestinations<UserT, DestinationT, OutputT> {
+  /** Return an AVRO schema for a given destination. */
+  public abstract Schema getSchema(DestinationT destination);
+
+  /** Return AVRO file metadata for a given destination. */
+  public Map<String, Object> getMetadata(DestinationT destination) {
+    return ImmutableMap.<String, Object>of();
+  }
+
+  /** Return an AVRO codec for a given destination. */
+  public CodecFactory getCodec(DestinationT destination) {
+    return AvroIO.TypedWrite.DEFAULT_CODEC;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DynamicFileDestinations.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DynamicFileDestinations.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DynamicFileDestinations.java
index d05a01a7..b087bc5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DynamicFileDestinations.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DynamicFileDestinations.java
@@ -18,7 +18,6 @@
 
 package org.apache.beam.sdk.io;
 
-import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 
 import javax.annotation.Nullable;
@@ -28,20 +27,30 @@ import org.apache.beam.sdk.io.DefaultFilenamePolicy.ParamsCoder;
 import org.apache.beam.sdk.io.FileBasedSink.DynamicDestinations;
 import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
 import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.SerializableFunctions;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 
 /** Some helper classes that derive from {@link FileBasedSink.DynamicDestinations}. */
 public class DynamicFileDestinations {
   /** Always returns a constant {@link FilenamePolicy}. */
-  private static class ConstantFilenamePolicy<T> extends DynamicDestinations<T, Void> {
+  private static class ConstantFilenamePolicy<UserT, OutputT>
+      extends DynamicDestinations<UserT, Void, OutputT> {
     private final FilenamePolicy filenamePolicy;
+    private final SerializableFunction<UserT, OutputT> formatFunction;
 
-    public ConstantFilenamePolicy(FilenamePolicy filenamePolicy) {
-      this.filenamePolicy = checkNotNull(filenamePolicy);
+    public ConstantFilenamePolicy(
+        FilenamePolicy filenamePolicy, SerializableFunction<UserT, OutputT> formatFunction) {
+      this.filenamePolicy = filenamePolicy;
+      this.formatFunction = formatFunction;
     }
 
     @Override
-    public Void getDestination(T element) {
+    public OutputT formatRecord(UserT record) {
+      return formatFunction.apply(record);
+    }
+
+    @Override
+    public Void getDestination(UserT element) {
       return (Void) null;
     }
 
@@ -71,14 +80,24 @@ public class DynamicFileDestinations {
    * A base class for a {@link DynamicDestinations} object that returns differently-configured
    * instances of {@link DefaultFilenamePolicy}.
    */
-  private static class DefaultPolicyDestinations<UserT> extends DynamicDestinations<UserT, Params> {
-    SerializableFunction<UserT, Params> destinationFunction;
-    Params emptyDestination;
+  private static class DefaultPolicyDestinations<UserT, OutputT>
+      extends DynamicDestinations<UserT, Params, OutputT> {
+    private final SerializableFunction<UserT, Params> destinationFunction;
+    private final Params emptyDestination;
+    private final SerializableFunction<UserT, OutputT> formatFunction;
 
     public DefaultPolicyDestinations(
-        SerializableFunction<UserT, Params> destinationFunction, Params emptyDestination) {
+        SerializableFunction<UserT, Params> destinationFunction,
+        Params emptyDestination,
+        SerializableFunction<UserT, OutputT> formatFunction) {
       this.destinationFunction = destinationFunction;
       this.emptyDestination = emptyDestination;
+      this.formatFunction = formatFunction;
+    }
+
+    @Override
+    public OutputT formatRecord(UserT record) {
+      return formatFunction.apply(record);
     }
 
     @Override
@@ -104,16 +123,28 @@ public class DynamicFileDestinations {
   }
 
   /** Returns a {@link DynamicDestinations} that always returns the same {@link FilenamePolicy}. */
-  public static <T> DynamicDestinations<T, Void> constant(FilenamePolicy filenamePolicy) {
-    return new ConstantFilenamePolicy<>(filenamePolicy);
+  public static <UserT, OutputT> DynamicDestinations<UserT, Void, OutputT> constant(
+      FilenamePolicy filenamePolicy, SerializableFunction<UserT, OutputT> formatFunction) {
+    return new ConstantFilenamePolicy<>(filenamePolicy, formatFunction);
+  }
+
+  /**
+   * A specialization of {@link #constant(FilenamePolicy, SerializableFunction)} for the case where
+   * UserT and OutputT are the same type and the format function is the identity.
+   */
+  public static <UserT> DynamicDestinations<UserT, Void, UserT> constant(
+      FilenamePolicy filenamePolicy) {
+    return new ConstantFilenamePolicy<>(filenamePolicy, SerializableFunctions.<UserT>identity());
   }
 
   /**
    * Returns a {@link DynamicDestinations} that returns instances of {@link DefaultFilenamePolicy}
    * configured with the given {@link Params}.
    */
-  public static <UserT> DynamicDestinations<UserT, Params> toDefaultPolicies(
-      SerializableFunction<UserT, Params> destinationFunction, Params emptyDestination) {
-    return new DefaultPolicyDestinations<>(destinationFunction, emptyDestination);
+  public static <UserT, OutputT> DynamicDestinations<UserT, Params, OutputT> toDefaultPolicies(
+      SerializableFunction<UserT, Params> destinationFunction,
+      Params emptyDestination,
+      SerializableFunction<UserT, OutputT> formatFunction) {
+    return new DefaultPolicyDestinations<>(destinationFunction, emptyDestination, formatFunction);
   }
 }


[2/4] beam git commit: [BEAM-92] Supports DynamicDestinations in AvroIO.

Posted by jk...@apache.org.
http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index 3bf5d5b..4e2b61c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -27,8 +27,10 @@ import static org.apache.beam.sdk.values.TypeDescriptors.extractFromTypeParamete
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.MoreObjects;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import com.google.common.collect.Ordering;
 import java.io.IOException;
 import java.io.InputStream;
@@ -40,7 +42,6 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -67,6 +68,7 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.HasDisplayData;
@@ -74,6 +76,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo.PaneInfoCoder;
 import org.apache.beam.sdk.util.MimeTypes;
+import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TypeDescriptor;
 import org.apache.beam.sdk.values.TypeDescriptors.TypeVariableExtractor;
 import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream;
@@ -95,9 +98,9 @@ import org.slf4j.LoggerFactory;
  * <p>The process of writing to file-based sink is as follows:
  *
  * <ol>
- * <li>An optional subclass-defined initialization,
- * <li>a parallel write of bundles to temporary files, and finally,
- * <li>these temporary files are renamed with final output filenames.
+ *   <li>An optional subclass-defined initialization,
+ *   <li>a parallel write of bundles to temporary files, and finally,
+ *   <li>these temporary files are renamed with final output filenames.
  * </ol>
  *
  * <p>In order to ensure fault-tolerance, a bundle may be executed multiple times (e.g., in the
@@ -121,7 +124,8 @@ import org.slf4j.LoggerFactory;
  * @param <OutputT> the type of values written to the sink.
  */
 @Experimental(Kind.FILESYSTEM)
-public abstract class FileBasedSink<OutputT, DestinationT> implements Serializable, HasDisplayData {
+public abstract class FileBasedSink<UserT, DestinationT, OutputT>
+    implements Serializable, HasDisplayData {
   private static final Logger LOG = LoggerFactory.getLogger(FileBasedSink.class);
 
   /** Directly supported file output compression types. */
@@ -199,7 +203,7 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
     }
   }
 
-  private final DynamicDestinations<?, DestinationT> dynamicDestinations;
+  private final DynamicDestinations<?, DestinationT, OutputT> dynamicDestinations;
 
   /**
    * The {@link WritableByteChannelFactory} that is used to wrap the raw data output to the
@@ -215,8 +219,54 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
    * destination type into an instance of {@link FilenamePolicy}.
    */
   @Experimental(Kind.FILESYSTEM)
-  public abstract static class DynamicDestinations<UserT, DestinationT>
+  public abstract static class DynamicDestinations<UserT, DestinationT, OutputT>
       implements HasDisplayData, Serializable {
+    interface SideInputAccessor {
+      <SideInputT> SideInputT sideInput(PCollectionView<SideInputT> view);
+    }
+
+    private SideInputAccessor sideInputAccessor;
+
+    static class SideInputAccessorViaProcessContext implements SideInputAccessor {
+      private DoFn<?, ?>.ProcessContext processContext;
+
+      SideInputAccessorViaProcessContext(DoFn<?, ?>.ProcessContext processContext) {
+        this.processContext = processContext;
+      }
+
+      @Override
+      public <SideInputT> SideInputT sideInput(PCollectionView<SideInputT> view) {
+        return processContext.sideInput(view);
+      }
+    }
+
+    /**
+     * Override to specify that this object needs access to one or more side inputs. This side
+     * inputs must be globally windowed, as they will be accessed from the global window.
+     */
+    public List<PCollectionView<?>> getSideInputs() {
+      return ImmutableList.of();
+    }
+
+    /**
+     * Returns the value of a given side input. The view must be present in {@link
+     * #getSideInputs()}.
+     */
+    protected final <SideInputT> SideInputT sideInput(PCollectionView<SideInputT> view) {
+      return sideInputAccessor.sideInput(view);
+    }
+
+    final void setSideInputAccessor(SideInputAccessor sideInputAccessor) {
+      this.sideInputAccessor = sideInputAccessor;
+    }
+
+    final void setSideInputAccessorFromProcessContext(DoFn<?, ?>.ProcessContext context) {
+      this.sideInputAccessor = new SideInputAccessorViaProcessContext(context);
+    }
+
+    /** Convert an input record type into the output type. */
+    public abstract OutputT formatRecord(UserT record);
+
     /**
      * Returns an object that represents at a high level the destination being written to. May not
      * return null. A destination must have deterministic hash and equality methods defined.
@@ -256,12 +306,13 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
         return destinationCoder;
       }
       // If dynamicDestinations doesn't provide a coder, try to find it in the coder registry.
-      @Nullable TypeDescriptor<DestinationT> descriptor =
+      @Nullable
+      TypeDescriptor<DestinationT> descriptor =
           extractFromTypeParameters(
               this,
               DynamicDestinations.class,
               new TypeVariableExtractor<
-                  DynamicDestinations<UserT, DestinationT>, DestinationT>() {});
+                  DynamicDestinations<UserT, DestinationT, OutputT>, DestinationT>() {});
       checkArgument(
           descriptor != null,
           "Unable to infer a coder for DestinationT, "
@@ -323,7 +374,7 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
   @Experimental(Kind.FILESYSTEM)
   public FileBasedSink(
       ValueProvider<ResourceId> tempDirectoryProvider,
-      DynamicDestinations<?, DestinationT> dynamicDestinations) {
+      DynamicDestinations<?, DestinationT, OutputT> dynamicDestinations) {
     this(tempDirectoryProvider, dynamicDestinations, CompressionType.UNCOMPRESSED);
   }
 
@@ -331,7 +382,7 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
   @Experimental(Kind.FILESYSTEM)
   public FileBasedSink(
       ValueProvider<ResourceId> tempDirectoryProvider,
-      DynamicDestinations<?, DestinationT> dynamicDestinations,
+      DynamicDestinations<?, DestinationT, OutputT> dynamicDestinations,
       WritableByteChannelFactory writableByteChannelFactory) {
     this.tempDirectoryProvider =
         NestedValueProvider.of(tempDirectoryProvider, new ExtractDirectory());
@@ -341,8 +392,8 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
 
   /** Return the {@link DynamicDestinations} used. */
   @SuppressWarnings("unchecked")
-  public <UserT> DynamicDestinations<UserT, DestinationT> getDynamicDestinations() {
-    return (DynamicDestinations<UserT, DestinationT>) dynamicDestinations;
+  public DynamicDestinations<UserT, DestinationT, OutputT> getDynamicDestinations() {
+    return (DynamicDestinations<UserT, DestinationT, OutputT>) dynamicDestinations;
   }
 
   /**
@@ -357,7 +408,7 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
   public void validate(PipelineOptions options) {}
 
   /** Return a subclass of {@link WriteOperation} that will manage the write to the sink. */
-  public abstract WriteOperation<OutputT, DestinationT> createWriteOperation();
+  public abstract WriteOperation<DestinationT, OutputT> createWriteOperation();
 
   public void populateDisplayData(DisplayData.Builder builder) {
     getDynamicDestinations().populateDisplayData(builder);
@@ -371,11 +422,11 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
    * written,
    *
    * <ol>
-   * <li>{@link WriteOperation#finalize} is given a list of the temporary files containing the
-   *     output bundles.
-   * <li>During finalize, these temporary files are copied to final output locations and named
-   *     according to a file naming template.
-   * <li>Finally, any temporary files that were created during the write are removed.
+   *   <li>{@link WriteOperation#finalize} is given a list of the temporary files containing the
+   *       output bundles.
+   *   <li>During finalize, these temporary files are copied to final output locations and named
+   *       according to a file naming template.
+   *   <li>Finally, any temporary files that were created during the write are removed.
    * </ol>
    *
    * <p>Subclass implementations of WriteOperation must implement {@link
@@ -400,9 +451,9 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
    *
    * @param <OutputT> the type of values written to the sink.
    */
-  public abstract static class WriteOperation<OutputT, DestinationT> implements Serializable {
+  public abstract static class WriteOperation<DestinationT, OutputT> implements Serializable {
     /** The Sink that this WriteOperation will write to. */
-    protected final FileBasedSink<OutputT, DestinationT> sink;
+    protected final FileBasedSink<?, DestinationT, OutputT> sink;
 
     /** Directory for temporary output files. */
     protected final ValueProvider<ResourceId> tempDirectory;
@@ -428,7 +479,7 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
      *
      * @param sink the FileBasedSink that will be used to configure this write operation.
      */
-    public WriteOperation(FileBasedSink<OutputT, DestinationT> sink) {
+    public WriteOperation(FileBasedSink<?, DestinationT, OutputT> sink) {
       this(
           sink,
           NestedValueProvider.of(sink.getTempDirectoryProvider(), new TemporaryDirectoryBuilder()));
@@ -463,12 +514,12 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
      * @param tempDirectory the base directory to be used for temporary output files.
      */
     @Experimental(Kind.FILESYSTEM)
-    public WriteOperation(FileBasedSink<OutputT, DestinationT> sink, ResourceId tempDirectory) {
+    public WriteOperation(FileBasedSink<?, DestinationT, OutputT> sink, ResourceId tempDirectory) {
       this(sink, StaticValueProvider.of(tempDirectory));
     }
 
     private WriteOperation(
-        FileBasedSink<OutputT, DestinationT> sink, ValueProvider<ResourceId> tempDirectory) {
+        FileBasedSink<?, DestinationT, OutputT> sink, ValueProvider<ResourceId> tempDirectory) {
       this.sink = sink;
       this.tempDirectory = tempDirectory;
       this.windowedWrites = false;
@@ -478,7 +529,7 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
      * Clients must implement to return a subclass of {@link Writer}. This method must not mutate
      * the state of the object.
      */
-    public abstract Writer<OutputT, DestinationT> createWriter() throws Exception;
+    public abstract Writer<DestinationT, OutputT> createWriter() throws Exception;
 
     /** Indicates that the operation will be performing windowed writes. */
     public void setWindowedWrites(boolean windowedWrites) {
@@ -533,7 +584,7 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
     protected final Map<ResourceId, ResourceId> buildOutputFilenames(
         Iterable<FileResult<DestinationT>> writerResults) {
       int numShards = Iterables.size(writerResults);
-      Map<ResourceId, ResourceId> outputFilenames = new HashMap<>();
+      Map<ResourceId, ResourceId> outputFilenames = Maps.newHashMap();
 
       // Either all results have a shard number set (if the sink is configured with a fixed
       // number of shards), or they all don't (otherwise).
@@ -597,7 +648,6 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
           "Only generated %s distinct file names for %s files.",
           numDistinctShards,
           outputFilenames.size());
-
       return outputFilenames;
     }
 
@@ -691,7 +741,7 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
     }
 
     /** Returns the FileBasedSink for this write operation. */
-    public FileBasedSink<OutputT, DestinationT> getSink() {
+    public FileBasedSink<?, DestinationT, OutputT> getSink() {
       return sink;
     }
 
@@ -727,10 +777,10 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
    *
    * @param <OutputT> the type of values to write.
    */
-  public abstract static class Writer<OutputT, DestinationT> {
+  public abstract static class Writer<DestinationT, OutputT> {
     private static final Logger LOG = LoggerFactory.getLogger(Writer.class);
 
-    private final WriteOperation<OutputT, DestinationT> writeOperation;
+    private final WriteOperation<DestinationT, OutputT> writeOperation;
 
     /** Unique id for this output bundle. */
     private String id;
@@ -757,7 +807,7 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
     private final String mimeType;
 
     /** Construct a new {@link Writer} that will produce files of the given MIME type. */
-    public Writer(WriteOperation<OutputT, DestinationT> writeOperation, String mimeType) {
+    public Writer(WriteOperation<DestinationT, OutputT> writeOperation, String mimeType) {
       checkNotNull(writeOperation);
       this.writeOperation = writeOperation;
       this.mimeType = mimeType;
@@ -930,9 +980,14 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
     }
 
     /** Return the WriteOperation that this Writer belongs to. */
-    public WriteOperation<OutputT, DestinationT> getWriteOperation() {
+    public WriteOperation<DestinationT, OutputT> getWriteOperation() {
       return writeOperation;
     }
+
+    /** Return the user destination object for this writer. */
+    public DestinationT getDestination() {
+      return destination;
+    }
   }
 
   /**
@@ -987,7 +1042,7 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
 
     @Experimental(Kind.FILESYSTEM)
     public ResourceId getDestinationFile(
-        DynamicDestinations<?, DestinationT> dynamicDestinations,
+        DynamicDestinations<?, DestinationT, ?> dynamicDestinations,
         int numShards,
         OutputFileHints outputFileHints) {
       checkArgument(getShard() != UNKNOWN_SHARDNUM);

http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
index 6e7b243..29b3e29 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
@@ -45,7 +45,6 @@ import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.transforms.SerializableFunctions;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.util.MimeTypes;
 import org.apache.beam.sdk.values.PBegin;
@@ -357,10 +356,12 @@ public class TFRecordIO {
       checkState(getOutputPrefix() != null,
           "need to set the output prefix of a TFRecordIO.Write transform");
       WriteFiles<byte[], Void, byte[]> write =
-          WriteFiles.<byte[], Void, byte[]>to(
+          WriteFiles.to(
               new TFRecordSink(
-                  getOutputPrefix(), getShardTemplate(), getFilenameSuffix(), getCompressionType()),
-              SerializableFunctions.<byte[]>identity());
+                  getOutputPrefix(),
+                  getShardTemplate(),
+                  getFilenameSuffix(),
+                  getCompressionType()));
       if (getNumShards() > 0) {
         write = write.withNumShards(getNumShards());
       }
@@ -548,7 +549,7 @@ public class TFRecordIO {
 
   /** A {@link FileBasedSink} for TFRecord files. Produces TFRecord files. */
   @VisibleForTesting
-  static class TFRecordSink extends FileBasedSink<byte[], Void> {
+  static class TFRecordSink extends FileBasedSink<byte[], Void, byte[]> {
     @VisibleForTesting
     TFRecordSink(
         ValueProvider<ResourceId> outputPrefix,
@@ -557,7 +558,7 @@ public class TFRecordIO {
         TFRecordIO.CompressionType compressionType) {
       super(
           outputPrefix,
-          DynamicFileDestinations.constant(
+          DynamicFileDestinations.<byte[]>constant(
               DefaultFilenamePolicy.fromStandardParameters(
                   outputPrefix, shardTemplate, suffix, false)),
           writableByteChannelFactory(compressionType));
@@ -571,7 +572,7 @@ public class TFRecordIO {
     }
 
     @Override
-    public WriteOperation<byte[], Void> createWriteOperation() {
+    public WriteOperation<Void, byte[]> createWriteOperation() {
       return new TFRecordWriteOperation(this);
     }
 
@@ -591,23 +592,23 @@ public class TFRecordIO {
     }
 
     /** A {@link WriteOperation WriteOperation} for TFRecord files. */
-    private static class TFRecordWriteOperation extends WriteOperation<byte[], Void> {
+    private static class TFRecordWriteOperation extends WriteOperation<Void, byte[]> {
       private TFRecordWriteOperation(TFRecordSink sink) {
         super(sink);
       }
 
       @Override
-      public Writer<byte[], Void> createWriter() throws Exception {
+      public Writer<Void, byte[]> createWriter() throws Exception {
         return new TFRecordWriter(this);
       }
     }
 
     /** A {@link Writer Writer} for TFRecord files. */
-    private static class TFRecordWriter extends Writer<byte[], Void> {
+    private static class TFRecordWriter extends Writer<Void, byte[]> {
       private WritableByteChannel outChannel;
       private TFRecordCodec codec;
 
-      private TFRecordWriter(WriteOperation<byte[], Void> writeOperation) {
+      private TFRecordWriter(WriteOperation<Void, byte[]> writeOperation) {
         super(writeOperation, MimeTypes.BINARY);
       }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index 765a842..312dc07 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -23,6 +23,10 @@ import static com.google.common.base.Preconditions.checkState;
 
 import com.google.auto.value.AutoValue;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Predicates;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import java.util.List;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
@@ -52,8 +56,8 @@ import org.apache.beam.sdk.values.PDone;
  *
  * <p>To read a {@link PCollection} from one or more text files, use {@code TextIO.read()} to
  * instantiate a transform and use {@link TextIO.Read#from(String)} to specify the path of the
- * file(s) to be read. Alternatively, if the filenames to be read are themselves in a
- * {@link PCollection}, apply {@link TextIO#readAll()}.
+ * file(s) to be read. Alternatively, if the filenames to be read are themselves in a {@link
+ * PCollection}, apply {@link TextIO#readAll()}.
  *
  * <p>{@link TextIO.Read} returns a {@link PCollection} of {@link String Strings}, each
  * corresponding to one line of an input UTF-8 text file (split into lines delimited by '\n', '\r',
@@ -70,8 +74,8 @@ import org.apache.beam.sdk.values.PDone;
  *
  * <p>If it is known that the filepattern will match a very large number of files (e.g. tens of
  * thousands or more), use {@link Read#withHintMatchesManyFiles} for better performance and
- * scalability. Note that it may decrease performance if the filepattern matches only a small
- * number of files.
+ * scalability. Note that it may decrease performance if the filepattern matches only a small number
+ * of files.
  *
  * <p>Example 2: reading a PCollection of filenames.
  *
@@ -121,9 +125,9 @@ import org.apache.beam.sdk.values.PDone;
  * allows you to convert any input value into a custom destination object, and map that destination
  * object to a {@link FilenamePolicy}. This allows using different filename policies (or more
  * commonly, differently-configured instances of the same policy) based on the input record. Often
- * this is used in conjunction with {@link TextIO#writeCustomType(SerializableFunction)}, which
- * allows your {@link DynamicDestinations} object to examine the input type and takes a format
- * function to convert that type to a string for writing.
+ * this is used in conjunction with {@link TextIO#writeCustomType}, which allows your {@link
+ * DynamicDestinations} object to examine the input type and takes a format function to convert that
+ * type to a string for writing.
  *
  * <p>A convenience shortcut is provided for the case where the default naming policy is used, but
  * different configurations of this policy are wanted based on the input record. Default naming
@@ -189,20 +193,23 @@ public class TextIO {
    * line.
    *
    * <p>This version allows you to apply {@link TextIO} writes to a PCollection of a custom type
-   * {@link T}, along with a format function that converts the input type {@link T} to the String
-   * that will be written to the file. The advantage of this is it allows a user-provided {@link
+   * {@link UserT}. A format mechanism that converts the input type {@link UserT} to the String that
+   * will be written to the file must be specified. If using a custom {@link DynamicDestinations}
+   * object this is done using {@link DynamicDestinations#formatRecord}, otherwise the {@link
+   * TypedWrite#withFormatFunction} can be used to specify a format function.
+   *
+   * <p>The advantage of using a custom type is that is it allows a user-provided {@link
    * DynamicDestinations} object, set via {@link Write#to(DynamicDestinations)} to examine the
-   * user's custom type when choosing a destination.
+   * custom type when choosing a destination.
    */
-  public static <T> TypedWrite<T> writeCustomType(SerializableFunction<T, String> formatFunction) {
-    return new AutoValue_TextIO_TypedWrite.Builder<T>()
+  public static <UserT> TypedWrite<UserT> writeCustomType() {
+    return new AutoValue_TextIO_TypedWrite.Builder<UserT>()
         .setFilenamePrefix(null)
         .setTempDirectory(null)
         .setShardTemplate(null)
         .setFilenameSuffix(null)
         .setFilenamePolicy(null)
         .setDynamicDestinations(null)
-        .setFormatFunction(formatFunction)
         .setWritableByteChannelFactory(FileBasedSink.CompressionType.UNCOMPRESSED)
         .setWindowedWrites(false)
         .setNumShards(0)
@@ -417,11 +424,11 @@ public class TextIO {
     }
   }
 
-  /////////////////////////////////////////////////////////////////////////////
+  // ///////////////////////////////////////////////////////////////////////////
 
   /** Implementation of {@link #write}. */
   @AutoValue
-  public abstract static class TypedWrite<T> extends PTransform<PCollection<T>, PDone> {
+  public abstract static class TypedWrite<UserT> extends PTransform<PCollection<UserT>, PDone> {
     /** The prefix of each file written, combined with suffix and shardTemplate. */
     @Nullable abstract ValueProvider<ResourceId> getFilenamePrefix();
 
@@ -449,10 +456,19 @@ public class TextIO {
 
     /** Allows for value-dependent {@link DynamicDestinations} to be vended. */
     @Nullable
-    abstract DynamicDestinations<T, ?> getDynamicDestinations();
+    abstract DynamicDestinations<UserT, ?, String> getDynamicDestinations();
+
+    @Nullable
+    /** A destination function for using {@link DefaultFilenamePolicy} */
+    abstract SerializableFunction<UserT, Params> getDestinationFunction();
 
-    /** A function that converts T to a String, for writing to the file. */
-    abstract SerializableFunction<T, String> getFormatFunction();
+    @Nullable
+    /** A default destination for empty PCollections. */
+    abstract Params getEmptyDestination();
+
+    /** A function that converts UserT to a String, for writing to the file. */
+    @Nullable
+    abstract SerializableFunction<UserT, String> getFormatFunction();
 
     /** Whether to write windowed output files. */
     abstract boolean getWindowedWrites();
@@ -463,37 +479,42 @@ public class TextIO {
      */
     abstract WritableByteChannelFactory getWritableByteChannelFactory();
 
-    abstract Builder<T> toBuilder();
+    abstract Builder<UserT> toBuilder();
 
     @AutoValue.Builder
-    abstract static class Builder<T> {
-      abstract Builder<T> setFilenamePrefix(ValueProvider<ResourceId> filenamePrefix);
+    abstract static class Builder<UserT> {
+      abstract Builder<UserT> setFilenamePrefix(ValueProvider<ResourceId> filenamePrefix);
+
+      abstract Builder<UserT> setTempDirectory(ValueProvider<ResourceId> tempDirectory);
 
-      abstract Builder<T> setTempDirectory(ValueProvider<ResourceId> tempDirectory);
+      abstract Builder<UserT> setShardTemplate(@Nullable String shardTemplate);
 
-      abstract Builder<T> setShardTemplate(@Nullable String shardTemplate);
+      abstract Builder<UserT> setFilenameSuffix(@Nullable String filenameSuffix);
 
-      abstract Builder<T> setFilenameSuffix(@Nullable String filenameSuffix);
+      abstract Builder<UserT> setHeader(@Nullable String header);
 
-      abstract Builder<T> setHeader(@Nullable String header);
+      abstract Builder<UserT> setFooter(@Nullable String footer);
 
-      abstract Builder<T> setFooter(@Nullable String footer);
+      abstract Builder<UserT> setFilenamePolicy(@Nullable FilenamePolicy filenamePolicy);
 
-      abstract Builder<T> setFilenamePolicy(@Nullable FilenamePolicy filenamePolicy);
+      abstract Builder<UserT> setDynamicDestinations(
+          @Nullable DynamicDestinations<UserT, ?, String> dynamicDestinations);
 
-      abstract Builder<T> setDynamicDestinations(
-          @Nullable DynamicDestinations<T, ?> dynamicDestinations);
+      abstract Builder<UserT> setDestinationFunction(
+          @Nullable SerializableFunction<UserT, Params> destinationFunction);
 
-      abstract Builder<T> setFormatFunction(SerializableFunction<T, String> formatFunction);
+      abstract Builder<UserT> setEmptyDestination(Params emptyDestination);
 
-      abstract Builder<T> setNumShards(int numShards);
+      abstract Builder<UserT> setFormatFunction(SerializableFunction<UserT, String> formatFunction);
 
-      abstract Builder<T> setWindowedWrites(boolean windowedWrites);
+      abstract Builder<UserT> setNumShards(int numShards);
 
-      abstract Builder<T> setWritableByteChannelFactory(
+      abstract Builder<UserT> setWindowedWrites(boolean windowedWrites);
+
+      abstract Builder<UserT> setWritableByteChannelFactory(
           WritableByteChannelFactory writableByteChannelFactory);
 
-      abstract TypedWrite<T> build();
+      abstract TypedWrite<UserT> build();
     }
 
     /**
@@ -513,18 +534,18 @@ public class TextIO {
      * <p>If {@link #withTempDirectory} has not been called, this filename prefix will be used to
      * infer a directory for temporary files.
      */
-    public TypedWrite<T> to(String filenamePrefix) {
+    public TypedWrite<UserT> to(String filenamePrefix) {
       return to(FileBasedSink.convertToFileResourceIfPossible(filenamePrefix));
     }
 
     /** Like {@link #to(String)}. */
     @Experimental(Kind.FILESYSTEM)
-    public TypedWrite<T> to(ResourceId filenamePrefix) {
+    public TypedWrite<UserT> to(ResourceId filenamePrefix) {
       return toResource(StaticValueProvider.of(filenamePrefix));
     }
 
     /** Like {@link #to(String)}. */
-    public TypedWrite<T> to(ValueProvider<String> outputPrefix) {
+    public TypedWrite<UserT> to(ValueProvider<String> outputPrefix) {
       return toResource(NestedValueProvider.of(outputPrefix,
           new SerializableFunction<String, ResourceId>() {
             @Override
@@ -538,7 +559,7 @@ public class TextIO {
      * Writes to files named according to the given {@link FileBasedSink.FilenamePolicy}. A
      * directory for temporary files must be specified using {@link #withTempDirectory}.
      */
-    public TypedWrite<T> to(FilenamePolicy filenamePolicy) {
+    public TypedWrite<UserT> to(FilenamePolicy filenamePolicy) {
       return toBuilder().setFilenamePolicy(filenamePolicy).build();
     }
 
@@ -547,7 +568,7 @@ public class TextIO {
      * objects can examine the input record when creating a {@link FilenamePolicy}. A directory for
      * temporary files must be specified using {@link #withTempDirectory}.
      */
-    public TypedWrite<T> to(DynamicDestinations<T, ?> dynamicDestinations) {
+    public TypedWrite<UserT> to(DynamicDestinations<UserT, ?, String> dynamicDestinations) {
       return toBuilder().setDynamicDestinations(dynamicDestinations).build();
     }
 
@@ -558,26 +579,39 @@ public class TextIO {
      * emptyDestination parameter specified where empty files should be written for when the written
      * {@link PCollection} is empty.
      */
-    public TypedWrite<T> to(
-        SerializableFunction<T, Params> destinationFunction, Params emptyDestination) {
-      return to(DynamicFileDestinations.toDefaultPolicies(destinationFunction, emptyDestination));
+    public TypedWrite<UserT> to(
+        SerializableFunction<UserT, Params> destinationFunction, Params emptyDestination) {
+      return toBuilder()
+          .setDestinationFunction(destinationFunction)
+          .setEmptyDestination(emptyDestination)
+          .build();
     }
 
     /** Like {@link #to(ResourceId)}. */
     @Experimental(Kind.FILESYSTEM)
-    public TypedWrite<T> toResource(ValueProvider<ResourceId> filenamePrefix) {
+    public TypedWrite<UserT> toResource(ValueProvider<ResourceId> filenamePrefix) {
       return toBuilder().setFilenamePrefix(filenamePrefix).build();
     }
 
+    /**
+     * Specifies a format function to convert {@link UserT} to the output type. If {@link
+     * #to(DynamicDestinations)} is used, {@link DynamicDestinations#formatRecord(Object)} must be
+     * used instead.
+     */
+    public TypedWrite<UserT> withFormatFunction(
+        SerializableFunction<UserT, String> formatFunction) {
+      return toBuilder().setFormatFunction(formatFunction).build();
+    }
+
     /** Set the base directory used to generate temporary files. */
     @Experimental(Kind.FILESYSTEM)
-    public TypedWrite<T> withTempDirectory(ValueProvider<ResourceId> tempDirectory) {
+    public TypedWrite<UserT> withTempDirectory(ValueProvider<ResourceId> tempDirectory) {
       return toBuilder().setTempDirectory(tempDirectory).build();
     }
 
     /** Set the base directory used to generate temporary files. */
     @Experimental(Kind.FILESYSTEM)
-    public TypedWrite<T> withTempDirectory(ResourceId tempDirectory) {
+    public TypedWrite<UserT> withTempDirectory(ResourceId tempDirectory) {
       return withTempDirectory(StaticValueProvider.of(tempDirectory));
     }
 
@@ -589,7 +623,7 @@ public class TextIO {
      * <p>See {@link DefaultFilenamePolicy} for how the prefix, shard name template, and suffix are
      * used.
      */
-    public TypedWrite<T> withShardNameTemplate(String shardTemplate) {
+    public TypedWrite<UserT> withShardNameTemplate(String shardTemplate) {
       return toBuilder().setShardTemplate(shardTemplate).build();
     }
 
@@ -601,7 +635,7 @@ public class TextIO {
      * <p>See {@link DefaultFilenamePolicy} for how the prefix, shard name template, and suffix are
      * used.
      */
-    public TypedWrite<T> withSuffix(String filenameSuffix) {
+    public TypedWrite<UserT> withSuffix(String filenameSuffix) {
       return toBuilder().setFilenameSuffix(filenameSuffix).build();
     }
 
@@ -615,7 +649,7 @@ public class TextIO {
      *
      * @param numShards the number of shards to use, or 0 to let the system decide.
      */
-    public TypedWrite<T> withNumShards(int numShards) {
+    public TypedWrite<UserT> withNumShards(int numShards) {
       checkArgument(numShards >= 0);
       return toBuilder().setNumShards(numShards).build();
     }
@@ -629,7 +663,7 @@ public class TextIO {
      *
      * <p>This is equivalent to {@code .withNumShards(1).withShardNameTemplate("")}
      */
-    public TypedWrite<T> withoutSharding() {
+    public TypedWrite<UserT> withoutSharding() {
       return withNumShards(1).withShardNameTemplate("");
     }
 
@@ -638,7 +672,7 @@ public class TextIO {
      *
      * <p>A {@code null} value will clear any previously configured header.
      */
-    public TypedWrite<T> withHeader(@Nullable String header) {
+    public TypedWrite<UserT> withHeader(@Nullable String header) {
       return toBuilder().setHeader(header).build();
     }
 
@@ -647,7 +681,7 @@ public class TextIO {
      *
      * <p>A {@code null} value will clear any previously configured footer.
      */
-    public TypedWrite<T> withFooter(@Nullable String footer) {
+    public TypedWrite<UserT> withFooter(@Nullable String footer) {
       return toBuilder().setFooter(footer).build();
     }
 
@@ -658,7 +692,7 @@ public class TextIO {
      *
      * <p>A {@code null} value will reset the value to the default value mentioned above.
      */
-    public TypedWrite<T> withWritableByteChannelFactory(
+    public TypedWrite<UserT> withWritableByteChannelFactory(
         WritableByteChannelFactory writableByteChannelFactory) {
       return toBuilder().setWritableByteChannelFactory(writableByteChannelFactory).build();
     }
@@ -669,36 +703,58 @@ public class TextIO {
      * <p>If using {@link #to(FileBasedSink.FilenamePolicy)}. Filenames will be generated using
      * {@link FilenamePolicy#windowedFilename}. See also {@link WriteFiles#withWindowedWrites()}.
      */
-    public TypedWrite<T> withWindowedWrites() {
+    public TypedWrite<UserT> withWindowedWrites() {
       return toBuilder().setWindowedWrites(true).build();
     }
 
-    private DynamicDestinations<T, ?> resolveDynamicDestinations() {
-      DynamicDestinations<T, ?> dynamicDestinations = getDynamicDestinations();
+    private DynamicDestinations<UserT, ?, String> resolveDynamicDestinations() {
+      DynamicDestinations<UserT, ?, String> dynamicDestinations = getDynamicDestinations();
       if (dynamicDestinations == null) {
-        FilenamePolicy usedFilenamePolicy = getFilenamePolicy();
-        if (usedFilenamePolicy == null) {
-          usedFilenamePolicy =
-              DefaultFilenamePolicy.fromStandardParameters(
-                  getFilenamePrefix(),
-                  getShardTemplate(),
-                  getFilenameSuffix(),
-                  getWindowedWrites());
+        if (getDestinationFunction() != null) {
+          dynamicDestinations =
+              DynamicFileDestinations.toDefaultPolicies(
+                  getDestinationFunction(), getEmptyDestination(), getFormatFunction());
+        } else {
+          FilenamePolicy usedFilenamePolicy = getFilenamePolicy();
+          if (usedFilenamePolicy == null) {
+            usedFilenamePolicy =
+                DefaultFilenamePolicy.fromStandardParameters(
+                    getFilenamePrefix(),
+                    getShardTemplate(),
+                    getFilenameSuffix(),
+                    getWindowedWrites());
+          }
+          dynamicDestinations =
+              DynamicFileDestinations.constant(usedFilenamePolicy, getFormatFunction());
         }
-        dynamicDestinations = DynamicFileDestinations.constant(usedFilenamePolicy);
       }
       return dynamicDestinations;
     }
 
     @Override
-    public PDone expand(PCollection<T> input) {
+    public PDone expand(PCollection<UserT> input) {
       checkState(
           getFilenamePrefix() != null || getTempDirectory() != null,
           "Need to set either the filename prefix or the tempDirectory of a TextIO.Write "
               + "transform.");
-      checkState(
-          getFilenamePolicy() == null || getDynamicDestinations() == null,
-          "Cannot specify both a filename policy and dynamic destinations");
+
+      List<?> allToArgs =
+          Lists.newArrayList(
+              getFilenamePolicy(),
+              getDynamicDestinations(),
+              getFilenamePrefix(),
+              getDestinationFunction());
+      checkArgument(
+          1 == Iterables.size(Iterables.filter(allToArgs, Predicates.notNull())),
+          "Exactly one of filename policy, dynamic destinations, filename prefix, or destination "
+              + "function must be set");
+
+      if (getDynamicDestinations() != null) {
+        checkArgument(
+            getFormatFunction() == null,
+            "A format function should not be specified "
+                + "with DynamicDestinations. Use DynamicDestinations.formatRecord instead");
+      }
       if (getFilenamePolicy() != null || getDynamicDestinations() != null) {
         checkState(
             getShardTemplate() == null && getFilenameSuffix() == null,
@@ -709,20 +765,20 @@ public class TextIO {
     }
 
     public <DestinationT> PDone expandTyped(
-        PCollection<T> input, DynamicDestinations<T, DestinationT> dynamicDestinations) {
+        PCollection<UserT> input,
+        DynamicDestinations<UserT, DestinationT, String> dynamicDestinations) {
       ValueProvider<ResourceId> tempDirectory = getTempDirectory();
       if (tempDirectory == null) {
         tempDirectory = getFilenamePrefix();
       }
-      WriteFiles<T, DestinationT, String> write =
+      WriteFiles<UserT, DestinationT, String> write =
           WriteFiles.to(
               new TextSink<>(
                   tempDirectory,
                   dynamicDestinations,
                   getHeader(),
                   getFooter(),
-                  getWritableByteChannelFactory()),
-              getFormatFunction());
+                  getWritableByteChannelFactory()));
       if (getNumShards() > 0) {
         write = write.withNumShards(getNumShards());
       }
@@ -774,7 +830,7 @@ public class TextIO {
     @VisibleForTesting TypedWrite<String> inner;
 
     Write() {
-      this(TextIO.writeCustomType(SerializableFunctions.<String>identity()));
+      this(TextIO.<String>writeCustomType());
     }
 
     Write(TypedWrite<String> inner) {
@@ -783,43 +839,53 @@ public class TextIO {
 
     /** See {@link TypedWrite#to(String)}. */
     public Write to(String filenamePrefix) {
-      return new Write(inner.to(filenamePrefix));
+      return new Write(
+          inner.to(filenamePrefix).withFormatFunction(SerializableFunctions.<String>identity()));
     }
 
     /** See {@link TypedWrite#to(ResourceId)}. */
     @Experimental(Kind.FILESYSTEM)
     public Write to(ResourceId filenamePrefix) {
-      return new Write(inner.to(filenamePrefix));
+      return new Write(
+          inner.to(filenamePrefix).withFormatFunction(SerializableFunctions.<String>identity()));
     }
 
     /** See {@link TypedWrite#to(ValueProvider)}. */
     public Write to(ValueProvider<String> outputPrefix) {
-      return new Write(inner.to(outputPrefix));
+      return new Write(
+          inner.to(outputPrefix).withFormatFunction(SerializableFunctions.<String>identity()));
     }
 
     /** See {@link TypedWrite#toResource(ValueProvider)}. */
     @Experimental(Kind.FILESYSTEM)
     public Write toResource(ValueProvider<ResourceId> filenamePrefix) {
-      return new Write(inner.toResource(filenamePrefix));
+      return new Write(
+          inner
+              .toResource(filenamePrefix)
+              .withFormatFunction(SerializableFunctions.<String>identity()));
     }
 
     /** See {@link TypedWrite#to(FilenamePolicy)}. */
     @Experimental(Kind.FILESYSTEM)
     public Write to(FilenamePolicy filenamePolicy) {
-      return new Write(inner.to(filenamePolicy));
+      return new Write(
+          inner.to(filenamePolicy).withFormatFunction(SerializableFunctions.<String>identity()));
     }
 
     /** See {@link TypedWrite#to(DynamicDestinations)}. */
     @Experimental(Kind.FILESYSTEM)
-    public Write to(DynamicDestinations<String, ?> dynamicDestinations) {
-      return new Write(inner.to(dynamicDestinations));
+    public Write to(DynamicDestinations<String, ?, String> dynamicDestinations) {
+      return new Write(inner.to(dynamicDestinations).withFormatFunction(null));
     }
 
     /** See {@link TypedWrite#to(SerializableFunction, Params)}. */
     @Experimental(Kind.FILESYSTEM)
     public Write to(
         SerializableFunction<String, Params> destinationFunction, Params emptyDestination) {
-      return new Write(inner.to(destinationFunction, emptyDestination));
+      return new Write(
+          inner
+              .to(destinationFunction, emptyDestination)
+              .withFormatFunction(SerializableFunctions.<String>identity()));
     }
 
     /** See {@link TypedWrite#withTempDirectory(ValueProvider)}. */

http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSink.java
index b57b28c..387e0ac 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSink.java
@@ -34,13 +34,13 @@ import org.apache.beam.sdk.util.MimeTypes;
  * '\n'} represented in {@code UTF-8} format as the record separator. Each record (including the
  * last) is terminated.
  */
-class TextSink<UserT, DestinationT> extends FileBasedSink<String, DestinationT> {
+class TextSink<UserT, DestinationT> extends FileBasedSink<UserT, DestinationT, String> {
   @Nullable private final String header;
   @Nullable private final String footer;
 
   TextSink(
       ValueProvider<ResourceId> baseOutputFilename,
-      DynamicDestinations<UserT, DestinationT> dynamicDestinations,
+      DynamicDestinations<UserT, DestinationT, String> dynamicDestinations,
       @Nullable String header,
       @Nullable String footer,
       WritableByteChannelFactory writableByteChannelFactory) {
@@ -50,13 +50,13 @@ class TextSink<UserT, DestinationT> extends FileBasedSink<String, DestinationT>
   }
 
   @Override
-  public WriteOperation<String, DestinationT> createWriteOperation() {
+  public WriteOperation<DestinationT, String> createWriteOperation() {
     return new TextWriteOperation<>(this, header, footer);
   }
 
   /** A {@link WriteOperation WriteOperation} for text files. */
   private static class TextWriteOperation<DestinationT>
-      extends WriteOperation<String, DestinationT> {
+      extends WriteOperation<DestinationT, String> {
     @Nullable private final String header;
     @Nullable private final String footer;
 
@@ -67,20 +67,20 @@ class TextSink<UserT, DestinationT> extends FileBasedSink<String, DestinationT>
     }
 
     @Override
-    public Writer<String, DestinationT> createWriter() throws Exception {
+    public Writer<DestinationT, String> createWriter() throws Exception {
       return new TextWriter<>(this, header, footer);
     }
   }
 
   /** A {@link Writer Writer} for text files. */
-  private static class TextWriter<DestinationT> extends Writer<String, DestinationT> {
+  private static class TextWriter<DestinationT> extends Writer<DestinationT, String> {
     private static final String NEWLINE = "\n";
     @Nullable private final String header;
     @Nullable private final String footer;
     private OutputStreamWriter out;
 
     public TextWriter(
-        WriteOperation<String, DestinationT> writeOperation,
+        WriteOperation<DestinationT, String> writeOperation,
         @Nullable String header,
         @Nullable String footer) {
       super(writeOperation, MimeTypes.TEXT);

http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
index d8d7478..85c5652 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
@@ -60,7 +60,6 @@ import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.WithKeys;
 import org.apache.beam.sdk.transforms.display.DisplayData;
@@ -76,7 +75,9 @@ import org.apache.beam.sdk.values.PCollection.IsBounded;
 import org.apache.beam.sdk.values.PCollectionList;
 import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PCollectionViews;
 import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.ShardedKey;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TupleTagList;
@@ -121,9 +122,8 @@ public class WriteFiles<UserT, DestinationT, OutputT>
   private static final int SPILLED_RECORD_SHARDING_FACTOR = 10;
 
   static final int UNKNOWN_SHARDNUM = -1;
-  private FileBasedSink<OutputT, DestinationT> sink;
-  private SerializableFunction<UserT, OutputT> formatFunction;
-  private WriteOperation<OutputT, DestinationT> writeOperation;
+  private FileBasedSink<UserT, DestinationT, OutputT> sink;
+  private WriteOperation<DestinationT, OutputT> writeOperation;
   // This allows the number of shards to be dynamically computed based on the input
   // PCollection.
   @Nullable private final PTransform<PCollection<UserT>, PCollectionView<Integer>> computeNumShards;
@@ -133,37 +133,44 @@ public class WriteFiles<UserT, DestinationT, OutputT>
   private final ValueProvider<Integer> numShardsProvider;
   private final boolean windowedWrites;
   private int maxNumWritersPerBundle;
+  // This is the set of side inputs used by this transform. This is usually populated by the users's
+  // DynamicDestinations object.
+  private final List<PCollectionView<?>> sideInputs;
 
   /**
    * Creates a {@link WriteFiles} transform that writes to the given {@link FileBasedSink}, letting
    * the runner control how many different shards are produced.
    */
   public static <UserT, DestinationT, OutputT> WriteFiles<UserT, DestinationT, OutputT> to(
-      FileBasedSink<OutputT, DestinationT> sink,
-      SerializableFunction<UserT, OutputT> formatFunction) {
+      FileBasedSink<UserT, DestinationT, OutputT> sink) {
     checkNotNull(sink, "sink");
     return new WriteFiles<>(
         sink,
-        formatFunction,
         null /* runner-determined sharding */,
         null,
         false,
-        DEFAULT_MAX_NUM_WRITERS_PER_BUNDLE);
+        DEFAULT_MAX_NUM_WRITERS_PER_BUNDLE,
+        sink.getDynamicDestinations().getSideInputs());
   }
 
   private WriteFiles(
-      FileBasedSink<OutputT, DestinationT> sink,
-      SerializableFunction<UserT, OutputT> formatFunction,
+      FileBasedSink<UserT, DestinationT, OutputT> sink,
       @Nullable PTransform<PCollection<UserT>, PCollectionView<Integer>> computeNumShards,
       @Nullable ValueProvider<Integer> numShardsProvider,
       boolean windowedWrites,
-      int maxNumWritersPerBundle) {
+      int maxNumWritersPerBundle,
+      List<PCollectionView<?>> sideInputs) {
     this.sink = sink;
-    this.formatFunction = checkNotNull(formatFunction);
     this.computeNumShards = computeNumShards;
     this.numShardsProvider = numShardsProvider;
     this.windowedWrites = windowedWrites;
     this.maxNumWritersPerBundle = maxNumWritersPerBundle;
+    this.sideInputs = sideInputs;
+  }
+
+  @Override
+  public Map<TupleTag<?>, PValue> getAdditionalInputs() {
+    return PCollectionViews.toAdditionalInputs(sideInputs);
   }
 
   @Override
@@ -207,15 +214,10 @@ public class WriteFiles<UserT, DestinationT, OutputT>
   }
 
   /** Returns the {@link FileBasedSink} associated with this PTransform. */
-  public FileBasedSink<OutputT, DestinationT> getSink() {
+  public FileBasedSink<UserT, DestinationT, OutputT> getSink() {
     return sink;
   }
 
-  /** Returns the the format function that maps the user type to the record written to files. */
-  public SerializableFunction<UserT, OutputT> getFormatFunction() {
-    return formatFunction;
-  }
-
   /**
    * Returns whether or not to perform windowed writes.
    */
@@ -266,11 +268,11 @@ public class WriteFiles<UserT, DestinationT, OutputT>
       ValueProvider<Integer> numShardsProvider) {
     return new WriteFiles<>(
         sink,
-        formatFunction,
         computeNumShards,
         numShardsProvider,
         windowedWrites,
-        maxNumWritersPerBundle);
+        maxNumWritersPerBundle,
+        sideInputs);
   }
 
   /** Set the maximum number of writers created in a bundle before spilling to shuffle. */
@@ -278,11 +280,22 @@ public class WriteFiles<UserT, DestinationT, OutputT>
       int maxNumWritersPerBundle) {
     return new WriteFiles<>(
         sink,
-        formatFunction,
         computeNumShards,
         numShardsProvider,
         windowedWrites,
-        maxNumWritersPerBundle);
+        maxNumWritersPerBundle,
+        sideInputs);
+  }
+
+  public WriteFiles<UserT, DestinationT, OutputT> withSideInputs(
+      List<PCollectionView<?>> sideInputs) {
+    return new WriteFiles<>(
+        sink,
+        computeNumShards,
+        numShardsProvider,
+        windowedWrites,
+        maxNumWritersPerBundle,
+        sideInputs);
   }
 
   /**
@@ -297,7 +310,7 @@ public class WriteFiles<UserT, DestinationT, OutputT>
     checkNotNull(
         sharding, "Cannot provide null sharding. Use withRunnerDeterminedSharding() instead");
     return new WriteFiles<>(
-        sink, formatFunction, sharding, null, windowedWrites, maxNumWritersPerBundle);
+        sink, sharding, null, windowedWrites, maxNumWritersPerBundle, sideInputs);
   }
 
   /**
@@ -305,8 +318,7 @@ public class WriteFiles<UserT, DestinationT, OutputT>
    * runner-determined sharding.
    */
   public WriteFiles<UserT, DestinationT, OutputT> withRunnerDeterminedSharding() {
-    return new WriteFiles<>(
-        sink, formatFunction, null, null, windowedWrites, maxNumWritersPerBundle);
+    return new WriteFiles<>(sink, null, null, windowedWrites, maxNumWritersPerBundle, sideInputs);
   }
 
   /**
@@ -323,7 +335,7 @@ public class WriteFiles<UserT, DestinationT, OutputT>
    */
   public WriteFiles<UserT, DestinationT, OutputT> withWindowedWrites() {
     return new WriteFiles<>(
-        sink, formatFunction, computeNumShards, numShardsProvider, true, maxNumWritersPerBundle);
+        sink, computeNumShards, numShardsProvider, true, maxNumWritersPerBundle, sideInputs);
   }
 
   private static class WriterKey<DestinationT> {
@@ -374,7 +386,7 @@ public class WriteFiles<UserT, DestinationT, OutputT>
     private final Coder<DestinationT> destinationCoder;
     private final boolean windowedWrites;
 
-    private Map<WriterKey<DestinationT>, Writer<OutputT, DestinationT>> writers;
+    private Map<WriterKey<DestinationT>, Writer<DestinationT, OutputT>> writers;
     private int spilledShardNum = UNKNOWN_SHARDNUM;
 
     WriteBundles(
@@ -394,6 +406,7 @@ public class WriteFiles<UserT, DestinationT, OutputT>
 
     @ProcessElement
     public void processElement(ProcessContext c, BoundedWindow window) throws Exception {
+      sink.getDynamicDestinations().setSideInputAccessorFromProcessContext(c);
       PaneInfo paneInfo = c.pane();
       // If we are doing windowed writes, we need to ensure that we have separate files for
       // data in different windows/panes. Similar for dynamic writes, make sure that different
@@ -402,7 +415,7 @@ public class WriteFiles<UserT, DestinationT, OutputT>
       // the map will only have a single element.
       DestinationT destination = sink.getDynamicDestinations().getDestination(c.element());
       WriterKey<DestinationT> key = new WriterKey<>(window, c.pane(), destination);
-      Writer<OutputT, DestinationT> writer = writers.get(key);
+      Writer<DestinationT, OutputT> writer = writers.get(key);
       if (writer == null) {
         if (writers.size() <= maxNumWritersPerBundle) {
           String uuid = UUID.randomUUID().toString();
@@ -436,14 +449,14 @@ public class WriteFiles<UserT, DestinationT, OutputT>
           return;
         }
       }
-      writeOrClose(writer, formatFunction.apply(c.element()));
+      writeOrClose(writer, getSink().getDynamicDestinations().formatRecord(c.element()));
     }
 
     @FinishBundle
     public void finishBundle(FinishBundleContext c) throws Exception {
-      for (Map.Entry<WriterKey<DestinationT>, Writer<OutputT, DestinationT>> entry :
+      for (Map.Entry<WriterKey<DestinationT>, Writer<DestinationT, OutputT>> entry :
           writers.entrySet()) {
-        Writer<OutputT, DestinationT> writer = entry.getValue();
+        Writer<DestinationT, OutputT> writer = entry.getValue();
         FileResult<DestinationT> result;
         try {
           result = writer.close();
@@ -478,13 +491,14 @@ public class WriteFiles<UserT, DestinationT, OutputT>
 
     @ProcessElement
     public void processElement(ProcessContext c, BoundedWindow window) throws Exception {
+      sink.getDynamicDestinations().setSideInputAccessorFromProcessContext(c);
       // Since we key by a 32-bit hash of the destination, there might be multiple destinations
       // in this iterable. The number of destinations is generally very small (1000s or less), so
       // there will rarely be hash collisions.
-      Map<DestinationT, Writer<OutputT, DestinationT>> writers = Maps.newHashMap();
+      Map<DestinationT, Writer<DestinationT, OutputT>> writers = Maps.newHashMap();
       for (UserT input : c.element().getValue()) {
         DestinationT destination = sink.getDynamicDestinations().getDestination(input);
-        Writer<OutputT, DestinationT> writer = writers.get(destination);
+        Writer<DestinationT, OutputT> writer = writers.get(destination);
         if (writer == null) {
           LOG.debug("Opening writer for write operation {}", writeOperation);
           writer = writeOperation.createWriter();
@@ -501,12 +515,12 @@ public class WriteFiles<UserT, DestinationT, OutputT>
           LOG.debug("Done opening writer");
           writers.put(destination, writer);
         }
-        writeOrClose(writer, formatFunction.apply(input));
-        }
+        writeOrClose(writer, getSink().getDynamicDestinations().formatRecord(input));
+      }
 
       // Close all writers.
-      for (Map.Entry<DestinationT, Writer<OutputT, DestinationT>> entry : writers.entrySet()) {
-        Writer<OutputT, DestinationT> writer = entry.getValue();
+      for (Map.Entry<DestinationT, Writer<DestinationT, OutputT>> entry : writers.entrySet()) {
+        Writer<DestinationT, OutputT> writer = entry.getValue();
         FileResult<DestinationT> result;
         try {
           // Close the writer; if this throws let the error propagate.
@@ -526,8 +540,8 @@ public class WriteFiles<UserT, DestinationT, OutputT>
     }
   }
 
-  private static <OutputT, DestinationT> void writeOrClose(
-      Writer<OutputT, DestinationT> writer, OutputT t) throws Exception {
+  private static <DestinationT, OutputT> void writeOrClose(
+      Writer<DestinationT, OutputT> writer, OutputT t) throws Exception {
     try {
       writer.write(t);
     } catch (Exception e) {
@@ -678,6 +692,7 @@ public class WriteFiles<UserT, DestinationT, OutputT>
           input.apply(
               writeName,
               ParDo.of(new WriteBundles(windowedWrites, unwrittedRecordsTag, destinationCoder))
+                  .withSideInputs(sideInputs)
                   .withOutputTags(writtenRecordsTag, TupleTagList.of(unwrittedRecordsTag)));
       PCollection<FileResult<DestinationT>> writtenBundleFiles =
           writeTuple
@@ -692,17 +707,18 @@ public class WriteFiles<UserT, DestinationT, OutputT>
               .apply("GroupUnwritten", GroupByKey.<ShardedKey<Integer>, UserT>create())
               .apply(
                   "WriteUnwritten",
-                  ParDo.of(new WriteShardedBundles(ShardAssignment.ASSIGN_IN_FINALIZE)))
+                  ParDo.of(new WriteShardedBundles(ShardAssignment.ASSIGN_IN_FINALIZE))
+                      .withSideInputs(sideInputs))
               .setCoder(FileResultCoder.of(shardedWindowCoder, destinationCoder));
       results =
           PCollectionList.of(writtenBundleFiles)
               .and(writtenGroupedFiles)
               .apply(Flatten.<FileResult<DestinationT>>pCollections());
     } else {
-      List<PCollectionView<?>> sideInputs = Lists.newArrayList();
+      List<PCollectionView<?>> shardingSideInputs = Lists.newArrayList();
       if (computeNumShards != null) {
         numShardsView = input.apply(computeNumShards);
-        sideInputs.add(numShardsView);
+        shardingSideInputs.add(numShardsView);
       } else {
         numShardsView = null;
       }
@@ -715,7 +731,7 @@ public class WriteFiles<UserT, DestinationT, OutputT>
                               numShardsView,
                               (numShardsView != null) ? null : numShardsProvider,
                               destinationCoder))
-                      .withSideInputs(sideInputs))
+                      .withSideInputs(shardingSideInputs))
               .setCoder(KvCoder.of(ShardedKeyCoder.of(VarIntCoder.of()), input.getCoder()))
               .apply("GroupIntoShards", GroupByKey.<ShardedKey<Integer>, UserT>create());
       shardedWindowCoder =
@@ -728,7 +744,8 @@ public class WriteFiles<UserT, DestinationT, OutputT>
       results =
           sharded.apply(
               "WriteShardedBundles",
-              ParDo.of(new WriteShardedBundles(ShardAssignment.ASSIGN_WHEN_WRITING)));
+              ParDo.of(new WriteShardedBundles(ShardAssignment.ASSIGN_WHEN_WRITING))
+                  .withSideInputs(sideInputs));
     }
     results.setCoder(FileResultCoder.of(shardedWindowCoder, destinationCoder));
 
@@ -773,11 +790,12 @@ public class WriteFiles<UserT, DestinationT, OutputT>
     } else {
       final PCollectionView<Iterable<FileResult<DestinationT>>> resultsView =
           results.apply(View.<FileResult<DestinationT>>asIterable());
-      ImmutableList.Builder<PCollectionView<?>> sideInputs =
+      ImmutableList.Builder<PCollectionView<?>> finalizeSideInputs =
           ImmutableList.<PCollectionView<?>>builder().add(resultsView);
       if (numShardsView != null) {
-        sideInputs.add(numShardsView);
+        finalizeSideInputs.add(numShardsView);
       }
+      finalizeSideInputs.addAll(sideInputs);
 
       // Finalize the write in another do-once ParDo on the singleton collection containing the
       // Writer. The results from the per-bundle writes are given as an Iterable side input.
@@ -794,7 +812,7 @@ public class WriteFiles<UserT, DestinationT, OutputT>
                   new DoFn<Void, Integer>() {
                     @ProcessElement
                     public void processElement(ProcessContext c) throws Exception {
-                      LOG.info("Finalizing write operation {}.", writeOperation);
+                      sink.getDynamicDestinations().setSideInputAccessorFromProcessContext(c);
                       // We must always output at least 1 shard, and honor user-specified numShards
                       // if
                       // set.
@@ -827,7 +845,7 @@ public class WriteFiles<UserT, DestinationT, OutputT>
                       writeOperation.removeTemporaryFiles(tempFiles);
                     }
                   })
-              .withSideInputs(sideInputs.build()));
+              .withSideInputs(finalizeSideInputs.build()));
     }
     return PDone.in(input.getPipeline());
   }
@@ -857,7 +875,7 @@ public class WriteFiles<UserT, DestinationT, OutputT>
           minShardsNeeded,
           destination);
       for (int i = 0; i < extraShardsNeeded; ++i) {
-        Writer<OutputT, DestinationT> writer = writeOperation.createWriter();
+        Writer<DestinationT, OutputT> writer = writeOperation.createWriter();
         // Currently this code path is only called in the unwindowed case.
         writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM, destination);
         FileResult<DestinationT> emptyWrite = writer.close();

http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
index 154ff5a..a96b6be 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
@@ -30,9 +30,11 @@ import static org.junit.Assert.assertTrue;
 import com.google.common.base.MoreObjects;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
@@ -41,6 +43,7 @@ import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Random;
 import java.util.Set;
@@ -48,6 +51,7 @@ import org.apache.avro.Schema;
 import org.apache.avro.file.CodecFactory;
 import org.apache.avro.file.DataFileReader;
 import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.reflect.Nullable;
@@ -55,6 +59,7 @@ import org.apache.avro.reflect.ReflectData;
 import org.apache.avro.reflect.ReflectDatumReader;
 import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.coders.DefaultCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
 import org.apache.beam.sdk.io.FileBasedSink.OutputFileHints;
 import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
@@ -68,6 +73,7 @@ import org.apache.beam.sdk.testing.UsesTestStream;
 import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -77,6 +83,7 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TimestampedValue;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
@@ -535,17 +542,147 @@ public class AvroIOTest {
     assertThat(actualElements, containsInAnyOrder(allElements.toArray()));
   }
 
+  private static final String SCHEMA_TEMPLATE_STRING =
+      "{\"namespace\": \"example.avro\",\n"
+          + " \"type\": \"record\",\n"
+          + " \"name\": \"TestTemplateSchema$$\",\n"
+          + " \"fields\": [\n"
+          + "     {\"name\": \"$$full\", \"type\": \"string\"},\n"
+          + "     {\"name\": \"$$suffix\", \"type\": [\"string\", \"null\"]}\n"
+          + " ]\n"
+          + "}";
+
+  private static String schemaFromPrefix(String prefix) {
+    return SCHEMA_TEMPLATE_STRING.replace("$$", prefix);
+  }
+
+  private static GenericRecord createRecord(String record, String prefix, Schema schema) {
+    GenericRecord genericRecord = new GenericData.Record(schema);
+    genericRecord.put(prefix + "full", record);
+    genericRecord.put(prefix + "suffix", record.substring(1));
+    return genericRecord;
+  }
+
+  private static class TestDynamicDestinations
+      extends DynamicAvroDestinations<String, String, GenericRecord> {
+    ResourceId baseDir;
+    PCollectionView<Map<String, String>> schemaView;
+
+    TestDynamicDestinations(ResourceId baseDir, PCollectionView<Map<String, String>> schemaView) {
+      this.baseDir = baseDir;
+      this.schemaView = schemaView;
+    }
+
+    @Override
+    public Schema getSchema(String destination) {
+      // Return a per-destination schema.
+      String schema = sideInput(schemaView).get(destination);
+      return new Schema.Parser().parse(schema);
+    }
+
+    @Override
+    public List<PCollectionView<?>> getSideInputs() {
+      return ImmutableList.<PCollectionView<?>>of(schemaView);
+    }
+
+    @Override
+    public GenericRecord formatRecord(String record) {
+      String prefix = record.substring(0, 1);
+      return createRecord(record, prefix, getSchema(prefix));
+    }
+
+    @Override
+    public String getDestination(String element) {
+      // Destination is based on first character of string.
+      return element.substring(0, 1);
+    }
+
+    @Override
+    public String getDefaultDestination() {
+      return "";
+    }
+
+    @Override
+    public FilenamePolicy getFilenamePolicy(String destination) {
+      return DefaultFilenamePolicy.fromStandardParameters(
+          StaticValueProvider.of(
+              baseDir.resolve("file_" + destination + ".txt", StandardResolveOptions.RESOLVE_FILE)),
+          null,
+          null,
+          false);
+    }
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testDynamicDestinations() throws Exception {
+    ResourceId baseDir =
+        FileSystems.matchNewResource(
+            Files.createTempDirectory(tmpFolder.getRoot().toPath(), "testDynamicDestinations")
+                .toString(),
+            true);
+
+    List<String> elements = Lists.newArrayList("aaaa", "aaab", "baaa", "baab", "caaa", "caab");
+    List<GenericRecord> expectedElements = Lists.newArrayListWithExpectedSize(elements.size());
+    Map<String, String> schemaMap = Maps.newHashMap();
+    for (String element : elements) {
+      String prefix = element.substring(0, 1);
+      String jsonSchema = schemaFromPrefix(prefix);
+      schemaMap.put(prefix, jsonSchema);
+      expectedElements.add(createRecord(element, prefix, new Schema.Parser().parse(jsonSchema)));
+    }
+    PCollectionView<Map<String, String>> schemaView =
+        writePipeline
+            .apply("createSchemaView", Create.of(schemaMap))
+            .apply(View.<String, String>asMap());
+
+    PCollection<String> input =
+        writePipeline.apply("createInput", Create.of(elements).withCoder(StringUtf8Coder.of()));
+    input.apply(
+        AvroIO.<String>writeCustomTypeToGenericRecords()
+            .to(new TestDynamicDestinations(baseDir, schemaView))
+            .withoutSharding()
+            .withTempDirectory(baseDir));
+    writePipeline.run();
+
+    // Validate that the data written matches the expected elements in the expected order.
+
+    List<String> prefixes = Lists.newArrayList();
+    for (String element : elements) {
+      prefixes.add(element.substring(0, 1));
+    }
+    prefixes = ImmutableSet.copyOf(prefixes).asList();
+
+    List<GenericRecord> actualElements = new ArrayList<>();
+    for (String prefix : prefixes) {
+      File expectedFile =
+          new File(
+              baseDir
+                  .resolve(
+                      "file_" + prefix + ".txt-00000-of-00001", StandardResolveOptions.RESOLVE_FILE)
+                  .toString());
+      assertTrue("Expected output file " + expectedFile.getAbsolutePath(), expectedFile.exists());
+      Schema schema = new Schema.Parser().parse(schemaFromPrefix(prefix));
+      try (DataFileReader<GenericRecord> reader =
+          new DataFileReader<>(expectedFile, new GenericDatumReader<GenericRecord>(schema))) {
+        Iterators.addAll(actualElements, reader);
+      }
+      expectedFile.delete();
+    }
+    assertThat(actualElements, containsInAnyOrder(expectedElements.toArray()));
+  }
+
   @Test
   public void testWriteWithDefaultCodec() throws Exception {
     AvroIO.Write<String> write = AvroIO.write(String.class).to("/tmp/foo/baz");
-    assertEquals(CodecFactory.deflateCodec(6).toString(), write.getCodec().toString());
+    assertEquals(CodecFactory.deflateCodec(6).toString(), write.inner.getCodec().toString());
   }
 
   @Test
   public void testWriteWithCustomCodec() throws Exception {
     AvroIO.Write<String> write =
         AvroIO.write(String.class).to("/tmp/foo/baz").withCodec(CodecFactory.snappyCodec());
-    assertEquals(SNAPPY_CODEC, write.getCodec().toString());
+    assertEquals(SNAPPY_CODEC, write.inner.getCodec().toString());
   }
 
   @Test
@@ -556,7 +693,7 @@ public class AvroIOTest {
 
     assertEquals(
         CodecFactory.deflateCodec(9).toString(),
-        SerializableUtils.clone(write.getCodec()).getCodec().toString());
+        SerializableUtils.clone(write.inner.getCodec()).getCodec().toString());
   }
 
   @Test
@@ -567,7 +704,7 @@ public class AvroIOTest {
 
     assertEquals(
         CodecFactory.xzCodec(9).toString(),
-        SerializableUtils.clone(write.getCodec()).getCodec().toString());
+        SerializableUtils.clone(write.inner.getCodec()).getCodec().toString());
   }
 
   @Test
@@ -618,7 +755,8 @@ public class AvroIOTest {
 
     String shardNameTemplate =
         firstNonNull(
-            write.getShardTemplate(), DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE);
+            write.inner.getShardTemplate(),
+            DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE);
 
     assertTestOutputs(expectedElements, numShards, outputFilePrefix, shardNameTemplate);
   }
@@ -710,7 +848,13 @@ public class AvroIOTest {
     assertThat(displayData, hasDisplayItem("filePrefix", "/foo"));
     assertThat(displayData, hasDisplayItem("shardNameTemplate", "-SS-of-NN-"));
     assertThat(displayData, hasDisplayItem("fileSuffix", "bar"));
-    assertThat(displayData, hasDisplayItem("schema", GenericClass.class));
+    assertThat(
+        displayData,
+        hasDisplayItem(
+            "schema",
+            "{\"type\":\"record\",\"name\":\"GenericClass\",\"namespace\":\"org.apache.beam.sdk.io"
+                + ".AvroIOTest$\",\"fields\":[{\"name\":\"intField\",\"type\":\"int\"},"
+                + "{\"name\":\"stringField\",\"type\":\"string\"}]}"));
     assertThat(displayData, hasDisplayItem("numShards", 100));
     assertThat(displayData, hasDisplayItem("codec", CodecFactory.snappyCodec().toString()));
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
index a6ad746..ff30e33 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
@@ -231,7 +231,7 @@ public class FileBasedSinkTest {
         SimpleSink.makeSimpleSink(
             getBaseOutputDirectory(), prefix, "", "", CompressionType.UNCOMPRESSED);
 
-    WriteOperation<String, Void> writeOp =
+    WriteOperation<Void, String> writeOp =
         new SimpleSink.SimpleWriteOperation<>(sink, tempDirectory);
 
     List<File> temporaryFiles = new ArrayList<>();
@@ -482,11 +482,11 @@ public class FileBasedSinkTest {
   public void testFileBasedWriterWithWritableByteChannelFactory() throws Exception {
     final String testUid = "testId";
     ResourceId root = getBaseOutputDirectory();
-    WriteOperation<String, Void> writeOp =
+    WriteOperation<Void, String> writeOp =
         SimpleSink.makeSimpleSink(
                 root, "file", "-SS-of-NN", "txt", new DrunkWritableByteChannelFactory())
             .createWriteOperation();
-    final Writer<String, Void> writer = writeOp.createWriter();
+    final Writer<Void, String> writer = writeOp.createWriter();
     final ResourceId expectedFile =
         writeOp.tempDirectory.get().resolve(testUid, StandardResolveOptions.RESOLVE_FILE);
 

http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java
index 9196178..382898d 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java
@@ -28,10 +28,10 @@ import org.apache.beam.sdk.util.MimeTypes;
 /**
  * A simple {@link FileBasedSink} that writes {@link String} values as lines with header and footer.
  */
-class SimpleSink<DestinationT> extends FileBasedSink<String, DestinationT> {
+class SimpleSink<DestinationT> extends FileBasedSink<String, DestinationT, String> {
   public SimpleSink(
       ResourceId tempDirectory,
-      DynamicDestinations<String, DestinationT> dynamicDestinations,
+      DynamicDestinations<String, DestinationT, String> dynamicDestinations,
       WritableByteChannelFactory writableByteChannelFactory) {
     super(StaticValueProvider.of(tempDirectory), dynamicDestinations, writableByteChannelFactory);
   }
@@ -50,7 +50,7 @@ class SimpleSink<DestinationT> extends FileBasedSink<String, DestinationT> {
       String shardTemplate,
       String suffix,
       WritableByteChannelFactory writableByteChannelFactory) {
-    DynamicDestinations<String, Void> dynamicDestinations =
+    DynamicDestinations<String, Void, String> dynamicDestinations =
         DynamicFileDestinations.constant(
             DefaultFilenamePolicy.fromParams(
                 new Params()
@@ -67,7 +67,7 @@ class SimpleSink<DestinationT> extends FileBasedSink<String, DestinationT> {
   }
 
   static final class SimpleWriteOperation<DestinationT>
-      extends WriteOperation<String, DestinationT> {
+      extends WriteOperation<DestinationT, String> {
     public SimpleWriteOperation(SimpleSink sink, ResourceId tempOutputDirectory) {
       super(sink, tempOutputDirectory);
     }
@@ -82,7 +82,7 @@ class SimpleSink<DestinationT> extends FileBasedSink<String, DestinationT> {
     }
   }
 
-  static final class SimpleWriter<DestinationT> extends Writer<String, DestinationT> {
+  static final class SimpleWriter<DestinationT> extends Writer<DestinationT, String> {
     static final String HEADER = "header";
     static final String FOOTER = "footer";
 

http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOWriteTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOWriteTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOWriteTest.java
index a73ed7d..7f80c26 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOWriteTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOWriteTest.java
@@ -110,7 +110,8 @@ public class TextIOWriteTest {
         });
   }
 
-  static class TestDynamicDestinations extends FileBasedSink.DynamicDestinations<String, String> {
+  static class TestDynamicDestinations
+      extends FileBasedSink.DynamicDestinations<String, String, String> {
     ResourceId baseDir;
 
     TestDynamicDestinations(ResourceId baseDir) {
@@ -118,6 +119,11 @@ public class TextIOWriteTest {
     }
 
     @Override
+    public String formatRecord(String record) {
+      return record;
+    }
+
+    @Override
     public String getDestination(String element) {
       // Destination is based on first character of string.
       return element.substring(0, 1);
@@ -169,10 +175,7 @@ public class TextIOWriteTest {
 
     List<String> elements = Lists.newArrayList("aaaa", "aaab", "baaa", "baab", "caaa", "caab");
     PCollection<String> input = p.apply(Create.of(elements).withCoder(StringUtf8Coder.of()));
-    input.apply(
-        TextIO.write()
-            .to(new TestDynamicDestinations(baseDir))
-            .withTempDirectory(FileSystems.matchNewResource(baseDir.toString(), true)));
+    input.apply(TextIO.write().to(new TestDynamicDestinations(baseDir)).withTempDirectory(baseDir));
     p.run();
 
     assertOutputFiles(
@@ -268,8 +271,14 @@ public class TextIOWriteTest {
             new UserWriteType("caab", "sixth"));
     PCollection<UserWriteType> input = p.apply(Create.of(elements));
     input.apply(
-        TextIO.writeCustomType(new SerializeUserWrite())
-            .to(new UserWriteDestination(baseDir), new DefaultFilenamePolicy.Params())
+        TextIO.<UserWriteType>writeCustomType()
+            .to(
+                new UserWriteDestination(baseDir),
+                new DefaultFilenamePolicy.Params()
+                    .withBaseFilename(
+                        baseDir.resolve(
+                            "empty", ResolveOptions.StandardResolveOptions.RESOLVE_FILE)))
+            .withFormatFunction(new SerializeUserWrite())
             .withTempDirectory(FileSystems.matchNewResource(baseDir.toString(), true)));
     p.run();