You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/07/29 00:49:42 UTC

[3/4] beam git commit: [BEAM-92] Supports DynamicDestinations in AvroIO.

[BEAM-92] Supports DynamicDestinations in AvroIO.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9f2622fa
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9f2622fa
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9f2622fa

Branch: refs/heads/master
Commit: 9f2622fa19da1284222e872fdcd63b086bdc3509
Parents: 1f2634d
Author: Reuven Lax <re...@google.com>
Authored: Thu Jul 6 20:22:25 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Fri Jul 28 17:28:12 2017 -0700

----------------------------------------------------------------------
 .../core/construction/ParDoTranslation.java     |   2 +-
 .../construction/WriteFilesTranslation.java     |  81 ++--
 .../construction/PTransformMatchersTest.java    |  10 +-
 .../construction/WriteFilesTranslationTest.java |  26 +-
 .../direct/WriteWithShardingFactory.java        |  10 +-
 .../direct/WriteWithShardingFactoryTest.java    |   8 +-
 .../beam/runners/dataflow/DataflowRunner.java   |   8 +-
 .../runners/dataflow/DataflowRunnerTest.java    |  10 +-
 .../src/main/proto/beam_runner_api.proto        |   2 +
 .../java/org/apache/beam/sdk/io/AvroIO.java     | 436 +++++++++++++++----
 .../java/org/apache/beam/sdk/io/AvroSink.java   |  93 ++--
 .../beam/sdk/io/ConstantAvroDestination.java    | 130 ++++++
 .../beam/sdk/io/DefaultFilenamePolicy.java      |   1 -
 .../beam/sdk/io/DynamicAvroDestinations.java    |  46 ++
 .../beam/sdk/io/DynamicFileDestinations.java    |  59 ++-
 .../org/apache/beam/sdk/io/FileBasedSink.java   | 121 +++--
 .../java/org/apache/beam/sdk/io/TFRecordIO.java |  23 +-
 .../java/org/apache/beam/sdk/io/TextIO.java     | 228 ++++++----
 .../java/org/apache/beam/sdk/io/TextSink.java   |  14 +-
 .../java/org/apache/beam/sdk/io/WriteFiles.java | 116 ++---
 .../java/org/apache/beam/sdk/io/AvroIOTest.java | 156 ++++++-
 .../apache/beam/sdk/io/FileBasedSinkTest.java   |   6 +-
 .../java/org/apache/beam/sdk/io/SimpleSink.java |  10 +-
 .../org/apache/beam/sdk/io/TextIOWriteTest.java |  23 +-
 .../org/apache/beam/sdk/io/WriteFilesTest.java  |  74 ++--
 .../java/org/apache/beam/sdk/io/xml/XmlIO.java  |   4 +-
 .../org/apache/beam/sdk/io/xml/XmlSink.java     |   8 +-
 27 files changed, 1214 insertions(+), 491 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
index d7b0e9f..5765c51 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
@@ -484,7 +484,7 @@ public class ParDoTranslation {
         });
   }
 
