You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2017/12/19 15:44:00 UTC

[jira] [Commented] (BEAM-2865) Implement FileIO.write()

    [ https://issues.apache.org/jira/browse/BEAM-2865?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16296986#comment-16296986 ] 

ASF GitHub Bot commented on BEAM-2865:
--------------------------------------

jkff closed pull request #3817: [BEAM-2865] Introduces FileIO.write() and uses it in AvroIO.
URL: https://github.com/apache/beam/pull/3817
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index eb13f3b2108..55024cca7da 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -23,15 +23,23 @@
 
 import com.google.auto.value.AutoValue;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.MoreObjects;
 import com.google.common.base.Supplier;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
 import java.util.Map;
 import javax.annotation.Nullable;
 import org.apache.avro.Schema;
 import org.apache.avro.file.CodecFactory;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.reflect.ReflectDatumWriter;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
 import org.apache.beam.sdk.coders.AvroCoder;
@@ -79,8 +87,8 @@
  * allows them in case the filepattern contains a glob wildcard character. Use {@link
  * Read#withEmptyMatchTreatment} to configure this behavior.
  *
- * <p>By default, the filepatterns are expanded only once. {@link Read#watchForNewFiles}
- * allows streaming of new files matching the filepattern(s).
+ * <p>By default, the filepatterns are expanded only once. {@link Read#watchForNewFiles} allows
+ * streaming of new files matching the filepattern(s).
  *
  * <h3>Reading records of a known schema</h3>
  *
@@ -144,6 +152,7 @@
  * }</pre>
  *
  * <h3>Streaming new files matching a filepattern</h3>
+ *
  * <pre>{@code
  * Pipeline p = ...;
  *
@@ -988,8 +997,11 @@ public ResourceId apply(String input) {
      * Use a {@link DynamicAvroDestinations} object to vend {@link FilenamePolicy} objects. These
      * objects can examine the input record when creating a {@link FilenamePolicy}. A directory for
      * temporary files must be specified using {@link #withTempDirectory}.
+     *
+     * @deprecated Use {@link FileIO#write()} or {@link FileIO#writeDynamic()} instead.
      */
     @Experimental(Kind.FILESYSTEM)
+    @Deprecated
     public <NewDestinationT> TypedWrite<UserT, NewDestinationT, OutputT> to(
         DynamicAvroDestinations<UserT, NewDestinationT, OutputT> dynamicDestinations) {
       return toBuilder()
@@ -1238,7 +1250,12 @@ public void populateDisplayData(DisplayData.Builder builder) {
           inner.to(filenamePolicy).withFormatFunction(SerializableFunctions.<T>identity()));
     }
 
-    /** See {@link TypedWrite#to(DynamicAvroDestinations)}. */
+    /**
+     * See {@link TypedWrite#to(DynamicAvroDestinations)}.
+     *
+     * @deprecated Use {@link FileIO#write()} or {@link FileIO#writeDynamic()} instead.
+     */
+    @Deprecated
     public Write<T> to(DynamicAvroDestinations<T, ?, T> dynamicDestinations) {
       return new Write<>(inner.to(dynamicDestinations).withFormatFunction(null));
     }
@@ -1309,7 +1326,7 @@ public void populateDisplayData(DisplayData.Builder builder) {
 
     @Override
     public PDone expand(PCollection<T> input) {
-      inner.expand(input);
+      input.apply(inner);
       return PDone.in(input.getPipeline());
     }
 
@@ -1333,6 +1350,118 @@ public void populateDisplayData(DisplayData.Builder builder) {
   }
   /////////////////////////////////////////////////////////////////////////////
 
+  /** Formats an element of a user type into a record with the given schema. */
+  public abstract static class RecordFormatter<ElementT> implements Serializable {
+    public abstract GenericRecord formatRecord(ElementT element, Schema schema);
+  }
+
+  /**
+   * A {@link Sink} for use with {@link FileIO#write} and {@link FileIO#writeDynamic}, writing
+   * elements of the given generated class, like {@link #write(Class)}.
+   */
+  public static <ElementT> Sink<ElementT> sink(final Class<ElementT> clazz) {
+    return new AutoValue_AvroIO_Sink.Builder<ElementT>()
+        .setJsonSchema(ReflectData.get().getSchema(clazz).toString())
+        .setMetadata(ImmutableMap.<String, Object>of())
+        .setCodec(TypedWrite.DEFAULT_SERIALIZABLE_CODEC)
+        .build();
+  }
+
+  /**
+   * A {@link Sink} for use with {@link FileIO#write} and {@link FileIO#writeDynamic}, writing
+   * elements by converting each one to a {@link GenericRecord} with a given (common) schema, like
+   * {@link #writeCustomTypeToGenericRecords()}.
+   */
+  public static <ElementT> Sink<ElementT> sinkViaGenericRecords(
+      Schema schema, RecordFormatter<ElementT> formatter) {
+    return new AutoValue_AvroIO_Sink.Builder<ElementT>()
+        .setRecordFormatter(formatter)
+        .setJsonSchema(schema.toString())
+        .setMetadata(ImmutableMap.<String, Object>of())
+        .setCodec(TypedWrite.DEFAULT_SERIALIZABLE_CODEC)
+        .build();
+  }
+
+  /** Implementation of {@link #sink} and {@link #sinkViaGenericRecords}. */
+  @AutoValue
+  public abstract static class Sink<ElementT> implements FileIO.Sink<ElementT> {
+    @Nullable abstract RecordFormatter<ElementT> getRecordFormatter();
+    @Nullable abstract String getJsonSchema();
+    abstract Map<String, Object> getMetadata();
+    abstract SerializableAvroCodecFactory getCodec();
+
+    abstract Builder<ElementT> toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder<ElementT> {
+      abstract Builder<ElementT> setRecordFormatter(RecordFormatter<ElementT> formatter);
+      abstract Builder<ElementT> setJsonSchema(String jsonSchema);
+      abstract Builder<ElementT> setMetadata(Map<String, Object> metadata);
+      abstract Builder<ElementT> setCodec(SerializableAvroCodecFactory codec);
+
+      abstract Sink<ElementT> build();
+    }
+
+    /** Specifies to put the given metadata into each generated file. By default, empty. */
+    public Sink<ElementT> withMetadata(Map<String, Object> metadata) {
+      return toBuilder().setMetadata(metadata).build();
+    }
+
+    /**
+     * Specifies to use the given {@link CodecFactory} for each generated file. By default, {@code
+     * CodecFactory.deflateCodec(6)}.
+     */
+    public Sink<ElementT> withCodec(CodecFactory codec) {
+      return toBuilder().setCodec(new SerializableAvroCodecFactory(codec)).build();
+    }
+
+    @Nullable private transient Schema schema;
+    @Nullable private transient DataFileWriter<ElementT> reflectWriter;
+    @Nullable private transient DataFileWriter<GenericRecord> genericWriter;
+
+    @Override
+    public void open(WritableByteChannel channel) throws IOException {
+      this.schema = new Schema.Parser().parse(getJsonSchema());
+      DataFileWriter<?> writer;
+      if (getRecordFormatter() == null) {
+        writer = reflectWriter = new DataFileWriter<>(new ReflectDatumWriter<ElementT>(schema));
+      } else {
+        writer =
+            genericWriter = new DataFileWriter<>(new GenericDatumWriter<GenericRecord>(schema));
+      }
+      writer.setCodec(getCodec().getCodec());
+      for (Map.Entry<String, Object> entry : getMetadata().entrySet()) {
+        Object v = entry.getValue();
+        if (v instanceof String) {
+          writer.setMeta(entry.getKey(), (String) v);
+        } else if (v instanceof Long) {
+          writer.setMeta(entry.getKey(), (Long) v);
+        } else if (v instanceof byte[]) {
+          writer.setMeta(entry.getKey(), (byte[]) v);
+        } else {
+          throw new IllegalStateException(
+              "Metadata value type must be one of String, Long, or byte[]. Found "
+                  + v.getClass().getSimpleName());
+        }
+      }
+      writer.create(schema, Channels.newOutputStream(channel));
+    }
+
+    @Override
+    public void write(ElementT element) throws IOException {
+      if (getRecordFormatter() == null) {
+        reflectWriter.append(element);
+      } else {
+        genericWriter.append(getRecordFormatter().formatRecord(element, schema));
+      }
+    }
+
+    @Override
+    public void flush() throws IOException {
+      MoreObjects.firstNonNull(reflectWriter, genericWriter).flush();
+    }
+  }
+
   /** Disallow construction of utility class. */
   private AvroIO() {}
 }
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java
index 38ed73379dc..bcad1437736 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java
@@ -68,7 +68,7 @@
    * write windowed files. In cases when user does specify shard template to be used then provided
    * template will be used for both windowed and non-windowed file names.
    */
-  private static final String DEFAULT_WINDOWED_SHARD_TEMPLATE =
+  public static final String DEFAULT_WINDOWED_SHARD_TEMPLATE =
       "W-P" + DEFAULT_UNWINDOWED_SHARD_TEMPLATE;
 
   /*
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index 2e5d387c63b..f036243c91f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -236,7 +236,7 @@ public static ResourceId convertToFileResourceIfPossible(String outputPrefix) {
       <SideInputT> SideInputT sideInput(PCollectionView<SideInputT> view);
     }
 
-    @Nullable private SideInputAccessor sideInputAccessor;
+    @Nullable private transient SideInputAccessor sideInputAccessor;
 
     static class SideInputAccessorViaProcessContext implements SideInputAccessor {
       private DoFn<?, ?>.ProcessContext processContext;
@@ -742,20 +742,21 @@ public void removeTemporaryFiles(Collection<ResourceId> filenames) throws IOExce
     final void moveToOutputFiles(
         List<KV<FileResult<DestinationT>, ResourceId>> resultsToFinalFilenames) throws IOException {
       int numFiles = resultsToFinalFilenames.size();
-      LOG.debug("Copying {} files.", numFiles);
-      List<ResourceId> srcFiles = new ArrayList<>();
-      List<ResourceId> dstFiles = new ArrayList<>();
-      for (KV<FileResult<DestinationT>, ResourceId> entry : resultsToFinalFilenames) {
-        srcFiles.add(entry.getKey().getTempFilename());
-        dstFiles.add(entry.getValue());
-        LOG.info(
-            "Will copy temporary file {} to final location {}",
-            entry.getKey().getTempFilename(),
-            entry.getValue());
-      }
-      // During a failure case, files may have been deleted in an earlier step. Thus
-      // we ignore missing files here.
-      FileSystems.copy(srcFiles, dstFiles, StandardMoveOptions.IGNORE_MISSING_FILES);
+
+        LOG.debug("Copying {} files.", numFiles);
+        List<ResourceId> srcFiles = new ArrayList<>();
+        List<ResourceId> dstFiles = new ArrayList<>();
+        for (KV<FileResult<DestinationT>, ResourceId> entry : resultsToFinalFilenames) {
+          srcFiles.add(entry.getKey().getTempFilename());
+          dstFiles.add(entry.getValue());
+          LOG.info(
+              "Will copy temporary file {} to final location {}",
+              entry.getKey(),
+              entry.getValue());
+        }
+        // During a failure case, files may have been deleted in an earlier step. Thus
+        // we ignore missing files here.
+        FileSystems.copy(srcFiles, dstFiles, StandardMoveOptions.IGNORE_MISSING_FILES);
       removeTemporaryFiles(srcFiles);
     }
 
@@ -995,17 +996,16 @@ public final void close() throws Exception {
         closeChannelAndThrow(channel, outputFile, e);
       }
 
-      checkState(
-          channel.isOpen(),
-          "Channel %s to %s should only be closed by its owner: %s",
-          channel,
-          outputFile);
-
-      LOG.debug("Closing channel to {}.", outputFile);
-      try {
-        channel.close();
-      } catch (Exception e) {
-        throw new IOException(String.format("Failed closing channel to %s", outputFile), e);
+      // It is valid for a subclass to either close the channel or not.
+      // They would typically close the channel e.g. if they are wrapping it in another channel
+      // and the wrapper needs to be closed.
+      if (channel.isOpen()) {
+        LOG.debug("Closing channel to {}.", outputFile);
+        try {
+          channel.close();
+        } catch (Exception e) {
+          throw new IOException(String.format("Failed closing channel to %s", outputFile), e);
+        }
       }
       LOG.info("Successfully wrote temporary file {}", outputFile);
     }
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java
index 4e7124af8a5..daf3f4a6834 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java
@@ -19,45 +19,284 @@
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
+import static org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions.RESOLVE_FILE;
+import static org.apache.beam.sdk.transforms.Contextful.fn;
 
 import com.google.auto.value.AutoValue;
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.Lists;
 import java.io.IOException;
 import java.io.Serializable;
 import java.nio.channels.Channels;
 import java.nio.channels.ReadableByteChannel;
 import java.nio.channels.SeekableByteChannel;
+import java.nio.channels.WritableByteChannel;
 import java.nio.charset.StandardCharsets;
+import java.text.DecimalFormat;
+import java.util.Collection;
+import java.util.List;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
+import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
 import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
 import org.apache.beam.sdk.transforms.Contextful;
+import org.apache.beam.sdk.transforms.Contextful.Fn;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.Requirements;
 import org.apache.beam.sdk.transforms.Reshuffle;
 import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.SerializableFunctions;
 import org.apache.beam.sdk.transforms.Values;
 import org.apache.beam.sdk.transforms.Watch;
 import org.apache.beam.sdk.transforms.Watch.Growth.PollFn;
 import org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.HasDisplayData;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.StreamUtils;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.TypeDescriptors;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Transforms for working with files. Currently includes matching of filepatterns via {@link #match}
- * and {@link #matchAll}, and reading matches via {@link #readMatches}.
+ * General-purpose transforms for working with files: listing files (matching), reading and writing.
+ *
+ * <h2>Matching filepatterns</h2>
+ *
+ * <p>{@link #match} and {@link #matchAll} match filepatterns (respectively either a single
+ * filepattern or a {@link PCollection} thereof) and return the files that match them as {@link
+ * PCollection PCollections} of {@link MatchResult.Metadata}. Configuration options for them are in
+ * {@link MatchConfiguration} and include features such as treatment of filepatterns that don't
+ * match anything and continuous incremental matching of filepatterns (watching for new files).
+ *
+ * <h3>Example: Watching a single filepattern for new files</h3>
+ *
+ * <p>This example matches a single filepattern repeatedly every 30 seconds, continuously returns
+ * new matched files as an unbounded {@code PCollection<Metadata>} and stops if no new files appear
+ * for 1 hour.
+ *
+ * <pre>{@code
+ * PCollection<Metadata> matches = p.apply(FileIO.match()
+ *     .filepattern("...")
+ *     .continuously(
+ *       Duration.standardSeconds(30), afterTimeSinceNewOutput(Duration.standardHours(1))));
+ * }</pre>
+ *
+ * <h3>Example: Matching a PCollection of filepatterns arriving from Kafka</h3>
+ *
+ * <p>This example reads filepatterns from Kafka and matches each one as it arrives, producing again
+ * an unbounded {@code PCollection<Metadata>}, and failing in case the filepattern doesn't match
+ * anything.
+ *
+ * <pre>{@code
+ * PCollection<String> filepatterns = p.apply(KafkaIO.read()...);
+ *
+ * PCollection<Metadata> matches = filepatterns.apply(FileIO.matchAll()
+ *     .withEmptyMatchTreatment(DISALLOW));
+ * }</pre>
+ *
+ * <h2>Reading files</h2>
+ *
+ * <p>{@link #readMatches} converts each result of {@link #match} or {@link #matchAll} to a {@link
+ * ReadableFile} that is convenient for reading a file's contents, optionally decompressing it.
+ *
+ * <h3>Example: Returning filenames and contents of compressed files matching a filepattern</h3>
+ *
+ * <p>This example matches a single filepattern and returns {@code KVs} of filenames and their
+ * contents as {@code String}, decompressing each file with GZIP.
+ *
+ * <pre>{@code
+ * PCollection<KV<String, String>> filesAndContents = p
+ *     .apply(FileIO.match().filepattern("hdfs://path/to/*.gz"))
+ *     // withCompression can be omitted - by default compression is detected from the filename.
+ *     .apply(FileIO.readMatches().withCompression(GZIP))
+ *     .apply(MapElements
+ *         // uses imports from TypeDescriptors
+ *         .into(KVs(strings(), strings()))
+ *         .via((ReadableFile f) -> KV.of(
+ *             f.getMetadata().resourceId().toString(), f.readFullyAsUTF8String())));
+ * }</pre>
+ *
+ * <h2>Writing files</h2>
+ *
+ * <p>{@link #write} and {@link #writeDynamic} write elements from a {@link PCollection} of a given
+ * type to files, using a given {@link Sink} to write a set of elements to each file. The collection
+ * can be bounded or unbounded - in either case, writing happens by default per window and pane, and
+ * the amount of data in each window and pane is finite, so a finite number of files ("shards") are
+ * written for each window and pane. There are several aspects to this process:
+ *
+ * <ul>
+ *   <li><b>How many shards are generated per pane:</b> This is controlled by <i>sharding</i>, using
+ *       {@link Write#withNumShards} or {@link Write#withSharding}. The default is runner-specific,
+ *       so the number of shards will vary based on runner behavior, though at least 1 shard will
+ *       always be produced for every non-empty pane. Note that setting a fixed number of shards can
+ *       hurt performance: it adds an additional {@link GroupByKey} to the pipeline. However, it is
+ *       required to set it when writing an unbounded {@link PCollection} due to <a
+ *       href="https://issues.apache.org/jira/browse/BEAM-1438">BEAM-1438</a> and similar behavior
+ *       in other runners.
+ *   <li><b>How the shards are named:</b> This is controlled by a {@link Write.FileNaming}:
+ *       filenames can depend on a variety of inputs, e.g. the window, the pane, total number of
+ *       shards, the current file's shard index, and compression. Controlling the file naming is
+ *       described in the section <i>File naming</i> below.
+ *   <li><b>Which elements go into which shard:</b> Elements within a pane get distributed into
+ *       different shards created for that pane arbitrarily, though {@link FileIO.Write} attempts to
+ *       make shards approximately evenly sized. For more control over which elements go into which
+ *       files, consider using <i>dynamic destinations</i> (see below).
+ *   <li><b>How a given set of elements is written to a shard:</b> This is controlled by the {@link
+ *       Sink}, e.g. {@link AvroIO#sink} will generate Avro files. The {@link Sink} controls the
+ *       format of a single file: how to open a file, how to write each element to it, and how to
+ *       close the file - but it does not control the set of files or which elements go where.
+ *       Elements are written to a shard in an arbitrary order. {@link FileIO.Write} can
+ *       additionally compress the generated files using {@link FileIO.Write#withCompression}.
+ *   <li><b>How all of the above can be element-dependent:</b> This is controlled by <i>dynamic
+ *       destinations</i>. It is possible to have different groups of elements use different
+ *       policies for naming files and for configuring the {@link Sink}. See "dynamic destinations"
+ *       below.
+ * </ul>
+ *
+ * <h3>File naming</h3>
+ *
+ * <p>The names of generated files are produced by a {@link Write.FileNaming}. The default naming
+ * strategy is to name files in the format: {@code
+ * $prefix-$start-$end-$pane-$shard-of-$numShards$suffix$compressionSuffix}, where:
+ *
+ * <ul>
+ *   <li>$prefix is set by {@link Write#withPrefix}, the default is "output".
+ *   <li>$start and $end are boundaries of the window of data being written, formatted in ISO 8601
+ *       format (YYYY-mm-ddTHH:MM:SSZZZ). The window is omitted in case this is the global window.
+ *   <li>$pane is the index of the pane within the window. The pane is omitted in case it is known
+ *       to be the only pane for this window.
+ *   <li>$shard is the index of the current shard being written, out of the $numShards total shards
+ *       written for the current pane. Both are formatted using 5 digits (or more if necessary
+ *       according to $numShards) and zero-padded.
+ *   <li>$suffix is set by {@link Write#withSuffix}, the default is empty.
+ *   <li>$compressionSuffix is based on the default extension for the chosen
+ *   {@link Write#withCompression compression type}.
+ * </ul>
+ *
+ * <p>For example: {@code data-2017-12-01T19:00:00Z-2017-12-01T20:00:00Z-2-00010-of-00050.txt.gz}
+ *
+ * <p>Alternatively, one can specify a custom naming strategy using {@link
+ * Write#withNaming(Write.FileNaming)}.
+ *
+ * <p>If {@link Write#to} is specified, then the filenames produced by the {@link Write.FileNaming}
+ * are resolved relative to that directory.
+ *
+ * <p>When using dynamic destinations via {@link #writeDynamic} (see below), specifying a custom
+ * naming strategy is required, using {@link Write#withNaming(SerializableFunction)} or {@link
+ * Write#withNaming(Contextful)}. In those, pass a function that creates a {@link Write.FileNaming}
+ * for the requested group ("destination"). You can either implement a custom {@link
+ * Write.FileNaming}, or use {@link Write#defaultNaming} to configure the default naming strategy
+ * with a prefix and suffix as per above.
+ *
+ * <h3>Dynamic destinations</h3>
+ *
+ * <p>If the elements in the input collection can be partitioned into groups that should be treated
+ * differently, {@link FileIO.Write} supports different treatment per group ("destination"). It can
+ * use different file naming strategies for different groups, and can differently configure the
+ * {@link Sink}, e.g. write different elements to Avro files in different directories with different
+ * schemas.
+ *
+ * <p>This feature is supported by {@link #writeDynamic}. Use {@link Write#by} to specify how to
+ * partition the elements into groups ("destinations"). Then elements will be grouped by
+ * destination, and {@link Write#withNaming(Contextful)} and {@link Write#via(Contextful)} will be
+ * applied separately within each group, i.e. different groups will be written using the file naming
+ * strategies returned by {@link Write#withNaming(Contextful)} and using sinks returned by {@link
+ * Write#via(Contextful)} for the respective destinations. Note that currently sharding can not be
+ * destination-dependent: every window/pane for every destination will use the same number of shards
+ * specified via {@link Write#withNumShards} or {@link Write#withSharding}.
+ *
+ * <h3>Writing custom types to sinks</h3>
+ *
+ * <p>Normally, when writing a collection of a custom type using a {@link Sink} that takes a
+ * different type (for example, writing a {@code PCollection<Event>} to a text-based {@code
+ * Sink<String>}), one can simply apply a {@code ParDo} or {@code MapElements} to convert the custom
+ * type to the sink's <i>output type</i>.
+ *
+ * <p>However, when using dynamic destinations, in many such cases the destination needs to be
+ * extract from the original type, so such a conversion is not possible. For example, one might
+ * write events of a custom class {@code Event} to a text sink, using the event's "type" as a
+ * destination. In that case, specify an <i>output function</i> in {@link Write#via(Contextful,
+ * Contextful)} or {@link Write#via(Contextful, Sink)}.
+ *
+ * <h3>Example: Writing CSV files</h3>
+ *
+ * <pre>{@code
+ * class CSVSink implements FileSink<List<String>> {
+ *   private String header;
+ *   private PrintWriter writer;
+ *
+ *   public CSVSink(List<String> colNames) {
+ *     this.header = Joiner.on(",").join(colNames);
+ *   }
+ *
+ *   public void open(WritableByteChannel channel) throws IOException {
+ *     writer = new PrintWriter(Channels.newOutputStream(channel));
+ *     writer.println(header);
+ *   }
+ *
+ *   public void write(List<String> element) throws IOException {
+ *     writer.println(Joiner.on(",").join(element));
+ *   }
+ *
+ *   public void finish() throws IOException {
+ *     writer.flush();
+ *   }
+ * }
+ *
+ * PCollection<BankTransaction> transactions = ...;
+ * // Convert transactions to strings before writing them to the CSV sink.
+ * transactions.apply(MapElements
+ *         .into(lists(strings()))
+ *         .via(tx -> Arrays.asList(tx.getUser(), tx.getAmount())))
+ *     .apply(FileIO.<List<String>>write()
+ *         .via(new CSVSink(Arrays.asList("user", "amount"))
+ *         .to(".../path/to/")
+ *         .withPrefix("transactions")
+ *         .withSuffix(".csv")
+ * }</pre>
+ *
+ * <h3>Example: Writing CSV files to different directories and with different headers</h3>
+ *
+ * <pre>{@code
+ * enum TransactionType {
+ *   DEPOSIT,
+ *   WITHDRAWAL,
+ *   TRANSFER,
+ *   ...
+ *
+ *   List<String> getFieldNames();
+ *   List<String> getAllFields(BankTransaction tx);
+ * }
+ *
+ * PCollection<BankTransaction> transactions = ...;
+ * transactions.apply(FileIO.<TransactionType, Transaction>writeDynamic()
+ *     .by(Transaction::getType)
+ *     .via(tx -> tx.getType().toFields(tx),  // Convert the data to be written to CSVSink
+ *          type -> new CSVSink(type.getFieldNames()))
+ *     .to(".../path/to/")
+ *     .withNaming(type -> defaultNaming(type + "-transactions", ".csv"));
+ * }</pre>
  */
 public class FileIO {
   private static final Logger LOG = LoggerFactory.getLogger(FileIO.class);
@@ -112,6 +351,27 @@ public static ReadMatches readMatches() {
         .build();
   }
 
+  /** Writes elements to files using a {@link Sink}. See class-level documentation. */
+  public static <InputT> Write<Void, InputT> write() {
+    return new AutoValue_FileIO_Write.Builder<Void, InputT>()
+        .setDynamic(false)
+        .setCompression(Compression.UNCOMPRESSED)
+        .setIgnoreWindowing(false)
+        .build();
+  }
+
+  /**
+   * Writes elements to files using a {@link Sink} and grouping the elements using "dynamic
+   * destinations". See class-level documentation.
+   */
+  public static <DestT, InputT> Write<DestT, InputT> writeDynamic() {
+    return new AutoValue_FileIO_Write.Builder<DestT, InputT>()
+        .setDynamic(true)
+        .setCompression(Compression.UNCOMPRESSED)
+        .setIgnoreWindowing(false)
+        .build();
+  }
+
   /** A utility class for accessing a potentially compressed file. */
   public static final class ReadableFile {
     private final MatchResult.Metadata metadata;
@@ -249,7 +509,7 @@ public void populateDisplayData(DisplayData.Builder builder) {
 
     /** Matches the given filepattern. */
     public Match filepattern(String filepattern) {
-      return this.filepattern(ValueProvider.StaticValueProvider.of(filepattern));
+      return this.filepattern(StaticValueProvider.of(filepattern));
     }
 
     /** Like {@link #filepattern(String)} but using a {@link ValueProvider}. */
@@ -468,4 +728,735 @@ public void process(ProcessContext c) {
       }
     }
   }
+
+  /**
+   * Specifies how to write elements to individual files in {@link FileIO#write} and {@link
+   * FileIO#writeDynamic}. A new instance of {@link Sink} is created for every file being written.
+   */
+  public interface Sink<ElementT> extends Serializable {
+    /**
+     * Initializes writing to the given channel. Will be invoked once on a given {@link Sink}
+     * instance.
+     */
+    void open(WritableByteChannel channel) throws IOException;
+
+    /** Appends a single element to the file. May be invoked zero or more times. */
+    void write(ElementT element) throws IOException;
+
+    /**
+     * Flushes the buffered state (if any) before the channel is closed. Does not need to close the
+     * channel. Will be invoked once.
+     */
+    void flush() throws IOException;
+  }
+
+  /** Implementation of {@link #write} and {@link #writeDynamic}. */
+  @AutoValue
+  @Experimental(Experimental.Kind.SOURCE_SINK)
+  public abstract static class Write<DestinationT, UserT>
+      extends PTransform<PCollection<UserT>, WriteFilesResult<DestinationT>> {
+    /** A policy for generating names for shard files. */
+    interface FileNaming extends Serializable {
+      /**
+       * Generates the filename. MUST use each argument and return different values for
+       * each combination of the arguments.
+       */
+      String getFilename(
+          BoundedWindow window,
+          PaneInfo pane,
+          int numShards,
+          int shardIndex,
+          Compression compression);
+    }
+
+    public static FileNaming defaultNaming(
+        final String prefix, final String suffix) {
+      return defaultNaming(StaticValueProvider.of(prefix), StaticValueProvider.of(suffix));
+    }
+
+    public static FileNaming defaultNaming(
+        final ValueProvider<String> prefix, final ValueProvider<String> suffix) {
+      return new FileNaming() {
+        @Override
+        public String getFilename(
+            BoundedWindow window,
+            PaneInfo pane,
+            int numShards,
+            int shardIndex,
+            Compression compression) {
+          checkArgument(window != null, "window can not be null");
+          checkArgument(pane != null, "pane can not be null");
+          checkArgument(compression != null, "compression can not be null");
+          StringBuilder res = new StringBuilder(prefix.get());
+          if (window != GlobalWindow.INSTANCE) {
+            if (res.length() > 0) {
+              res.append("-");
+            }
+            checkArgument(
+                window instanceof IntervalWindow,
+                "defaultNaming() supports only windows of type %s, "
+                    + "but got window %s of type %s",
+                IntervalWindow.class.getSimpleName(),
+                window,
+                window.getClass().getSimpleName());
+            IntervalWindow iw = (IntervalWindow) window;
+            res.append(iw.start().toString()).append("-").append(iw.end().toString());
+          }
+          boolean isOnlyFiring = pane.isFirst() && pane.isLast();
+          if (!isOnlyFiring) {
+            if (res.length() > 0) {
+              res.append("-");
+            }
+            res.append(pane.getIndex());
+          }
+          if (res.length() > 0) {
+            res.append("-");
+          }
+          String numShardsStr = String.valueOf(numShards);
+          // A trillion shards per window per pane ought to be enough for everybody.
+          DecimalFormat df =
+              new DecimalFormat("000000000000".substring(0, Math.max(5, numShardsStr.length())));
+          res.append(df.format(shardIndex)).append("-of-").append(df.format(numShards));
+          res.append(suffix.get());
+          res.append(compression.getSuggestedSuffix());
+          return res.toString();
+        }
+      };
+    }
+
+    public static FileNaming relativeFileNaming(
+        final ValueProvider<String> baseDirectory, final FileNaming innerNaming) {
+      return new FileNaming() {
+        @Override
+        public String getFilename(
+            BoundedWindow window,
+            PaneInfo pane,
+            int numShards,
+            int shardIndex,
+            Compression compression) {
+          return FileSystems.matchNewResource(baseDirectory.get(), true /* isDirectory */)
+              .resolve(
+                  innerNaming.getFilename(window, pane, numShards, shardIndex, compression),
+                  RESOLVE_FILE)
+              .toString();
+        }
+      };
+    }
+
+    abstract boolean getDynamic();
+
+    @Nullable
+    abstract Contextful<Fn<DestinationT, Sink<?>>> getSinkFn();
+
+    @Nullable
+    abstract Contextful<Fn<UserT, ?>> getOutputFn();
+
+    @Nullable
+    abstract Contextful<Fn<UserT, DestinationT>> getDestinationFn();
+
+    @Nullable
+    abstract ValueProvider<String> getOutputDirectory();
+
+    @Nullable
+    abstract ValueProvider<String> getFilenamePrefix();
+
+    @Nullable
+    abstract ValueProvider<String> getFilenameSuffix();
+
+    @Nullable
+    abstract FileNaming getConstantFileNaming();
+
+    @Nullable
+    abstract Contextful<Fn<DestinationT, FileNaming>> getFileNamingFn();
+
+    @Nullable
+    abstract DestinationT getEmptyWindowDestination();
+
+    @Nullable
+    abstract Coder<DestinationT> getDestinationCoder();
+
+    @Nullable
+    abstract ValueProvider<String> getTempDirectory();
+
+    abstract Compression getCompression();
+
+    @Nullable
+    abstract ValueProvider<Integer> getNumShards();
+
+    @Nullable
+    abstract PTransform<PCollection<UserT>, PCollectionView<Integer>> getSharding();
+
+    abstract boolean getIgnoreWindowing();
+
+    abstract Builder<DestinationT, UserT> toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder<DestinationT, UserT> {
+      abstract Builder<DestinationT, UserT> setDynamic(boolean dynamic);
+
+      abstract Builder<DestinationT, UserT> setSinkFn(
+          Contextful<Fn<DestinationT, Sink<?>>> sink);
+
+      abstract Builder<DestinationT, UserT> setOutputFn(
+          Contextful<Fn<UserT, ?>> outputFn);
+
+      abstract Builder<DestinationT, UserT> setDestinationFn(
+          Contextful<Fn<UserT, DestinationT>> destinationFn);
+
+      abstract Builder<DestinationT, UserT> setOutputDirectory(
+          ValueProvider<String> outputDirectory);
+
+      abstract Builder<DestinationT, UserT> setFilenamePrefix(ValueProvider<String> filenamePrefix);
+
+      abstract Builder<DestinationT, UserT> setFilenameSuffix(ValueProvider<String> filenameSuffix);
+
+      abstract Builder<DestinationT, UserT> setConstantFileNaming(FileNaming constantFileNaming);
+
+      abstract Builder<DestinationT, UserT> setFileNamingFn(
+          Contextful<Fn<DestinationT, FileNaming>> namingFn);
+
+      abstract Builder<DestinationT, UserT> setEmptyWindowDestination(
+          DestinationT emptyWindowDestination);
+
+      abstract Builder<DestinationT, UserT> setDestinationCoder(
+          Coder<DestinationT> destinationCoder);
+
+      abstract Builder<DestinationT, UserT> setTempDirectory(
+          ValueProvider<String> tempDirectoryProvider);
+
+      abstract Builder<DestinationT, UserT> setCompression(Compression compression);
+
+      abstract Builder<DestinationT, UserT> setNumShards(
+          @Nullable ValueProvider<Integer> numShards);
+
+      abstract Builder<DestinationT, UserT> setSharding(
+          PTransform<PCollection<UserT>, PCollectionView<Integer>> sharding);
+
+      abstract Builder<DestinationT, UserT> setIgnoreWindowing(boolean ignoreWindowing);
+
+      abstract Write<DestinationT, UserT> build();
+    }
+
+    /** Specifies how to partition elements into groups ("destinations"). */
+    public Write<DestinationT, UserT> by(
+        SerializableFunction<UserT, DestinationT> destinationFn) {
+      checkArgument(destinationFn != null, "destinationFn can not be null");
+      return by(fn(destinationFn));
+    }
+
+    /** Like {@link #by}, but with access to context such as side inputs. */
+    public Write<DestinationT, UserT> by(
+        Contextful<Fn<UserT, DestinationT>> destinationFn) {
+      checkArgument(destinationFn != null, "destinationFn can not be null");
+      return toBuilder().setDestinationFn(destinationFn).build();
+    }
+
+    /**
+     * Specifies how to create a {@link Sink} for a particular destination and how to map the
+     * element type to the sink's output type. The sink function must create a new {@link Sink}
+     * instance every time it is called.
+     */
+    public <OutputT> Write<DestinationT, UserT> via(
+        Contextful<Fn<UserT, OutputT>> outputFn,
+        Contextful<Fn<DestinationT, Sink<OutputT>>> sinkFn) {
+      checkArgument(sinkFn != null, "sinkFn can not be null");
+      checkArgument(outputFn != null, "outputFn can not be null");
+      return toBuilder()
+        .setSinkFn((Contextful) sinkFn)
+        .setOutputFn(outputFn).build();
+    }
+
+    /**
+     * Like {@link #via(Contextful, Contextful)}, but uses the same sink for all
+     * destinations.
+     */
+    public <OutputT> Write<DestinationT, UserT> via(
+        Contextful<Fn<UserT, OutputT>> outputFn, final Sink<OutputT> sink) {
+      checkArgument(sink != null, "sink can not be null");
+      checkArgument(outputFn != null, "outputFn can not be null");
+      return via(
+          outputFn,
+          fn(SerializableFunctions.<DestinationT, Sink<OutputT>>clonesOf(sink)));
+    }
+
+    /**
+     * Like {@link #via(Contextful, Contextful)}, but the output type of the sink is the same as the
+     * type of the input collection. The sink function must create a new {@link Sink} instance every
+     * time it is called.
+     */
+    public Write<DestinationT, UserT> via(Contextful<Fn<DestinationT, Sink<UserT>>> sinkFn) {
+      checkArgument(sinkFn != null, "sinkFn can not be null");
+      return toBuilder()
+          .setSinkFn((Contextful) sinkFn)
+          .setOutputFn(fn(SerializableFunctions.<UserT>identity()))
+          .build();
+    }
+
+    /**
+     * Like {@link #via(Contextful)}, but uses the same {@link Sink} for all destinations.
+     */
+    public Write<DestinationT, UserT> via(Sink<UserT> sink) {
+      checkArgument(sink != null, "sink can not be null");
+      return via(fn(SerializableFunctions.<DestinationT, Sink<UserT>>clonesOf(sink)));
+    }
+
+    /**
+     * Specifies a common directory for all generated files. A temporary generated sub-directory of
+     * this directory will be used as the temp directory, unless overridden by {@link
+     * #withTempDirectory}.
+     */
+    public Write<DestinationT, UserT> to(String directory) {
+      checkArgument(directory != null, "directory can not be null");
+      return to(StaticValueProvider.of(directory));
+    }
+
+    /** Like {@link #to(String)} but with a {@link ValueProvider}. */
+    public Write<DestinationT, UserT> to(ValueProvider<String> directory) {
+      checkArgument(directory != null, "directory can not be null");
+      return toBuilder().setOutputDirectory(directory).build();
+    }
+
+    /**
+     * Specifies a common prefix to use for all generated filenames, if using the default file
+     * naming. Incompatible with {@link #withNaming}.
+     */
+    public Write<DestinationT, UserT> withPrefix(String prefix) {
+      checkArgument(prefix != null, "prefix can not be null");
+      return withPrefix(StaticValueProvider.of(prefix));
+    }
+
+    /** Like {@link #withPrefix(String)} but with a {@link ValueProvider}. */
+    public Write<DestinationT, UserT> withPrefix(ValueProvider<String> prefix) {
+      checkArgument(prefix != null, "prefix can not be null");
+      return toBuilder().setFilenamePrefix(prefix).build();
+    }
+
+    /**
+     * Specifies a common suffix to use for all generated filenames, if using the default file
+     * naming. Incompatible with {@link #withNaming}.
+     */
+    public Write<DestinationT, UserT> withSuffix(String suffix) {
+      checkArgument(suffix != null, "suffix can not be null");
+      return withSuffix(StaticValueProvider.of(suffix));
+    }
+
+    /** Like {@link #withSuffix(String)} but with a {@link ValueProvider}. */
+    public Write<DestinationT, UserT> withSuffix(ValueProvider<String> suffix) {
+      checkArgument(suffix != null, "suffix can not be null");
+      return toBuilder().setFilenameSuffix(suffix).build();
+    }
+
+    /**
+     * Specifies a custom strategy for generating filenames. All generated filenames will be
+     * resolved relative to the directory specified in {@link #to}, if any.
+     *
+     * <p>Incompatible with {@link #withSuffix}.
+     *
+     * <p>This can only be used in combination with {@link #write()} but not {@link
+     * #writeDynamic()}.
+     */
+    public Write<DestinationT, UserT> withNaming(FileNaming naming) {
+      checkArgument(naming != null, "naming can not be null");
+      return toBuilder().setConstantFileNaming(naming).build();
+    }
+
+    /**
+     * Specifies a custom strategy for generating filenames depending on the destination, similar to
+     * {@link #withNaming(FileNaming)}.
+     *
+     * <p>This can only be used in combination with {@link #writeDynamic()} but not {@link
+     * #write()}.
+     */
+    public Write<DestinationT, UserT> withNaming(
+        SerializableFunction<DestinationT, FileNaming> namingFn) {
+      checkArgument(namingFn != null, "namingFn can not be null");
+      return withNaming(Contextful.fn(namingFn));
+    }
+
+    /**
+     * Like {@link #withNaming(SerializableFunction)} but allows accessing context, such as side
+     * inputs, from the function.
+     */
+    public Write<DestinationT, UserT> withNaming(
+        Contextful<Fn<DestinationT, FileNaming>> namingFn) {
+      checkArgument(namingFn != null, "namingFn can not be null");
+      return toBuilder().setFileNamingFn(namingFn).build();
+    }
+
+    /** Specifies a directory into which all temporary files will be placed. */
+    public Write<DestinationT, UserT> withTempDirectory(
+        String tempDirectory) {
+      checkArgument(tempDirectory != null, "tempDirectory can not be null");
+      return withTempDirectory(StaticValueProvider.of(tempDirectory));
+    }
+
+    /** Like {@link #withTempDirectory(String)}. */
+    public Write<DestinationT, UserT> withTempDirectory(
+        ValueProvider<String> tempDirectory) {
+      checkArgument(tempDirectory != null, "tempDirectory can not be null");
+      return toBuilder().setTempDirectory(tempDirectory).build();
+    }
+
+    /**
+     * Specifies to compress all generated shard files using the given {@link Compression} and, by
+     * default, append the respective extension to the filename.
+     */
+    public Write<DestinationT, UserT> withCompression(Compression compression) {
+      checkArgument(compression != null, "compression can not be null");
+      checkArgument(
+          compression != Compression.AUTO, "AUTO compression is not supported for writing");
+      return toBuilder().setCompression(compression).build();
+    }
+
+    /**
+     * If {@link #withIgnoreWindowing()} is specified, specifies a destination to be used in case
+     * the collection is empty, to generate the (only, empty) output file.
+     */
+    public Write<DestinationT, UserT> withEmptyGlobalWindowDestination(
+        DestinationT emptyWindowDestination) {
+      return toBuilder().setEmptyWindowDestination(emptyWindowDestination).build();
+    }
+
+    /**
+     * Specifies a {@link Coder} for the destination type, if it can not be inferred from {@link
+     * #by}.
+     */
+    public Write<DestinationT, UserT> withDestinationCoder(Coder<DestinationT> destinationCoder) {
+      checkArgument(destinationCoder != null, "destinationCoder can not be null");
+      return toBuilder().setDestinationCoder(destinationCoder).build();
+    }
+
+    /**
+     * Specifies to use a given fixed number of shards per window. 0 means runner-determined
+     * sharding. Specifying a non-zero value may hurt performance, because it will limit the
+     * parallelism of writing and will introduce an extra {@link GroupByKey} operation.
+     */
+    public Write<DestinationT, UserT> withNumShards(int numShards) {
+      checkArgument(numShards >= 0, "numShards must be non-negative, but was: %s", numShards);
+      if (numShards == 0) {
+        return withNumShards(null);
+      }
+      return withNumShards(StaticValueProvider.of(numShards));
+    }
+
+    /**
+     * Like {@link #withNumShards(int)}. Specifying {@code null} means runner-determined sharding.
+     */
+    public Write<DestinationT, UserT> withNumShards(@Nullable ValueProvider<Integer> numShards) {
+      return toBuilder().setNumShards(numShards).build();
+    }
+
+    /**
+     * Specifies a {@link PTransform} to use for computing the desired number of shards in each
+     * window.
+     */
+    public Write<DestinationT, UserT> withSharding(
+        PTransform<PCollection<UserT>, PCollectionView<Integer>> sharding) {
+      checkArgument(sharding != null, "sharding can not be null");
+      return toBuilder().setSharding(sharding).build();
+    }
+
+    /**
+     * Specifies to ignore windowing information in the input, and instead rewindow it to global
+     * window with the default trigger.
+     *
+     * @deprecated Avoid usage of this method: its effects are complex and it will be removed in
+     *     future versions of Beam. Right now it exists for compatibility with {@link WriteFiles}.
+     */
+    @Deprecated
+    public Write<DestinationT, UserT> withIgnoreWindowing() {
+      return toBuilder().setIgnoreWindowing(true).build();
+    }
+
+    @Override
+    public WriteFilesResult<DestinationT> expand(PCollection<UserT> input) {
+      Write.Builder<DestinationT, UserT> resolvedSpec = new AutoValue_FileIO_Write.Builder<>();
+
+      resolvedSpec.setDynamic(getDynamic());
+
+      checkArgument(getSinkFn() != null, ".via() is required");
+      resolvedSpec.setSinkFn(getSinkFn());
+
+      checkArgument(getOutputFn() != null, "outputFn should have been set by .via()");
+      resolvedSpec.setOutputFn(getOutputFn());
+
+      // Resolve destinationFn
+      if (getDynamic()) {
+        checkArgument(getDestinationFn() != null, "when using writeDynamic(), .by() is required");
+        resolvedSpec.setDestinationFn(getDestinationFn());
+        resolvedSpec.setDestinationCoder(resolveDestinationCoder(input));
+      } else {
+        checkArgument(getDestinationFn() == null, ".by() requires writeDynamic()");
+        checkArgument(
+            getDestinationCoder() == null, ".withDestinationCoder() requires writeDynamic()");
+        resolvedSpec.setDestinationFn(
+            fn(SerializableFunctions.<UserT, DestinationT>constant(null)));
+        resolvedSpec.setDestinationCoder((Coder) VoidCoder.of());
+      }
+
+      // Resolve fileNamingFn
+      Contextful<Fn<DestinationT, FileNaming>> fileNamingFn;
+      if (getDynamic()) {
+        checkArgument(
+            getConstantFileNaming() == null,
+            "when using writeDynamic(), must use versions of .withNaming() "
+                + "that take functions from DestinationT");
+        checkArgument(getFilenamePrefix() == null, ".withPrefix() requires write()");
+        checkArgument(getFilenameSuffix() == null, ".withSuffix() requires write()");
+        checkArgument(
+            getFileNamingFn() != null,
+            "when using writeDynamic(), must specify "
+                + ".withNaming() taking a function form DestinationT");
+        fileNamingFn =
+            Contextful.fn(
+                new Fn<DestinationT, FileNaming>() {
+                  @Override
+                  public FileNaming apply(DestinationT element, Context c) throws Exception {
+                    FileNaming naming = getFileNamingFn().getClosure().apply(element, c);
+                    return getOutputDirectory() == null
+                        ? naming
+                        : relativeFileNaming(getOutputDirectory(), naming);
+                  }
+                },
+                getFileNamingFn().getRequirements());
+      } else {
+        checkArgument(getFileNamingFn() == null,
+            ".withNaming() taking a function from DestinationT requires writeDynamic()");
+        FileNaming constantFileNaming;
+        if (getConstantFileNaming() == null) {
+          constantFileNaming = defaultNaming(
+              MoreObjects.firstNonNull(
+                  getFilenamePrefix(), StaticValueProvider.of("output")),
+              MoreObjects.firstNonNull(getFilenameSuffix(), StaticValueProvider.of("")));
+          if (getOutputDirectory() != null) {
+            constantFileNaming = relativeFileNaming(getOutputDirectory(), constantFileNaming);
+          }
+        } else {
+          checkArgument(
+              getFilenamePrefix() == null, ".to(FileNaming) is incompatible with .withSuffix()");
+          checkArgument(
+              getFilenameSuffix() == null, ".to(FileNaming) is incompatible with .withPrefix()");
+          constantFileNaming = getConstantFileNaming();
+        }
+        fileNamingFn =
+            fn(SerializableFunctions.<DestinationT, FileNaming>constant(constantFileNaming));
+      }
+
+      resolvedSpec.setFileNamingFn(fileNamingFn);
+      resolvedSpec.setEmptyWindowDestination(getEmptyWindowDestination());
+      if (getTempDirectory() == null) {
+        checkArgument(
+            getOutputDirectory() != null, "must specify either .withTempDirectory() or .to()");
+        resolvedSpec.setTempDirectory(getOutputDirectory());
+      } else {
+        resolvedSpec.setTempDirectory(getTempDirectory());
+      }
+
+      resolvedSpec.setCompression(getCompression());
+      resolvedSpec.setNumShards(getNumShards());
+      resolvedSpec.setSharding(getSharding());
+      resolvedSpec.setIgnoreWindowing(getIgnoreWindowing());
+
+      Write<DestinationT, UserT> resolved = resolvedSpec.build();
+      WriteFiles<UserT, DestinationT, ?> writeFiles =
+          WriteFiles.to(new ViaFileBasedSink<>(resolved))
+              .withSideInputs(Lists.newArrayList(resolved.getAllSideInputs()));
+      if (getNumShards() != null) {
+        writeFiles = writeFiles.withNumShards(getNumShards());
+      } else if (getSharding() != null) {
+        writeFiles = writeFiles.withSharding(getSharding());
+      } else {
+        writeFiles = writeFiles.withRunnerDeterminedSharding();
+      }
+      if (!getIgnoreWindowing()) {
+        writeFiles = writeFiles.withWindowedWrites();
+      }
+      return input.apply(writeFiles);
+    }
+
+    private Coder<DestinationT> resolveDestinationCoder(PCollection<UserT> input) {
+      Coder<DestinationT> destinationCoder = getDestinationCoder();
+      if (destinationCoder == null) {
+        TypeDescriptor<DestinationT> destinationT =
+            TypeDescriptors.outputOf(getDestinationFn().getClosure());
+        try {
+          destinationCoder =
+              input
+                  .getPipeline()
+                  .getCoderRegistry()
+                  .getCoder(destinationT);
+        } catch (CannotProvideCoderException e) {
+          throw new IllegalArgumentException(
+              "Unable to infer a coder for destination type (inferred from .by() as \""
+                  + destinationT
+                  + "\") - specify it explicitly using .withDestinationCoder()");
+        }
+      }
+      return destinationCoder;
+    }
+
+    private Collection<PCollectionView<?>> getAllSideInputs() {
+      return Requirements.union(
+              getDestinationFn(), getOutputFn(), getSinkFn(), getFileNamingFn())
+          .getSideInputs();
+    }
+
+    private static class ViaFileBasedSink<UserT, DestinationT, OutputT>
+        extends FileBasedSink<UserT, DestinationT, OutputT> {
+      private final Write<DestinationT, UserT> spec;
+
+      private ViaFileBasedSink(
+          Write<DestinationT, UserT> spec) {
+        super(
+            ValueProvider.NestedValueProvider.of(
+                spec.getTempDirectory(),
+                new SerializableFunction<String, ResourceId>() {
+                  @Override
+                  public ResourceId apply(String input) {
+                    return FileSystems.matchNewResource(input, true /* isDirectory */);
+                  }
+                }),
+            new DynamicDestinationsAdapter<UserT, DestinationT, OutputT>(spec),
+            spec.getCompression());
+        this.spec = spec;
+      }
+
+      @Override
+      public WriteOperation<DestinationT, OutputT> createWriteOperation() {
+        return new WriteOperation<DestinationT, OutputT>(this) {
+          @Override
+          public Writer<DestinationT, OutputT> createWriter() throws Exception {
+            return new Writer<DestinationT, OutputT>(this, "") {
+              @Nullable private Sink<OutputT> sink;
+
+              @Override
+              protected void prepareWrite(WritableByteChannel channel) throws Exception {
+                Fn<DestinationT, Sink<OutputT>> sinkFn = (Fn) spec.getSinkFn().getClosure();
+                sink =
+                    sinkFn.apply(
+                        getDestination(),
+                        new Fn.Context() {
+                          @Override
+                          public <T> T sideInput(PCollectionView<T> view) {
+                            return getWriteOperation()
+                                .getSink()
+                                .getDynamicDestinations()
+                                .sideInput(view);
+                          }
+                        });
+                sink.open(channel);
+              }
+
+              @Override
+              public void write(OutputT value) throws Exception {
+                sink.write(value);
+              }
+
+              @Override
+              protected void finishWrite() throws Exception {
+                sink.flush();
+              }
+            };
+          }
+        };
+      }
+
+      private static class DynamicDestinationsAdapter<UserT, DestinationT, OutputT>
+          extends DynamicDestinations<UserT, DestinationT, OutputT> {
+        private final Write<DestinationT, UserT> spec;
+        @Nullable private transient Fn.Context context;
+
+        private DynamicDestinationsAdapter(Write<DestinationT, UserT> spec) {
+          this.spec = spec;
+        }
+
+        private Fn.Context getContext() {
+          if (context == null) {
+            context = new Fn.Context() {
+              @Override
+              public <T> T sideInput(PCollectionView<T> view) {
+                return DynamicDestinationsAdapter.this.sideInput(view);
+              }
+            };
+          }
+          return context;
+        }
+
+        @Override
+        public OutputT formatRecord(UserT record) {
+          try {
+            return ((Fn<UserT, OutputT>) spec.getOutputFn().getClosure())
+                .apply(record, getContext());
+          } catch (Exception e) {
+            throw new RuntimeException(e);
+          }
+        }
+
+        @Override
+        public DestinationT getDestination(UserT element) {
+          try {
+            return spec.getDestinationFn().getClosure().apply(element, getContext());
+          } catch (Exception e) {
+            throw new RuntimeException(e);
+          }
+        }
+
+        @Override
+        public DestinationT getDefaultDestination() {
+          return spec.getEmptyWindowDestination();
+        }
+
+        @Override
+        public FilenamePolicy getFilenamePolicy(final DestinationT destination) {
+          final FileNaming namingFn;
+          try {
+            namingFn = spec.getFileNamingFn().getClosure().apply(destination, getContext());
+          } catch (Exception e) {
+            throw new RuntimeException(e);
+          }
+          return new FilenamePolicy() {
+            @Override
+            public ResourceId windowedFilename(
+                int shardNumber,
+                int numShards,
+                BoundedWindow window,
+                PaneInfo paneInfo,
+                OutputFileHints outputFileHints) {
+              // We ignore outputFileHints because it will always be the same as
+              // spec.getCompression() because we control the FileBasedSink.
+              return FileSystems.matchNewResource(
+                  namingFn.getFilename(
+                      window, paneInfo, numShards, shardNumber, spec.getCompression()),
+                  false /* isDirectory */);
+            }
+
+            @Nullable
+            @Override
+            public ResourceId unwindowedFilename(
+                int shardNumber, int numShards, OutputFileHints outputFileHints) {
+              return FileSystems.matchNewResource(
+                  namingFn.getFilename(
+                      GlobalWindow.INSTANCE,
+                      PaneInfo.NO_FIRING,
+                      numShards,
+                      shardNumber,
+                      spec.getCompression()),
+                  false /* isDirectory */);
+            }
+          };
+        }
+
+        @Override
+        public List<PCollectionView<?>> getSideInputs() {
+          return Lists.newArrayList(spec.getAllSideInputs());
+        }
+
+        @Nullable
+        @Override
+        public Coder<DestinationT> getDestinationCoder() {
+          return spec.getDestinationCoder();
+        }
+      }
+    }
+  }
 }
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index 865875de4b0..e16a43d35f6 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -27,8 +27,11 @@
 import com.google.common.base.Predicates;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
-
+import java.io.IOException;
+import java.io.PrintWriter;
 import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
 import java.util.Arrays;
 import java.util.List;
 import javax.annotation.Nullable;
@@ -147,43 +150,25 @@
  * <p>If you want better control over how filenames are generated than the default policy allows, a
  * custom {@link FilenamePolicy} can also be set using {@link TextIO.Write#to(FilenamePolicy)}.
  *
- * <h3>Writing windowed or unbounded data</h3>
- *
- * <p>By default, all input is put into the global window before writing. If per-window writes are
- * desired - for example, when using a streaming runner - {@link TextIO.Write#withWindowedWrites()}
- * will cause windowing and triggering to be preserved. When producing windowed writes with a
- * streaming runner that supports triggers, the number of output shards must be set explicitly using
- * {@link TextIO.Write#withNumShards(int)}; some runners may set this for you to a runner-chosen
- * value, so you may need not set it yourself. If setting an explicit template using {@link
- * TextIO.Write#withShardNameTemplate(String)}, make sure that the template contains placeholders
- * for the window and the pane; W is expanded into the window text, and P into the pane; the default
- * template will include both the window and the pane in the filename.
+ * <h3>Advanced features</h3>
  *
- * <h3>Writing data to multiple destinations</h3>
+ * <p>{@link TextIO} supports all features of {@link FileIO#write} and {@link FileIO#writeDynamic},
+ * such as writing windowed/unbounded data, writing data to multiple destinations, and so on, by
+ * providing a {@link Sink} via {@link #sink()}.
  *
- * <p>TextIO also supports dynamic, value-dependent file destinations. The most general form of this
- * is done via {@link TextIO.Write#to(DynamicDestinations)}. A {@link DynamicDestinations} class
- * allows you to convert any input value into a custom destination object, and map that destination
- * object to a {@link FilenamePolicy}. This allows using different filename policies (or more
- * commonly, differently-configured instances of the same policy) based on the input record. Often
- * this is used in conjunction with {@link TextIO#writeCustomType}, which allows your {@link
- * DynamicDestinations} object to examine the input type and takes a format function to convert that
- * type to a string for writing.
- *
- * <p>A convenience shortcut is provided for the case where the default naming policy is used, but
- * different configurations of this policy are wanted based on the input record. Default naming
- * policies can be configured using the {@link DefaultFilenamePolicy.Params} object.
+ * <p>For example, to write events of different type to different filenames:
  *
  * <pre>{@code
- * PCollection<UserEvent>> lines = ...;
- * lines.apply(TextIO.<UserEvent>writeCustomType(new FormatEvent())
- *      .to(new SerializableFunction<UserEvent, Params>() {
- *         public String apply(UserEvent value) {
- *           return new Params().withBaseFilename(baseDirectory + "/" + value.country());
- *         }
- *       }),
- *       new Params().withBaseFilename(baseDirectory + "/empty");
+ *   PCollection<Event> events = ...;
+ *   events.apply(FileIO.<EventType, Event>writeDynamic()
+ *         .by(Event::getType)
+ *         .via(TextIO.sink(), Event::toString)
+ *         .to(type -> nameFilesUsingWindowPaneAndShard(".../events/" + type + "/data", ".txt")));
  * }</pre>
+ *
+ * <p>For backwards compatibility, {@link TextIO} also supports the legacy
+ * {@link DynamicDestinations} interface for advanced features via {@link
+ * Write#to(DynamicDestinations)}.
  */
 public class TextIO {
   /**
@@ -722,7 +707,11 @@ public ResourceId apply(String input) {
      * Use a {@link DynamicDestinations} object to vend {@link FilenamePolicy} objects. These
      * objects can examine the input record when creating a {@link FilenamePolicy}. A directory for
      * temporary files must be specified using {@link #withTempDirectory}.
+     *
+     * @deprecated Use {@link FileIO#write()} or {@link FileIO#writeDynamic()} with {@link #sink()}
+     *     instead.
      */
+    @Deprecated
     public <NewDestinationT> TypedWrite<UserT, NewDestinationT> to(
         DynamicDestinations<UserT, DestinationT, String> dynamicDestinations) {
       return (TypedWrite) toBuilder().setDynamicDestinations(dynamicDestinations).build();
@@ -734,7 +723,11 @@ public ResourceId apply(String input) {
      * records should be written (base filename, file suffix, and shard template). The
      * emptyDestination parameter specified where empty files should be written for when the written
      * {@link PCollection} is empty.
+     *
+     * @deprecated Use {@link FileIO#write()} or {@link FileIO#writeDynamic()} with {@link #sink()}
+     *     instead.
      */
+    @Deprecated
     public TypedWrite<UserT, Params> to(
         SerializableFunction<UserT, Params> destinationFunction, Params emptyDestination) {
       return (TypedWrite) toBuilder()
@@ -753,7 +746,11 @@ public ResourceId apply(String input) {
      * Specifies a format function to convert {@link UserT} to the output type. If {@link
      * #to(DynamicDestinations)} is used, {@link DynamicDestinations#formatRecord(Object)} must be
      * used instead.
+     *
+     * @deprecated Use {@link FileIO#write()} or {@link FileIO#writeDynamic()} with {@link #sink()}
+     *     instead.
      */
+    @Deprecated
     public TypedWrite<UserT, DestinationT> withFormatFunction(
         @Nullable SerializableFunction<UserT, String> formatFunction) {
       return toBuilder().setFormatFunction(formatFunction).build();
@@ -1026,15 +1023,27 @@ public Write to(FilenamePolicy filenamePolicy) {
           inner.to(filenamePolicy).withFormatFunction(SerializableFunctions.<String>identity()));
     }
 
-    /** See {@link TypedWrite#to(DynamicDestinations)}. */
+    /**
+     * See {@link TypedWrite#to(DynamicDestinations)}.
+     *
+     * @deprecated Use {@link FileIO#write()} or {@link FileIO#writeDynamic()} ()} with {@link
+     *     #sink()} instead.
+     */
     @Experimental(Kind.FILESYSTEM)
+    @Deprecated
     public Write to(DynamicDestinations<String, ?, String> dynamicDestinations) {
       return new Write(
           inner.to((DynamicDestinations) dynamicDestinations).withFormatFunction(null));
     }
 
-    /** See {@link TypedWrite#to(SerializableFunction, Params)}. */
+    /**
+     * See {@link TypedWrite#to(SerializableFunction, Params)}.
+     *
+     * @deprecated Use {@link FileIO#write()} or {@link FileIO#writeDynamic()} ()} with {@link
+     *     #sink()} instead.
+     */
     @Experimental(Kind.FILESYSTEM)
+    @Deprecated
     public Write to(
         SerializableFunction<String, Params> destinationFunction, Params emptyDestination) {
       return new Write(
@@ -1159,6 +1168,62 @@ public boolean matches(String filename) {
 
   //////////////////////////////////////////////////////////////////////////////
 
+  /**
+   * Creates a {@link Sink} that writes newline-delimited strings in UTF-8, for use with {@link
+   * FileIO#write}.
+   */
+  public static Sink sink() {
+    return new AutoValue_TextIO_Sink.Builder().build();
+  }
+
+  /** Implementation of {@link #sink}. */
+  @AutoValue
+  public abstract static class Sink implements FileIO.Sink<String> {
+    @Nullable abstract String getHeader();
+    @Nullable abstract String getFooter();
+    abstract Builder toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setHeader(String header);
+      abstract Builder setFooter(String footer);
+      abstract Sink build();
+    }
+
+    public Sink withHeader(String header) {
+      checkArgument(header != null, "header can not be null");
+      return toBuilder().setHeader(header).build();
+    }
+
+    public Sink withFooter(String footer) {
+      checkArgument(footer != null, "footer can not be null");
+      return toBuilder().setFooter(footer).build();
+    }
+
+    @Nullable private transient PrintWriter writer;
+
+    @Override
+    public void open(WritableByteChannel channel) throws IOException {
+      writer = new PrintWriter(Channels.newOutputStream(channel));
+      if (getHeader() != null) {
+        writer.println(getHeader());
+      }
+    }
+
+    @Override
+    public void write(String element) throws IOException {
+      writer.println(element);
+    }
+
+    @Override
+    public void flush() throws IOException {
+      if (getFooter() != null) {
+        writer.println(getFooter());
+      }
+      writer.close();
+    }
+  }
+
   /** Disable construction of utility class. */
   private TextIO() {}
 }
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
index 499a1940020..9f219bd7327 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
@@ -24,7 +24,6 @@
 import com.google.auto.value.AutoValue;
 import com.google.common.base.Objects;
 import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Multimap;
@@ -624,14 +623,15 @@ private WriteShardedBundlesToTempFiles(
 
     @Override
     public PCollection<FileResult<DestinationT>> expand(PCollection<UserT> input) {
+      List<PCollectionView<?>> shardingSideInputs = Lists.newArrayList(getSideInputs());
+      if (numShardsView != null) {
+        shardingSideInputs.add(numShardsView);
+      }
       return input
           .apply(
               "ApplyShardingKey",
               ParDo.of(new ApplyShardingKeyFn(numShardsView, destinationCoder))
-                  .withSideInputs(
-                      (numShardsView == null)
-                          ? ImmutableList.<PCollectionView<Integer>>of()
-                          : ImmutableList.of(numShardsView)))
+                  .withSideInputs(shardingSideInputs))
           .setCoder(KvCoder.of(ShardedKeyCoder.of(VarIntCoder.of()), input.getCoder()))
           .apply("GroupIntoShards", GroupByKey.<ShardedKey<Integer>, UserT>create())
           .apply(
@@ -656,6 +656,7 @@ private WriteShardedBundlesToTempFiles(
 
     @ProcessElement
     public void processElement(ProcessContext context) throws IOException {
+      getDynamicDestinations().setSideInputAccessorFromProcessContext(context);
       final int shardCount;
       if (numShardsView != null) {
         shardCount = context.sideInput(numShardsView);
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Requirements.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Requirements.java
index f90e8f3aeb9..ac59cda0394 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Requirements.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Requirements.java
@@ -17,10 +17,12 @@
  */
 package org.apache.beam.sdk.transforms;
 
+import com.google.common.collect.Sets;
 import java.io.Serializable;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Set;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
 import org.apache.beam.sdk.values.PCollectionView;
@@ -58,4 +60,12 @@ public static Requirements empty() {
   public boolean isEmpty() {
     return sideInputs.isEmpty();
   }
+
+  public static Requirements union(Contextful... contextfuls) {
+    Set<PCollectionView<?>> sideInputs = Sets.newHashSet();
+    for (Contextful c : contextfuls) {
+      sideInputs.addAll(c.getRequirements().getSideInputs());
+    }
+    return requiresSideInputs(sideInputs);
+  }
 }
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SerializableFunctions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SerializableFunctions.java
index d057d81eb45..134a7498246 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SerializableFunctions.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SerializableFunctions.java
@@ -18,8 +18,22 @@
 
 package org.apache.beam.sdk.transforms;
 
+import java.io.Serializable;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.util.SerializableUtils;
+
 /** Useful {@link SerializableFunction} overrides. */
 public class SerializableFunctions {
+  public static <InT, OutT extends Serializable> SerializableFunction<InT, OutT> clonesOf(
+      final OutT base) {
+    return new SerializableFunction<InT, OutT>() {
+      @Override
+      public OutT apply(InT input) {
+        return SerializableUtils.clone(base);
+      }
+    };
+  }
+
   private static class Identity<T> implements SerializableFunction<T, T> {
     @Override
     public T apply(T input) {
@@ -28,9 +42,9 @@ public T apply(T input) {
   }
 
   private static class Constant<InT, OutT> implements SerializableFunction<InT, OutT> {
-    OutT value;
+    @Nullable OutT value;
 
-    Constant(OutT value) {
+    Constant(@Nullable OutT value) {
       this.value = value;
     }
 
@@ -44,7 +58,7 @@ public OutT apply(InT input) {
     return new Identity<>();
   }
 
-  public static <InT, OutT> SerializableFunction<InT, OutT> constant(OutT value) {
+  public static <InT, OutT> SerializableFunction<InT, OutT> constant(@Nullable OutT value) {
     return new Constant<>(value);
   }
 }
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
index 239c9f45222..04893b9f8cf 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
@@ -20,6 +20,8 @@
 import static com.google.common.base.MoreObjects.firstNonNull;
 import static org.apache.avro.file.DataFileConstants.SNAPPY_CODEC;
 import static org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions.RESOLVE_FILE;
+import static org.apache.beam.sdk.transforms.Contextful.fn;
+import static org.apache.beam.sdk.transforms.Requirements.requiresSideInputs;
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.hasItem;
@@ -73,8 +75,10 @@
 import org.apache.beam.sdk.testing.TestStream;
 import org.apache.beam.sdk.testing.UsesTestStream;
 import org.apache.beam.sdk.testing.ValidatesRunner;
+import org.apache.beam.sdk.transforms.Contextful;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.SimpleFunction;
 import org.apache.beam.sdk.transforms.View;
@@ -112,6 +116,8 @@
   @Rule
   public transient TestPipeline readPipeline = TestPipeline.create();
 
+  @Rule public transient TestPipeline windowedAvroWritePipeline = TestPipeline.create();
+
   @Rule public transient TemporaryFolder tmpFolder = new TemporaryFolder();
 
   @Rule public transient ExpectedException expectedException = ExpectedException.none();
@@ -166,6 +172,17 @@ public GenericClass apply(GenericRecord input) {
     }
   }
 
+  private enum Sharding {
+    RUNNER_DETERMINED,
+    WITHOUT_SHARDING,
+    FIXED_3_SHARDS
+  }
+
+  private enum WriteMethod {
+    AVROIO_WRITE,
+    AVROIO_SINK
+  }
+
   private static final String SCHEMA_STRING =
       "{\"namespace\": \"example.avro\",\n"
           + " \"type\": \"record\",\n"
@@ -623,15 +640,18 @@ public ResourceId windowedFilename(
       String filenamePrefix =
           outputFilePrefix.isDirectory() ? "" : firstNonNull(outputFilePrefix.getFilename(), "");
 
+      IntervalWindow interval = (IntervalWindow) window;
+      String windowStr =
+          String.format("%s-%s", interval.start().toString(), interval.end().toString());
       String filename =
           String.format(
-              "%s-%s-%s-of-%s-pane-%s%s%s",
+              "%s-%s-%s-of-%s-pane-%s%s%s.avro",
               filenamePrefix,
-              window,
+              windowStr,
               shardNumber,
-              numShards - 1,
+              numShards,
               paneInfo.getIndex(),
-              paneInfo.isLast() ? "-final" : "",
+              paneInfo.isLast() ? "-last" : "",
               outputFileHints.getSuggestedFilenameSuffix());
       return outputFilePrefix
           .getCurrentDirectory()
@@ -652,13 +672,21 @@ public void populateDisplayData(DisplayData.Builder builder) {
     }
   }
 
-  @Rule public transient TestPipeline windowedAvroWritePipeline = TestPipeline.create();
-
   @Test
   @Category({ValidatesRunner.class, UsesTestStream.class})
   public void testWriteWindowed() throws Throwable {
+    testWindowedAvroIOWriteUsingMethod(WriteMethod.AVROIO_WRITE);
+  }
+
+  @Test
+  @Category({ValidatesRunner.class, UsesTestStream.class})
+  public void testWindowedAvroIOWriteViaSink() throws Throwable {
+    testWindowedAvroIOWriteUsingMethod(WriteMethod.AVROIO_SINK);
+  }
+
+  void testWindowedAvroIOWriteUsingMethod(WriteMethod method) throws IOException {
     Path baseDir = Files.createTempDirectory(tmpFolder.getRoot().toPath(), "testwrite");
-    String baseFilename = baseDir.resolve("prefix").toString();
+    final String baseFilename = baseDir.resolve("prefix").toString();
 
     Instant base = new Instant(0);
     ArrayList<GenericClass> allElements = new ArrayList<>();
@@ -707,18 +735,45 @@ public void testWriteWindowed() throws Throwable {
                 Arrays.copyOfRange(secondWindowArray, 1, secondWindowArray.length))
             .advanceWatermarkToInfinity();
 
-    FilenamePolicy policy =
-        new WindowedFilenamePolicy(FileBasedSink.convertToFileResourceIfPossible(baseFilename));
+    final PTransform<PCollection<GenericClass>, WriteFilesResult<Void>> write;
+    switch (method) {
+      case AVROIO_WRITE:
+        {
+          FilenamePolicy policy =
+              new WindowedFilenamePolicy(
+                  FileBasedSink.convertToFileResourceIfPossible(baseFilename));
+          write =
+              AvroIO.write(GenericClass.class)
+                  .to(policy)
+                  .withTempDirectory(
+                      StaticValueProvider.of(
+                          FileSystems.matchNewResource(baseDir.toString(), true)))
+                  .withWindowedWrites()
+                  .withNumShards(2)
+                  .withOutputFilenames();
+          break;
+        }
+
+      case AVROIO_SINK:
+        {
+          write =
+              FileIO.<GenericClass>write()
+                  .via(AvroIO.sink(GenericClass.class))
+                  .to(baseDir.toString())
+                  .withPrefix("prefix")
+                  .withSuffix(".avro")
+                  .withTempDirectory(baseDir.toString())
+                  .withNumShards(2);
+          break;
+        }
+
+      default:
+        throw new UnsupportedOperationException();
+    }
     windowedAvroWritePipeline
         .apply(values)
         .apply(Window.<GenericClass>into(FixedWindows.of(Duration.standardMinutes(1))))
-        .apply(
-            AvroIO.write(GenericClass.class)
-                .to(policy)
-                .withTempDirectory(
-                    StaticValueProvider.of(FileSystems.matchNewResource(baseDir.toString(), true)))
-                .withWindowedWrites()
-                .withNumShards(2));
+        .apply(write);
     windowedAvroWritePipeline.run();
 
     // Validate that the data written matches the expected elements in the expected order
@@ -726,17 +781,17 @@ public void testWriteWindowed() throws Throwable {
     for (int shard = 0; shard < 2; shard++) {
       for (int window = 0; window < 2; window++) {
         Instant windowStart = new Instant(0).plus(Duration.standardMinutes(window));
-        IntervalWindow intervalWindow =
+        IntervalWindow iw =
             new IntervalWindow(windowStart, Duration.standardMinutes(1));
-        expectedFiles.add(
-            new File(
-                baseFilename
-                    + "-"
-                    + intervalWindow.toString()
-                    + "-"
-                    + shard
-                    + "-of-1"
-                    + "-pane-0-final"));
+        String baseAndWindow = baseFilename + "-" + iw.start() + "-" + iw.end();
+        switch(method) {
+          case AVROIO_WRITE:
+            expectedFiles.add(new File(baseAndWindow + "-" + shard + "-of-2-pane-0-last.avro"));
+            break;
+          case AVROIO_SINK:
+            expectedFiles.add(new File(baseAndWindow + "-0000" + shard + "-of-00002.avro"));
+            break;
+        }
       }
     }
 
@@ -819,21 +874,16 @@ public String getDefaultDestination() {
     public FilenamePolicy getFilenamePolicy(String destination) {
       return DefaultFilenamePolicy.fromStandardParameters(
           StaticValueProvider.of(
-              baseDir.resolve("file_" + destination + ".txt", RESOLVE_FILE)),
-          null,
-          null,
+              baseDir.resolve("file_" + destination, RESOLVE_FILE)),
+          "-SSSSS-of-NNNNN",
+          ".avro",
           false);
     }
   }
 
-  private enum Sharding {
-    RUNNER_DETERMINED,
-    WITHOUT_SHARDING,
-    FIXED_3_SHARDS
-  }
-
-  private void testDynamicDestinationsWithSharding(Sharding sharding) throws Exception {
-    ResourceId baseDir =
+  private void testDynamicDestinationsUnwindowedWithSharding(
+      WriteMethod writeMethod, Sharding sharding) throws Exception {
+    final ResourceId baseDir =
         FileSystems.matchNewResource(
             Files.createTempDirectory(tmpFolder.getRoot().toPath(), "testDynamicDestinations")
                 .toString(),
@@ -849,32 +899,105 @@ private void testDynamicDestinationsWithSharding(Sharding sharding) throws Excep
       expectedElements.put(
           prefix, createRecord(element, prefix, new Schema.Parser().parse(jsonSchema)));
     }
-    PCollectionView<Map<String, String>> schemaView =
+    final PCollectionView<Map<String, String>> schemaView =
         writePipeline
             .apply("createSchemaView", Create.of(schemaMap))
             .apply(View.<String, String>asMap());
 
     PCollection<String> input =
         writePipeline.apply("createInput", Create.of(elements).withCoder(StringUtf8Coder.of()));
-    AvroIO.TypedWrite<String, String, GenericRecord> write =
-        AvroIO.<String>writeCustomTypeToGenericRecords()
-            .to(new TestDynamicDestinations(baseDir, schemaView))
-            .withTempDirectory(baseDir);
 
-    switch (sharding) {
-      case RUNNER_DETERMINED:
-        break;
-      case WITHOUT_SHARDING:
-        write = write.withoutSharding();
+    switch (writeMethod) {
+      case AVROIO_WRITE: {
+        AvroIO.TypedWrite<String, String, GenericRecord> write =
+            AvroIO.<String>writeCustomTypeToGenericRecords()
+                .to(new TestDynamicDestinations(baseDir, schemaView))
+                .withTempDirectory(baseDir);
+
+        switch (sharding) {
+          case RUNNER_DETERMINED:
+            break;
+          case WITHOUT_SHARDING:
+            write = write.withoutSharding();
+            break;
+          case FIXED_3_SHARDS:
+            write = write.withNumShards(3);
+            break;
+          default:
+            throw new IllegalArgumentException("Unknown sharding " + sharding);
+        }
+
+        input.apply(write);
         break;
-      case FIXED_3_SHARDS:
-        write = write.withNumShards(3);
-        break;
-      default:
-        throw new IllegalArgumentException("Unknown sharding " + sharding);
+      }
+
+      case AVROIO_SINK:
+        {
+          final AvroIO.RecordFormatter<String> formatter =
+              new AvroIO.RecordFormatter<String>() {
+                @Override
+                public GenericRecord formatRecord(String element, Schema schema) {
+                  String prefix = element.substring(0, 1);
+                  GenericRecord record = new GenericData.Record(schema);
+                  record.put(prefix + "full", element);
+                  record.put(prefix + "suffix", element.substring(1));
+                  return record;
+                }
+              };
+          FileIO.Write<String, String> write =
+              FileIO.<String, String>writeDynamic()
+                  .by(
+                      fn(
+                          new Contextful.Fn<String, String>() {
+                            @Override
+                            public String apply(String element, Context c) {
+                              c.sideInput(schemaView); // Ignore result
+                              return element.substring(0, 1);
+                            }
+                          },
+                          requiresSideInputs(schemaView)))
+                  .via(
+                      fn(
+                          new Contextful.Fn<String, FileIO.Sink<String>>() {
+                            @Override
+                            public FileIO.Sink<String> apply(String dest, Context c) {
+                              Schema schema =
+                                  new Schema.Parser().parse(c.sideInput(schemaView).get(dest));
+                              return AvroIO.sinkViaGenericRecords(schema, formatter);
+                            }
+                          },
+                          requiresSideInputs(schemaView)))
+                  .to(baseDir.toString())
+                  .withNaming(
+                      fn(
+                          new Contextful.Fn<String, FileIO.Write.FileNaming>() {
+                            @Override
+                            public FileIO.Write.FileNaming apply(String dest, Context c) {
+                              c.sideInput(schemaView); // Ignore result
+                              return FileIO.Write.defaultNaming("file_" + dest, ".avro");
+                            }
+                          },
+                          requiresSideInputs(schemaView)))
+                  .withTempDirectory(baseDir.toString())
+                  .withIgnoreWindowing();
+          switch (sharding) {
+            case RUNNER_DETERMINED:
+              break;
+            case WITHOUT_SHARDING:
+              write = write.withNumShards(1);
+              break;
+            case FIXED_3_SHARDS:
+              write = write.withNumShards(3);
+              break;
+            default:
+              throw new IllegalArgumentException("Unknown sharding " + sharding);
+          }
+
+          input.apply(write);
+          break;
+        }
     }
 
-    input.apply(write);
     writePipeline.run();
 
     // Validate that the data written matches the expected elements in the expected order.
@@ -883,19 +1006,21 @@ private void testDynamicDestinationsWithSharding(Sharding sharding) throws Excep
       String shardPattern;
       switch (sharding) {
         case RUNNER_DETERMINED:
-          shardPattern = "*";
+          shardPattern = "-*";
           break;
         case WITHOUT_SHARDING:
-          shardPattern = "00000-of-00001";
+          shardPattern = "-00000-of-00001";
           break;
         case FIXED_3_SHARDS:
-          shardPattern = "*-of-00003";
+          shardPattern = "-*-of-00003";
           break;
         default:
           throw new IllegalArgumentException("Unknown sharding " + sharding);
       }
       String expectedFilepattern =
-          baseDir.resolve("file_" + prefix + ".txt-" + shardPattern, RESOLVE_FILE).toString();
+          baseDir
+              .resolve("file_" + prefix + shardPattern + ".avro", RESOLVE_FILE)
+              .toString();
 
       PCollection<GenericRecord> records =
           readPipeline.apply(
@@ -909,19 +1034,42 @@ private void testDynamicDestinationsWithSharding(Sharding sharding) throws Excep
   @Test
   @Category(NeedsRunner.class)
   public void testDynamicDestinationsRunnerDeterminedSharding() throws Exception {
-    testDynamicDestinationsWithSharding(Sharding.RUNNER_DETERMINED);
+    testDynamicDestinationsUnwindowedWithSharding(
+        WriteMethod.AVROIO_WRITE, Sharding.RUNNER_DETERMINED);
   }
 
   @Test
   @Category(NeedsRunner.class)
   public void testDynamicDestinationsWithoutSharding() throws Exception {
-    testDynamicDestinationsWithSharding(Sharding.WITHOUT_SHARDING);
+    testDynamicDestinationsUnwindowedWithSharding(
+        WriteMethod.AVROIO_WRITE, Sharding.WITHOUT_SHARDING);
   }
 
   @Test
   @Category(NeedsRunner.class)
   public void testDynamicDestinationsWithNumShards() throws Exception {
-    testDynamicDestinationsWithSharding(Sharding.FIXED_3_SHARDS);
+    testDynamicDestinationsUnwindowedWithSharding(
+        WriteMethod.AVROIO_WRITE, Sharding.FIXED_3_SHARDS);
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testDynamicDestinationsViaSinkRunnerDeterminedSharding() throws Exception {
+    testDynamicDestinationsUnwindowedWithSharding(
+        WriteMethod.AVROIO_SINK, Sharding.RUNNER_DETERMINED);
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testDynamicDestinationsViaSinkWithoutSharding() throws Exception {
+    testDynamicDestinationsUnwindowedWithSharding(
+        WriteMethod.AVROIO_SINK, Sharding.WITHOUT_SHARDING);
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testDynamicDestinationsViaSinkWithNumShards() throws Exception {
+    testDynamicDestinationsUnwindowedWithSharding(WriteMethod.AVROIO_SINK, Sharding.FIXED_3_SHARDS);
   }
 
   @Test
@@ -995,12 +1143,11 @@ private void runTestWrite(String[] expectedElements, int numShards) throws IOExc
     File baseOutputFile = new File(tmpFolder.getRoot(), "prefix");
     String outputFilePrefix = baseOutputFile.getAbsolutePath();
 
-    AvroIO.Write<String> write = AvroIO.write(String.class).to(outputFilePrefix);
+    AvroIO.Write<String> write =
+        AvroIO.write(String.class).to(outputFilePrefix).withSuffix(".avro");
     if (numShards > 1) {
-      System.out.println("NumShards " + numShards);
       write = write.withNumShards(numShards);
     } else {
-      System.out.println("no sharding");
       write = write.withoutSharding();
     }
     writePipeline.apply(Create.of(ImmutableList.copyOf(expectedElements))).apply(write);
@@ -1025,7 +1172,7 @@ public static void assertTestOutputs(
               DefaultFilenamePolicy.constructName(
                       FileBasedSink.convertToFileResourceIfPossible(outputFilePrefix),
                       shardNameTemplate,
-                      "" /* no suffix */,
+                      ".avro",
                       i,
                       numShards,
                       null,
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOWriteTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOWriteTest.java
index 1ade0d052f4..313e97461a8 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOWriteTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOWriteTest.java
@@ -33,11 +33,16 @@
 import com.google.common.base.Predicate;
 import com.google.common.base.Predicates;
 import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import java.io.BufferedReader;
 import java.io.File;
+import java.io.FileOutputStream;
 import java.io.FileReader;
+import java.io.IOException;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.ArrayList;
@@ -410,17 +415,8 @@ private static void assertOutputFiles(
     List<List<String>> actual = new ArrayList<>();
 
     for (File tmpFile : expectedFiles) {
-      try (BufferedReader reader = new BufferedReader(new FileReader(tmpFile))) {
-        List<String> currentFile = new ArrayList<>();
-        while (true) {
-          String line = reader.readLine();
-          if (line == null) {
-            break;
-          }
-          currentFile.add(line);
-        }
-        actual.add(currentFile);
-      }
+      List<String> currentFile = readLinesFromFile(tmpFile);
+      actual.add(currentFile);
     }
 
     List<String> expectedElements = new ArrayList<>(elems.length);
@@ -442,6 +438,20 @@ private static void assertOutputFiles(
     assertTrue(Iterables.all(actual, haveProperHeaderAndFooter(header, footer)));
   }
 
+  private static List<String> readLinesFromFile(File f) throws IOException {
+    List<String> currentFile = new ArrayList<>();
+    try (BufferedReader reader = new BufferedReader(new FileReader(f))) {
+      while (true) {
+        String line = reader.readLine();
+        if (line == null) {
+          break;
+        }
+        currentFile.add(line);
+      }
+    }
+    return currentFile;
+  }
+
   private static Function<List<String>, List<String>> removeHeaderAndFooter(
       final String header, final String footer) {
     return new Function<List<String>, List<String>>() {
@@ -648,4 +658,39 @@ public void testWindowedWritesWithOnceTrigger() throws Throwable {
 
     p.run();
   }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testWriteViaSink() throws Exception {
+    List<String> data = ImmutableList.of("a", "b", "c", "d", "e", "f");
+    PAssert.that(
+            p.apply(Create.of(data))
+                .apply(
+                    FileIO.<String>write()
+                        .to(tempFolder.getRoot().toString())
+                        .withSuffix(".txt")
+                        .via(TextIO.sink())
+                        .withIgnoreWindowing())
+                .getPerDestinationOutputFilenames()
+                .apply(Values.<String>create())
+                .apply(TextIO.readAll()))
+        .containsInAnyOrder(data);
+
+    p.run();
+  }
+
+  @Test
+  public void testSink() throws Exception {
+    TextIO.Sink sink = TextIO.sink().withHeader("header").withFooter("footer");
+    File f = new File(tempFolder.getRoot(), "file");
+    try (WritableByteChannel chan = Channels.newChannel(new FileOutputStream(f))) {
+      sink.open(chan);
+      sink.write("a");
+      sink.write("b");
+      sink.write("c");
+      sink.flush();
+    }
+
+    assertEquals(Arrays.asList("header", "a", "b", "c", "footer"), readLinesFromFile(f));
+  }
 }
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
index 418db92c4bf..033f9627ce7 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
@@ -718,7 +718,7 @@ public void testWritingFailsTableDoesNotExist() throws Exception {
             Create.empty(
                 KvCoder.of(ByteStringCoder.of(), IterableCoder.of(ProtoCoder.of(Mutation.class)))));
 
-    // Exception will be thrown by write.validate() when write is applied.
+    // Exception will be thrown by write.validate() when writeToDynamic is applied.
     thrown.expect(IllegalArgumentException.class);
     thrown.expectMessage(String.format("Table %s does not exist", table));
 
diff --git a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java
index fd420249412..ec4052e7162 100644
--- a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java
+++ b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java
@@ -355,7 +355,7 @@ public void testWriting() throws Exception {
   public void testWritingFailsTableDoesNotExist() throws Exception {
     final String table = "TEST-TABLE-DOES-NOT-EXIST";
 
-    // Exception will be thrown by write.expand() when write is applied.
+    // Exception will be thrown by write.expand() when writeToDynamic is applied.
     thrown.expect(IllegalArgumentException.class);
     thrown.expectMessage(String.format("Table %s does not exist", table));
     p.apply(Create.empty(HBaseMutationCoder.of()))


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Implement FileIO.write()
> ------------------------
>
>                 Key: BEAM-2865
>                 URL: https://issues.apache.org/jira/browse/BEAM-2865
>             Project: Beam
>          Issue Type: New Feature
>          Components: sdk-java-core
>            Reporter: Eugene Kirpichov
>            Assignee: Eugene Kirpichov
>             Fix For: 2.3.0
>
>
> Design doc: http://s.apache.org/fileio-write
> Discussion: https://lists.apache.org/thread.html/cc543556cc709a44ed92262207215eaa0e43a0f573c630b6360d4edc@%3Cdev.beam.apache.org%3E



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)