You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/07/11 01:24:06 UTC

[3/5] beam git commit: Adds DynamicDestinations support to FileBasedSink

http://git-wip-us.apache.org/repos/asf/beam/blob/77ba7a35/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DynamicFileDestinations.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DynamicFileDestinations.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DynamicFileDestinations.java
new file mode 100644
index 0000000..e7ef0f6
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DynamicFileDestinations.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io;
+
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.DefaultFilenamePolicy.Params;
+import org.apache.beam.sdk.io.DefaultFilenamePolicy.ParamsCoder;
+import org.apache.beam.sdk.io.FileBasedSink.DynamicDestinations;
+import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+
+/** Some helper classes that derive from {@link FileBasedSink.DynamicDestinations}. */
+public class DynamicFileDestinations {
+  /** Always returns a constant {@link FilenamePolicy}. */
+  private static class ConstantFilenamePolicy<T> extends DynamicDestinations<T, Void> {
+    private final FilenamePolicy filenamePolicy;
+
+    public ConstantFilenamePolicy(FilenamePolicy filenamePolicy) {
+      this.filenamePolicy = filenamePolicy;
+    }
+
+    @Override
+    public Void getDestination(T element) {
+      return (Void) null;
+    }
+
+    @Override
+    public Coder<Void> getDestinationCoder() {
+      return null;
+    }
+
+    @Override
+    public Void getDefaultDestination() {
+      return (Void) null;
+    }
+
+    @Override
+    public FilenamePolicy getFilenamePolicy(Void destination) {
+      return filenamePolicy;
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      filenamePolicy.populateDisplayData(builder);
+    }
+  }
+
+  /**
+   * A base class for a {@link DynamicDestinations} object that returns differently-configured
+   * instances of {@link DefaultFilenamePolicy}.
+   */
+  private static class DefaultPolicyDestinations<UserT> extends DynamicDestinations<UserT, Params> {
+    SerializableFunction<UserT, Params> destinationFunction;
+    Params emptyDestination;
+
+    public DefaultPolicyDestinations(
+        SerializableFunction<UserT, Params> destinationFunction, Params emptyDestination) {
+      this.destinationFunction = destinationFunction;
+      this.emptyDestination = emptyDestination;
+    }
+
+    @Override
+    public Params getDestination(UserT element) {
+      return destinationFunction.apply(element);
+    }
+
+    @Override
+    public Params getDefaultDestination() {
+      return emptyDestination;
+    }
+
+    @Nullable
+    @Override
+    public Coder<Params> getDestinationCoder() {
+      return ParamsCoder.of();
+    }
+
+    @Override
+    public FilenamePolicy getFilenamePolicy(DefaultFilenamePolicy.Params params) {
+      return DefaultFilenamePolicy.fromParams(params);
+    }
+  }
+
+  /** Returns a {@link DynamicDestinations} that always returns the same {@link FilenamePolicy}. */
+  public static <T> DynamicDestinations<T, Void> constant(FilenamePolicy filenamePolicy) {
+    return new ConstantFilenamePolicy<>(filenamePolicy);
+  }
+
+  /**
+   * Returns a {@link DynamicDestinations} that returns instances of {@link DefaultFilenamePolicy}
+   * configured with the given {@link Params}.
+   */
+  public static <UserT> DynamicDestinations<UserT, Params> toDefaultPolicies(
+      SerializableFunction<UserT, Params> destinationFunction, Params emptyDestination) {
+    return new DefaultPolicyDestinations<>(destinationFunction, emptyDestination);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/77ba7a35/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index 8102316..583af60 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -33,6 +33,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.Serializable;
+import java.lang.reflect.TypeVariable;
 import java.nio.channels.Channels;
 import java.nio.channels.WritableByteChannel;
 import java.util.ArrayList;
@@ -49,8 +50,10 @@ import java.util.zip.GZIPOutputStream;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.coders.NullableCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.StructuredCoder;
@@ -73,6 +76,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo.PaneInfoCoder;
 import org.apache.beam.sdk.util.MimeTypes;
+import org.apache.beam.sdk.values.TypeDescriptor;
 import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream;
 import org.apache.commons.compress.compressors.deflate.DeflateCompressorOutputStream;
 import org.joda.time.Instant;
@@ -82,43 +86,43 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Abstract class for file-based output. An implementation of FileBasedSink writes file-based
- * output and defines the format of output files (how values are written, headers/footers, MIME
- * type, etc.).
+ * Abstract class for file-based output. An implementation of FileBasedSink writes file-based output
+ * and defines the format of output files (how values are written, headers/footers, MIME type,
+ * etc.).
  *
  * <p>At pipeline construction time, the methods of FileBasedSink are called to validate the sink
  * and to create a {@link WriteOperation} that manages the process of writing to the sink.
  *
  * <p>The process of writing to file-based sink is as follows:
+ *
  * <ol>
- * <li>An optional subclass-defined initialization,
- * <li>a parallel write of bundles to temporary files, and finally,
- * <li>these temporary files are renamed with final output filenames.
+ *   <li>An optional subclass-defined initialization,
+ *   <li>a parallel write of bundles to temporary files, and finally,
+ *   <li>these temporary files are renamed with final output filenames.
  * </ol>
  *
  * <p>In order to ensure fault-tolerance, a bundle may be executed multiple times (e.g., in the
  * event of failure/retry or for redundancy). However, exactly one of these executions will have its
- * result passed to the finalize method. Each call to {@link Writer#openWindowed}
- * or {@link Writer#openUnwindowed} is passed a unique <i>bundle id</i> when it is called
- * by the WriteFiles transform, so even redundant or retried bundles will have a unique way of
- * identifying
- * their output.
+ * result passed to the finalize method. Each call to {@link Writer#openWindowed} or {@link
+ * Writer#openUnwindowed} is passed a unique <i>bundle id</i> when it is called by the WriteFiles
+ * transform, so even redundant or retried bundles will have a unique way of identifying their
+ * output.
  *
  * <p>The bundle id should be used to guarantee that a bundle's output is unique. This uniqueness
  * guarantee is important; if a bundle is to be output to a file, for example, the name of the file
  * will encode the unique bundle id to avoid conflicts with other writers.
  *
- * {@link FileBasedSink} can take a custom {@link FilenamePolicy} object to determine output
- * filenames, and this policy object can be used to write windowed or triggered
- * PCollections into separate files per window pane. This allows file output from unbounded
- * PCollections, and also works for bounded PCollecctions.
+ * <p>{@link FileBasedSink} can take a custom {@link FilenamePolicy} object to determine output
+ * filenames, and this policy object can be used to write windowed or triggered PCollections into
+ * separate files per window pane. This allows file output from unbounded PCollections, and also
+ * works for bounded PCollecctions.
  *
  * <p>Supported file systems are those registered with {@link FileSystems}.
  *
- * @param <T> the type of values written to the sink.
+ * @param <OutputT> the type of values written to the sink.
  */
 @Experimental(Kind.FILESYSTEM)
-public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
+public abstract class FileBasedSink<OutputT, DestinationT> implements Serializable, HasDisplayData {
   private static final Logger LOG = LoggerFactory.getLogger(FileBasedSink.class);
 
   /**
@@ -173,7 +177,7 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
     }
 
     @Override
-    public String getFilenameSuffix() {
+    public String getSuggestedFilenameSuffix() {
       return filenameSuffix;
     }
 
@@ -205,6 +209,8 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
     }
   }
 
+  private final DynamicDestinations<?, DestinationT> dynamicDestinations;
+
   /**
    * The {@link WritableByteChannelFactory} that is used to wrap the raw data output to the
    * underlying channel. The default is to not compress the output using
@@ -213,8 +219,70 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
   private final WritableByteChannelFactory writableByteChannelFactory;
 
   /**
-   * A naming policy for output files.
+   * A class that allows value-dependent writes in {@link FileBasedSink}.
+   *
+   * <p>Users can define a custom type to represent destinations, and provide a mapping to turn this
+   * destination type into an instance of {@link FilenamePolicy}.
    */
+  @Experimental(Kind.FILESYSTEM)
+  public abstract static class DynamicDestinations<UserT, DestinationT>
+      implements HasDisplayData, Serializable {
+    /**
+     * Returns an object that represents at a high level the destination being written to. May not
+     * return null.
+     */
+    public abstract DestinationT getDestination(UserT element);
+
+    /**
+     * Returns the default destination. This is used for collections that have no elements as the
+     * destination to write empty files to.
+     */
+    public abstract DestinationT getDefaultDestination();
+
+    /**
+     * Returns the coder for {@link DestinationT}. If this is not overridden, then the coder
+     * registry will be use to find a suitable coder. This must be a deterministic coder, as {@link
+     * DestinationT} will be used as a key type in a {@link
+     * org.apache.beam.sdk.transforms.GroupByKey}.
+     */
+    @Nullable
+    public Coder<DestinationT> getDestinationCoder() {
+      return null;
+    }
+
+    /** Converts a destination into a {@link FilenamePolicy}. May not return null. */
+    public abstract FilenamePolicy getFilenamePolicy(DestinationT destination);
+
+    /** Populates the display data. */
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {}
+
+    // Gets the destination coder. If the user does not provide one, try to find one in the coder
+    // registry. If no coder can be found, throws CannotProvideCoderException.
+    final Coder<DestinationT> getDestinationCoderWithDefault(CoderRegistry registry)
+        throws CannotProvideCoderException {
+      Coder<DestinationT> destinationCoder = getDestinationCoder();
+      if (destinationCoder != null) {
+        return destinationCoder;
+      }
+      // If dynamicDestinations doesn't provide a coder, try to find it in the coder registry.
+      // We must first use reflection to figure out what the type parameter is.
+      TypeDescriptor<?> superDescriptor =
+          TypeDescriptor.of(getClass()).getSupertype(DynamicDestinations.class);
+      if (!superDescriptor.getRawType().equals(DynamicDestinations.class)) {
+        throw new AssertionError(
+            "Couldn't find the DynamicDestinations superclass of " + this.getClass());
+      }
+      TypeVariable typeVariable = superDescriptor.getTypeParameter("DestinationT");
+      @SuppressWarnings("unchecked")
+      TypeDescriptor<DestinationT> descriptor =
+          (TypeDescriptor<DestinationT>) superDescriptor.resolveType(typeVariable);
+      return registry.getCoder(descriptor);
+    }
+  }
+
+  /** A naming policy for output files. */
+  @Experimental(Kind.FILESYSTEM)
   public abstract static class FilenamePolicy implements Serializable {
     /**
      * Context used for generating a name based on shard number, and num shards.
@@ -287,29 +355,28 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
     /**
      * When a sink has requested windowed or triggered output, this method will be invoked to return
      * the file {@link ResourceId resource} to be created given the base output directory and a
-     * (possibly empty) extension from {@link FileBasedSink} configuration
-     * (e.g., {@link CompressionType}).
+     * {@link OutputFileHints} containing information about the file, including a suggested
+     * extension (e.g. coming from {@link CompressionType}).
      *
-     * <p>The {@link WindowedContext} object gives access to the window and pane,
-     * as well as sharding information. The policy must return unique and consistent filenames
-     * for different windows and panes.
+     * <p>The {@link WindowedContext} object gives access to the window and pane, as well as
+     * sharding information. The policy must return unique and consistent filenames for different
+     * windows and panes.
      */
     @Experimental(Kind.FILESYSTEM)
-    public abstract ResourceId windowedFilename(
-        ResourceId outputDirectory, WindowedContext c, String extension);
+    public abstract ResourceId windowedFilename(WindowedContext c, OutputFileHints outputFileHints);
 
     /**
      * When a sink has not requested windowed or triggered output, this method will be invoked to
      * return the file {@link ResourceId resource} to be created given the base output directory and
-     * a (possibly empty) extension applied by additional {@link FileBasedSink} configuration
-     * (e.g., {@link CompressionType}).
+     * a {@link OutputFileHints} containing information about the file, including a suggested (e.g.
+     * coming from {@link CompressionType}).
      *
      * <p>The {@link Context} object only provides sharding information, which is used by the policy
      * to generate unique and consistent filenames.
      */
     @Experimental(Kind.FILESYSTEM)
-    @Nullable public abstract ResourceId unwindowedFilename(
-        ResourceId outputDirectory, Context c, String extension);
+    @Nullable
+    public abstract ResourceId unwindowedFilename(Context c, OutputFileHints outputFileHints);
 
     /**
      * Populates the display data.
@@ -318,19 +385,8 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
     }
   }
 
-  /** The policy used to generate names of files to be produced. */
-  private final FilenamePolicy filenamePolicy;
   /** The directory to which files will be written. */
-  private final ValueProvider<ResourceId> baseOutputDirectoryProvider;
-
-  /**
-   * Construct a {@link FileBasedSink} with the given filename policy, producing uncompressed files.
-   */
-  @Experimental(Kind.FILESYSTEM)
-  public FileBasedSink(
-      ValueProvider<ResourceId> baseOutputDirectoryProvider, FilenamePolicy filenamePolicy) {
-    this(baseOutputDirectoryProvider, filenamePolicy, CompressionType.UNCOMPRESSED);
-  }
+  private final ValueProvider<ResourceId> tempDirectoryProvider;
 
   private static class ExtractDirectory implements SerializableFunction<ResourceId, ResourceId> {
     @Override
@@ -340,95 +396,91 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
   }
 
   /**
-   * Construct a {@link FileBasedSink} with the given filename policy and output channel type.
+   * Construct a {@link FileBasedSink} with the given temp directory, producing uncompressed files.
    */
   @Experimental(Kind.FILESYSTEM)
   public FileBasedSink(
-      ValueProvider<ResourceId> baseOutputDirectoryProvider,
-      FilenamePolicy filenamePolicy,
+      ValueProvider<ResourceId> tempDirectoryProvider,
+      DynamicDestinations<?, DestinationT> dynamicDestinations) {
+    this(tempDirectoryProvider, dynamicDestinations, CompressionType.UNCOMPRESSED);
+  }
+
+  /** Construct a {@link FileBasedSink} with the given temp directory and output channel type. */
+  @Experimental(Kind.FILESYSTEM)
+  public FileBasedSink(
+      ValueProvider<ResourceId> tempDirectoryProvider,
+      DynamicDestinations<?, DestinationT> dynamicDestinations,
       WritableByteChannelFactory writableByteChannelFactory) {
-    this.baseOutputDirectoryProvider =
-        NestedValueProvider.of(baseOutputDirectoryProvider, new ExtractDirectory());
-    this.filenamePolicy = filenamePolicy;
+    this.tempDirectoryProvider =
+        NestedValueProvider.of(tempDirectoryProvider, new ExtractDirectory());
+    this.dynamicDestinations = checkNotNull(dynamicDestinations);
     this.writableByteChannelFactory = writableByteChannelFactory;
   }
 
-  /**
-   * Returns the base directory inside which files will be written according to the configured
-   * {@link FilenamePolicy}.
-   */
-  @Experimental(Kind.FILESYSTEM)
-  public ValueProvider<ResourceId> getBaseOutputDirectoryProvider() {
-    return baseOutputDirectoryProvider;
+  /** Return the {@link DynamicDestinations} used. */
+  @SuppressWarnings("unchecked")
+  public <UserT> DynamicDestinations<UserT, DestinationT> getDynamicDestinations() {
+    return (DynamicDestinations<UserT, DestinationT>) dynamicDestinations;
   }
 
   /**
-   * Returns the policy by which files will be named inside of the base output directory. Note that
-   * the {@link FilenamePolicy} may itself specify one or more inner directories before each output
-   * file, say when writing windowed outputs in a {@code output/YYYY/MM/DD/file.txt} format.
+   * Returns the directory inside which temprary files will be written according to the configured
+   * {@link FilenamePolicy}.
    */
   @Experimental(Kind.FILESYSTEM)
-  public final FilenamePolicy getFilenamePolicy() {
-    return filenamePolicy;
+  public ValueProvider<ResourceId> getTempDirectoryProvider() {
+    return tempDirectoryProvider;
   }
 
   public void validate(PipelineOptions options) {}
 
-  /**
-   * Return a subclass of {@link WriteOperation} that will manage the write
-   * to the sink.
-   */
-  public abstract WriteOperation<T> createWriteOperation();
+  /** Return a subclass of {@link WriteOperation} that will manage the write to the sink. */
+  public abstract WriteOperation<OutputT, DestinationT> createWriteOperation();
 
   public void populateDisplayData(DisplayData.Builder builder) {
-    getFilenamePolicy().populateDisplayData(builder);
+    getDynamicDestinations().populateDisplayData(builder);
   }
 
   /**
    * Abstract operation that manages the process of writing to {@link FileBasedSink}.
    *
-   * <p>The primary responsibilities of the WriteOperation is the management of output
-   * files. During a write, {@link Writer}s write bundles to temporary file
-   * locations. After the bundles have been written,
+   * <p>The primary responsibilities of the WriteOperation is the management of output files. During
+   * a write, {@link Writer}s write bundles to temporary file locations. After the bundles have been
+   * written,
+   *
    * <ol>
-   * <li>{@link WriteOperation#finalize} is given a list of the temporary
-   * files containing the output bundles.
-   * <li>During finalize, these temporary files are copied to final output locations and named
-   * according to a file naming template.
-   * <li>Finally, any temporary files that were created during the write are removed.
+   *   <li>{@link WriteOperation#finalize} is given a list of the temporary files containing the
+   *       output bundles.
+   *   <li>During finalize, these temporary files are copied to final output locations and named
+   *       according to a file naming template.
+   *   <li>Finally, any temporary files that were created during the write are removed.
    * </ol>
    *
-   * <p>Subclass implementations of WriteOperation must implement
-   * {@link WriteOperation#createWriter} to return a concrete
-   * FileBasedSinkWriter.
+   * <p>Subclass implementations of WriteOperation must implement {@link
+   * WriteOperation#createWriter} to return a concrete FileBasedSinkWriter.
    *
-   * <h2>Temporary and Output File Naming:</h2> During the write, bundles are written to temporary
-   * files using the tempDirectory that can be provided via the constructor of
-   * WriteOperation. These temporary files will be named
-   * {@code {tempDirectory}/{bundleId}}, where bundleId is the unique id of the bundle.
-   * For example, if tempDirectory is "gs://my-bucket/my_temp_output", the output for a
-   * bundle with bundle id 15723 will be "gs://my-bucket/my_temp_output/15723".
+   * <h2>Temporary and Output File Naming:</h2>
    *
-   * <p>Final output files are written to baseOutputFilename with the format
-   * {@code {baseOutputFilename}-0000i-of-0000n.{extension}} where n is the total number of bundles
-   * written and extension is the file extension. Both baseOutputFilename and extension are required
-   * constructor arguments.
+   * <p>During the write, bundles are written to temporary files using the tempDirectory that can be
+   * provided via the constructor of WriteOperation. These temporary files will be named {@code
+   * {tempDirectory}/{bundleId}}, where bundleId is the unique id of the bundle. For example, if
+   * tempDirectory is "gs://my-bucket/my_temp_output", the output for a bundle with bundle id 15723
+   * will be "gs://my-bucket/my_temp_output/15723".
    *
-   * <p>Subclass implementations can change the file naming template by supplying a value for
-   * fileNamingTemplate.
+   * <p>Final output files are written to the location specified by the {@link FilenamePolicy}. If
+   * no filename policy is specified, then the {@link DefaultFilenamePolicy} will be used. The
+   * directory that the files are written to is determined by the {@link FilenamePolicy} instance.
    *
    * <p>Note that in the case of permanent failure of a bundle's write, no clean up of temporary
    * files will occur.
    *
    * <p>If there are no elements in the PCollection being written, no output will be generated.
    *
-   * @param <T> the type of values written to the sink.
+   * @param <OutputT> the type of values written to the sink.
    */
-  public abstract static class WriteOperation<T> implements Serializable {
-    /**
-     * The Sink that this WriteOperation will write to.
-     */
-    protected final FileBasedSink<T> sink;
+  public abstract static class WriteOperation<OutputT, DestinationT> implements Serializable {
+    /** The Sink that this WriteOperation will write to. */
+    protected final FileBasedSink<OutputT, DestinationT> sink;
 
     /** Directory for temporary output files. */
     protected final ValueProvider<ResourceId> tempDirectory;
@@ -445,17 +497,19 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
     }
 
     /**
-     * Constructs a WriteOperation using the default strategy for generating a temporary
-     * directory from the base output filename.
+     * Constructs a WriteOperation using the default strategy for generating a temporary directory
+     * from the base output filename.
      *
-     * <p>Default is a uniquely named sibling of baseOutputFilename, e.g. if baseOutputFilename is
-     * /path/to/foo, the temporary directory will be /path/to/temp-beam-foo-$date.
+     * <p>Default is a uniquely named subdirectory of the provided tempDirectory, e.g. if
+     * tempDirectory is /path/to/foo/, the temporary directory will be
+     * /path/to/foo/temp-beam-foo-$date.
      *
      * @param sink the FileBasedSink that will be used to configure this write operation.
      */
-    public WriteOperation(FileBasedSink<T> sink) {
-      this(sink, NestedValueProvider.of(
-          sink.getBaseOutputDirectoryProvider(), new TemporaryDirectoryBuilder()));
+    public WriteOperation(FileBasedSink<OutputT, DestinationT> sink) {
+      this(
+          sink,
+          NestedValueProvider.of(sink.getTempDirectoryProvider(), new TemporaryDirectoryBuilder()));
     }
 
     private static class TemporaryDirectoryBuilder
@@ -471,10 +525,12 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
       private final Long tempId = TEMP_COUNT.getAndIncrement();
 
       @Override
-      public ResourceId apply(ResourceId baseOutputDirectory) {
+      public ResourceId apply(ResourceId tempDirectory) {
         // Temp directory has a timestamp and a unique ID
         String tempDirName = String.format(".temp-beam-%s-%s", timestamp, tempId);
-        return baseOutputDirectory.resolve(tempDirName, StandardResolveOptions.RESOLVE_DIRECTORY);
+        return tempDirectory
+            .getCurrentDirectory()
+            .resolve(tempDirName, StandardResolveOptions.RESOLVE_DIRECTORY);
       }
     }
 
@@ -485,22 +541,22 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
      * @param tempDirectory the base directory to be used for temporary output files.
      */
     @Experimental(Kind.FILESYSTEM)
-    public WriteOperation(FileBasedSink<T> sink, ResourceId tempDirectory) {
+    public WriteOperation(FileBasedSink<OutputT, DestinationT> sink, ResourceId tempDirectory) {
       this(sink, StaticValueProvider.of(tempDirectory));
     }
 
     private WriteOperation(
-        FileBasedSink<T> sink, ValueProvider<ResourceId> tempDirectory) {
+        FileBasedSink<OutputT, DestinationT> sink, ValueProvider<ResourceId> tempDirectory) {
       this.sink = sink;
       this.tempDirectory = tempDirectory;
       this.windowedWrites = false;
     }
 
     /**
-     * Clients must implement to return a subclass of {@link Writer}. This
-     * method must not mutate the state of the object.
+     * Clients must implement to return a subclass of {@link Writer}. This method must not mutate
+     * the state of the object.
      */
-    public abstract Writer<T> createWriter() throws Exception;
+    public abstract Writer<OutputT, DestinationT> createWriter() throws Exception;
 
     /**
      * Indicates that the operation will be performing windowed writes.
@@ -514,8 +570,8 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
      * removing temporary files.
      *
      * <p>Finalization may be overridden by subclass implementations to perform customized
-     * finalization (e.g., initiating some operation on output bundles, merging them, etc.).
-     * {@code writerResults} contains the filenames of written bundles.
+     * finalization (e.g., initiating some operation on output bundles, merging them, etc.). {@code
+     * writerResults} contains the filenames of written bundles.
      *
      * <p>If subclasses override this method, they must guarantee that its implementation is
      * idempotent, as it may be executed multiple times in the case of failure or for redundancy. It
@@ -523,7 +579,7 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
      *
      * @param writerResults the results of writes (FileResult).
      */
-    public void finalize(Iterable<FileResult> writerResults) throws Exception {
+    public void finalize(Iterable<FileResult<DestinationT>> writerResults) throws Exception {
       // Collect names of temporary files and rename them.
       Map<ResourceId, ResourceId> outputFilenames = buildOutputFilenames(writerResults);
       copyToOutputFiles(outputFilenames);
@@ -542,17 +598,14 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
 
     @Experimental(Kind.FILESYSTEM)
     protected final Map<ResourceId, ResourceId> buildOutputFilenames(
-        Iterable<FileResult> writerResults) {
+        Iterable<FileResult<DestinationT>> writerResults) {
       int numShards = Iterables.size(writerResults);
       Map<ResourceId, ResourceId> outputFilenames = new HashMap<>();
 
-      FilenamePolicy policy = getSink().getFilenamePolicy();
-      ResourceId baseOutputDir = getSink().getBaseOutputDirectoryProvider().get();
-
       // Either all results have a shard number set (if the sink is configured with a fixed
       // number of shards), or they all don't (otherwise).
       Boolean isShardNumberSetEverywhere = null;
-      for (FileResult result : writerResults) {
+      for (FileResult<DestinationT> result : writerResults) {
         boolean isShardNumberSetHere = (result.getShard() != UNKNOWN_SHARDNUM);
         if (isShardNumberSetEverywhere == null) {
           isShardNumberSetEverywhere = isShardNumberSetHere;
@@ -568,7 +621,7 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
         isShardNumberSetEverywhere = true;
       }
 
-      List<FileResult> resultsWithShardNumbers = Lists.newArrayList();
+      List<FileResult<DestinationT>> resultsWithShardNumbers = Lists.newArrayList();
       if (isShardNumberSetEverywhere) {
         resultsWithShardNumbers = Lists.newArrayList(writerResults);
       } else {
@@ -577,29 +630,32 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
         // case of triggers, the list of FileResult objects in the Finalize iterable is not
         // deterministic, and might change over retries. This breaks the assumption below that
         // sorting the FileResult objects provides idempotency.
-        List<FileResult> sortedByTempFilename =
+        List<FileResult<DestinationT>> sortedByTempFilename =
             Ordering.from(
-                new Comparator<FileResult>() {
-                  @Override
-                  public int compare(FileResult first, FileResult second) {
-                    String firstFilename = first.getTempFilename().toString();
-                    String secondFilename = second.getTempFilename().toString();
-                    return firstFilename.compareTo(secondFilename);
-                  }
-                })
+                    new Comparator<FileResult<DestinationT>>() {
+                      @Override
+                      public int compare(
+                          FileResult<DestinationT> first, FileResult<DestinationT> second) {
+                        String firstFilename = first.getTempFilename().toString();
+                        String secondFilename = second.getTempFilename().toString();
+                        return firstFilename.compareTo(secondFilename);
+                      }
+                    })
                 .sortedCopy(writerResults);
         for (int i = 0; i < sortedByTempFilename.size(); i++) {
           resultsWithShardNumbers.add(sortedByTempFilename.get(i).withShard(i));
         }
       }
 
-      for (FileResult result : resultsWithShardNumbers) {
+      for (FileResult<DestinationT> result : resultsWithShardNumbers) {
         checkArgument(
             result.getShard() != UNKNOWN_SHARDNUM, "Should have set shard number on %s", result);
         outputFilenames.put(
             result.getTempFilename(),
             result.getDestinationFile(
-                policy, baseOutputDir, numShards, getSink().getExtension()));
+                getSink().getDynamicDestinations(),
+                numShards,
+                getSink().getWritableByteChannelFactory()));
       }
 
       int numDistinctShards = new HashSet<>(outputFilenames.values()).size();
@@ -615,18 +671,18 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
      *
      * <p>Can be called from subclasses that override {@link WriteOperation#finalize}.
      *
-     * <p>Files will be named according to the file naming template. The order of the output files
-     * will be the same as the sorted order of the input filenames.  In other words, if the input
-     * filenames are ["C", "A", "B"], baseOutputFilename is "file", the extension is ".txt", and
-     * the fileNamingTemplate is "-SSS-of-NNN", the contents of A will be copied to
-     * file-000-of-003.txt, the contents of B will be copied to file-001-of-003.txt, etc.
+     * <p>Files will be named according to the {@link FilenamePolicy}. The order of the output files
+     * will be the same as the sorted order of the input filenames. In other words (when using
+     * {@link DefaultFilenamePolicy}), if the input filenames are ["C", "A", "B"], baseFilename (int
+     * the policy) is "dir/file", the extension is ".txt", and the fileNamingTemplate is
+     * "-SSS-of-NNN", the contents of A will be copied to dir/file-000-of-003.txt, the contents of B
+     * will be copied to dir/file-001-of-003.txt, etc.
      *
      * @param filenames the filenames of temporary files.
      */
     @VisibleForTesting
     @Experimental(Kind.FILESYSTEM)
-    final void copyToOutputFiles(Map<ResourceId, ResourceId> filenames)
-        throws IOException {
+    final void copyToOutputFiles(Map<ResourceId, ResourceId> filenames) throws IOException {
       int numFiles = filenames.size();
       if (numFiles > 0) {
         LOG.debug("Copying {} files.", numFiles);
@@ -698,10 +754,8 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
       }
     }
 
-    /**
-     * Returns the FileBasedSink for this write operation.
-     */
-    public FileBasedSink<T> getSink() {
+    /** Returns the FileBasedSink for this write operation. */
+    public FileBasedSink<OutputT, DestinationT> getSink() {
       return sink;
     }
 
@@ -719,33 +773,28 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
     }
   }
 
-  /** Returns the extension that will be written to the produced files. */
-  protected final String getExtension() {
-    String extension = MoreObjects.firstNonNull(writableByteChannelFactory.getFilenameSuffix(), "");
-    if (!extension.isEmpty() && !extension.startsWith(".")) {
-      extension = "." + extension;
-    }
-    return extension;
+  /** Returns the {@link WritableByteChannelFactory} used. */
+  protected final WritableByteChannelFactory getWritableByteChannelFactory() {
+    return writableByteChannelFactory;
   }
 
   /**
-   * Abstract writer that writes a bundle to a {@link FileBasedSink}. Subclass
-   * implementations provide a method that can write a single value to a
-   * {@link WritableByteChannel}.
+   * Abstract writer that writes a bundle to a {@link FileBasedSink}. Subclass implementations
+   * provide a method that can write a single value to a {@link WritableByteChannel}.
    *
    * <p>Subclass implementations may also override methods that write headers and footers before and
    * after the values in a bundle, respectively, as well as provide a MIME type for the output
    * channel.
    *
-   * <p>Multiple {@link Writer} instances may be created on the same worker, and therefore
-   * any access to static members or methods should be thread safe.
+   * <p>Multiple {@link Writer} instances may be created on the same worker, and therefore any
+   * access to static members or methods should be thread safe.
    *
-   * @param <T> the type of values to write.
+   * @param <OutputT> the type of values to write.
    */
-  public abstract static class Writer<T> {
+  public abstract static class Writer<OutputT, DestinationT> {
     private static final Logger LOG = LoggerFactory.getLogger(Writer.class);
 
-    private final WriteOperation<T> writeOperation;
+    private final WriteOperation<OutputT, DestinationT> writeOperation;
 
     /** Unique id for this output bundle. */
     private String id;
@@ -753,6 +802,7 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
     private BoundedWindow window;
     private PaneInfo paneInfo;
     private int shard = -1;
+    private DestinationT destination;
 
     /** The output file for this bundle. May be null if opening failed. */
     private @Nullable ResourceId outputFile;
@@ -772,10 +822,8 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
      */
     private final String mimeType;
 
-    /**
-     * Construct a new {@link Writer} that will produce files of the given MIME type.
-     */
-    public Writer(WriteOperation<T> writeOperation, String mimeType) {
+    /** Construct a new {@link Writer} that will produce files of the given MIME type. */
+    public Writer(WriteOperation<OutputT, DestinationT> writeOperation, String mimeType) {
       checkNotNull(writeOperation);
       this.writeOperation = writeOperation;
       this.mimeType = mimeType;
@@ -818,28 +866,29 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
      * id populated for the case of static sharding. In cases where the runner is dynamically
      * picking sharding, shard might be set to -1.
      */
-    public final void openWindowed(String uId, BoundedWindow window, PaneInfo paneInfo, int shard)
+    public final void openWindowed(
+        String uId, BoundedWindow window, PaneInfo paneInfo, int shard, DestinationT destination)
         throws Exception {
       if (!getWriteOperation().windowedWrites) {
         throw new IllegalStateException("openWindowed called a non-windowed sink.");
       }
-      open(uId, window, paneInfo, shard);
+      open(uId, window, paneInfo, shard, destination);
     }
 
     /**
      * Called for each value in the bundle.
      */
-    public abstract void write(T value) throws Exception;
+    public abstract void write(OutputT value) throws Exception;
 
     /**
-     * Similar to {@link #openWindowed} however for the case where unwindowed writes were
-     * requested.
+     * Similar to {@link #openWindowed} however for the case where unwindowed writes were requested.
      */
-    public final void openUnwindowed(String uId, int shard) throws Exception {
+    public final void openUnwindowed(String uId, int shard, DestinationT destination)
+        throws Exception {
       if (getWriteOperation().windowedWrites) {
         throw new IllegalStateException("openUnwindowed called a windowed sink.");
       }
-      open(uId, null, null, shard);
+      open(uId, null, null, shard, destination);
     }
 
     // Helper function to close a channel, on exception cases.
@@ -855,14 +904,18 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
       }
     }
 
-    private void open(String uId,
-                      @Nullable BoundedWindow window,
-                      @Nullable PaneInfo paneInfo,
-                      int shard) throws Exception {
+    private void open(
+        String uId,
+        @Nullable BoundedWindow window,
+        @Nullable PaneInfo paneInfo,
+        int shard,
+        DestinationT destination)
+        throws Exception {
       this.id = uId;
       this.window = window;
       this.paneInfo = paneInfo;
       this.shard = shard;
+      this.destination = destination;
       ResourceId tempDirectory = getWriteOperation().tempDirectory.get();
       outputFile = tempDirectory.resolve(id, StandardResolveOptions.RESOLVE_FILE);
       verifyNotNull(
@@ -908,7 +961,7 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
     }
 
     /** Closes the channel and returns the bundle result. */
-    public final FileResult close() throws Exception {
+    public final FileResult<DestinationT> close() throws Exception {
       checkState(outputFile != null, "FileResult.close cannot be called with a null outputFile");
 
       LOG.debug("Writing footer to {}.", outputFile);
@@ -938,35 +991,41 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
         throw new IOException(String.format("Failed closing channel to %s", outputFile), e);
       }
 
-      FileResult result = new FileResult(outputFile, shard, window, paneInfo);
+      FileResult<DestinationT> result =
+          new FileResult<>(outputFile, shard, window, paneInfo, destination);
       LOG.debug("Result for bundle {}: {}", this.id, outputFile);
       return result;
     }
 
-    /**
-     * Return the WriteOperation that this Writer belongs to.
-     */
-    public WriteOperation<T> getWriteOperation() {
+    /** Return the WriteOperation that this Writer belongs to. */
+    public WriteOperation<OutputT, DestinationT> getWriteOperation() {
       return writeOperation;
     }
   }
 
   /**
-   * Result of a single bundle write. Contains the filename produced by the bundle, and if known
-   * the final output filename.
+   * Result of a single bundle write. Contains the filename produced by the bundle, and if known the
+   * final output filename.
    */
-  public static final class FileResult {
+  public static final class FileResult<DestinationT> {
     private final ResourceId tempFilename;
     private final int shard;
     private final BoundedWindow window;
     private final PaneInfo paneInfo;
+    private final DestinationT destination;
 
     @Experimental(Kind.FILESYSTEM)
-    public FileResult(ResourceId tempFilename, int shard, BoundedWindow window, PaneInfo paneInfo) {
+    public FileResult(
+        ResourceId tempFilename,
+        int shard,
+        BoundedWindow window,
+        PaneInfo paneInfo,
+        DestinationT destination) {
       this.tempFilename = tempFilename;
       this.shard = shard;
       this.window = window;
       this.paneInfo = paneInfo;
+      this.destination = destination;
     }
 
     @Experimental(Kind.FILESYSTEM)
@@ -978,8 +1037,8 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
       return shard;
     }
 
-    public FileResult withShard(int shard) {
-      return new FileResult(tempFilename, shard, window, paneInfo);
+    public FileResult<DestinationT> withShard(int shard) {
+      return new FileResult<>(tempFilename, shard, window, paneInfo, destination);
     }
 
     public BoundedWindow getWindow() {
@@ -990,17 +1049,24 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
       return paneInfo;
     }
 
+    public DestinationT getDestination() {
+      return destination;
+    }
+
     @Experimental(Kind.FILESYSTEM)
-    public ResourceId getDestinationFile(FilenamePolicy policy, ResourceId outputDirectory,
-                                         int numShards, String extension) {
+    public ResourceId getDestinationFile(
+        DynamicDestinations<?, DestinationT> dynamicDestinations,
+        int numShards,
+        OutputFileHints outputFileHints) {
       checkArgument(getShard() != UNKNOWN_SHARDNUM);
       checkArgument(numShards > 0);
+      FilenamePolicy policy = dynamicDestinations.getFilenamePolicy(destination);
       if (getWindow() != null) {
-        return policy.windowedFilename(outputDirectory, new WindowedContext(
-            getWindow(), getPaneInfo(), getShard(), numShards), extension);
+        return policy.windowedFilename(
+            new WindowedContext(getWindow(), getPaneInfo(), getShard(), numShards),
+            outputFileHints);
       } else {
-        return policy.unwindowedFilename(outputDirectory, new Context(getShard(), numShards),
-            extension);
+        return policy.unwindowedFilename(new Context(getShard(), numShards), outputFileHints);
       }
     }
 
@@ -1014,22 +1080,24 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
     }
   }
 
-  /**
-   * A coder for {@link FileResult} objects.
-   */
-  public static final class FileResultCoder extends StructuredCoder<FileResult> {
+  /** A coder for {@link FileResult} objects. */
+  public static final class FileResultCoder<DestinationT>
+      extends StructuredCoder<FileResult<DestinationT>> {
     private static final Coder<String> FILENAME_CODER = StringUtf8Coder.of();
     private static final Coder<Integer> SHARD_CODER = VarIntCoder.of();
     private static final Coder<PaneInfo> PANE_INFO_CODER = NullableCoder.of(PaneInfoCoder.INSTANCE);
-
     private final Coder<BoundedWindow> windowCoder;
+    private final Coder<DestinationT> destinationCoder;
 
-    protected FileResultCoder(Coder<BoundedWindow> windowCoder) {
+    protected FileResultCoder(
+        Coder<BoundedWindow> windowCoder, Coder<DestinationT> destinationCoder) {
       this.windowCoder = NullableCoder.of(windowCoder);
+      this.destinationCoder = destinationCoder;
     }
 
-    public static FileResultCoder of(Coder<BoundedWindow> windowCoder) {
-      return new FileResultCoder(windowCoder);
+    public static <DestinationT> FileResultCoder<DestinationT> of(
+        Coder<BoundedWindow> windowCoder, Coder<DestinationT> destinationCoder) {
+      return new FileResultCoder<>(windowCoder, destinationCoder);
     }
 
     @Override
@@ -1038,8 +1106,7 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
     }
 
     @Override
-    public void encode(FileResult value, OutputStream outStream)
-        throws IOException {
+    public void encode(FileResult<DestinationT> value, OutputStream outStream) throws IOException {
       if (value == null) {
         throw new CoderException("cannot encode a null value");
       }
@@ -1047,17 +1114,22 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
       windowCoder.encode(value.getWindow(), outStream);
       PANE_INFO_CODER.encode(value.getPaneInfo(), outStream);
       SHARD_CODER.encode(value.getShard(), outStream);
+      destinationCoder.encode(value.getDestination(), outStream);
     }
 
     @Override
-    public FileResult decode(InputStream inStream)
-        throws IOException {
+    public FileResult<DestinationT> decode(InputStream inStream) throws IOException {
       String tempFilename = FILENAME_CODER.decode(inStream);
       BoundedWindow window = windowCoder.decode(inStream);
       PaneInfo paneInfo = PANE_INFO_CODER.decode(inStream);
       int shard = SHARD_CODER.decode(inStream);
-      return new FileResult(FileSystems.matchNewResource(tempFilename, false /* isDirectory */),
-          shard, window, paneInfo);
+      DestinationT destination = destinationCoder.decode(inStream);
+      return new FileResult<>(
+          FileSystems.matchNewResource(tempFilename, false /* isDirectory */),
+          shard,
+          window,
+          paneInfo,
+          destination);
     }
 
     @Override
@@ -1066,25 +1138,15 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
       windowCoder.verifyDeterministic();
       PANE_INFO_CODER.verifyDeterministic();
       SHARD_CODER.verifyDeterministic();
+      destinationCoder.verifyDeterministic();
     }
   }
 
   /**
-   * Implementations create instances of {@link WritableByteChannel} used by {@link FileBasedSink}
-   * and related classes to allow <em>decorating</em>, or otherwise transforming, the raw data that
-   * would normally be written directly to the {@link WritableByteChannel} passed into
-   * {@link WritableByteChannelFactory#create(WritableByteChannel)}.
-   *
-   * <p>Subclasses should override {@link #toString()} with something meaningful, as it is used when
-   * building {@link DisplayData}.
+   * Provides hints about how to generate output files, such as a suggested filename suffix (e.g.
+   * based on the compression type), and the file MIME type.
    */
-  public interface WritableByteChannelFactory extends Serializable {
-    /**
-     * @param channel the {@link WritableByteChannel} to wrap
-     * @return the {@link WritableByteChannel} to be used during output
-     */
-    WritableByteChannel create(WritableByteChannel channel) throws IOException;
-
+  public interface OutputFileHints extends Serializable {
     /**
      * Returns the MIME type that should be used for the files that will hold the output data. May
      * return {@code null} if this {@code WritableByteChannelFactory} does not meaningfully change
@@ -1101,6 +1163,23 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
      * @return an optional filename suffix, eg, ".gz" is returned by {@link CompressionType#GZIP}
      */
     @Nullable
-    String getFilenameSuffix();
+    String getSuggestedFilenameSuffix();
+  }
+
+  /**
+   * Implementations create instances of {@link WritableByteChannel} used by {@link FileBasedSink}
+   * and related classes to allow <em>decorating</em>, or otherwise transforming, the raw data that
+   * would normally be written directly to the {@link WritableByteChannel} passed into {@link
+   * WritableByteChannelFactory#create(WritableByteChannel)}.
+   *
+   * <p>Subclasses should override {@link #toString()} with something meaningful, as it is used when
+   * building {@link DisplayData}.
+   */
+  public interface WritableByteChannelFactory extends OutputFileHints {
+    /**
+     * @param channel the {@link WritableByteChannel} to wrap
+     * @return the {@link WritableByteChannel} to be used during output
+     */
+    WritableByteChannel create(WritableByteChannel channel) throws IOException;
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/77ba7a35/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
index e288075..6e7b243 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
@@ -45,6 +45,7 @@ import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.SerializableFunctions;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.util.MimeTypes;
 import org.apache.beam.sdk.values.PBegin;
@@ -355,12 +356,11 @@ public class TFRecordIO {
     public PDone expand(PCollection<byte[]> input) {
       checkState(getOutputPrefix() != null,
           "need to set the output prefix of a TFRecordIO.Write transform");
-      WriteFiles<byte[]> write = WriteFiles.to(
+      WriteFiles<byte[], Void, byte[]> write =
+          WriteFiles.<byte[], Void, byte[]>to(
               new TFRecordSink(
-                  getOutputPrefix(),
-                  getShardTemplate(),
-                  getFilenameSuffix(),
-                  getCompressionType()));
+                  getOutputPrefix(), getShardTemplate(), getFilenameSuffix(), getCompressionType()),
+              SerializableFunctions.<byte[]>identity());
       if (getNumShards() > 0) {
         write = write.withNumShards(getNumShards());
       }
@@ -546,20 +546,20 @@ public class TFRecordIO {
     }
   }
 
-  /**
-   * A {@link FileBasedSink} for TFRecord files. Produces TFRecord files.
-   */
+  /** A {@link FileBasedSink} for TFRecord files. Produces TFRecord files. */
   @VisibleForTesting
-  static class TFRecordSink extends FileBasedSink<byte[]> {
+  static class TFRecordSink extends FileBasedSink<byte[], Void> {
     @VisibleForTesting
-    TFRecordSink(ValueProvider<ResourceId> outputPrefix,
+    TFRecordSink(
+        ValueProvider<ResourceId> outputPrefix,
         @Nullable String shardTemplate,
         @Nullable String suffix,
         TFRecordIO.CompressionType compressionType) {
       super(
           outputPrefix,
-          DefaultFilenamePolicy.constructUsingStandardParameters(
-              outputPrefix, shardTemplate, suffix, false),
+          DynamicFileDestinations.constant(
+              DefaultFilenamePolicy.fromStandardParameters(
+                  outputPrefix, shardTemplate, suffix, false)),
           writableByteChannelFactory(compressionType));
     }
 
@@ -571,7 +571,7 @@ public class TFRecordIO {
     }
 
     @Override
-    public WriteOperation<byte[]> createWriteOperation() {
+    public WriteOperation<byte[], Void> createWriteOperation() {
       return new TFRecordWriteOperation(this);
     }
 
@@ -590,30 +590,24 @@ public class TFRecordIO {
       return CompressionType.UNCOMPRESSED;
     }
 
-    /**
-     * A {@link WriteOperation
-     * WriteOperation} for TFRecord files.
-     */
-    private static class TFRecordWriteOperation extends WriteOperation<byte[]> {
+    /** A {@link WriteOperation WriteOperation} for TFRecord files. */
+    private static class TFRecordWriteOperation extends WriteOperation<byte[], Void> {
       private TFRecordWriteOperation(TFRecordSink sink) {
         super(sink);
       }
 
       @Override
-      public Writer<byte[]> createWriter() throws Exception {
+      public Writer<byte[], Void> createWriter() throws Exception {
         return new TFRecordWriter(this);
       }
     }
 
-    /**
-     * A {@link Writer Writer}
-     * for TFRecord files.
-     */
-    private static class TFRecordWriter extends Writer<byte[]> {
+    /** A {@link Writer Writer} for TFRecord files. */
+    private static class TFRecordWriter extends Writer<byte[], Void> {
       private WritableByteChannel outChannel;
       private TFRecordCodec codec;
 
-      private TFRecordWriter(WriteOperation<byte[]> writeOperation) {
+      private TFRecordWriter(WriteOperation<byte[], Void> writeOperation) {
         super(writeOperation, MimeTypes.BINARY);
       }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/77ba7a35/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index f1eb7c0..5241589 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -22,12 +22,15 @@ import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 
 import com.google.auto.value.AutoValue;
+import com.google.common.annotations.VisibleForTesting;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.io.DefaultFilenamePolicy.Params;
+import org.apache.beam.sdk.io.FileBasedSink.DynamicDestinations;
 import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
 import org.apache.beam.sdk.io.FileBasedSink.WritableByteChannelFactory;
 import org.apache.beam.sdk.io.Read.Bounded;
@@ -37,6 +40,7 @@ import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.SerializableFunctions;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
@@ -65,19 +69,8 @@ import org.apache.beam.sdk.values.PDone;
  * <p>To write a {@link PCollection} to one or more text files, use {@code TextIO.write()}, using
  * {@link TextIO.Write#to(String)} to specify the output prefix of the files to write.
  *
- * <p>By default, all input is put into the global window before writing. If per-window writes are
- * desired - for example, when using a streaming runner -
- * {@link TextIO.Write#withWindowedWrites()} will cause windowing and triggering to be
- * preserved. When producing windowed writes, the number of output shards must be set explicitly
- * using {@link TextIO.Write#withNumShards(int)}; some runners may set this for you to a
- * runner-chosen value, so you may need not set it yourself. A {@link FilenamePolicy} can also be
- * set in case you need better control over naming files created by unique windows.
- * {@link DefaultFilenamePolicy} policy for producing unique filenames might not be appropriate
- * for your use case.
- *
- * <p>Any existing files with the same names as generated output files will be overwritten.
- *
  * <p>For example:
+ *
  * <pre>{@code
  * // A simple Write to a local file (only runs locally):
  * PCollection<String> lines = ...;
@@ -85,10 +78,49 @@ import org.apache.beam.sdk.values.PDone;
  *
  * // Same as above, only with Gzip compression:
  * PCollection<String> lines = ...;
- * lines.apply(TextIO.write().to("/path/to/file.txt"));
+ * lines.apply(TextIO.write().to("/path/to/file.txt"))
  *      .withSuffix(".txt")
  *      .withWritableByteChannelFactory(FileBasedSink.CompressionType.GZIP));
  * }</pre>
+ *
+ * <p>By default, all input is put into the global window before writing. If per-window writes are
+ * desired - for example, when using a streaming runner - {@link TextIO.Write#withWindowedWrites()}
+ * will cause windowing and triggering to be preserved. When producing windowed writes with a
+ * streaming runner that supports triggers, the number of output shards must be set explicitly using
+ * {@link TextIO.Write#withNumShards(int)}; some runners may set this for you to a runner-chosen
+ * value, so you may need not set it yourself. If setting an explicit template using {@link
+ * TextIO.Write#withShardNameTemplate(String)}, make sure that the template contains placeholders
+ * for the window and the pane; W is expanded into the window text, and P into the pane; the default
+ * template will include both the window and the pane in the filename.
+ *
+ * <p>If you want better control over how filenames are generated than the default policy allows, a
+ * custom {@link FilenamePolicy} can also be set using {@link TextIO.Write#to(FilenamePolicy)}.
+ *
+ * <p>TextIO also supports dynamic, value-dependent file destinations. The most general form of this
+ * is done via {@link TextIO.Write#to(DynamicDestinations)}. A {@link DynamicDestinations} class
+ * allows you to convert any input value into a custom destination object, and map that destination
+ * object to a {@link FilenamePolicy}. This allows using different filename policies (or more
+ * commonly, differently-configured instances of the same policy) based on the input record. Often
+ * this is used in conjunction with {@link TextIO#writeCustomType(SerializableFunction)}, which
+ * allows your {@link DynamicDestinations} object to examine the input type and takes a format
+ * function to convert that type to a string for writing.
+ *
+ * <p>A convenience shortcut is provided for the case where the default naming policy is used, but
+ * different configurations of this policy are wanted based on the input record. Default naming
+ * policies can be configured using the {@link DefaultFilenamePolicy.Params} object.
+ *
+ * <pre>{@code
+ * PCollection<UserEvent>> lines = ...;
+ * lines.apply(TextIO.<UserEvent>writeCustomType(new FormatEvent())
+ *      .to(new SerializableFunction<UserEvent, Params>() {
+ *         public String apply(UserEvent value) {
+ *           return new Params().withBaseFilename(baseDirectory + "/" + value.country());
+ *         }
+ *       }),
+ *       new Params().withBaseFilename(baseDirectory + "/empty");
+ * }</pre>
+ *
+ * <p>Any existing files with the same names as generated output files will be overwritten.
  */
 public class TextIO {
   /**
@@ -105,11 +137,29 @@ public class TextIO {
    * line.
    */
   public static Write write() {
-    return new AutoValue_TextIO_Write.Builder()
+    return new TextIO.Write();
+  }
+
+  /**
+   * A {@link PTransform} that writes a {@link PCollection} to a text file (or multiple text files
+   * matching a sharding pattern), with each element of the input collection encoded into its own
+   * line.
+   *
+   * <p>This version allows you to apply {@link TextIO} writes to a PCollection of a custom type
+   * {@link T}, along with a format function that converts the input type {@link T} to the String
+   * that will be written to the file. The advantage of this is it allows a user-provided {@link
+   * DynamicDestinations} object, set via {@link Write#to(DynamicDestinations)} to examine the
+   * user's custom type when choosing a destination.
+   */
+  public static <T> TypedWrite<T> writeCustomType(SerializableFunction<T, String> formatFunction) {
+    return new AutoValue_TextIO_TypedWrite.Builder<T>()
         .setFilenamePrefix(null)
+        .setTempDirectory(null)
         .setShardTemplate(null)
         .setFilenameSuffix(null)
         .setFilenamePolicy(null)
+        .setDynamicDestinations(null)
+        .setFormatFunction(formatFunction)
         .setWritableByteChannelFactory(FileBasedSink.CompressionType.UNCOMPRESSED)
         .setWindowedWrites(false)
         .setNumShards(0)
@@ -223,18 +273,21 @@ public class TextIO {
     }
   }
 
-
-  /////////////////////////////////////////////////////////////////////////////
+  // ///////////////////////////////////////////////////////////////////////////
 
   /** Implementation of {@link #write}. */
   @AutoValue
-  public abstract static class Write extends PTransform<PCollection<String>, PDone> {
+  public abstract static class TypedWrite<T> extends PTransform<PCollection<T>, PDone> {
     /** The prefix of each file written, combined with suffix and shardTemplate. */
     @Nullable abstract ValueProvider<ResourceId> getFilenamePrefix();
 
     /** The suffix of each file written, combined with prefix and shardTemplate. */
     @Nullable abstract String getFilenameSuffix();
 
+    /** The base directory used for generating temporary files. */
+    @Nullable
+    abstract ValueProvider<ResourceId> getTempDirectory();
+
     /** An optional header to add to each file. */
     @Nullable abstract String getHeader();
 
@@ -250,6 +303,13 @@ public class TextIO {
     /** A policy for naming output files. */
     @Nullable abstract FilenamePolicy getFilenamePolicy();
 
+    /** Allows for value-dependent {@link DynamicDestinations} to be vended. */
+    @Nullable
+    abstract DynamicDestinations<T, ?> getDynamicDestinations();
+
+    /** A function that converts T to a String, for writing to the file. */
+    abstract SerializableFunction<T, String> getFormatFunction();
+
     /** Whether to write windowed output files. */
     abstract boolean getWindowedWrites();
 
@@ -259,66 +319,68 @@ public class TextIO {
      */
     abstract WritableByteChannelFactory getWritableByteChannelFactory();
 
-    abstract Builder toBuilder();
+    abstract Builder<T> toBuilder();
 
     @AutoValue.Builder
-    abstract static class Builder {
-      abstract Builder setFilenamePrefix(ValueProvider<ResourceId> filenamePrefix);
-      abstract Builder setShardTemplate(@Nullable String shardTemplate);
-      abstract Builder setFilenameSuffix(@Nullable String filenameSuffix);
-      abstract Builder setHeader(@Nullable String header);
-      abstract Builder setFooter(@Nullable String footer);
-      abstract Builder setFilenamePolicy(@Nullable FilenamePolicy filenamePolicy);
-      abstract Builder setNumShards(int numShards);
-      abstract Builder setWindowedWrites(boolean windowedWrites);
-      abstract Builder setWritableByteChannelFactory(
+    abstract static class Builder<T> {
+      abstract Builder<T> setFilenamePrefix(ValueProvider<ResourceId> filenamePrefix);
+
+      abstract Builder<T> setTempDirectory(ValueProvider<ResourceId> tempDirectory);
+
+      abstract Builder<T> setShardTemplate(@Nullable String shardTemplate);
+
+      abstract Builder<T> setFilenameSuffix(@Nullable String filenameSuffix);
+
+      abstract Builder<T> setHeader(@Nullable String header);
+
+      abstract Builder<T> setFooter(@Nullable String footer);
+
+      abstract Builder<T> setFilenamePolicy(@Nullable FilenamePolicy filenamePolicy);
+
+      abstract Builder<T> setDynamicDestinations(
+          @Nullable DynamicDestinations<T, ?> dynamicDestinations);
+
+      abstract Builder<T> setFormatFunction(SerializableFunction<T, String> formatFunction);
+
+      abstract Builder<T> setNumShards(int numShards);
+
+      abstract Builder<T> setWindowedWrites(boolean windowedWrites);
+
+      abstract Builder<T> setWritableByteChannelFactory(
           WritableByteChannelFactory writableByteChannelFactory);
 
-      abstract Write build();
+      abstract TypedWrite<T> build();
     }
 
     /**
-     * Writes to text files with the given prefix. The given {@code prefix} can reference any
-     * {@link FileSystem} on the classpath.
-     *
-     * <p>The name of the output files will be determined by the {@link FilenamePolicy} used.
+     * Writes to text files with the given prefix. The given {@code prefix} can reference any {@link
+     * FileSystem} on the classpath. This prefix is used by the {@link DefaultFilenamePolicy} to
+     * generate filenames.
      *
      * <p>By default, a {@link DefaultFilenamePolicy} will be used built using the specified prefix
-     * to define the base output directory and file prefix, a shard identifier (see
-     * {@link #withNumShards(int)}), and a common suffix (if supplied using
-     * {@link #withSuffix(String)}).
+     * to define the base output directory and file prefix, a shard identifier (see {@link
+     * #withNumShards(int)}), and a common suffix (if supplied using {@link #withSuffix(String)}).
+     *
+     * <p>This default policy can be overridden using {@link #to(FilenamePolicy)}, in which case
+     * {@link #withShardNameTemplate(String)} and {@link #withSuffix(String)} should not be set.
+     * Custom filename policies do not automatically see this prefix - you should explicitly pass
+     * the prefix into your {@link FilenamePolicy} object if you need this.
      *
-     * <p>This default policy can be overridden using {@link #withFilenamePolicy(FilenamePolicy)},
-     * in which case {@link #withShardNameTemplate(String)} and {@link #withSuffix(String)} should
-     * not be set.
+     * <p>If {@link #withTempDirectory} has not been called, this filename prefix will be used to
+     * infer a directory for temporary files.
      */
-    public Write to(String filenamePrefix) {
+    public TypedWrite<T> to(String filenamePrefix) {
       return to(FileBasedSink.convertToFileResourceIfPossible(filenamePrefix));
     }
 
-    /**
-     * Writes to text files with prefix from the given resource.
-     *
-     * <p>The name of the output files will be determined by the {@link FilenamePolicy} used.
-     *
-     * <p>By default, a {@link DefaultFilenamePolicy} will be used built using the specified prefix
-     * to define the base output directory and file prefix, a shard identifier (see
-     * {@link #withNumShards(int)}), and a common suffix (if supplied using
-     * {@link #withSuffix(String)}).
-     *
-     * <p>This default policy can be overridden using {@link #withFilenamePolicy(FilenamePolicy)},
-     * in which case {@link #withShardNameTemplate(String)} and {@link #withSuffix(String)} should
-     * not be set.
-     */
+    /** Like {@link #to(String)}. */
     @Experimental(Kind.FILESYSTEM)
-    public Write to(ResourceId filenamePrefix) {
+    public TypedWrite<T> to(ResourceId filenamePrefix) {
       return toResource(StaticValueProvider.of(filenamePrefix));
     }
 
-    /**
-     * Like {@link #to(String)}.
-     */
-    public Write to(ValueProvider<String> outputPrefix) {
+    /** Like {@link #to(String)}. */
+    public TypedWrite<T> to(ValueProvider<String> outputPrefix) {
       return toResource(NestedValueProvider.of(outputPrefix,
           new SerializableFunction<String, ResourceId>() {
             @Override
@@ -329,43 +391,77 @@ public class TextIO {
     }
 
     /**
-     * Like {@link #to(ResourceId)}.
+     * Writes to files named according to the given {@link FileBasedSink.FilenamePolicy}. A
+     * directory for temporary files must be specified using {@link #withTempDirectory}.
      */
+    public TypedWrite<T> to(FilenamePolicy filenamePolicy) {
+      return toBuilder().setFilenamePolicy(filenamePolicy).build();
+    }
+
+    /**
+     * Use a {@link DynamicDestinations} object to vend {@link FilenamePolicy} objects. These
+     * objects can examine the input record when creating a {@link FilenamePolicy}. A directory for
+     * temporary files must be specified using {@link #withTempDirectory}.
+     */
+    public TypedWrite<T> to(DynamicDestinations<T, ?> dynamicDestinations) {
+      return toBuilder().setDynamicDestinations(dynamicDestinations).build();
+    }
+
+    /**
+     * Write to dynamic destinations using the default filename policy. The destinationFunction maps
+     * the input record to a {@link DefaultFilenamePolicy.Params} object that specifies where the
+     * records should be written (base filename, file suffix, and shard template). The
+     * emptyDestination parameter specified where empty files should be written for when the written
+     * {@link PCollection} is empty.
+     */
+    public TypedWrite<T> to(
+        SerializableFunction<T, Params> destinationFunction, Params emptyDestination) {
+      return to(DynamicFileDestinations.toDefaultPolicies(destinationFunction, emptyDestination));
+    }
+
+    /** Like {@link #to(ResourceId)}. */
     @Experimental(Kind.FILESYSTEM)
-    public Write toResource(ValueProvider<ResourceId> filenamePrefix) {
+    public TypedWrite<T> toResource(ValueProvider<ResourceId> filenamePrefix) {
       return toBuilder().setFilenamePrefix(filenamePrefix).build();
     }
 
+    /** Set the base directory used to generate temporary files. */
+    @Experimental(Kind.FILESYSTEM)
+    public TypedWrite<T> withTempDirectory(ValueProvider<ResourceId> tempDirectory) {
+      return toBuilder().setTempDirectory(tempDirectory).build();
+    }
+
+    /** Set the base directory used to generate temporary files. */
+    @Experimental(Kind.FILESYSTEM)
+    public TypedWrite<T> withTempDirectory(ResourceId tempDirectory) {
+      return withTempDirectory(StaticValueProvider.of(tempDirectory));
+    }
+
     /**
      * Uses the given {@link ShardNameTemplate} for naming output files. This option may only be
-     * used when {@link #withFilenamePolicy(FilenamePolicy)} has not been configured.
+     * used when using one of the default filename-prefix to() overrides - i.e. not when using
+     * either {@link #to(FilenamePolicy)} or {@link #to(DynamicDestinations)}.
      *
      * <p>See {@link DefaultFilenamePolicy} for how the prefix, shard name template, and suffix are
      * used.
      */
-    public Write withShardNameTemplate(String shardTemplate) {
+    public TypedWrite<T> withShardNameTemplate(String shardTemplate) {
       return toBuilder().setShardTemplate(shardTemplate).build();
     }
 
     /**
-     * Configures the filename suffix for written files. This option may only be used when
-     * {@link #withFilenamePolicy(FilenamePolicy)} has not been configured.
+     * Configures the filename suffix for written files. This option may only be used when using one
+     * of the default filename-prefix to() overrides - i.e. not when using either {@link
+     * #to(FilenamePolicy)} or {@link #to(DynamicDestinations)}.
      *
      * <p>See {@link DefaultFilenamePolicy} for how the prefix, shard name template, and suffix are
      * used.
      */
-    public Write withSuffix(String filenameSuffix) {
+    public TypedWrite<T> withSuffix(String filenameSuffix) {
       return toBuilder().setFilenameSuffix(filenameSuffix).build();
     }
 
     /**
-     * Configures the {@link FileBasedSink.FilenamePolicy} that will be used to name written files.
-     */
-    public Write withFilenamePolicy(FilenamePolicy filenamePolicy) {
-      return toBuilder().setFilenamePolicy(filenamePolicy).build();
-    }
-
-    /**
      * Configures the number of output shards produced overall (when using unwindowed writes) or
      * per-window (when using windowed writes).
      *
@@ -375,14 +471,13 @@ public class TextIO {
      *
      * @param numShards the number of shards to use, or 0 to let the system decide.
      */
-    public Write withNumShards(int numShards) {
+    public TypedWrite<T> withNumShards(int numShards) {
       checkArgument(numShards >= 0);
       return toBuilder().setNumShards(numShards).build();
     }
 
     /**
-     * Forces a single file as output and empty shard name template. This option is only compatible
-     * with unwindowed writes.
+     * Forces a single file as output and empty shard name template.
      *
      * <p>For unwindowed writes, constraining the number of shards is likely to reduce the
      * performance of a pipeline. Setting this value is not recommended unless you require a
@@ -390,7 +485,7 @@ public class TextIO {
      *
      * <p>This is equivalent to {@code .withNumShards(1).withShardNameTemplate("")}
      */
-    public Write withoutSharding() {
+    public TypedWrite<T> withoutSharding() {
       return withNumShards(1).withShardNameTemplate("");
     }
 
@@ -399,7 +494,7 @@ public class TextIO {
      *
      * <p>A {@code null} value will clear any previously configured header.
      */
-    public Write withHeader(@Nullable String header) {
+    public TypedWrite<T> withHeader(@Nullable String header) {
       return toBuilder().setHeader(header).build();
     }
 
@@ -408,48 +503,82 @@ public class TextIO {
      *
      * <p>A {@code null} value will clear any previously configured footer.
      */
-    public Write withFooter(@Nullable String footer) {
+    public TypedWrite<T> withFooter(@Nullable String footer) {
       return toBuilder().setFooter(footer).build();
     }
 
     /**
-     * Returns a transform for writing to text files like this one but that has the given
-     * {@link WritableByteChannelFactory} to be used by the {@link FileBasedSink} during output.
-     * The default is value is {@link FileBasedSink.CompressionType#UNCOMPRESSED}.
+     * Returns a transform for writing to text files like this one but that has the given {@link
+     * WritableByteChannelFactory} to be used by the {@link FileBasedSink} during output. The
+     * default is value is {@link FileBasedSink.CompressionType#UNCOMPRESSED}.
      *
      * <p>A {@code null} value will reset the value to the default value mentioned above.
      */
-    public Write withWritableByteChannelFactory(
+    public TypedWrite<T> withWritableByteChannelFactory(
         WritableByteChannelFactory writableByteChannelFactory) {
       return toBuilder().setWritableByteChannelFactory(writableByteChannelFactory).build();
     }
 
-    public Write withWindowedWrites() {
+    /**
+     * Preserves windowing of input elements and writes them to files based on the element's window.
+     *
+     * <p>If using {@link #to(FileBasedSink.FilenamePolicy)}. Filenames will be generated using
+     * {@link FilenamePolicy#windowedFilename}. See also {@link WriteFiles#withWindowedWrites()}.
+     */
+    public TypedWrite<T> withWindowedWrites() {
       return toBuilder().setWindowedWrites(true).build();
     }
 
+    private DynamicDestinations<T, ?> resolveDynamicDestinations() {
+      DynamicDestinations<T, ?> dynamicDestinations = getDynamicDestinations();
+      if (dynamicDestinations == null) {
+        FilenamePolicy usedFilenamePolicy = getFilenamePolicy();
+        if (usedFilenamePolicy == null) {
+          usedFilenamePolicy =
+              DefaultFilenamePolicy.fromStandardParameters(
+                  getFilenamePrefix(),
+                  getShardTemplate(),
+                  getFilenameSuffix(),
+                  getWindowedWrites());
+        }
+        dynamicDestinations = DynamicFileDestinations.constant(usedFilenamePolicy);
+      }
+      return dynamicDestinations;
+    }
+
     @Override
-    public PDone expand(PCollection<String> input) {
-      checkState(getFilenamePrefix() != null,
-          "Need to set the filename prefix of a TextIO.Write transform.");
+    public PDone expand(PCollection<T> input) {
+      checkState(
+          getFilenamePrefix() != null || getTempDirectory() != null,
+          "Need to set either the filename prefix or the tempDirectory of a TextIO.Write "
+              + "transform.");
       checkState(
-          (getFilenamePolicy() == null)
-              || (getShardTemplate() == null && getFilenameSuffix() == null),
-          "Cannot set a filename policy and also a filename template or suffix.");
-
-      FilenamePolicy usedFilenamePolicy = getFilenamePolicy();
-      if (usedFilenamePolicy == null) {
-        usedFilenamePolicy = DefaultFilenamePolicy.constructUsingStandardParameters(
-            getFilenamePrefix(), getShardTemplate(), getFilenameSuffix(), getWindowedWrites());
+          getFilenamePolicy() == null || getDynamicDestinations() == null,
+          "Cannot specify both a filename policy and dynamic destinations");
+      if (getFilenamePolicy() != null || getDynamicDestinations() != null) {
+        checkState(
+            getShardTemplate() == null && getFilenameSuffix() == null,
+            "shardTemplate and filenameSuffix should only be used with the default "
+                + "filename policy");
       }
-      WriteFiles<String> write =
+      return expandTyped(input, resolveDynamicDestinations());
+    }
+
+    public <DestinationT> PDone expandTyped(
+        PCollection<T> input, DynamicDestinations<T, DestinationT> dynamicDestinations) {
+      ValueProvider<ResourceId> tempDirectory = getTempDirectory();
+      if (tempDirectory == null) {
+        tempDirectory = getFilenamePrefix();
+      }
+      WriteFiles<T, DestinationT, String> write =
           WriteFiles.to(
-              new TextSink(
-                  getFilenamePrefix(),
-                  usedFilenamePolicy,
+              new TextSink<>(
+                  tempDirectory,
+                  dynamicDestinations,
                   getHeader(),
                   getFooter(),
-                  getWritableByteChannelFactory()));
+                  getWritableByteChannelFactory()),
+              getFormatFunction());
       if (getNumShards() > 0) {
         write = write.withNumShards(getNumShards());
       }
@@ -463,27 +592,26 @@ public class TextIO {
     public void populateDisplayData(DisplayData.Builder builder) {
       super.populateDisplayData(builder);
 
-      String prefixString = "";
-      if (getFilenamePrefix() != null) {
-        prefixString = getFilenamePrefix().isAccessible()
-            ? getFilenamePrefix().get().toString() : getFilenamePrefix().toString();
+      resolveDynamicDestinations().populateDisplayData(builder);
+      String tempDirectory = null;
+      if (getTempDirectory() != null) {
+        tempDirectory =
+            getTempDirectory().isAccessible()
+                ? getTempDirectory().get().toString()
+                : getTempDirectory().toString();
       }
       builder
-          .addIfNotNull(DisplayData.item("filePrefix", prefixString)
-            .withLabel("Output File Prefix"))
-          .addIfNotNull(DisplayData.item("fileSuffix", getFilenameSuffix())
-            .withLabel("Output File Suffix"))
-          .addIfNotNull(DisplayData.item("shardNameTemplate", getShardTemplate())
-            .withLabel("Output Shard Name Template"))
-          .addIfNotDefault(DisplayData.item("numShards", getNumShards())
-            .withLabel("Maximum Output Shards"), 0)
-          .addIfNotNull(DisplayData.item("fileHeader", getHeader())
-            .withLabel("File Header"))
-          .addIfNotNull(DisplayData.item("fileFooter", getFooter())
-              .withLabel("File Footer"))
-          .add(DisplayData
-              .item("writableByteChannelFactory", getWritableByteChannelFactory().toString())
-              .withLabel("Compression/Transformation Type"));
+          .addIfNotDefault(
+              DisplayData.item("numShards", getNumShards()).withLabel("Maximum Output Shards"), 0)
+          .addIfNotNull(
+              DisplayData.item("tempDirectory", tempDirectory)
+                  .withLabel("Directory for temporary files"))
+          .addIfNotNull(DisplayData.item("fileHeader", getHeader()).withLabel("File Header"))
+          .addIfNotNull(DisplayData.item("fileFooter", getFooter()).withLabel("File Footer"))
+          .add(
+              DisplayData.item(
+                      "writableByteChannelFactory", getWritableByteChannelFactory().toString())
+                  .withLabel("Compression/Transformation Type"));
     }
 
     @Override
@@ -493,6 +621,128 @@ public class TextIO {
   }
 
   /**
+   * This class is used as the default return value of {@link TextIO#write()}.
+   *
+   * <p>All methods in this class delegate to the appropriate method of {@link TextIO.TypedWrite}.
+   * This class exists for backwards compatibility, and will be removed in Beam 3.0.
+   */
+  public static class Write extends PTransform<PCollection<String>, PDone> {
+    @VisibleForTesting TypedWrite<String> inner;
+
+    Write() {
+      this(TextIO.writeCustomType(SerializableFunctions.<String>identity()));
+    }
+
+    Write(TypedWrite<String> inner) {
+      this.inner = inner;
+    }
+
+    /** See {@link TypedWrite#to(String)}. */
+    public Write to(String filenamePrefix) {
+      return new Write(inner.to(filenamePrefix));
+    }
+
+    /** See {@link TypedWrite#to(ResourceId)}. */
+    @Experimental(Kind.FILESYSTEM)
+    public Write to(ResourceId filenamePrefix) {
+      return new Write(inner.to(filenamePrefix));
+    }
+
+    /** See {@link TypedWrite#to(ValueProvider)}. */
+    public Write to(ValueProvider<String> outputPrefix) {
+      return new Write(inner.to(outputPrefix));
+    }
+
+    /** See {@link TypedWrite#toResource(ValueProvider)}. */
+    @Experimental(Kind.FILESYSTEM)
+    public Write toResource(ValueProvider<ResourceId> filenamePrefix) {
+      return new Write(inner.toResource(filenamePrefix));
+    }
+
+    /** See {@link TypedWrite#to(FilenamePolicy)}. */
+    @Experimental(Kind.FILESYSTEM)
+    public Write to(FilenamePolicy filenamePolicy) {
+      return new Write(inner.to(filenamePolicy));
+    }
+
+    /** See {@link TypedWrite#to(DynamicDestinations)}. */
+    @Experimental(Kind.FILESYSTEM)
+    public Write to(DynamicDestinations<String, ?> dynamicDestinations) {
+      return new Write(inner.to(dynamicDestinations));
+    }
+
+    /** See {@link TypedWrite#to(SerializableFunction, Params)}. */
+    @Experimental(Kind.FILESYSTEM)
+    public Write to(
+        SerializableFunction<String, Params> destinationFunction, Params emptyDestination) {
+      return new Write(inner.to(destinationFunction, emptyDestination));
+    }
+
+    /** See {@link TypedWrite#withTempDirectory(ValueProvider)}. */
+    @Experimental(Kind.FILESYSTEM)
+    public Write withTempDirectory(ValueProvider<ResourceId> tempDirectory) {
+      return new Write(inner.withTempDirectory(tempDirectory));
+    }
+
+    /** See {@link TypedWrite#withTempDirectory(ResourceId)}. */
+    @Experimental(Kind.FILESYSTEM)
+    public Write withTempDirectory(ResourceId tempDirectory) {
+      return new Write(inner.withTempDirectory(tempDirectory));
+    }
+
+    /** See {@link TypedWrite#withShardNameTemplate(String)}. */
+    public Write withShardNameTemplate(String shardTemplate) {
+      return new Write(inner.withShardNameTemplate(shardTemplate));
+    }
+
+    /** See {@link TypedWrite#withSuffix(String)}. */
+    public Write withSuffix(String filenameSuffix) {
+      return new Write(inner.withSuffix(filenameSuffix));
+    }
+
+    /** See {@link TypedWrite#withNumShards(int)}. */
+    public Write withNumShards(int numShards) {
+      return new Write(inner.withNumShards(numShards));
+    }
+
+    /** See {@link TypedWrite#withoutSharding()}. */
+    public Write withoutSharding() {
+      return new Write(inner.withoutSharding());
+    }
+
+    /** See {@link TypedWrite#withHeader(String)}. */
+    public Write withHeader(@Nullable String header) {
+      return new Write(inner.withHeader(header));
+    }
+
+    /** See {@link TypedWrite#withFooter(String)}. */
+    public Write withFooter(@Nullable String footer) {
+      return new Write(inner.withFooter(footer));
+    }
+
+    /** See {@link TypedWrite#withWritableByteChannelFactory(WritableByteChannelFactory)}. */
+    public Write withWritableByteChannelFactory(
+        WritableByteChannelFactory writableByteChannelFactory) {
+      return new Write(inner.withWritableByteChannelFactory(writableByteChannelFactory));
+    }
+
+    /** See {@link TypedWrite#withWindowedWrites}. */
+    public Write withWindowedWrites() {
+      return new Write(inner.withWindowedWrites());
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      inner.populateDisplayData(builder);
+    }
+
+    @Override
+    public PDone expand(PCollection<String> input) {
+      return inner.expand(input);
+    }
+  }
+
+  /**
    * Possible text file compression types.
    */
   public enum CompressionType {