-  private static SideInput toProto(PCollectionView<?> view) {
+  public static SideInput toProto(PCollectionView<?> view) {
     Builder builder = SideInput.newBuilder();
     builder.setAccessPattern(
         FunctionSpec.newBuilder()

http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java
index b1d2da4..7954b0e 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java
@@ -19,29 +19,35 @@
 package org.apache.beam.runners.core.construction;
 
 import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
 
 import com.google.auto.service.AutoService;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import com.google.protobuf.Any;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.BytesValue;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi.SdkFunctionSpec;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.SideInput;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi.WriteFilesPayload;
 import org.apache.beam.sdk.io.FileBasedSink;
 import org.apache.beam.sdk.io.WriteFiles;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.TupleTag;
 
 /**
  * Utility methods for translating a {@link WriteFiles} to and from {@link RunnerApi}
@@ -53,28 +59,25 @@ public class WriteFilesTranslation {
   public static final String CUSTOM_JAVA_FILE_BASED_SINK_URN =
       "urn:beam:file_based_sink:javasdk:0.1";
 
-  public static final String CUSTOM_JAVA_FILE_BASED_SINK_FORMAT_FUNCTION_URN =
-      "urn:beam:file_based_sink_format_function:javasdk:0.1";
-
   @VisibleForTesting
   static WriteFilesPayload toProto(WriteFiles<?, ?, ?> transform) {
+    Map<String, SideInput> sideInputs = Maps.newHashMap();
+    for (PCollectionView<?> view : transform.getSink().getDynamicDestinations().getSideInputs()) {
+      sideInputs.put(view.getTagInternal().getId(), ParDoTranslation.toProto(view));
+    }
     return WriteFilesPayload.newBuilder()
         .setSink(toProto(transform.getSink()))
-        .setFormatFunction(toProto(transform.getFormatFunction()))
         .setWindowedWrites(transform.isWindowedWrites())
         .setRunnerDeterminedSharding(
             transform.getNumShards() == null && transform.getSharding() == null)
+        .putAllSideInputs(sideInputs)
         .build();
   }
 
-  private static SdkFunctionSpec toProto(FileBasedSink<?, ?> sink) {
+  private static SdkFunctionSpec toProto(FileBasedSink<?, ?, ?> sink) {
     return toProto(CUSTOM_JAVA_FILE_BASED_SINK_URN, sink);
   }
 
-  private static SdkFunctionSpec toProto(SerializableFunction<?, ?> serializableFunction) {
-    return toProto(CUSTOM_JAVA_FILE_BASED_SINK_FORMAT_FUNCTION_URN, serializableFunction);
-  }
-
   private static SdkFunctionSpec toProto(String urn, Serializable serializable) {
     return SdkFunctionSpec.newBuilder()
         .setSpec(
@@ -91,7 +94,7 @@ public class WriteFilesTranslation {
   }
 
   @VisibleForTesting
-  static FileBasedSink<?, ?> sinkFromProto(SdkFunctionSpec sinkProto) throws IOException {
+  static FileBasedSink<?, ?, ?> sinkFromProto(SdkFunctionSpec sinkProto) throws IOException {
     checkArgument(
         sinkProto.getSpec().getUrn().equals(CUSTOM_JAVA_FILE_BASED_SINK_URN),
         "Cannot extract %s instance from %s with URN %s",
@@ -102,44 +105,44 @@ public class WriteFilesTranslation {
     byte[] serializedSink =
         sinkProto.getSpec().getParameter().unpack(BytesValue.class).getValue().toByteArray();
 
-    return (FileBasedSink<?, ?>)
+    return (FileBasedSink<?, ?, ?>)
         SerializableUtils.deserializeFromByteArray(
             serializedSink, FileBasedSink.class.getSimpleName());
   }
 
-  @VisibleForTesting
-  static <InputT, OutputT> SerializableFunction<InputT, OutputT> formatFunctionFromProto(
-      SdkFunctionSpec sinkProto) throws IOException {
-    checkArgument(
-        sinkProto.getSpec().getUrn().equals(CUSTOM_JAVA_FILE_BASED_SINK_FORMAT_FUNCTION_URN),
-        "Cannot extract %s instance from %s with URN %s",
-        SerializableFunction.class.getSimpleName(),
-        FunctionSpec.class.getSimpleName(),
-        sinkProto.getSpec().getUrn());
-
-    byte[] serializedFunction =
-        sinkProto.getSpec().getParameter().unpack(BytesValue.class).getValue().toByteArray();
-
-    return (SerializableFunction<InputT, OutputT>)
-        SerializableUtils.deserializeFromByteArray(
-            serializedFunction, FileBasedSink.class.getSimpleName());
-  }
-
-  public static <UserT, DestinationT, OutputT> FileBasedSink<OutputT, DestinationT> getSink(
+  public static <UserT, DestinationT, OutputT> FileBasedSink<UserT, DestinationT, OutputT> getSink(
       AppliedPTransform<PCollection<UserT>, PDone, ? extends PTransform<PCollection<UserT>, PDone>>
           transform)
       throws IOException {
-    return (FileBasedSink<OutputT, DestinationT>)
+    return (FileBasedSink<UserT, DestinationT, OutputT>)
         sinkFromProto(getWriteFilesPayload(transform).getSink());
   }
 
-  public static <InputT, OutputT> SerializableFunction<InputT, OutputT> getFormatFunction(
-      AppliedPTransform<
-              PCollection<InputT>, PDone, ? extends PTransform<PCollection<InputT>, PDone>>
-          transform)
-      throws IOException {
-    return formatFunctionFromProto(
-        getWriteFilesPayload(transform).<InputT, OutputT>getFormatFunction());
+  public static <UserT, DestinationT, OutputT>
+      List<PCollectionView<?>> getDynamicDestinationSideInputs(
+          AppliedPTransform<
+                  PCollection<UserT>, PDone, ? extends PTransform<PCollection<UserT>, PDone>>
+              transform)
+          throws IOException {
+    SdkComponents sdkComponents = SdkComponents.create();
+    RunnerApi.PTransform transformProto = PTransformTranslation.toProto(transform, sdkComponents);
+    List<PCollectionView<?>> views = Lists.newArrayList();
+    Map<String, SideInput> sideInputs = getWriteFilesPayload(transform).getSideInputsMap();
+    for (Map.Entry<String, SideInput> entry : sideInputs.entrySet()) {
+      PCollection<?> originalPCollection =
+          checkNotNull(
+              (PCollection<?>) transform.getInputs().get(new TupleTag<>(entry.getKey())),
+              "no input with tag %s",
+              entry.getKey());
+      views.add(
+          ParDoTranslation.viewFromProto(
+              entry.getValue(),
+              entry.getKey(),
+              originalPCollection,
+              transformProto,
+              RehydratedComponents.forComponents(sdkComponents.toComponents())));
+    }
+    return views;
   }
 
   public static <T> boolean isWindowedWrites(

http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
index 316645b..1862699 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
@@ -57,7 +57,6 @@ import org.apache.beam.sdk.transforms.Materialization;
 import org.apache.beam.sdk.transforms.Materializations;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.SerializableFunctions;
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
@@ -549,15 +548,14 @@ public class PTransformMatchersTest implements Serializable {
             false);
     WriteFiles<Integer, Void, Integer> write =
         WriteFiles.to(
-            new FileBasedSink<Integer, Void>(
+            new FileBasedSink<Integer, Void, Integer>(
                 StaticValueProvider.of(outputDirectory),
-                DynamicFileDestinations.constant(new FakeFilenamePolicy())) {
+                DynamicFileDestinations.<Integer>constant(new FakeFilenamePolicy())) {
               @Override
-              public WriteOperation<Integer, Void> createWriteOperation() {
+              public WriteOperation<Void, Integer> createWriteOperation() {
                 return null;
               }
-            },
-            SerializableFunctions.<Integer>identity());
+            });
     assertThat(
         PTransformMatchers.writeWithRunnerDeterminedSharding().matches(appliedWrite(write)),
         is(true));

http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java
index 4259ac8..e067fac 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java
@@ -38,7 +38,6 @@ import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.SerializableFunctions;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
@@ -63,12 +62,11 @@ public class WriteFilesTranslationTest {
   public static class TestWriteFilesPayloadTranslation {
     @Parameters(name = "{index}: {0}")
     public static Iterable<WriteFiles<Object, Void, Object>> data() {
-      SerializableFunction<Object, Object> format = SerializableFunctions.constant(null);
       return ImmutableList.of(
-          WriteFiles.to(new DummySink(), format),
-          WriteFiles.to(new DummySink(), format).withWindowedWrites(),
-          WriteFiles.to(new DummySink(), format).withNumShards(17),
-          WriteFiles.to(new DummySink(), format).withWindowedWrites().withNumShards(42));
+          WriteFiles.to(new DummySink()),
+          WriteFiles.to(new DummySink()).withWindowedWrites(),
+          WriteFiles.to(new DummySink()).withNumShards(17),
+          WriteFiles.to(new DummySink()).withWindowedWrites().withNumShards(42));
     }
 
     @Parameter(0)
@@ -87,7 +85,8 @@ public class WriteFilesTranslationTest {
       assertThat(payload.getWindowedWrites(), equalTo(writeFiles.isWindowedWrites()));
 
       assertThat(
-          (FileBasedSink<String, Void>) WriteFilesTranslation.sinkFromProto(payload.getSink()),
+          (FileBasedSink<String, Void, String>)
+              WriteFilesTranslation.sinkFromProto(payload.getSink()),
           equalTo(writeFiles.getSink()));
     }
 
@@ -118,16 +117,17 @@ public class WriteFilesTranslationTest {
    * A simple {@link FileBasedSink} for testing serialization/deserialization. Not mocked to avoid
    * any issues serializing mocks.
    */
-  private static class DummySink extends FileBasedSink<Object, Void> {
+  private static class DummySink extends FileBasedSink<Object, Void, Object> {
 
     DummySink() {
       super(
           StaticValueProvider.of(FileSystems.matchNewResource("nowhere", false)),
-          DynamicFileDestinations.constant(new DummyFilenamePolicy()));
+          DynamicFileDestinations.constant(
+              new DummyFilenamePolicy(), SerializableFunctions.constant(null)));
     }
 
     @Override
-    public WriteOperation<Object, Void> createWriteOperation() {
+    public WriteOperation<Void, Object> createWriteOperation() {
       return new DummyWriteOperation(this);
     }
 
@@ -152,13 +152,13 @@ public class WriteFilesTranslationTest {
     }
   }
 
-  private static class DummyWriteOperation extends FileBasedSink.WriteOperation<Object, Void> {
-    public DummyWriteOperation(FileBasedSink<Object, Void> sink) {
+  private static class DummyWriteOperation extends FileBasedSink.WriteOperation<Void, Object> {
+    public DummyWriteOperation(FileBasedSink<Object, Void, Object> sink) {
       super(sink);
     }
 
     @Override
-    public FileBasedSink.Writer<Object, Void> createWriter() throws Exception {
+    public FileBasedSink.Writer<Void, Object> createWriter() throws Exception {
       throw new UnsupportedOperationException("Should never be called.");
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
index ba796ae..3557c5d 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
@@ -24,10 +24,12 @@ import com.google.common.base.Suppliers;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ThreadLocalRandom;
 import org.apache.beam.runners.core.construction.PTransformReplacements;
 import org.apache.beam.runners.core.construction.WriteFilesTranslation;
+import org.apache.beam.sdk.io.FileBasedSink;
 import org.apache.beam.sdk.io.WriteFiles;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.runners.PTransformOverrideFactory;
@@ -61,10 +63,10 @@ class WriteWithShardingFactory<InputT>
       AppliedPTransform<PCollection<InputT>, PDone, PTransform<PCollection<InputT>, PDone>>
           transform) {
     try {
-      WriteFiles<InputT, ?, ?> replacement =
-          WriteFiles.to(
-              WriteFilesTranslation.getSink(transform),
-              WriteFilesTranslation.getFormatFunction(transform));
+      List<PCollectionView<?>> sideInputs =
+          WriteFilesTranslation.getDynamicDestinationSideInputs(transform);
+      FileBasedSink sink = WriteFilesTranslation.getSink(transform);
+      WriteFiles<InputT, ?, ?> replacement = WriteFiles.to(sink).withSideInputs(sideInputs);
       if (WriteFilesTranslation.isWindowedWrites(transform)) {
         replacement = replacement.withWindowedWrites();
       }

http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
index 6dd069c..d0db44e 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
@@ -55,7 +55,6 @@ import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.DoFnTester;
 import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.SerializableFunctions;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
@@ -143,15 +142,14 @@ public class WriteWithShardingFactoryTest implements Serializable {
 
     PTransform<PCollection<Object>, PDone> original =
         WriteFiles.to(
-            new FileBasedSink<Object, Void>(
+            new FileBasedSink<Object, Void, Object>(
                 StaticValueProvider.of(outputDirectory),
                 DynamicFileDestinations.constant(new FakeFilenamePolicy())) {
               @Override
-              public WriteOperation<Object, Void> createWriteOperation() {
+              public WriteOperation<Void, Object> createWriteOperation() {
                 throw new IllegalArgumentException("Should not be used");
               }
-            },
-            SerializableFunctions.identity());
+            });
     @SuppressWarnings("unchecked")
     PCollection<Object> objs = (PCollection) p.apply(Create.empty(VoidCoder.of()));
 

http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 762ac9f..f8d2c3c 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -92,6 +92,7 @@ import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi;
 import org.apache.beam.sdk.extensions.gcp.storage.PathValidator;
 import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.FileBasedSink;
 import org.apache.beam.sdk.io.FileSystems;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.io.UnboundedSource;
@@ -1501,10 +1502,11 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
       }
 
       try {
+        List<PCollectionView<?>> sideInputs =
+            WriteFilesTranslation.getDynamicDestinationSideInputs(transform);
+        FileBasedSink sink = WriteFilesTranslation.getSink(transform);
         WriteFiles<UserT, DestinationT, OutputT> replacement =
-            WriteFiles.<UserT, DestinationT, OutputT>to(
-                WriteFilesTranslation.<UserT, DestinationT, OutputT>getSink(transform),
-                WriteFilesTranslation.<UserT, OutputT>getFormatFunction(transform));
+            WriteFiles.to(sink).withSideInputs(sideInputs);
         if (WriteFilesTranslation.isWindowedWrites(transform)) {
           replacement = replacement.withWindowedWrites();
         }

http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index 7556a28..9db73c6 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -1271,8 +1271,7 @@ public class DataflowRunnerTest implements Serializable {
 
     StreamingShardedWriteFactory<Object, Void, Object> factory =
         new StreamingShardedWriteFactory<>(p.getOptions());
-    WriteFiles<Object, Void, Object> original =
-        WriteFiles.to(new TestSink(tmpFolder.toString()), SerializableFunctions.identity());
+    WriteFiles<Object, Void, Object> original = WriteFiles.to(new TestSink(tmpFolder.toString()));
     PCollection<Object> objs = (PCollection) p.apply(Create.empty(VoidCoder.of()));
     AppliedPTransform<PCollection<Object>, PDone, WriteFiles<Object, Void, Object>>
         originalApplication =
@@ -1290,7 +1289,7 @@ public class DataflowRunnerTest implements Serializable {
     assertThat(replacement.getNumShards().get(), equalTo(expectedNumShards));
   }
 
-  private static class TestSink extends FileBasedSink<Object, Void> {
+  private static class TestSink extends FileBasedSink<Object, Void, Object> {
     @Override
     public void validate(PipelineOptions options) {}
 
@@ -1315,11 +1314,12 @@ public class DataflowRunnerTest implements Serializable {
                     int shardNumber, int numShards, OutputFileHints outputFileHints) {
                   throw new UnsupportedOperationException("should not be called");
                 }
-              }));
+              },
+              SerializableFunctions.identity()));
     }
 
     @Override
-    public WriteOperation<Object, Void> createWriteOperation() {
+    public WriteOperation<Void, Object> createWriteOperation() {
       throw new IllegalArgumentException("Should not be used");
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
----------------------------------------------------------------------
diff --git a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
index 42e2601..9afb565 100644
--- a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
+++ b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
@@ -375,6 +375,8 @@ message WriteFilesPayload {
   bool windowed_writes = 3;
 
   bool runner_determined_sharding = 4;
+
+  map<string, SideInput> side_inputs = 5;
 }
 
 // A coder, the binary format for serialization and deserialization of data in

http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index 27c9073..824f725 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -25,7 +25,6 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Supplier;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
-import com.google.common.io.BaseEncoding;
 import java.util.Map;
 import javax.annotation.Nullable;
 import org.apache.avro.Schema;
@@ -40,7 +39,6 @@ import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VoidCoder;
-import org.apache.beam.sdk.io.FileBasedSink.DynamicDestinations;
 import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
 import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.options.ValueProvider;
@@ -51,7 +49,6 @@ import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.SerializableFunctions;
 import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.display.HasDisplayData;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PDone;
@@ -161,6 +158,51 @@ import org.apache.beam.sdk.values.TypeDescriptors;
  *     .withSuffix(".avro"));
  * }</pre>
  *
+ * <p>The following shows a more-complex example of AvroIO.Write usage, generating dynamic file
+ * destinations as well as a dynamic Avro schema per file. In this example, a PCollection of user
+ * events (e.g. actions on a website) is written out to Avro files. Each event contains the user id
+ * as an integer field. We want events for each user to go into a specific directory for that user,
+ * and each user's data should be written with a specific schema for that user; a side input is
+ * used, so the schema can be calculated in a different stage.
+ *
+ * <pre>{@code
+ * // This is the user class that controls dynamic destinations for this avro write. The input to
+ * // AvroIO.Write will be UserEvent, and we will be writing GenericRecords to the file (in order
+ * // to have dynamic schemas). Everything is per userid, so we define a dynamic destination type
+ * // of Integer.
+ * class UserDynamicAvroDestinations
+ *     extends DynamicAvroDestinations<UserEvent, Integer, GenericRecord> {
+ *   private final PCollectionView<Map<Integer, String>> userToSchemaMap;
+ *   public UserDynamicAvroDestinations( PCollectionView<Map<Integer, String>> userToSchemaMap) {
+ *     this.userToSchemaMap = userToSchemaMap;
+ *   }
+ *   public GenericRecord formatRecord(UserEvent record) {
+ *     return formatUserRecord(record, getSchema(record.getUserId()));
+ *   }
+ *   public Schema getSchema(Integer userId) {
+ *     return new Schema.Parser().parse(sideInput(userToSchemaMap).get(userId));
+ *   }
+ *   public Integer getDestination(UserEvent record) {
+ *     return record.getUserId();
+ *   }
+ *   public Integer getDefaultDestination() {
+ *     return 0;
+ *   }
+ *   public FilenamePolicy getFilenamePolicy(Integer userId) {
+ *     return DefaultFilenamePolicy.fromParams(new Params().withBaseFilename(baseDir + "/user-"
+ *     + userId + "/events"));
+ *   }
+ *   public List<PCollectionView<?>> getSideInputs() {
+ *     return ImmutableList.<PCollectionView<?>>of(userToSchemaMap);
+ *   }
+ * }
+ * PCollection<UserEvents> events = ...;
+ * PCollectionView<Integer, String> schemaMap = events.apply(
+ *     "ComputeSchemas", new ComputePerUserSchemas());
+ * events.apply("WriteAvros", AvroIO.<Integer>writeCustomTypeToGenericRecords()
+ *     .to(new UserDynamicAvros()));
+ * }</pre>
+ *
  * <p>By default, {@link AvroIO.Write} produces output files that are compressed using the {@link
  * org.apache.avro.file.Codec CodecFactory.deflateCodec(6)}. This default can be changed or
  * overridden using {@link AvroIO.Write#withCodec}.
@@ -256,18 +298,53 @@ public class AvroIO {
    * pattern).
    */
   public static <T> Write<T> write(Class<T> recordClass) {
-    return AvroIO.<T>defaultWriteBuilder()
-        .setRecordClass(recordClass)
-        .setSchema(ReflectData.get().getSchema(recordClass))
-        .build();
+    return new Write<>(
+        AvroIO.<T, T>defaultWriteBuilder()
+            .setGenericRecords(false)
+            .setSchema(ReflectData.get().getSchema(recordClass))
+            .build());
   }
 
   /** Writes Avro records of the specified schema. */
   public static Write<GenericRecord> writeGenericRecords(Schema schema) {
-    return AvroIO.<GenericRecord>defaultWriteBuilder()
-        .setRecordClass(GenericRecord.class)
-        .setSchema(schema)
-        .build();
+    return new Write<>(
+        AvroIO.<GenericRecord, GenericRecord>defaultWriteBuilder()
+            .setGenericRecords(true)
+            .setSchema(schema)
+            .build());
+  }
+
+  /**
+   * A {@link PTransform} that writes a {@link PCollection} to an avro file (or multiple avro files
+   * matching a sharding pattern), with each element of the input collection encoded into its own
+   * record of type OutputT.
+   *
+   * <p>This version allows you to apply {@link AvroIO} writes to a PCollection of a custom type
+   * {@link UserT}. A format mechanism that converts the input type {@link UserT} to the output type
+   * that will be written to the file must be specified. If using a custom {@link
+   * DynamicAvroDestinations} object this is done using {@link
+   * DynamicAvroDestinations#formatRecord}, otherwise the {@link
+   * AvroIO.TypedWrite#withFormatFunction} can be used to specify a format function.
+   *
+   * <p>The advantage of using a custom type is that is it allows a user-provided {@link
+   * DynamicAvroDestinations} object, set via {@link AvroIO.Write#to(DynamicAvroDestinations)} to
+   * examine the custom type when choosing a destination.
+   *
+   * <p>If the output type is {@link GenericRecord} use {@link #writeCustomTypeToGenericRecords()}
+   * instead.
+   */
+  public static <UserT, OutputT> TypedWrite<UserT, OutputT> writeCustomType() {
+    return AvroIO.<UserT, OutputT>defaultWriteBuilder().setGenericRecords(false).build();
+  }
+
+  /**
+   * Similar to {@link #writeCustomType()}, but specialized for the case where the output type is
+   * {@link GenericRecord}. A schema must be specified either in {@link
+   * DynamicAvroDestinations#getSchema} or if not using dynamic destinations, by using {@link
+   * TypedWrite#withSchema(Schema)}.
+   */
+  public static <UserT> TypedWrite<UserT, GenericRecord> writeCustomTypeToGenericRecords() {
+    return AvroIO.<UserT, GenericRecord>defaultWriteBuilder().setGenericRecords(true).build();
   }
 
   /**
@@ -277,12 +354,12 @@ public class AvroIO {
     return writeGenericRecords(new Schema.Parser().parse(schema));
   }
 
-  private static <T> Write.Builder<T> defaultWriteBuilder() {
-    return new AutoValue_AvroIO_Write.Builder<T>()
+  private static <UserT, OutputT> TypedWrite.Builder<UserT, OutputT> defaultWriteBuilder() {
+    return new AutoValue_AvroIO_TypedWrite.Builder<UserT, OutputT>()
         .setFilenameSuffix(null)
         .setShardTemplate(null)
         .setNumShards(0)
-        .setCodec(Write.DEFAULT_CODEC)
+        .setCodec(TypedWrite.DEFAULT_SERIALIZABLE_CODEC)
         .setMetadata(ImmutableMap.<String, Object>of())
         .setWindowedWrites(false);
   }
@@ -572,15 +649,18 @@ public class AvroIO {
     }
   }
 
-  /////////////////////////////////////////////////////////////////////////////
+  // ///////////////////////////////////////////////////////////////////////////
 
   /** Implementation of {@link #write}. */
   @AutoValue
-  public abstract static class Write<T> extends PTransform<PCollection<T>, PDone> {
-    private static final SerializableAvroCodecFactory DEFAULT_CODEC =
-        new SerializableAvroCodecFactory(CodecFactory.deflateCodec(6));
-    // This should be a multiple of 4 to not get a partial encoded byte.
-    private static final int METADATA_BYTES_MAX_LENGTH = 40;
+  public abstract static class TypedWrite<UserT, OutputT>
+      extends PTransform<PCollection<UserT>, PDone> {
+    static final CodecFactory DEFAULT_CODEC = CodecFactory.deflateCodec(6);
+    static final SerializableAvroCodecFactory DEFAULT_SERIALIZABLE_CODEC =
+        new SerializableAvroCodecFactory(DEFAULT_CODEC);
+
+    @Nullable
+    abstract SerializableFunction<UserT, OutputT> getFormatFunction();
 
     @Nullable abstract ValueProvider<ResourceId> getFilenamePrefix();
     @Nullable abstract String getShardTemplate();
@@ -590,11 +670,16 @@ public class AvroIO {
     abstract ValueProvider<ResourceId> getTempDirectory();
 
     abstract int getNumShards();
-    @Nullable abstract Class<T> getRecordClass();
+
+    abstract boolean getGenericRecords();
+
     @Nullable abstract Schema getSchema();
     abstract boolean getWindowedWrites();
     @Nullable abstract FilenamePolicy getFilenamePolicy();
 
+    @Nullable
+    abstract DynamicAvroDestinations<UserT, ?, OutputT> getDynamicDestinations();
+
     /**
      * The codec used to encode the blocks in the Avro file. String value drawn from those in
      * https://avro.apache.org/docs/1.7.7/api/java/org/apache/avro/file/CodecFactory.html
@@ -603,25 +688,39 @@ public class AvroIO {
     /** Avro file metadata. */
     abstract ImmutableMap<String, Object> getMetadata();
 
-    abstract Builder<T> toBuilder();
+    abstract Builder<UserT, OutputT> toBuilder();
 
     @AutoValue.Builder
-    abstract static class Builder<T> {
-      abstract Builder<T> setFilenamePrefix(ValueProvider<ResourceId> filenamePrefix);
-      abstract Builder<T> setFilenameSuffix(String filenameSuffix);
+    abstract static class Builder<UserT, OutputT> {
+      abstract Builder<UserT, OutputT> setFormatFunction(
+          SerializableFunction<UserT, OutputT> formatFunction);
 
-      abstract Builder<T> setTempDirectory(ValueProvider<ResourceId> tempDirectory);
+      abstract Builder<UserT, OutputT> setFilenamePrefix(ValueProvider<ResourceId> filenamePrefix);
 
-      abstract Builder<T> setNumShards(int numShards);
-      abstract Builder<T> setShardTemplate(String shardTemplate);
-      abstract Builder<T> setRecordClass(Class<T> recordClass);
-      abstract Builder<T> setSchema(Schema schema);
-      abstract Builder<T> setWindowedWrites(boolean windowedWrites);
-      abstract Builder<T> setFilenamePolicy(FilenamePolicy filenamePolicy);
-      abstract Builder<T> setCodec(SerializableAvroCodecFactory codec);
-      abstract Builder<T> setMetadata(ImmutableMap<String, Object> metadata);
+      abstract Builder<UserT, OutputT> setFilenameSuffix(String filenameSuffix);
+
+      abstract Builder<UserT, OutputT> setTempDirectory(ValueProvider<ResourceId> tempDirectory);
+
+      abstract Builder<UserT, OutputT> setNumShards(int numShards);
+
+      abstract Builder<UserT, OutputT> setShardTemplate(String shardTemplate);
+
+      abstract Builder<UserT, OutputT> setGenericRecords(boolean genericRecords);
 
-      abstract Write<T> build();
+      abstract Builder<UserT, OutputT> setSchema(Schema schema);
+
+      abstract Builder<UserT, OutputT> setWindowedWrites(boolean windowedWrites);
+
+      abstract Builder<UserT, OutputT> setFilenamePolicy(FilenamePolicy filenamePolicy);
+
+      abstract Builder<UserT, OutputT> setCodec(SerializableAvroCodecFactory codec);
+
+      abstract Builder<UserT, OutputT> setMetadata(ImmutableMap<String, Object> metadata);
+
+      abstract Builder<UserT, OutputT> setDynamicDestinations(
+          DynamicAvroDestinations<UserT, ?, OutputT> dynamicDestinations);
+
+      abstract TypedWrite<UserT, OutputT> build();
     }
 
     /**
@@ -635,7 +734,7 @@ public class AvroIO {
      * common suffix (if supplied using {@link #withSuffix(String)}). This default can be overridden
      * using {@link #to(FilenamePolicy)}.
      */
-    public Write<T> to(String outputPrefix) {
+    public TypedWrite<UserT, OutputT> to(String outputPrefix) {
       return to(FileBasedSink.convertToFileResourceIfPossible(outputPrefix));
     }
 
@@ -658,14 +757,12 @@ public class AvroIO {
      * infer a directory for temporary files.
      */
     @Experimental(Kind.FILESYSTEM)
-    public Write<T> to(ResourceId outputPrefix) {
+    public TypedWrite<UserT, OutputT> to(ResourceId outputPrefix) {
       return toResource(StaticValueProvider.of(outputPrefix));
     }
 
-    /**
-     * Like {@link #to(String)}.
-     */
-    public Write<T> to(ValueProvider<String> outputPrefix) {
+    /** Like {@link #to(String)}. */
+    public TypedWrite<UserT, OutputT> to(ValueProvider<String> outputPrefix) {
       return toResource(NestedValueProvider.of(outputPrefix,
           new SerializableFunction<String, ResourceId>() {
             @Override
@@ -675,11 +772,9 @@ public class AvroIO {
           }));
     }
 
-    /**
-     * Like {@link #to(ResourceId)}.
-     */
+    /** Like {@link #to(ResourceId)}. */
     @Experimental(Kind.FILESYSTEM)
-    public Write<T> toResource(ValueProvider<ResourceId> outputPrefix) {
+    public TypedWrite<UserT, OutputT> toResource(ValueProvider<ResourceId> outputPrefix) {
       return toBuilder().setFilenamePrefix(outputPrefix).build();
     }
 
@@ -687,16 +782,52 @@ public class AvroIO {
      * Writes to files named according to the given {@link FileBasedSink.FilenamePolicy}. A
      * directory for temporary files must be specified using {@link #withTempDirectory}.
      */
-    public Write<T> to(FilenamePolicy filenamePolicy) {
+    @Experimental(Kind.FILESYSTEM)
+    public TypedWrite<UserT, OutputT> to(FilenamePolicy filenamePolicy) {
       return toBuilder().setFilenamePolicy(filenamePolicy).build();
     }
 
+    /**
+     * Use a {@link DynamicAvroDestinations} object to vend {@link FilenamePolicy} objects. These
+     * objects can examine the input record when creating a {@link FilenamePolicy}. A directory for
+     * temporary files must be specified using {@link #withTempDirectory}.
+     */
+    @Experimental(Kind.FILESYSTEM)
+    public TypedWrite<UserT, OutputT> to(
+        DynamicAvroDestinations<UserT, ?, OutputT> dynamicDestinations) {
+      return toBuilder().setDynamicDestinations(dynamicDestinations).build();
+    }
+
+    /**
+     * Sets the the output schema. Can only be used when the output type is {@link GenericRecord}
+     * and when not using {@link #to(DynamicAvroDestinations)}.
+     */
+    public TypedWrite<UserT, OutputT> withSchema(Schema schema) {
+      return toBuilder().setSchema(schema).build();
+    }
+
+    /**
+     * Specifies a format function to convert {@link UserT} to the output type. If {@link
+     * #to(DynamicAvroDestinations)} is used, {@link DynamicAvroDestinations#formatRecord} must be
+     * used instead.
+     */
+    public TypedWrite<UserT, OutputT> withFormatFunction(
+        SerializableFunction<UserT, OutputT> formatFunction) {
+      return toBuilder().setFormatFunction(formatFunction).build();
+    }
+
     /** Set the base directory used to generate temporary files. */
     @Experimental(Kind.FILESYSTEM)
-    public Write<T> withTempDirectory(ValueProvider<ResourceId> tempDirectory) {
+    public TypedWrite<UserT, OutputT> withTempDirectory(ValueProvider<ResourceId> tempDirectory) {
       return toBuilder().setTempDirectory(tempDirectory).build();
     }
 
+    /** Set the base directory used to generate temporary files. */
+    @Experimental(Kind.FILESYSTEM)
+    public TypedWrite<UserT, OutputT> withTempDirectory(ResourceId tempDirectory) {
+      return withTempDirectory(StaticValueProvider.of(tempDirectory));
+    }
+
     /**
      * Uses the given {@link ShardNameTemplate} for naming output files. This option may only be
      * used when using one of the default filename-prefix to() overrides.
@@ -704,7 +835,7 @@ public class AvroIO {
      * <p>See {@link DefaultFilenamePolicy} for how the prefix, shard name template, and suffix are
      * used.
      */
-    public Write<T> withShardNameTemplate(String shardTemplate) {
+    public TypedWrite<UserT, OutputT> withShardNameTemplate(String shardTemplate) {
       return toBuilder().setShardTemplate(shardTemplate).build();
     }
 
@@ -715,7 +846,7 @@ public class AvroIO {
      * <p>See {@link DefaultFilenamePolicy} for how the prefix, shard name template, and suffix are
      * used.
      */
-    public Write<T> withSuffix(String filenameSuffix) {
+    public TypedWrite<UserT, OutputT> withSuffix(String filenameSuffix) {
       return toBuilder().setFilenameSuffix(filenameSuffix).build();
     }
 
@@ -729,7 +860,7 @@ public class AvroIO {
      *
      * @param numShards the number of shards to use, or 0 to let the system decide.
      */
-    public Write<T> withNumShards(int numShards) {
+    public TypedWrite<UserT, OutputT> withNumShards(int numShards) {
       checkArgument(numShards >= 0);
       return toBuilder().setNumShards(numShards).build();
     }
@@ -744,7 +875,7 @@ public class AvroIO {
      *
      * <p>This is equivalent to {@code .withNumShards(1).withShardNameTemplate("")}
      */
-    public Write<T> withoutSharding() {
+    public TypedWrite<UserT, OutputT> withoutSharding() {
       return withNumShards(1).withShardNameTemplate("");
     }
 
@@ -754,12 +885,12 @@ public class AvroIO {
      * <p>If using {@link #to(FileBasedSink.FilenamePolicy)}. Filenames will be generated using
      * {@link FilenamePolicy#windowedFilename}. See also {@link WriteFiles#withWindowedWrites()}.
      */
-    public Write<T> withWindowedWrites() {
+    public TypedWrite<UserT, OutputT> withWindowedWrites() {
       return toBuilder().setWindowedWrites(true).build();
     }
 
     /** Writes to Avro file(s) compressed using specified codec. */
-    public Write<T> withCodec(CodecFactory codec) {
+    public TypedWrite<UserT, OutputT> withCodec(CodecFactory codec) {
       return toBuilder().setCodec(new SerializableAvroCodecFactory(codec)).build();
     }
 
@@ -768,7 +899,7 @@ public class AvroIO {
      *
      * <p>Supported value types are String, Long, and byte[].
      */
-    public Write<T> withMetadata(Map<String, Object> metadata) {
+    public TypedWrite<UserT, OutputT> withMetadata(Map<String, Object> metadata) {
       Map<String, String> badKeys = Maps.newLinkedHashMap();
       for (Map.Entry<String, Object> entry : metadata.entrySet()) {
         Object v = entry.getValue();
@@ -783,18 +914,31 @@ public class AvroIO {
       return toBuilder().setMetadata(ImmutableMap.copyOf(metadata)).build();
     }
 
-    DynamicDestinations<T, Void> resolveDynamicDestinations() {
-      FilenamePolicy usedFilenamePolicy = getFilenamePolicy();
-      if (usedFilenamePolicy == null) {
-        usedFilenamePolicy =
-            DefaultFilenamePolicy.fromStandardParameters(
-                getFilenamePrefix(), getShardTemplate(), getFilenameSuffix(), getWindowedWrites());
+    DynamicAvroDestinations<UserT, ?, OutputT> resolveDynamicDestinations() {
+      DynamicAvroDestinations<UserT, ?, OutputT> dynamicDestinations = getDynamicDestinations();
+      if (dynamicDestinations == null) {
+        FilenamePolicy usedFilenamePolicy = getFilenamePolicy();
+        if (usedFilenamePolicy == null) {
+          usedFilenamePolicy =
+              DefaultFilenamePolicy.fromStandardParameters(
+                  getFilenamePrefix(),
+                  getShardTemplate(),
+                  getFilenameSuffix(),
+                  getWindowedWrites());
+        }
+        dynamicDestinations =
+            constantDestinations(
+                usedFilenamePolicy,
+                getSchema(),
+                getMetadata(),
+                getCodec().getCodec(),
+                getFormatFunction());
       }
-      return DynamicFileDestinations.constant(usedFilenamePolicy);
+      return dynamicDestinations;
     }
 
     @Override
-    public PDone expand(PCollection<T> input) {
+    public PDone expand(PCollection<UserT> input) {
       checkArgument(
           getFilenamePrefix() != null || getTempDirectory() != null,
           "Need to set either the filename prefix or the tempDirectory of a AvroIO.Write "
@@ -805,24 +949,25 @@ public class AvroIO {
             "shardTemplate and filenameSuffix should only be used with the default "
                 + "filename policy");
       }
+      if (getDynamicDestinations() != null) {
+        checkArgument(
+            getFormatFunction() == null,
+            "A format function should not be specified "
+                + "with DynamicDestinations. Use DynamicDestinations.formatRecord instead");
+      }
+
       return expandTyped(input, resolveDynamicDestinations());
     }
 
     public <DestinationT> PDone expandTyped(
-        PCollection<T> input, DynamicDestinations<T, DestinationT> dynamicDestinations) {
+        PCollection<UserT> input,
+        DynamicAvroDestinations<UserT, DestinationT, OutputT> dynamicDestinations) {
       ValueProvider<ResourceId> tempDirectory = getTempDirectory();
       if (tempDirectory == null) {
         tempDirectory = getFilenamePrefix();
       }
-      WriteFiles<T, DestinationT, T> write =
-          WriteFiles.to(
-              new AvroSink<>(
-                  tempDirectory,
-                  dynamicDestinations,
-                  AvroCoder.of(getRecordClass(), getSchema()),
-                  getCodec(),
-                  getMetadata()),
-              SerializableFunctions.<T>identity());
+      WriteFiles<UserT, DestinationT, OutputT> write =
+          WriteFiles.to(new AvroSink<>(tempDirectory, dynamicDestinations, getGenericRecords()));
       if (getNumShards() > 0) {
         write = write.withNumShards(getNumShards());
       }
@@ -845,33 +990,11 @@ public class AvroIO {
                 : getTempDirectory().toString();
       }
       builder
-          .add(DisplayData.item("schema", getRecordClass()).withLabel("Record Schema"))
           .addIfNotDefault(
               DisplayData.item("numShards", getNumShards()).withLabel("Maximum Output Shards"), 0)
-          .addIfNotDefault(
-              DisplayData.item("codec", getCodec().toString()).withLabel("Avro Compression Codec"),
-              DEFAULT_CODEC.toString())
           .addIfNotNull(
               DisplayData.item("tempDirectory", tempDirectory)
                   .withLabel("Directory for temporary files"));
-      builder.include("Metadata", new Metadata());
-    }
-
-    private class Metadata implements HasDisplayData {
-      @Override
-      public void populateDisplayData(DisplayData.Builder builder) {
-        for (Map.Entry<String, Object> entry : getMetadata().entrySet()) {
-          DisplayData.Type type = DisplayData.inferType(entry.getValue());
-          if (type != null) {
-            builder.add(DisplayData.item(entry.getKey(), type, entry.getValue()));
-          } else {
-            String base64 = BaseEncoding.base64().encode((byte[]) entry.getValue());
-            String repr = base64.length() <= METADATA_BYTES_MAX_LENGTH
-                ? base64 : base64.substring(0, METADATA_BYTES_MAX_LENGTH) + "...";
-            builder.add(DisplayData.item(entry.getKey(), repr));
-          }
-        }
-      }
     }
 
     @Override
@@ -880,6 +1003,131 @@ public class AvroIO {
     }
   }
 
+  /**
+   * This class is used as the default return value of {@link AvroIO#write}
+   *
+   * <p>All methods in this class delegate to the appropriate method of {@link AvroIO.TypedWrite}.
+   * This class exists for backwards compatibility, and will be removed in Beam 3.0.
+   */
+  public static class Write<T> extends PTransform<PCollection<T>, PDone> {
+    @VisibleForTesting TypedWrite<T, T> inner;
+
+    Write(TypedWrite<T, T> inner) {
+      this.inner = inner;
+    }
+
+    /** See {@link TypedWrite#to(String)}. */
+    public Write<T> to(String outputPrefix) {
+      return new Write<>(
+          inner
+              .to(FileBasedSink.convertToFileResourceIfPossible(outputPrefix))
+              .withFormatFunction(SerializableFunctions.<T>identity()));
+    }
+
+    /** See {@link TypedWrite#to(ResourceId)} . */
+    @Experimental(Kind.FILESYSTEM)
+    public Write<T> to(ResourceId outputPrefix) {
+      return new Write<T>(
+          inner.to(outputPrefix).withFormatFunction(SerializableFunctions.<T>identity()));
+    }
+
+    /** See {@link TypedWrite#to(ValueProvider)}. */
+    public Write<T> to(ValueProvider<String> outputPrefix) {
+      return new Write<>(
+          inner.to(outputPrefix).withFormatFunction(SerializableFunctions.<T>identity()));
+    }
+
+    /** See {@link TypedWrite#to(ResourceId)}. */
+    @Experimental(Kind.FILESYSTEM)
+    public Write<T> toResource(ValueProvider<ResourceId> outputPrefix) {
+      return new Write<>(
+          inner.toResource(outputPrefix).withFormatFunction(SerializableFunctions.<T>identity()));
+    }
+
+    /** See {@link TypedWrite#to(FilenamePolicy)}. */
+    public Write<T> to(FilenamePolicy filenamePolicy) {
+      return new Write<>(
+          inner.to(filenamePolicy).withFormatFunction(SerializableFunctions.<T>identity()));
+    }
+
+    /** See {@link TypedWrite#to(DynamicAvroDestinations)}. */
+    public Write to(DynamicAvroDestinations<T, ?, T> dynamicDestinations) {
+      return new Write<>(inner.to(dynamicDestinations).withFormatFunction(null));
+    }
+
+    /** See {@link TypedWrite#withSchema}. */
+    public Write withSchema(Schema schema) {
+      return new Write<>(inner.withSchema(schema));
+    }
+    /** See {@link TypedWrite#withTempDirectory(ValueProvider)}. */
+    @Experimental(Kind.FILESYSTEM)
+    public Write<T> withTempDirectory(ValueProvider<ResourceId> tempDirectory) {
+      return new Write<>(inner.withTempDirectory(tempDirectory));
+    }
+
+    /** See {@link TypedWrite#withTempDirectory(ResourceId)}. */
+    public Write<T> withTempDirectory(ResourceId tempDirectory) {
+      return new Write<>(inner.withTempDirectory(tempDirectory));
+    }
+
+    /** See {@link TypedWrite#withShardNameTemplate}. */
+    public Write<T> withShardNameTemplate(String shardTemplate) {
+      return new Write<>(inner.withShardNameTemplate(shardTemplate));
+    }
+
+    /** See {@link TypedWrite#withSuffix}. */
+    public Write<T> withSuffix(String filenameSuffix) {
+      return new Write<>(inner.withSuffix(filenameSuffix));
+    }
+
+    /** See {@link TypedWrite#withNumShards}. */
+    public Write<T> withNumShards(int numShards) {
+      return new Write<>(inner.withNumShards(numShards));
+    }
+
+    /** See {@link TypedWrite#withoutSharding}. */
+    public Write<T> withoutSharding() {
+      return new Write<>(inner.withoutSharding());
+    }
+
+    /** See {@link TypedWrite#withWindowedWrites}. */
+    public Write withWindowedWrites() {
+      return new Write<T>(inner.withWindowedWrites());
+    }
+
+    /** See {@link TypedWrite#withCodec}. */
+    public Write<T> withCodec(CodecFactory codec) {
+      return new Write<>(inner.withCodec(codec));
+    }
+
+    /** See {@link TypedWrite#withMetadata} . */
+    public Write withMetadata(Map<String, Object> metadata) {
+      return new Write<>(inner.withMetadata(metadata));
+    }
+
+    @Override
+    public PDone expand(PCollection<T> input) {
+      return inner.expand(input);
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      inner.populateDisplayData(builder);
+    }
+  }
+
+  /**
+   * Returns a {@link DynamicAvroDestinations} that always returns the same {@link FilenamePolicy},
+   * schema, metadata, and codec.
+   */
+  public static <UserT, OutputT> DynamicAvroDestinations<UserT, Void, OutputT> constantDestinations(
+      FilenamePolicy filenamePolicy,
+      Schema schema,
+      Map<String, Object> metadata,
+      CodecFactory codec,
+      SerializableFunction<UserT, OutputT> formatFunction) {
+    return new ConstantAvroDestination<>(filenamePolicy, schema, metadata, codec, formatFunction);
+  }
   /////////////////////////////////////////////////////////////////////////////
 
   /** Disallow construction of utility class. */

http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java
index c78870b..acd3ea6 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java
@@ -17,93 +17,90 @@
  */
 package org.apache.beam.sdk.io;
 
-import com.google.common.collect.ImmutableMap;
 import java.nio.channels.Channels;
 import java.nio.channels.WritableByteChannel;
 import java.util.Map;
+import org.apache.avro.Schema;
+import org.apache.avro.file.CodecFactory;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericDatumWriter;
-import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.io.DatumWriter;
 import org.apache.avro.reflect.ReflectDatumWriter;
-import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.util.MimeTypes;
 
 /** A {@link FileBasedSink} for Avro files. */
-class AvroSink<T, DestinationT> extends FileBasedSink<T, DestinationT> {
-  private final AvroCoder<T> coder;
-  private final SerializableAvroCodecFactory codec;
-  private final ImmutableMap<String, Object> metadata;
+class AvroSink<UserT, DestinationT, OutputT> extends FileBasedSink<UserT, DestinationT, OutputT> {
+  private final DynamicAvroDestinations<UserT, DestinationT, OutputT> dynamicDestinations;
+  private final boolean genericRecords;
 
   AvroSink(
       ValueProvider<ResourceId> outputPrefix,
-      DynamicDestinations<T, DestinationT> dynamicDestinations,
-      AvroCoder<T> coder,
-      SerializableAvroCodecFactory codec,
-      ImmutableMap<String, Object> metadata) {
+      DynamicAvroDestinations<UserT, DestinationT, OutputT> dynamicDestinations,
+      boolean genericRecords) {
     // Avro handle compression internally using the codec.
     super(outputPrefix, dynamicDestinations, CompressionType.UNCOMPRESSED);
-    this.coder = coder;
-    this.codec = codec;
-    this.metadata = metadata;
+    this.dynamicDestinations = dynamicDestinations;
+    this.genericRecords = genericRecords;
   }
 
   @Override
-  public WriteOperation<T, DestinationT> createWriteOperation() {
-    return new AvroWriteOperation<>(this, coder, codec, metadata);
+  public DynamicAvroDestinations<UserT, DestinationT, OutputT> getDynamicDestinations() {
+    return (DynamicAvroDestinations<UserT, DestinationT, OutputT>) super.getDynamicDestinations();
+  }
+
+  @Override
+  public WriteOperation<DestinationT, OutputT> createWriteOperation() {
+    return new AvroWriteOperation<>(this, genericRecords);
   }
 
   /** A {@link WriteOperation WriteOperation} for Avro files. */
-  private static class AvroWriteOperation<T, DestinationT> extends WriteOperation<T, DestinationT> {
-    private final AvroCoder<T> coder;
-    private final SerializableAvroCodecFactory codec;
-    private final ImmutableMap<String, Object> metadata;
+  private static class AvroWriteOperation<DestinationT, OutputT>
+      extends WriteOperation<DestinationT, OutputT> {
+    private final DynamicAvroDestinations<?, DestinationT, ?> dynamicDestinations;
+    private final boolean genericRecords;
 
-    private AvroWriteOperation(
-        AvroSink<T, DestinationT> sink,
-        AvroCoder<T> coder,
-        SerializableAvroCodecFactory codec,
-        ImmutableMap<String, Object> metadata) {
+    private AvroWriteOperation(AvroSink<?, DestinationT, OutputT> sink, boolean genericRecords) {
       super(sink);
-      this.coder = coder;
-      this.codec = codec;
-      this.metadata = metadata;
+      this.dynamicDestinations = sink.getDynamicDestinations();
+      this.genericRecords = genericRecords;
     }
 
     @Override
-    public Writer<T, DestinationT> createWriter() throws Exception {
-      return new AvroWriter<>(this, coder, codec, metadata);
+    public Writer<DestinationT, OutputT> createWriter() throws Exception {
+      return new AvroWriter<>(this, dynamicDestinations, genericRecords);
     }
   }
 
   /** A {@link Writer Writer} for Avro files. */
-  private static class AvroWriter<T, DestinationT> extends Writer<T, DestinationT> {
-    private final AvroCoder<T> coder;
-    private DataFileWriter<T> dataFileWriter;
-    private SerializableAvroCodecFactory codec;
-    private final ImmutableMap<String, Object> metadata;
+  private static class AvroWriter<DestinationT, OutputT> extends Writer<DestinationT, OutputT> {
+    private DataFileWriter<OutputT> dataFileWriter;
+    private final DynamicAvroDestinations<?, DestinationT, ?> dynamicDestinations;
+    private final boolean genericRecords;
 
     public AvroWriter(
-        WriteOperation<T, DestinationT> writeOperation,
-        AvroCoder<T> coder,
-        SerializableAvroCodecFactory codec,
-        ImmutableMap<String, Object> metadata) {
+        WriteOperation<DestinationT, OutputT> writeOperation,
+        DynamicAvroDestinations<?, DestinationT, ?> dynamicDestinations,
+        boolean genericRecords) {
       super(writeOperation, MimeTypes.BINARY);
-      this.coder = coder;
-      this.codec = codec;
-      this.metadata = metadata;
+      this.dynamicDestinations = dynamicDestinations;
+      this.genericRecords = genericRecords;
     }
 
     @SuppressWarnings("deprecation") // uses internal test functionality.
     @Override
     protected void prepareWrite(WritableByteChannel channel) throws Exception {
-      DatumWriter<T> datumWriter = coder.getType().equals(GenericRecord.class)
-          ? new GenericDatumWriter<T>(coder.getSchema())
-          : new ReflectDatumWriter<T>(coder.getSchema());
+      DestinationT destination = getDestination();
+      CodecFactory codec = dynamicDestinations.getCodec(destination);
+      Schema schema = dynamicDestinations.getSchema(destination);
+      Map<String, Object> metadata = dynamicDestinations.getMetadata(destination);
 
-      dataFileWriter = new DataFileWriter<>(datumWriter).setCodec(codec.getCodec());
+      DatumWriter<OutputT> datumWriter =
+          genericRecords
+              ? new GenericDatumWriter<OutputT>(schema)
+              : new ReflectDatumWriter<OutputT>(schema);
+      dataFileWriter = new DataFileWriter<>(datumWriter).setCodec(codec);
       for (Map.Entry<String, Object> entry : metadata.entrySet()) {
         Object v = entry.getValue();
         if (v instanceof String) {
@@ -118,11 +115,11 @@ class AvroSink<T, DestinationT> extends FileBasedSink<T, DestinationT> {
                   + v.getClass().getSimpleName());
         }
       }
-      dataFileWriter.create(coder.getSchema(), Channels.newOutputStream(channel));
+      dataFileWriter.create(schema, Channels.newOutputStream(channel));
     }
 
     @Override
-    public void write(T value) throws Exception {
+    public void write(OutputT value) throws Exception {
       dataFileWriter.append(value);
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ConstantAvroDestination.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ConstantAvroDestination.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ConstantAvroDestination.java
new file mode 100644
index 0000000..b006e26
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ConstantAvroDestination.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io;
+
+import com.google.common.base.Function;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
+import com.google.common.io.BaseEncoding;
+import java.io.Serializable;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.avro.Schema;
+import org.apache.avro.file.CodecFactory;
+import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.HasDisplayData;
+
+/** Always returns a constant {@link FilenamePolicy}, {@link Schema}, metadata, and codec. */
+class ConstantAvroDestination<UserT, OutputT>
+    extends DynamicAvroDestinations<UserT, Void, OutputT> {
+  private static class SchemaFunction implements Serializable, Function<String, Schema> {
+    @Nullable
+    @Override
+    public Schema apply(@Nullable String input) {
+      return new Schema.Parser().parse(input);
+    }
+  }
+
+  // This should be a multiple of 4 to not get a partial encoded byte.
+  private static final int METADATA_BYTES_MAX_LENGTH = 40;
+  private final FilenamePolicy filenamePolicy;
+  private final Supplier<Schema> schema;
+  private final Map<String, Object> metadata;
+  private final SerializableAvroCodecFactory codec;
+  private final SerializableFunction<UserT, OutputT> formatFunction;
+
+  private class Metadata implements HasDisplayData {
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      for (Map.Entry<String, Object> entry : metadata.entrySet()) {
+        DisplayData.Type type = DisplayData.inferType(entry.getValue());
+        if (type != null) {
+          builder.add(DisplayData.item(entry.getKey(), type, entry.getValue()));
+        } else {
+          String base64 = BaseEncoding.base64().encode((byte[]) entry.getValue());
+          String repr =
+              base64.length() <= METADATA_BYTES_MAX_LENGTH
+                  ? base64
+                  : base64.substring(0, METADATA_BYTES_MAX_LENGTH) + "...";
+          builder.add(DisplayData.item(entry.getKey(), repr));
+        }
+      }
+    }
+  }
+
+  public ConstantAvroDestination(
+      FilenamePolicy filenamePolicy,
+      Schema schema,
+      Map<String, Object> metadata,
+      CodecFactory codec,
+      SerializableFunction<UserT, OutputT> formatFunction) {
+    this.filenamePolicy = filenamePolicy;
+    this.schema = Suppliers.compose(new SchemaFunction(), Suppliers.ofInstance(schema.toString()));
+    this.metadata = metadata;
+    this.codec = new SerializableAvroCodecFactory(codec);
+    this.formatFunction = formatFunction;
+  }
+
+  @Override
+  public OutputT formatRecord(UserT record) {
+    return formatFunction.apply(record);
+  }
+
+  @Override
+  public Void getDestination(UserT element) {
+    return (Void) null;
+  }
+
+  @Override
+  public Void getDefaultDestination() {
+    return (Void) null;
+  }
+
+  @Override
+  public FilenamePolicy getFilenamePolicy(Void destination) {
+    return filenamePolicy;
+  }
+
+  @Override
+  public Schema getSchema(Void destination) {
+    return schema.get();
+  }
+
+  @Override
+  public Map<String, Object> getMetadata(Void destination) {
+    return metadata;
+  }
+
+  @Override
+  public CodecFactory getCodec(Void destination) {
+    return codec.getCodec();
+  }
+
+  @Override
+  public void populateDisplayData(DisplayData.Builder builder) {
+    filenamePolicy.populateDisplayData(builder);
+    builder.add(DisplayData.item("schema", schema.get().toString()).withLabel("Record Schema"));
+    builder.addIfNotDefault(
+        DisplayData.item("codec", codec.getCodec().toString()).withLabel("Avro Compression Codec"),
+        AvroIO.TypedWrite.DEFAULT_SERIALIZABLE_CODEC.toString());
+    builder.include("Metadata", new Metadata());
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java
index 4021609..1f438d5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java
@@ -157,7 +157,6 @@ public final class DefaultFilenamePolicy extends FilenamePolicy {
           && shardTemplate.equals(other.shardTemplate)
           && suffix.equals(other.suffix);
     }
-
     @Override
     public String toString() {
       return MoreObjects.toStringHelper(this)

http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DynamicAvroDestinations.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DynamicAvroDestinations.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DynamicAvroDestinations.java
new file mode 100644
index 0000000..f4e8ee6
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DynamicAvroDestinations.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
+import org.apache.avro.Schema;
+import org.apache.avro.file.CodecFactory;
+import org.apache.beam.sdk.io.FileBasedSink.DynamicDestinations;
+
+/**
+ * A specialization of {@link DynamicDestinations} for {@link AvroIO}. In addition to dynamic file
+ * destinations, this allows specifying other AVRO properties (schema, metadata, codec) per
+ * destination.
+ */
+public abstract class DynamicAvroDestinations<UserT, DestinationT, OutputT>
+    extends DynamicDestinations<UserT, DestinationT, OutputT> {
+  /** Return an AVRO schema for a given destination. */
+  public abstract Schema getSchema(DestinationT destination);
+
+  /** Return AVRO file metadata for a given destination. */
+  public Map<String, Object> getMetadata(DestinationT destination) {
+    return ImmutableMap.<String, Object>of();
+  }
+
+  /** Return an AVRO codec for a given destination. */
+  public CodecFactory getCodec(DestinationT destination) {
+    return AvroIO.TypedWrite.DEFAULT_CODEC;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/9f2622fa/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DynamicFileDestinations.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DynamicFileDestinations.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DynamicFileDestinations.java
index d05a01a7..b087bc5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DynamicFileDestinations.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DynamicFileDestinations.java
@@ -18,7 +18,6 @@
 
 package org.apache.beam.sdk.io;
 
-import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 
 import javax.annotation.Nullable;
@@ -28,20 +27,30 @@ import org.apache.beam.sdk.io.DefaultFilenamePolicy.ParamsCoder;
 import org.apache.beam.sdk.io.FileBasedSink.DynamicDestinations;
 import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
 import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.SerializableFunctions;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 
 /** Some helper classes that derive from {@link FileBasedSink.DynamicDestinations}. */
 public class DynamicFileDestinations {
   /** Always returns a constant {@link FilenamePolicy}. */
-  private static class ConstantFilenamePolicy<T> extends DynamicDestinations<T, Void> {
+  private static class ConstantFilenamePolicy<UserT, OutputT>
+      extends DynamicDestinations<UserT, Void, OutputT> {
     private final FilenamePolicy filenamePolicy;
+    private final SerializableFunction<UserT, OutputT> formatFunction;
 
-    public ConstantFilenamePolicy(FilenamePolicy filenamePolicy) {
-      this.filenamePolicy = checkNotNull(filenamePolicy);
+    public ConstantFilenamePolicy(
+        FilenamePolicy filenamePolicy, SerializableFunction<UserT, OutputT> formatFunction) {
+      this.filenamePolicy = filenamePolicy;
+      this.formatFunction = formatFunction;
     }
 
     @Override
-    public Void getDestination(T element) {
+    public OutputT formatRecord(UserT record) {
+      return formatFunction.apply(record);
+    }
+
+    @Override
+    public Void getDestination(UserT element) {
       return (Void) null;
     }
 
@@ -71,14 +80,24 @@ public class DynamicFileDestinations {
    * A base class for a {@link DynamicDestinations} object that returns differently-configured
    * instances of {@link DefaultFilenamePolicy}.
    */
-  private static class DefaultPolicyDestinations<UserT> extends DynamicDestinations<UserT, Params> {
-    SerializableFunction<UserT, Params> destinationFunction;
-    Params emptyDestination;
+  private static class DefaultPolicyDestinations<UserT, OutputT>
+      extends DynamicDestinations<UserT, Params, OutputT> {
+    private final SerializableFunction<UserT, Params> destinationFunction;
+    private final Params emptyDestination;
+    private final SerializableFunction<UserT, OutputT> formatFunction;
 
     public DefaultPolicyDestinations(
-        SerializableFunction<UserT, Params> destinationFunction, Params emptyDestination) {
+        SerializableFunction<UserT, Params> destinationFunction,
+        Params emptyDestination,
+        SerializableFunction<UserT, OutputT> formatFunction) {
       this.destinationFunction = destinationFunction;
       this.emptyDestination = emptyDestination;
+      this.formatFunction = formatFunction;
+    }
+
+    @Override
+    public OutputT formatRecord(UserT record) {
+      return formatFunction.apply(record);
     }
 
     @Override
@@ -104,16 +123,28 @@ public class DynamicFileDestinations {
   }
 
   /** Returns a {@link DynamicDestinations} that always returns the same {@link FilenamePolicy}. */
-  public static <T> DynamicDestinations<T, Void> constant(FilenamePolicy filenamePolicy) {
-    return new ConstantFilenamePolicy<>(filenamePolicy);
+  public static <UserT, OutputT> DynamicDestinations<UserT, Void, OutputT> constant(
+      FilenamePolicy filenamePolicy, SerializableFunction<UserT, OutputT> formatFunction) {
+    return new ConstantFilenamePolicy<>(filenamePolicy, formatFunction);
+  }
+
+  /**
+   * A specialization of {@link #constant(FilenamePolicy, SerializableFunction)} for the case where
+   * UserT and OutputT are the same type and the format function is the identity.
+   */
+  public static <UserT> DynamicDestinations<UserT, Void, UserT> constant(
+      FilenamePolicy filenamePolicy) {
+    return new ConstantFilenamePolicy<>(filenamePolicy, SerializableFunctions.<UserT>identity());
   }
 
   /**
    * Returns a {@link DynamicDestinations} that returns instances of {@link DefaultFilenamePolicy}
    * configured with the given {@link Params}.
    */
-  public static <UserT> DynamicDestinations<UserT, Params> toDefaultPolicies(
-      SerializableFunction<UserT, Params> destinationFunction, Params emptyDestination) {
-    return new DefaultPolicyDestinations<>(destinationFunction, emptyDestination);
+  public static <UserT, OutputT> DynamicDestinations<UserT, Params, OutputT> toDefaultPolicies(
+      SerializableFunction<UserT, Params> destinationFunction,
+      Params emptyDestination,
+      SerializableFunction<UserT, OutputT> formatFunction) {
+    return new DefaultPolicyDestinations<>(destinationFunction, emptyDestination, formatFunction);
   }
 }