You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/05/04 00:45:25 UTC

[3/4] beam git commit: Convert WriteFiles/FileBasedSink from IOChannelFactory to FileSystems

Convert WriteFiles/FileBasedSink from IOChannelFactory to FileSystems

This converts FileBasedSink from IOChannelFactory to FileSystems, with
fallout changes on all existing Transforms that use WriteFiles.

We preserve the existing semantics of most transforms, simply adding the
ability for users to provide ResourceId in addition to String when
setting the outputPrefix.

Other changes:

* Rethink FilenamePolicy as a function from ResourceId (base directory)
  to ResourceId (output file), moving the base directory into the
  context. This way, FilenamePolicy logic is truly independent from the
  base directory. Using ResourceId#resolve, a filename policy can add
  multiple path components, say, base/YYYY/MM/DD/file.txt, in a
  fileystem independent way.

  (Also add an optional extension parameter to the function, enabling an
  owning transform to pass in the suffix from a separately-configured
  compression factory or similar.)

* Make DefaultFilenamePolicy its own top-level class and move
  IOChannelUtils#constructName into it. This the default FilenamePolicy
  used by FileBasedSink.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/17358248
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/17358248
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/17358248

Branch: refs/heads/master
Commit: 17358248f8acbfa3c91e91b7ad80a9e0edb7e782
Parents: b4bafd0
Author: Dan Halperin <dh...@google.com>
Authored: Tue Apr 25 10:10:28 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Wed May 3 17:44:59 2017 -0700

----------------------------------------------------------------------
 .../examples/common/WriteOneFilePerWindow.java  |  52 +-
 .../construction/PTransformMatchersTest.java    |  10 +-
 .../direct/WriteWithShardingFactoryTest.java    |  32 +-
 .../beam/runners/dataflow/DataflowRunner.java   |  12 +-
 .../runners/dataflow/DataflowRunnerTest.java    |   4 +
 .../java/org/apache/beam/sdk/io/AvroIO.java     | 213 ++++---
 .../java/org/apache/beam/sdk/io/AvroSink.java   |  22 +-
 .../beam/sdk/io/DefaultFilenamePolicy.java      | 169 ++++++
 .../org/apache/beam/sdk/io/FileBasedSink.java   | 593 +++++++++----------
 .../java/org/apache/beam/sdk/io/TFRecordIO.java | 126 ++--
 .../java/org/apache/beam/sdk/io/TextIO.java     | 249 ++++----
 .../java/org/apache/beam/sdk/io/TextSink.java   |  20 +-
 .../java/org/apache/beam/sdk/io/WriteFiles.java |  15 +-
 .../apache/beam/sdk/util/IOChannelUtils.java    |   6 +-
 .../apache/beam/sdk/util/NoopPathValidator.java |  12 +-
 .../beam/sdk/util/NumberedShardedFile.java      |   4 +-
 .../org/apache/beam/sdk/util/PathValidator.java |  15 +-
 .../java/org/apache/beam/sdk/io/AvroIOTest.java |  65 +-
 .../beam/sdk/io/DefaultFilenamePolicyTest.java  |  55 ++
 .../apache/beam/sdk/io/FileBasedSinkTest.java   | 330 +++++------
 .../java/org/apache/beam/sdk/io/SimpleSink.java |  27 +-
 .../java/org/apache/beam/sdk/io/TextIOTest.java |  45 +-
 .../org/apache/beam/sdk/io/WriteFilesTest.java  |  41 +-
 .../apache/beam/sdk/util/GcsPathValidator.java  |  22 +-
 .../java/org/apache/beam/sdk/io/xml/XmlIO.java  |  15 +-
 .../org/apache/beam/sdk/io/xml/XmlSink.java     |  16 +-
 .../org/apache/beam/sdk/io/xml/XmlSinkTest.java |  52 +-
 27 files changed, 1251 insertions(+), 971 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/17358248/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java b/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java
index 461b46d..ed35c8a 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java
@@ -17,10 +17,13 @@
  */
 package org.apache.beam.examples.common;
 
+import static com.google.common.base.Verify.verifyNotNull;
+
+import org.apache.beam.sdk.io.FileBasedSink;
 import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
 import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.options.ValueProvider;
-import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
+import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
+import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
@@ -47,9 +50,21 @@ public class WriteOneFilePerWindow extends PTransform<PCollection<String>, PDone
 
   @Override
   public PDone expand(PCollection<String> input) {
+    // filenamePrefix may contain a directory and a filename component. Pull out only the filename
+    // component from that path for the PerWindowFiles.
+    String prefix = "";
+    ResourceId resource = FileBasedSink.convertToFileResourceIfPossible(filenamePrefix);
+    if (!resource.isDirectory()) {
+      prefix = verifyNotNull(
+          resource.getFilename(),
+          "A non-directory resource should have a non-null filename: %s",
+          resource);
+    }
+
     return input.apply(
         TextIO.write()
-            .to(new PerWindowFiles(filenamePrefix))
+            .to(resource.getCurrentDirectory())
+            .withFilenamePolicy(new PerWindowFiles(prefix))
             .withWindowedWrites()
             .withNumShards(3));
   }
@@ -62,32 +77,31 @@ public class WriteOneFilePerWindow extends PTransform<PCollection<String>, PDone
    */
   public static class PerWindowFiles extends FilenamePolicy {
 
-    private final String output;
+    private final String prefix;
 
-    public PerWindowFiles(String output) {
-      this.output = output;
-    }
-
-    @Override
-    public ValueProvider<String> getBaseOutputFilenameProvider() {
-      return StaticValueProvider.of(output);
+    public PerWindowFiles(String prefix) {
+      this.prefix = prefix;
     }
 
-    public String   filenamePrefixForWindow(IntervalWindow window) {
-      return String.format(
-          "%s-%s-%s", output, FORMATTER.print(window.start()), FORMATTER.print(window.end()));
+    public String filenamePrefixForWindow(IntervalWindow window) {
+      return String.format("%s-%s-%s",
+          prefix, FORMATTER.print(window.start()), FORMATTER.print(window.end()));
     }
 
     @Override
-    public String windowedFilename(WindowedContext context) {
+    public ResourceId windowedFilename(
+        ResourceId outputDirectory, WindowedContext context, String extension) {
       IntervalWindow window = (IntervalWindow) context.getWindow();
-      return String.format(
-          "%s-%s-of-%s",
-          filenamePrefixForWindow(window), context.getShardNumber(), context.getNumShards());
+      String filename = String.format(
+          "%s-%s-of-%s%s",
+          filenamePrefixForWindow(window), context.getShardNumber(), context.getNumShards(),
+          extension);
+      return outputDirectory.resolve(filename, StandardResolveOptions.RESOLVE_FILE);
     }
 
     @Override
-    public String unwindowedFilename(Context context) {
+    public ResourceId unwindowedFilename(
+        ResourceId outputDirectory, Context context, String extension) {
       throw new UnsupportedOperationException("Unsupported.");
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/17358248/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
index 33ba80c..e7d4c64 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
@@ -29,8 +29,13 @@ import java.io.Serializable;
 import java.util.Collections;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.io.DefaultFilenamePolicy;
 import org.apache.beam.sdk.io.FileBasedSink;
+import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
+import org.apache.beam.sdk.io.LocalResources;
 import org.apache.beam.sdk.io.WriteFiles;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
 import org.apache.beam.sdk.runners.PTransformMatcher;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
@@ -498,9 +503,12 @@ public class PTransformMatchersTest implements Serializable {
 
   @Test
   public void writeWithRunnerDeterminedSharding() {
+    ResourceId outputDirectory = LocalResources.fromString("/foo/bar", true /* isDirectory */);
+    FilenamePolicy policy = DefaultFilenamePolicy.constructUsingStandardParameters(
+        StaticValueProvider.of(outputDirectory), DefaultFilenamePolicy.DEFAULT_SHARD_TEMPLATE, "");
     WriteFiles<Integer> write =
         WriteFiles.to(
-            new FileBasedSink<Integer>("foo", "bar") {
+            new FileBasedSink<Integer>(StaticValueProvider.of(outputDirectory), policy) {
               @Override
               public FileBasedWriteOperation<Integer> createWriteOperation() {
                 return null;

http://git-wip-us.apache.org/repos/asf/beam/blob/17358248/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
index 53d2ba3..18940d2 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
@@ -39,10 +39,14 @@ import java.util.UUID;
 import org.apache.beam.runners.direct.WriteWithShardingFactory.CalculateShardsFn;
 import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.io.DefaultFilenamePolicy;
 import org.apache.beam.sdk.io.FileBasedSink;
+import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
+import org.apache.beam.sdk.io.LocalResources;
 import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.io.WriteFiles;
-import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.Create;
@@ -121,7 +125,17 @@ public class WriteWithShardingFactoryTest {
 
   @Test
   public void withNoShardingSpecifiedReturnsNewTransform() {
-    WriteFiles<Object> original = WriteFiles.to(new TestSink());
+    ResourceId outputDirectory = LocalResources.fromString("/foo", true /* isDirectory */);
+    FilenamePolicy policy = DefaultFilenamePolicy.constructUsingStandardParameters(
+        StaticValueProvider.of(outputDirectory), DefaultFilenamePolicy.DEFAULT_SHARD_TEMPLATE, "");
+    WriteFiles<Object> original = WriteFiles.to(
+        new FileBasedSink<Object>(StaticValueProvider.of(outputDirectory), policy) {
+          @Override
+          public FileBasedWriteOperation<Object> createWriteOperation() {
+            throw new IllegalArgumentException("Should not be used");
+          }
+        });
+    @SuppressWarnings("unchecked")
     PCollection<Object> objs = (PCollection) p.apply(Create.empty(VoidCoder.of()));
 
     AppliedPTransform<PCollection<Object>, PDone, WriteFiles<Object>> originalApplication =
@@ -206,18 +220,4 @@ public class WriteWithShardingFactoryTest {
     List<Integer> shards = fnTester.processBundle((long) count);
     assertThat(shards, containsInAnyOrder(13));
   }
-
-  private static class TestSink extends FileBasedSink<Object> {
-    public TestSink() {
-      super("", "");
-    }
-
-    @Override
-    public void validate(PipelineOptions options) {}
-
-    @Override
-    public FileBasedWriteOperation<Object> createWriteOperation() {
-      throw new IllegalArgumentException("Should not be used");
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/17358248/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 0a4a151..f7455b3 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -87,12 +87,15 @@ import org.apache.beam.sdk.io.FileBasedSink;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.io.WriteFiles;
+import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
+import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesCoder;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSink;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSource;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsValidator;
+import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
 import org.apache.beam.sdk.runners.PTransformOverride;
 import org.apache.beam.sdk.runners.PTransformOverrideFactory;
@@ -844,11 +847,12 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
 
     @Override
     public PDone expand(PCollection<T> input) {
-      FileBasedSink<T> sink = transform.getSink();
-      if (sink.getBaseOutputFilenameProvider().isAccessible()) {
+      ValueProvider<ResourceId> outputDirectory =
+          transform.getSink().getBaseOutputDirectoryProvider();
+      if (outputDirectory.isAccessible()) {
         PathValidator validator = runner.options.getPathValidator();
-        validator.validateOutputFilePrefixSupported(
-            sink.getBaseOutputFilenameProvider().get());
+        validator.validateOutputResourceSupported(
+            outputDirectory.get().resolve("some-file", StandardResolveOptions.RESOLVE_FILE));
       }
       return transform.expand(input);
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/17358248/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index fa106ac..c0dfbee 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -245,6 +245,10 @@ public class DataflowRunnerTest {
     options.setDataflowClient(buildMockDataflow());
     options.setGcsUtil(mockGcsUtil);
     options.setGcpCredential(new TestCredential());
+
+    // Configure the FileSystem registrar to use these options.
+    FileSystems.setDefaultConfigInWorkers(options);
+
     return options;
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/17358248/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index 3bb61a2..c7e7233 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -18,6 +18,7 @@
 package org.apache.beam.sdk.io;
 
 import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
 
 import com.google.auto.value.AutoValue;
 import com.google.common.collect.ImmutableMap;
@@ -34,8 +35,12 @@ import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
 import org.apache.beam.sdk.io.Read.Bounded;
-import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
+import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.HasDisplayData;
 import org.apache.beam.sdk.values.PBegin;
@@ -46,7 +51,7 @@ import org.apache.beam.sdk.values.PDone;
  * {@link PTransform}s for reading and writing Avro files.
  *
  * <p>To read a {@link PCollection} from one or more Avro files, use {@code AvroIO.read()},
- * specifying {@link AvroIO.Read#from} to specify the filename or filepattern to read from.
+ * using {@link AvroIO.Read#from} to specify the filename or filepattern to read from.
  * See {@link FileSystems} for information on supported file systems and filepatterns.
  *
  * <p>To read specific records, such as Avro-generated classes, use {@link #read(Class)}.
@@ -70,12 +75,14 @@ import org.apache.beam.sdk.values.PDone;
  *                .from("gs://my_bucket/path/to/records-*.avro"));
  * } </pre>
  *
- * <p>To write a {@link PCollection} to one or more Avro files, use {@link AvroIO.Write}, specifying
- * {@code AvroIO.write().to(String)} to specify the filename or sharded filepattern to write to.
- * See {@link FileSystems} for information on supported file systems and {@link ShardNameTemplate}
- * for information on naming of output files. You can also use {@code AvroIO.write()} with
- * {@link Write#to(FileBasedSink.FilenamePolicy)} to
- * specify a custom file naming policy.
+ * <p>To write a {@link PCollection} to one or more Avro files, use {@link AvroIO.Write}, using
+ * {@code AvroIO.write().to(String)} to specify the output filename prefix. The default
+ * {@link DefaultFilenamePolicy} will use this prefix, in conjunction with a
+ * {@link ShardNameTemplate} (set via {@link Write#withShardNameTemplate(String)}) and optional
+ * filename suffix (set via {@link Write#withSuffix(String)}, to generate output filenames in a
+ * sharded way. You can override this default write filename policy using
+ * {@link Write#withFilenamePolicy(FileBasedSink.FilenamePolicy)} to specify a custom file naming
+ * policy.
  *
  * <p>By default, all input is put into the global window before writing. If per-window writes are
  * desired - for example, when using a streaming runner -
@@ -109,11 +116,6 @@ import org.apache.beam.sdk.values.PDone;
  * <p>By default, {@link AvroIO.Write} produces output files that are compressed using the
  * {@link org.apache.avro.file.Codec CodecFactory.deflateCodec(6)}. This default can
  * be changed or overridden using {@link AvroIO.Write#withCodec}.
- *
- * <h3>Permissions</h3>
- * Permission requirements depend on the {@link PipelineRunner} that is used to execute the
- * pipeline. Please refer to the documentation of corresponding {@link PipelineRunner}s for
- * more details.
  */
 public class AvroIO {
   /**
@@ -172,9 +174,9 @@ public class AvroIO {
 
   private static <T> Write.Builder<T> defaultWriteBuilder() {
     return new AutoValue_AvroIO_Write.Builder<T>()
-        .setFilenameSuffix("")
+        .setFilenameSuffix(null)
+        .setShardTemplate(null)
         .setNumShards(0)
-        .setShardTemplate(Write.DEFAULT_SHARD_TEMPLATE)
         .setCodec(Write.DEFAULT_CODEC)
         .setMetadata(ImmutableMap.<String, Object>of())
         .setWindowedWrites(false);
@@ -246,23 +248,16 @@ public class AvroIO {
   /** Implementation of {@link #write}. */
   @AutoValue
   public abstract static class Write<T> extends PTransform<PCollection<T>, PDone> {
-    /**
-     * A {@link PTransform} that writes a bounded {@link PCollection} to an Avro file (or
-     * multiple Avro files matching a sharding pattern).
-     *
-     * @param <T> the type of each of the elements of the input PCollection
-     */
-    private static final String DEFAULT_SHARD_TEMPLATE = ShardNameTemplate.INDEX_OF_MAX;
     private static final SerializableAvroCodecFactory DEFAULT_CODEC =
         new SerializableAvroCodecFactory(CodecFactory.deflateCodec(6));
     // This should be a multiple of 4 to not get a partial encoded byte.
     private static final int METADATA_BYTES_MAX_LENGTH = 40;
 
-    @Nullable abstract String getFilenamePrefix();
-    abstract String getFilenameSuffix();
+    @Nullable abstract ValueProvider<ResourceId> getFilenamePrefix();
+    @Nullable abstract String getShardTemplate();
+    @Nullable abstract String getFilenameSuffix();
     abstract int getNumShards();
-    abstract String getShardTemplate();
-    abstract Class<T> getRecordClass();
+    @Nullable abstract Class<T> getRecordClass();
     @Nullable abstract Schema getSchema();
     abstract boolean getWindowedWrites();
     @Nullable abstract FilenamePolicy getFilenamePolicy();
@@ -278,7 +273,7 @@ public class AvroIO {
 
     @AutoValue.Builder
     abstract static class Builder<T> {
-      abstract Builder<T> setFilenamePrefix(String filenamePrefix);
+      abstract Builder<T> setFilenamePrefix(ValueProvider<ResourceId> filenamePrefix);
       abstract Builder<T> setFilenameSuffix(String filenameSuffix);
       abstract Builder<T> setNumShards(int numShards);
       abstract Builder<T> setShardTemplate(String shardTemplate);
@@ -293,54 +288,106 @@ public class AvroIO {
     }
 
     /**
-     * Writes to the file(s) with the given prefix. See {@link FileSystems} for information on
+     * Writes to file(s) with the given output prefix. See {@link FileSystems} for information on
+     * supported file systems.
+     *
+     * <p>The name of the output files will be determined by the {@link FilenamePolicy} used.
+     *
+     * <p>By default, a {@link DefaultFilenamePolicy} will build output filenames using the
+     * specified prefix, a shard name template (see {@link #withShardNameTemplate(String)}, and
+     * a common suffix (if supplied using {@link #withSuffix(String)}). This default can be
+     * overridden using {@link #withFilenamePolicy(FilenamePolicy)}.
+     */
+    public Write<T> to(String outputPrefix) {
+      return to(FileBasedSink.convertToFileResourceIfPossible(outputPrefix));
+    }
+
+    /**
+     * Writes to file(s) with the given output prefix. See {@link FileSystems} for information on
      * supported file systems.
      *
-     * <p>The files written will begin with this prefix, followed by
-     * a shard identifier (see {@link #withNumShards}, and end
-     * in a common extension, if given by {@link #withSuffix}.
+     * <p>The name of the output files will be determined by the {@link FilenamePolicy} used.
+     *
+     * <p>By default, a {@link DefaultFilenamePolicy} will build output filenames using the
+     * specified prefix, a shard name template (see {@link #withShardNameTemplate(String)}, and
+     * a common suffix (if supplied using {@link #withSuffix(String)}). This default can be
+     * overridden using {@link #withFilenamePolicy(FilenamePolicy)}.
+     */
+    public Write<T> to(ResourceId outputPrefix) {
+      return toResource(StaticValueProvider.of(outputPrefix));
+    }
+
+    /**
+     * Like {@link #to(String)}.
+     */
+    public Write<T> to(ValueProvider<String> outputPrefix) {
+      return toResource(NestedValueProvider.of(outputPrefix,
+          new SerializableFunction<String, ResourceId>() {
+            @Override
+            public ResourceId apply(String input) {
+              return FileBasedSink.convertToFileResourceIfPossible(input);
+            }
+          }));
+    }
+
+    /**
+     * Like {@link #to(ResourceId)}.
      */
-    public Write<T> to(String filenamePrefix) {
-      return toBuilder().setFilenamePrefix(filenamePrefix).build();
+    public Write<T> toResource(ValueProvider<ResourceId> outputPrefix) {
+      return toBuilder().setFilenamePrefix(outputPrefix).build();
     }
 
-    /** Writes to the file(s) specified by the provided {@link FileBasedSink.FilenamePolicy}. */
-    public Write<T> to(FilenamePolicy filenamePolicy) {
+    /**
+     * Configures the {@link FileBasedSink.FilenamePolicy} that will be used to name written files.
+     */
+    public Write<T> withFilenamePolicy(FilenamePolicy filenamePolicy) {
       return toBuilder().setFilenamePolicy(filenamePolicy).build();
     }
 
     /**
-     * Writes to the file(s) with the given filename suffix.
+     * Uses the given {@link ShardNameTemplate} for naming output files. This option may only be
+     * used when {@link #withFilenamePolicy(FilenamePolicy)} has not been configured.
+     *
+     * <p>See {@link DefaultFilenamePolicy} for how the prefix, shard name template, and suffix are
+     * used.
+     */
+    public Write<T> withShardNameTemplate(String shardTemplate) {
+      return toBuilder().setShardTemplate(shardTemplate).build();
+    }
+
+    /**
+     * Configures the filename suffix for written files. This option may only be used when
+     * {@link #withFilenamePolicy(FilenamePolicy)} has not been configured.
      *
-     * <p>See {@link ShardNameTemplate} for a description of shard templates.
+     * <p>See {@link DefaultFilenamePolicy} for how the prefix, shard name template, and suffix are
+     * used.
      */
     public Write<T> withSuffix(String filenameSuffix) {
       return toBuilder().setFilenameSuffix(filenameSuffix).build();
     }
 
     /**
-     * Uses the provided shard count. See {@link ShardNameTemplate} for a description of shard
-     * templates.
+     * Configures the number of output shards produced overall (when using unwindowed writes) or
+     * per-window (when using windowed writes).
      *
-     * <p>Constraining the number of shards is likely to reduce
-     * the performance of a pipeline. Setting this value is not recommended
-     * unless you require a specific number of output files.
+     * <p>For unwindowed writes, constraining the number of shards is likely to reduce the
+     * performance of a pipeline. Setting this value is not recommended unless you require a
+     * specific number of output files.
      *
-     * @param numShards the number of shards to use, or 0 to let the system
-     *                  decide.
+     * @param numShards the number of shards to use, or 0 to let the system decide.
      */
     public Write<T> withNumShards(int numShards) {
       checkArgument(numShards >= 0);
       return toBuilder().setNumShards(numShards).build();
     }
 
-    /** Uses the given {@link ShardNameTemplate} for naming output files. */
-    public Write<T> withShardNameTemplate(String shardTemplate) {
-      return toBuilder().setShardTemplate(shardTemplate).build();
-    }
-
     /**
-     * Forces a single file as output.
+     * Forces a single file as output and empty shard name template. This option is only compatible
+     * with unwindowed writes.
+     *
+     * <p>For unwindowed writes, constraining the number of shards is likely to reduce the
+     * performance of a pipeline. Setting this value is not recommended unless you require a
+     * specific number of output files.
      *
      * <p>This is equivalent to {@code .withNumShards(1).withShardNameTemplate("")}
      */
@@ -351,9 +398,9 @@ public class AvroIO {
     /**
      * Preserves windowing of input elements and writes them to files based on the element's window.
      *
-     * <p>Requires use of {@link #to(FileBasedSink.FilenamePolicy)}. Filenames will be generated
-     * using {@link FilenamePolicy#windowedFilename(FileBasedSink.FilenamePolicy.WindowedContext)}.
-     * See also {@link WriteFiles#withWindowedWrites()}.
+     * <p>Requires use of {@link #withFilenamePolicy(FileBasedSink.FilenamePolicy)}. Filenames will
+     * be generated using {@link FilenamePolicy#windowedFilename}. See also
+     * {@link WriteFiles#withWindowedWrites()}.
      */
     public Write<T> withWindowedWrites() {
       return toBuilder().setWindowedWrites(true).build();
@@ -386,36 +433,28 @@ public class AvroIO {
 
     @Override
     public PDone expand(PCollection<T> input) {
-      if (getFilenamePolicy() == null && getFilenamePrefix() == null) {
-        throw new IllegalStateException(
-            "need to set the filename prefix of an AvroIO.Write transform");
-      }
-      if (getFilenamePolicy() != null && getFilenamePrefix() != null) {
-        throw new IllegalStateException(
-            "cannot set both a filename policy and a filename prefix");
-      }
-      if (getSchema() == null) {
-        throw new IllegalStateException("need to set the schema of an AvroIO.Write transform");
+      checkState(getFilenamePrefix() != null,
+          "Need to set the filename prefix of an AvroIO.Write transform.");
+      checkState(
+          (getFilenamePolicy() == null)
+              || (getShardTemplate() == null && getFilenameSuffix() == null),
+          "Cannot set a filename policy and also a filename template or suffix.");
+      checkState(getSchema() != null,
+          "Need to set the schema of an AvroIO.Write transform.");
+
+      FilenamePolicy usedFilenamePolicy = getFilenamePolicy();
+      if (usedFilenamePolicy == null) {
+        usedFilenamePolicy = DefaultFilenamePolicy.constructUsingStandardParameters(
+            getFilenamePrefix(), getShardTemplate(), getFilenameSuffix());
       }
 
-      WriteFiles<T> write = null;
-      if (getFilenamePolicy() != null) {
-        write = WriteFiles.to(
-            new AvroSink<>(
-                getFilenamePolicy(),
-                AvroCoder.of(getRecordClass(), getSchema()),
-                getCodec(),
-                getMetadata()));
-      } else {
-        write = WriteFiles.to(
+      WriteFiles<T> write = WriteFiles.to(
             new AvroSink<>(
                 getFilenamePrefix(),
-                getFilenameSuffix(),
-                getShardTemplate(),
+                usedFilenamePolicy,
                 AvroCoder.of(getRecordClass(), getSchema()),
                 getCodec(),
                 getMetadata()));
-      }
       if (getNumShards() > 0) {
         write = write.withNumShards(getNumShards());
       }
@@ -428,17 +467,25 @@ public class AvroIO {
     @Override
     public void populateDisplayData(DisplayData.Builder builder) {
       super.populateDisplayData(builder);
+      checkState(
+          getFilenamePrefix() != null,
+          "Unable to populate DisplayData for invalid AvroIO.Write (unset output prefix).");
+      String outputPrefixString = null;
+      if (getFilenamePrefix().isAccessible()) {
+        ResourceId dir = getFilenamePrefix().get();
+        outputPrefixString = dir.toString();
+      } else {
+        outputPrefixString = getFilenamePrefix().toString();
+      }
       builder
           .add(DisplayData.item("schema", getRecordClass())
             .withLabel("Record Schema"))
-          .addIfNotNull(DisplayData.item("filePrefix", getFilenamePrefix())
+          .addIfNotNull(DisplayData.item("filePrefix", outputPrefixString)
             .withLabel("Output File Prefix"))
-          .addIfNotDefault(DisplayData.item("shardNameTemplate", getShardTemplate())
-              .withLabel("Output Shard Name Template"),
-              DEFAULT_SHARD_TEMPLATE)
-          .addIfNotDefault(DisplayData.item("fileSuffix", getFilenameSuffix())
-              .withLabel("Output File Suffix"),
-              "")
+          .addIfNotNull(DisplayData.item("shardNameTemplate", getShardTemplate())
+              .withLabel("Output Shard Name Template"))
+          .addIfNotNull(DisplayData.item("fileSuffix", getFilenameSuffix())
+              .withLabel("Output File Suffix"))
           .addIfNotDefault(DisplayData.item("numShards", getNumShards())
               .withLabel("Maximum Output Shards"),
               0)

http://git-wip-us.apache.org/repos/asf/beam/blob/17358248/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java
index 46bb4f3..7d42574 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java
@@ -27,7 +27,8 @@ import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.io.DatumWriter;
 import org.apache.avro.reflect.ReflectDatumWriter;
 import org.apache.beam.sdk.coders.AvroCoder;
-import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.util.MimeTypes;
 
 /** A {@link FileBasedSink} for Avro files. */
@@ -37,24 +38,13 @@ class AvroSink<T> extends FileBasedSink<T> {
   private final ImmutableMap<String, Object> metadata;
 
   AvroSink(
+      ValueProvider<ResourceId> outputPrefix,
       FilenamePolicy filenamePolicy,
       AvroCoder<T> coder,
       SerializableAvroCodecFactory codec,
       ImmutableMap<String, Object> metadata) {
-    super(filenamePolicy);
-    this.coder = coder;
-    this.codec = codec;
-    this.metadata = metadata;
-  }
-
-  AvroSink(
-      String baseOutputFilename,
-      String extension,
-      String fileNameTemplate,
-      AvroCoder<T> coder,
-      SerializableAvroCodecFactory codec,
-      ImmutableMap<String, Object> metadata) {
-    super(baseOutputFilename, extension, fileNameTemplate);
+    // Avro handle compression internally using the codec.
+    super(outputPrefix, filenamePolicy, CompressionType.UNCOMPRESSED);
     this.coder = coder;
     this.codec = codec;
     this.metadata = metadata;
@@ -82,7 +72,7 @@ class AvroSink<T> extends FileBasedSink<T> {
     }
 
     @Override
-    public FileBasedWriter<T> createWriter(PipelineOptions options) throws Exception {
+    public FileBasedWriter<T> createWriter() throws Exception {
       return new AvroWriter<>(this, coder, codec, metadata);
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/17358248/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java
new file mode 100644
index 0000000..07bc2db
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io;
+
+import static com.google.common.base.MoreObjects.firstNonNull;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.text.DecimalFormat;
+import java.util.Arrays;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
+import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+
+/**
+ * A default {@link FilenamePolicy} for unwindowed files. This policy is constructed using three
+ * parameters that together define the output name of a sharded file, in conjunction with the number
+ * of shards and index of the particular file, using {@link #constructName}.
+ *
+ * <p>Most users of unwindowed files will use this {@link DefaultFilenamePolicy}. For more advanced
+ * uses in generating different files for each window and other sharding controls, see the
+ * {@code WriteOneFilePerWindow} example pipeline.
+ */
+public final class DefaultFilenamePolicy extends FilenamePolicy {
+  /** The default sharding name template used in {@link #constructUsingStandardParameters}. */
+  public static final String DEFAULT_SHARD_TEMPLATE = ShardNameTemplate.INDEX_OF_MAX;
+
+  // Pattern that matches shard placeholders within a shard template.
+  private static final Pattern SHARD_FORMAT_RE = Pattern.compile("(S+|N+)");
+
+  /**
+   * Constructs a new {@link DefaultFilenamePolicy}.
+   *
+   * @see DefaultFilenamePolicy for more information on the arguments to this function.
+   */
+  @VisibleForTesting
+  DefaultFilenamePolicy(ValueProvider<String> prefix, String shardTemplate, String suffix) {
+    this.prefix = prefix;
+    this.shardTemplate = shardTemplate;
+    this.suffix = suffix;
+  }
+
+  /**
+   * A helper function to construct a {@link DefaultFilenamePolicy} using the standard filename
+   * parameters, namely a provided {@link ResourceId} for the output prefix, and possibly-null
+   * shard name template and suffix.
+   *
+   * <p>Any filename component of the provided resource will be used as the filename prefix.
+   *
+   * <p>If provided, the shard name template will be used; otherwise {@link #DEFAULT_SHARD_TEMPLATE}
+   * will be used.
+   *
+   * <p>If provided, the suffix will be used; otherwise the files will have an empty suffix.
+   */
+  public static DefaultFilenamePolicy constructUsingStandardParameters(
+      ValueProvider<ResourceId> outputPrefix,
+      @Nullable String shardTemplate,
+      @Nullable String filenameSuffix) {
+    return new DefaultFilenamePolicy(
+        NestedValueProvider.of(outputPrefix, new ExtractFilename()),
+        firstNonNull(shardTemplate, DEFAULT_SHARD_TEMPLATE),
+        firstNonNull(filenameSuffix, ""));
+  }
+
+  private final ValueProvider<String> prefix;
+  private final String shardTemplate;
+  private final String suffix;
+
+  /**
+   * Constructs a fully qualified name from components.
+   *
+   * <p>The name is built from a prefix, shard template (with shard numbers
+   * applied), and a suffix.  All components are required, but may be empty
+   * strings.
+   *
+   * <p>Within a shard template, repeating sequences of the letters "S" or "N"
+   * are replaced with the shard number, or number of shards respectively.  The
+   * numbers are formatted with leading zeros to match the length of the
+   * repeated sequence of letters.
+   *
+   * <p>For example, if prefix = "output", shardTemplate = "-SSS-of-NNN", and
+   * suffix = ".txt", with shardNum = 1 and numShards = 100, the following is
+   * produced:  "output-001-of-100.txt".
+   */
+  public static String constructName(
+      String prefix, String shardTemplate, String suffix, int shardNum, int numShards) {
+    // Matcher API works with StringBuffer, rather than StringBuilder.
+    StringBuffer sb = new StringBuffer();
+    sb.append(prefix);
+
+    Matcher m = SHARD_FORMAT_RE.matcher(shardTemplate);
+    while (m.find()) {
+      boolean isShardNum = (m.group(1).charAt(0) == 'S');
+
+      char[] zeros = new char[m.end() - m.start()];
+      Arrays.fill(zeros, '0');
+      DecimalFormat df = new DecimalFormat(String.valueOf(zeros));
+      String formatted = df.format(isShardNum ? shardNum : numShards);
+      m.appendReplacement(sb, formatted);
+    }
+    m.appendTail(sb);
+
+    sb.append(suffix);
+    return sb.toString();
+  }
+
+  @Override
+  @Nullable
+  public ResourceId unwindowedFilename(ResourceId outputDirectory, Context context,
+      String extension) {
+    String filename =
+        constructName(
+            prefix.get(), shardTemplate, suffix, context.getShardNumber(), context.getNumShards())
+        + extension;
+    return outputDirectory.resolve(filename, StandardResolveOptions.RESOLVE_FILE);
+  }
+
+  @Override
+  public ResourceId windowedFilename(ResourceId outputDirectory,
+      WindowedContext c, String extension) {
+    throw new UnsupportedOperationException("There is no default policy for windowed file"
+        + " output. Please provide an explicit FilenamePolicy to generate filenames.");
+  }
+
+  @Override
+  public void populateDisplayData(DisplayData.Builder builder) {
+    String filenamePattern;
+    if (prefix.isAccessible()) {
+      filenamePattern = String.format("%s%s%s", prefix.get(), shardTemplate, suffix);
+    } else {
+      filenamePattern = String.format("%s%s%s", prefix, shardTemplate, suffix);
+    }
+    builder.add(
+        DisplayData.item("filenamePattern", filenamePattern)
+            .withLabel("Filename Pattern"));
+  }
+
+  private static class ExtractFilename implements SerializableFunction<ResourceId, String> {
+    @Override
+    public String apply(ResourceId input) {
+      if (input.isDirectory()) {
+        return "";
+      } else {
+        return firstNonNull(input.getFilename(), "");
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/17358248/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index 7ba608c..0daf5dc 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -21,10 +21,11 @@ import static com.google.common.base.MoreObjects.firstNonNull;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
-import static com.google.common.base.Strings.isNullOrEmpty;
+import static com.google.common.base.Verify.verifyNotNull;
 
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Ordering;
 import java.io.IOException;
 import java.io.InputStream;
@@ -32,26 +33,27 @@ import java.io.OutputStream;
 import java.io.Serializable;
 import java.nio.channels.Channels;
 import java.nio.channels.WritableByteChannel;
-import java.nio.file.Path;
-import java.text.DecimalFormat;
 import java.util.ArrayList;
-import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.zip.GZIPOutputStream;
 import javax.annotation.Nullable;
-import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.NullableCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy.Context;
 import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy.WindowedContext;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
+import org.apache.beam.sdk.io.fs.MoveOptions.StandardMoveOptions;
+import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
+import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
@@ -61,13 +63,12 @@ import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.HasDisplayData;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.IOChannelFactory;
-import org.apache.beam.sdk.util.IOChannelUtils;
 import org.apache.beam.sdk.util.MimeTypes;
 import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream;
 import org.apache.commons.compress.compressors.deflate.DeflateCompressorOutputStream;
 import org.joda.time.Instant;
 import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -103,52 +104,12 @@ import org.slf4j.LoggerFactory;
  * PCollections into separate files per window pane. This allows file output from unbounded
  * PCollections, and also works for bounded PCollecctions.
  *
- * <p>Supported file systems are those registered with {@link IOChannelUtils}.
+ * <p>Supported file systems are those registered with {@link FileSystems}.
  *
  * @param <T> the type of values written to the sink.
  */
 public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
   private static final Logger LOG = LoggerFactory.getLogger(FileBasedSink.class);
-  // Pattern that matches shard placeholders within a shard template.
-  private static final Pattern SHARD_FORMAT_RE = Pattern.compile("(S+|N+)");
-
-  /**
-   * Constructs a fully qualified name from components.
-   *
-   * <p>The name is built from a prefix, shard template (with shard numbers
-   * applied), and a suffix.  All components are required, but may be empty
-   * strings.
-   *
-   * <p>Within a shard template, repeating sequences of the letters "S" or "N"
-   * are replaced with the shard number, or number of shards respectively.  The
-   * numbers are formatted with leading zeros to match the length of the
-   * repeated sequence of letters.
-   *
-   * <p>For example, if prefix = "output", shardTemplate = "-SSS-of-NNN", and
-   * suffix = ".txt", with shardNum = 1 and numShards = 100, the following is
-   * produced:  "output-001-of-100.txt".
-   */
-  public static String constructName(String prefix,
-      String shardTemplate, String suffix, int shardNum, int numShards) {
-    // Matcher API works with StringBuffer, rather than StringBuilder.
-    StringBuffer sb = new StringBuffer();
-    sb.append(prefix);
-
-    Matcher m = SHARD_FORMAT_RE.matcher(shardTemplate);
-    while (m.find()) {
-      boolean isShardNum = (m.group(1).charAt(0) == 'S');
-
-      char[] zeros = new char[m.end() - m.start()];
-      Arrays.fill(zeros, '0');
-      DecimalFormat df = new DecimalFormat(String.valueOf(zeros));
-      String formatted = df.format(isShardNum ? shardNum : numShards);
-      m.appendReplacement(sb, formatted);
-    }
-    m.appendTail(sb);
-
-    sb.append(suffix);
-    return sb.toString();
-  }
 
   /**
    * Directly supported file output compression types.
@@ -213,12 +174,32 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
   }
 
   /**
+   * This is a helper function for turning a user-provided output filename prefix and converting it
+   * into a {@link ResourceId} for writing output files. See {@link TextIO.Write#to(String)} for an
+   * example use case.
+   *
+   * <p>Typically, the input prefix will be something like {@code /tmp/foo/bar}, and the user would
+   * like output files to be named as {@code /tmp/foo/bar-0-of-3.txt}. Thus, this function tries to
+   * interpret the provided string as a file {@link ResourceId} path.
+   *
+   * <p>However, this may fail, for example if the user gives a prefix that is a directory. E.g.,
+   * {@code /}, {@code gs://my-bucket}, or {@code c://}. In that case, interpreting the string as a
+   * file will fail and this function will return a directory {@link ResourceId} instead.
+   */
+  public static ResourceId convertToFileResourceIfPossible(String outputPrefix) {
+    try {
+      return FileSystems.matchNewResource(outputPrefix, false /* isDirectory */);
+    } catch (Exception e) {
+      return FileSystems.matchNewResource(outputPrefix, true /* isDirectory */);
+    }
+  }
+
+  /**
    * The {@link WritableByteChannelFactory} that is used to wrap the raw data output to the
    * underlying channel. The default is to not compress the output using
    * {@link CompressionType#UNCOMPRESSED}.
    */
-  protected final WritableByteChannelFactory writableByteChannelFactory;
-
+  private final WritableByteChannelFactory writableByteChannelFactory;
 
   /**
    * A naming policy for output files.
@@ -294,23 +275,28 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
 
     /**
      * When a sink has requested windowed or triggered output, this method will be invoked to return
-     * the filename. The {@link WindowedContext} object gives access to the window and pane, as
-     * well as sharding information. The policy must return unique and consistent filenames
+     * the file {@link ResourceId resource} to be created given the base output directory and a
+     * (possibly empty) extension from {@link FileBasedSink} configuration
+     * (e.g., {@link CompressionType}).
+     *
+     * <p>The {@link WindowedContext} object gives access to the window and pane,
+     * as well as sharding information. The policy must return unique and consistent filenames
      * for different windows and panes.
      */
-    public abstract String windowedFilename(WindowedContext c);
-
-    /**
-     * When a sink has not requested windowed output, this method will be invoked to return the
-     * filename. The {@link Context} object only provides sharding information, which is used by
-     * the policy to generate unique and consistent filenames.
-     */
-    public abstract String unwindowedFilename(Context c);
+    public abstract ResourceId windowedFilename(
+        ResourceId outputDirectory, WindowedContext c, String extension);
 
     /**
-     * @return The base filename for all output files.
+     * When a sink has not requested windowed or triggered output, this method will be invoked to
+     * return the file {@link ResourceId resource} to be created given the base output directory and
+     * a (possibly empty) extension applied by additional {@link FileBasedSink} configuration
+     * (e.g., {@link CompressionType}).
+     *
+     * <p>The {@link Context} object only provides sharding information, which is used by the policy
+     * to generate unique and consistent filenames.
      */
-    public abstract ValueProvider<String> getBaseOutputFilenameProvider();
+    @Nullable public abstract ResourceId unwindowedFilename(
+        ResourceId outputDirectory, Context c, String extension);
 
     /**
      * Populates the display data.
@@ -319,129 +305,55 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
     }
   }
 
+  /** The policy used to generate names of files to be produced. */
+  @VisibleForTesting
+  final FilenamePolicy filenamePolicy;
+  /** The directory to which files will be written. */
+  private final ValueProvider<ResourceId> baseOutputDirectoryProvider;
+
   /**
-   * A default filename policy.
+   * Construct a {@link FileBasedSink} with the given filename policy, producing uncompressed files.
    */
-  protected class DefaultFilenamePolicy extends FilenamePolicy {
-    ValueProvider<String> baseOutputFilename;
-    String extension;
-    String fileNamingTemplate;
-
-    public DefaultFilenamePolicy(ValueProvider<String> baseOutputFilename, String extension,
-                                 String fileNamingTemplate) {
-      this.baseOutputFilename = baseOutputFilename;
-      if (!isNullOrEmpty(writableByteChannelFactory.getFilenameSuffix())) {
-        this.extension = extension + getFileExtension(
-            writableByteChannelFactory.getFilenameSuffix());
-      } else {
-        this.extension = extension;
-      }
-      this.fileNamingTemplate = fileNamingTemplate;
-    }
-
-    @Override
-    public String unwindowedFilename(FilenamePolicy.Context context) {
-      if (context.numShards <= 0) {
-        return null;
-      }
-
-      String suffix = getFileExtension(extension);
-      String filename = constructName(
-          baseOutputFilename.get(), fileNamingTemplate, suffix, context.getShardNumber(),
-          context.getNumShards());
-      return filename;
-    }
-
-    @Override
-    public String windowedFilename(FilenamePolicy.WindowedContext c) {
-      throw new UnsupportedOperationException("There is no default policy for windowed file"
-          + " output. Please provide an explicit FilenamePolicy to generate filenames.");
-    }
-
-    @Override
-    public ValueProvider<String> getBaseOutputFilenameProvider() {
-      return baseOutputFilename;
-    }
+  public FileBasedSink(
+      ValueProvider<ResourceId> baseOutputDirectoryProvider, FilenamePolicy filenamePolicy) {
+    this(baseOutputDirectoryProvider, filenamePolicy, CompressionType.UNCOMPRESSED);
+  }
 
+  private static class ExtractDirectory implements SerializableFunction<ResourceId, ResourceId> {
     @Override
-    public void populateDisplayData(DisplayData.Builder builder) {
-    String fileNamePattern = String.format("%s%s%s",
-        baseOutputFilename.isAccessible()
-        ? baseOutputFilename.get() : baseOutputFilename.toString(),
-        fileNamingTemplate, getFileExtension(extension));
-    builder.add(DisplayData.item("fileNamePattern", fileNamePattern)
-      .withLabel("File Name Pattern"));
+    public ResourceId apply(ResourceId input) {
+      return input.getCurrentDirectory();
     }
   }
 
   /**
-   * The policy used to generate output filenames.
-   */
-  protected FilenamePolicy fileNamePolicy;
-
-  /**
-   * Construct a FileBasedSink with the given base output filename and extension. A
-   * {@link WritableByteChannelFactory} of type {@link CompressionType#UNCOMPRESSED} will be used.
-   */
-  public FileBasedSink(String baseOutputFilename, String extension) {
-    this(baseOutputFilename, extension, ShardNameTemplate.INDEX_OF_MAX);
-  }
-
-  /**
-   * Construct a FileBasedSink with the given base output filename, extension, and
-   * {@link WritableByteChannelFactory}.
+   * Construct a {@link FileBasedSink} with the given filename policy and output channel type.
    */
-  public FileBasedSink(String baseOutputFilename, String extension,
+  public FileBasedSink(
+      ValueProvider<ResourceId> baseOutputDirectoryProvider,
+      FilenamePolicy filenamePolicy,
       WritableByteChannelFactory writableByteChannelFactory) {
-    this(StaticValueProvider.of(baseOutputFilename), extension,
-        ShardNameTemplate.INDEX_OF_MAX, writableByteChannelFactory);
+    this.baseOutputDirectoryProvider =
+        NestedValueProvider.of(baseOutputDirectoryProvider, new ExtractDirectory());
+    this.filenamePolicy = filenamePolicy;
+    this.writableByteChannelFactory = writableByteChannelFactory;
   }
 
   /**
-   * Construct a FileBasedSink with the given base output filename, extension, and file naming
-   * template. A {@link WritableByteChannelFactory} of type {@link CompressionType#UNCOMPRESSED}
-   * will be used.
-   *
-   * <p>See {@link ShardNameTemplate} for a description of file naming templates.
+   * Returns the base directory inside which files will be written according to the configured
+   * {@link FilenamePolicy}.
    */
-  public FileBasedSink(String baseOutputFilename, String extension, String fileNamingTemplate) {
-    this(StaticValueProvider.of(baseOutputFilename), extension, fileNamingTemplate,
-        CompressionType.UNCOMPRESSED);
-  }
-
-  /**
-   * Construct a FileBasedSink with the given base output filename, extension, file naming template,
-   * and {@link WritableByteChannelFactory}.
-   *
-   * <p>See {@link ShardNameTemplate} for a description of file naming templates.
-   */
-  public FileBasedSink(ValueProvider<String> baseOutputFilename, String extension,
-      String fileNamingTemplate, WritableByteChannelFactory writableByteChannelFactory) {
-    this.writableByteChannelFactory = writableByteChannelFactory;
-    this.fileNamePolicy = new DefaultFilenamePolicy(baseOutputFilename, extension,
-        fileNamingTemplate);
-  }
-
-  public FileBasedSink(FilenamePolicy fileNamePolicy) {
-    this(fileNamePolicy, CompressionType.UNCOMPRESSED);
-
-  }
-
-  public FileBasedSink(FilenamePolicy fileNamePolicy,
-                       WritableByteChannelFactory writableByteChannelFactory) {
-    this.fileNamePolicy = fileNamePolicy;
-    this.writableByteChannelFactory = writableByteChannelFactory;
+  public ValueProvider<ResourceId> getBaseOutputDirectoryProvider() {
+    return baseOutputDirectoryProvider;
   }
 
   /**
-   * Returns the base output filename for this file based sink.
+   * Returns the policy by which files will be named inside of the base output directory. Note that
+   * the {@link FilenamePolicy} may itself specify one or more inner directories before each output
+   * file, say when writing windowed outputs in a {@code output/YYYY/MM/DD/file.txt} format.
    */
-  public ValueProvider<String> getBaseOutputFilenameProvider() {
-    return fileNamePolicy.getBaseOutputFilenameProvider();
-  }
-
-  public FilenamePolicy getFileNamePolicy() {
-    return fileNamePolicy;
+  public final FilenamePolicy getFilenamePolicy() {
+    return filenamePolicy;
   }
 
   public void validate(PipelineOptions options) {}
@@ -453,22 +365,7 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
   public abstract FileBasedWriteOperation<T> createWriteOperation();
 
   public void populateDisplayData(DisplayData.Builder builder) {
-    getFileNamePolicy().populateDisplayData(builder);
-  }
-
-  /**
-   * Returns the file extension to be used. If the user did not request a file
-   * extension then this method returns the empty string. Otherwise this method
-   * adds a {@code "."} to the beginning of the users extension if one is not present.
-   */
-  private static String getFileExtension(String usersExtension) {
-    if (usersExtension == null || usersExtension.isEmpty()) {
-      return "";
-    }
-    if (usersExtension.startsWith(".")) {
-      return usersExtension;
-    }
-    return "." + usersExtension;
+    getFilenamePolicy().populateDisplayData(builder);
   }
 
   /**
@@ -518,15 +415,15 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
     protected final FileBasedSink<T> sink;
 
     /** Directory for temporary output files. */
-    protected final ValueProvider<String> tempDirectory;
+    protected final ValueProvider<ResourceId> tempDirectory;
 
     /** Whether windowed writes are being used. */
     protected  boolean windowedWrites;
 
-    /** Constructs a temporary file path given the temporary directory and a filename. */
-    protected static String buildTemporaryFilename(String tempDirectory, String filename)
+    /** Constructs a temporary file resource given the temporary directory and a filename. */
+    protected static ResourceId buildTemporaryFilename(ResourceId tempDirectory, String filename)
         throws IOException {
-      return IOChannelUtils.getFactory(tempDirectory).resolve(tempDirectory, filename);
+      return tempDirectory.resolve(filename, StandardResolveOptions.RESOLVE_FILE);
     }
 
     /**
@@ -540,30 +437,26 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
      */
     public FileBasedWriteOperation(FileBasedSink<T> sink) {
       this(sink, NestedValueProvider.of(
-          sink.getBaseOutputFilenameProvider(), new TemporaryDirectoryBuilder()));
+          sink.getBaseOutputDirectoryProvider(), new TemporaryDirectoryBuilder()));
     }
 
     private static class TemporaryDirectoryBuilder
-        implements SerializableFunction<String, String> {
+        implements SerializableFunction<ResourceId, ResourceId> {
+      private static final AtomicLong TEMP_COUNT = new AtomicLong(0);
+      private static final DateTimeFormatter TEMPDIR_TIMESTAMP =
+          DateTimeFormat.forPattern("yyyy-MM-DD_HH-mm-ss");
       // The intent of the code is to have a consistent value of tempDirectory across
       // all workers, which wouldn't happen if now() was called inline.
-      Instant now = Instant.now();
+      private final String timestamp = Instant.now().toString(TEMPDIR_TIMESTAMP);
+      // Multiple different sinks may be used in the same output directory; use tempId to create a
+      // separate temp directory for each.
+      private final Long tempId = TEMP_COUNT.getAndIncrement();
 
       @Override
-      public String apply(String baseOutputFilename) {
-        try {
-          IOChannelFactory factory = IOChannelUtils.getFactory(baseOutputFilename);
-          Path baseOutputPath = factory.toPath(baseOutputFilename);
-          return baseOutputPath
-              .resolveSibling(
-                  "temp-beam-"
-                  + baseOutputPath.getFileName()
-                  + "-"
-                  + now.toString(DateTimeFormat.forPattern("yyyy-MM-DD_HH-mm-ss")))
-              .toString();
-        } catch (IOException e) {
-          throw new RuntimeException(e);
-        }
+      public ResourceId apply(ResourceId baseOutputDirectory) {
+        // Temp directory has a timestamp and a unique ID
+        String tempDirName = String.format(".temp-beam-%s-%s", timestamp, tempId);
+        return baseOutputDirectory.resolve(tempDirName, StandardResolveOptions.RESOLVE_DIRECTORY);
       }
     }
 
@@ -573,11 +466,12 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
      * @param sink the FileBasedSink that will be used to configure this write operation.
      * @param tempDirectory the base directory to be used for temporary output files.
      */
-    public FileBasedWriteOperation(FileBasedSink<T> sink, String tempDirectory) {
+    public FileBasedWriteOperation(FileBasedSink<T> sink, ResourceId tempDirectory) {
       this(sink, StaticValueProvider.of(tempDirectory));
     }
 
-    private FileBasedWriteOperation(FileBasedSink<T> sink, ValueProvider<String> tempDirectory) {
+    private FileBasedWriteOperation(
+        FileBasedSink<T> sink, ValueProvider<ResourceId> tempDirectory) {
       this.sink = sink;
       this.tempDirectory = tempDirectory;
       this.windowedWrites = false;
@@ -587,7 +481,7 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
      * Clients must implement to return a subclass of {@link FileBasedSink.FileBasedWriter}. This
      * method must not mutate the state of the object.
      */
-    public abstract FileBasedWriter<T> createWriter(PipelineOptions options) throws Exception;
+    public abstract FileBasedWriter<T> createWriter() throws Exception;
 
     /**
      * Indicates that the operation will be performing windowed writes.
@@ -610,11 +504,10 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
      *
      * @param writerResults the results of writes (FileResult).
      */
-    public void finalize(Iterable<FileResult> writerResults,
-                         PipelineOptions options) throws Exception {
+    public void finalize(Iterable<FileResult> writerResults) throws Exception {
       // Collect names of temporary files and rename them.
-      Map<String, String> outputFilenames = buildOutputFilenames(writerResults);
-      copyToOutputFiles(outputFilenames, options);
+      Map<ResourceId, ResourceId> outputFilenames = buildOutputFilenames(writerResults);
+      copyToOutputFiles(outputFilenames);
 
       // Optionally remove temporary files.
       // We remove the entire temporary directory, rather than specifically removing the files
@@ -625,12 +518,13 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
       //
       // When windows or triggers are specified, files are generated incrementally so deleting
       // the entire directory in finalize is incorrect.
-      removeTemporaryFiles(outputFilenames.keySet(), !windowedWrites, options);
+      removeTemporaryFiles(outputFilenames.keySet(), !windowedWrites);
     }
 
-    protected final Map<String, String> buildOutputFilenames(Iterable<FileResult> writerResults) {
-      Map<String, String> outputFilenames = new HashMap<>();
-      List<String> files = new ArrayList<>();
+    protected final Map<ResourceId, ResourceId> buildOutputFilenames(
+        Iterable<FileResult> writerResults) {
+      Map<ResourceId, ResourceId> outputFilenames = new HashMap<>();
+      List<ResourceId> files = new ArrayList<>();
       for (FileResult result : writerResults) {
         if (result.getDestinationFilename() != null) {
           outputFilenames.put(result.getFilename(), result.getDestinationFilename());
@@ -639,20 +533,21 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
         }
       }
 
-      // If the user does not specify numShards() (not supported with windowing). Then the
       // writerResults won't contain destination filenames, so we dynamically generate them here.
       if (files.size() > 0) {
         checkArgument(outputFilenames.isEmpty());
         // Sort files for idempotence.
-        files = Ordering.natural().sortedCopy(files);
-        FilenamePolicy filenamePolicy = getSink().fileNamePolicy;
+        files = Ordering.usingToString().sortedCopy(files);
+        ResourceId outputDirectory = getSink().getBaseOutputDirectoryProvider().get();
+        FilenamePolicy filenamePolicy = getSink().filenamePolicy;
         for (int i = 0; i < files.size(); i++) {
           outputFilenames.put(files.get(i),
-              filenamePolicy.unwindowedFilename(new Context(i, files.size())));
+              filenamePolicy.unwindowedFilename(outputDirectory, new Context(i, files.size()),
+                  getSink().getExtension()));
         }
       }
 
-      int numDistinctShards = new HashSet<String>(outputFilenames.values()).size();
+      int numDistinctShards = new HashSet<ResourceId>(outputFilenames.values()).size();
       checkState(numDistinctShards == outputFilenames.size(),
          "Only generated %s distinct file names for %s files.",
          numDistinctShards, outputFilenames.size());
@@ -673,15 +568,21 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
      *
      * @param filenames the filenames of temporary files.
      */
-    protected final void copyToOutputFiles(Map<String, String> filenames,
-                                           PipelineOptions options)
+    @VisibleForTesting
+    final void copyToOutputFiles(Map<ResourceId, ResourceId> filenames)
         throws IOException {
       int numFiles = filenames.size();
       if (numFiles > 0) {
         LOG.debug("Copying {} files.", numFiles);
-        IOChannelFactory channelFactory =
-            IOChannelUtils.getFactory(filenames.values().iterator().next());
-        channelFactory.copy(filenames.keySet(), filenames.values());
+        List<ResourceId> srcFiles = new ArrayList<>(filenames.size());
+        List<ResourceId> dstFiles = new ArrayList<>(filenames.size());
+        for (Map.Entry<ResourceId, ResourceId> srcDestPair : filenames.entrySet()) {
+          srcFiles.add(srcDestPair.getKey());
+          dstFiles.add(srcDestPair.getValue());
+        }
+        // During a failure case, files may have been deleted in an earlier step. Thus
+        // we ignore missing files here.
+        FileSystems.copy(srcFiles, dstFiles, StandardMoveOptions.IGNORE_MISSING_FILES);
       } else {
         LOG.info("No output files to write.");
       }
@@ -694,13 +595,11 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
      * <b>Note:</b>If finalize is overridden and does <b>not</b> rename or otherwise finalize
      * temporary files, this method will remove them.
      */
-    protected final void removeTemporaryFiles(Set<String> knownFiles,
-                                              boolean shouldRemoveTemporaryDirectory,
-                                              PipelineOptions options)
-        throws IOException {
-      String tempDir = tempDirectory.get();
+    @VisibleForTesting
+    final void removeTemporaryFiles(
+        Set<ResourceId> knownFiles, boolean shouldRemoveTemporaryDirectory) throws IOException {
+      ResourceId tempDir = tempDirectory.get();
       LOG.debug("Removing temporary bundle output files in {}.", tempDir);
-      IOChannelFactory factory = IOChannelUtils.getFactory(tempDir);
 
       // To partially mitigate the effects of filesystems with eventually-consistent
       // directory matching APIs, we remove not only files that the filesystem says exist
@@ -709,17 +608,21 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
 
       // This may still fail to remove temporary outputs of some failed bundles, but at least
       // the common case (where all bundles succeed) is guaranteed to be fully addressed.
-      Set<String> matches = new HashSet<>();
+      Set<ResourceId> matches = new HashSet<>();
       // TODO: Windows OS cannot resolves and matches '*' in the path,
       // ignore the exception for now to avoid failing the pipeline.
       if (shouldRemoveTemporaryDirectory) {
         try {
-          matches.addAll(factory.match(factory.resolve(tempDir, "*")));
+          MatchResult singleMatch = Iterables.getOnlyElement(
+              FileSystems.match(Collections.singletonList(tempDir.toString() + "*")));
+          for (Metadata matchResult : singleMatch.metadata()) {
+            matches.add(matchResult.resourceId());
+          }
         } catch (Exception e) {
           LOG.warn("Failed to match temporary files under: [{}].", tempDir);
         }
       }
-      Set<String> allMatches = new HashSet<>(matches);
+      Set<ResourceId> allMatches = new HashSet<>(matches);
       allMatches.addAll(knownFiles);
       LOG.debug(
           "Removing {} temporary files found under {} ({} matched glob, {} known files)",
@@ -727,23 +630,18 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
           tempDir,
           matches.size(),
           allMatches.size() - matches.size());
+      FileSystems.delete(allMatches, StandardMoveOptions.IGNORE_MISSING_FILES);
+
       // Deletion of the temporary directory might fail, if not all temporary files are removed.
       try {
-        factory.remove(allMatches);
-        factory.remove(ImmutableList.of(tempDir));
+        FileSystems.delete(
+            Collections.singletonList(tempDir), StandardMoveOptions.IGNORE_MISSING_FILES);
       } catch (Exception e) {
         LOG.warn("Failed to remove temporary directory: [{}].", tempDir);
       }
     }
 
     /**
-     * Provides a coder for {@link FileBasedSink.FileResult}.
-     */
-    public final Coder<FileResult> getFileResultCoder() {
-      return FileResultCoder.of();
-    }
-
-    /**
      * Returns the FileBasedSink for this write operation.
      */
     public FileBasedSink<T> getSink() {
@@ -751,6 +649,15 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
     }
   }
 
+  /** Returns the extension that will be written to the produced files. */
+  protected final String getExtension() {
+    String extension = MoreObjects.firstNonNull(writableByteChannelFactory.getFilenameSuffix(), "");
+    if (!extension.isEmpty() && !extension.startsWith(".")) {
+      extension = "." + extension;
+    }
+    return extension;
+  }
+
   /**
    * Abstract writer that writes a bundle to a {@link FileBasedSink}. Subclass
    * implementations provide a method that can write a single value to a
@@ -760,19 +667,17 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
    * after the values in a bundle, respectively, as well as provide a MIME type for the output
    * channel.
    *
-   * <p>Multiple FileBasedWriter instances may be created on the same worker, and therefore any
-   * access to static members or methods should be thread safe.
+   * <p>Multiple {@link FileBasedWriter} instances may be created on the same worker, and therefore
+   * any access to static members or methods should be thread safe.
    *
    * @param <T> the type of values to write.
    */
   public abstract static class FileBasedWriter<T> {
     private static final Logger LOG = LoggerFactory.getLogger(FileBasedWriter.class);
 
-    final FileBasedWriteOperation<T> writeOperation;
+    private final FileBasedWriteOperation<T> writeOperation;
 
-    /**
-     * Unique id for this output bundle.
-     */
+    /** Unique id for this output bundle. */
     private String id;
 
     private BoundedWindow window;
@@ -780,10 +685,8 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
     private int shard = -1;
     private int numShards = -1;
 
-    /**
-     * The filename of the output bundle.
-     */
-    private String filename;
+    /** The output file for this bundle. May be null if opening failed. */
+    private @Nullable ResourceId outputFile;
 
     /**
      * The channel to write to.
@@ -801,7 +704,7 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
     private final String mimeType;
 
     /**
-     * Construct a new FileBasedWriter with a base filename.
+     * Construct a new {@link FileBasedWriter} that will produce files of the given MIME type.
      */
     public FileBasedWriter(FileBasedWriteOperation<T> writeOperation, String mimeType) {
       checkNotNull(writeOperation);
@@ -846,11 +749,9 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
      * shard and numShards are populated for the case of static sharding. In cases where the
      * runner is dynamically picking sharding, shard and numShards might both be set to -1.
      */
-    public final void openWindowed(String uId,
-                                   BoundedWindow window,
-                                   PaneInfo paneInfo,
-                                   int shard,
-                                   int numShards) throws Exception {
+    public final void openWindowed(
+        String uId, BoundedWindow window, PaneInfo paneInfo, int shard, int numShards)
+        throws Exception {
       if (!getWriteOperation().windowedWrites) {
         throw new IllegalStateException("openWindowed called a non-windowed sink.");
       }
@@ -875,6 +776,19 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
       open(uId, null, null, shard, numShards);
     }
 
+    // Helper function to close a channel, on exception cases.
+    // Always throws prior exception, with any new closing exception suppressed.
+    private static void closeChannelAndThrow(
+        WritableByteChannel channel, ResourceId filename, Exception prior) throws Exception {
+      try {
+        channel.close();
+      } catch (Exception e) {
+        LOG.error("Closing channel for {} failed.", filename, e);
+        prior.addSuppressed(e);
+        throw prior;
+      }
+    }
+
     private void open(String uId,
                       @Nullable BoundedWindow window,
                       @Nullable PaneInfo paneInfo,
@@ -885,64 +799,98 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
       this.paneInfo = paneInfo;
       this.shard = shard;
       this.numShards = numShards;
-      filename = FileBasedWriteOperation.buildTemporaryFilename(
-          getWriteOperation().tempDirectory.get(), uId);
-      LOG.debug("Opening {}.", filename);
+      ResourceId tempDirectory = getWriteOperation().tempDirectory.get();
+      outputFile = tempDirectory.resolve(id, StandardResolveOptions.RESOLVE_FILE);
+      verifyNotNull(
+          outputFile, "FileSystems are not allowed to return null from resolve: %s", tempDirectory);
+
       final WritableByteChannelFactory factory =
           getWriteOperation().getSink().writableByteChannelFactory;
       // The factory may force a MIME type or it may return null, indicating to use the sink's MIME.
       String channelMimeType = firstNonNull(factory.getMimeType(), mimeType);
-      channel = factory.create(IOChannelUtils.create(filename, channelMimeType));
+      LOG.debug("Opening {} for write with MIME type {}.", outputFile, channelMimeType);
+      WritableByteChannel tempChannel = FileSystems.create(outputFile, channelMimeType);
+      try {
+        channel = factory.create(tempChannel);
+      } catch (Exception e) {
+        // If we have opened the underlying channel but fail to open the compression channel,
+        // we should still close the underlying channel.
+        closeChannelAndThrow(tempChannel, outputFile, e);
+      }
+
+      // The caller shouldn't have to close() this FileBasedWriter if it fails to open(), so close
+      // the channel if prepareWrite() or writeHeader() fails.
+      String step = "";
       try {
+        LOG.debug("Preparing write to {}.", outputFile);
         prepareWrite(channel);
-        LOG.debug("Writing header to {}.", filename);
+
+        LOG.debug("Writing header to {}.", outputFile);
         writeHeader();
       } catch (Exception e) {
-        // The caller shouldn't have to close() this Writer if it fails to open(), so close the
-        // channel if prepareWrite() or writeHeader() fails.
-        try {
-          LOG.error("Writing header to {} failed, closing channel.", filename);
-          channel.close();
-        } catch (IOException closeException) {
-          // Log exception and mask it.
-          LOG.error("Closing channel for {} failed: {}", filename, closeException.getMessage());
-        }
-        // Throw the exception that caused the write to fail.
-        throw e;
+        LOG.error("Beginning write to {} failed, closing channel.", step, outputFile, e);
+        closeChannelAndThrow(channel, outputFile, e);
       }
-      LOG.debug("Starting write of bundle {} to {}.", this.id, filename);
+
+      LOG.debug("Starting write of bundle {} to {}.", this.id, outputFile);
     }
 
     public final void cleanup() throws Exception {
-      if (filename != null) {
-        IOChannelUtils.getFactory(filename).remove(Lists.<String>newArrayList(filename));
+      if (outputFile != null) {
+        // outputFile may be null if open() was not called or failed.
+        FileSystems.delete(
+            Collections.singletonList(outputFile), StandardMoveOptions.IGNORE_MISSING_FILES);
       }
     }
 
-    /**
-     * Closes the channel and returns the bundle result.
-     */
+    /** Closes the channel and returns the bundle result. */
     public final FileResult close() throws Exception {
-      try (WritableByteChannel theChannel = channel) {
-        LOG.debug("Writing footer to {}.", filename);
+      checkState(outputFile != null, "FileResult.close cannot be called with a null outputFile");
+
+      LOG.debug("Writing footer to {}.", outputFile);
+      try {
         writeFooter();
-        LOG.debug("Finishing write to {}.", filename);
+      } catch (Exception e) {
+        LOG.error("Writing footer to {} failed, closing channel.", outputFile, e);
+        closeChannelAndThrow(channel, outputFile, e);
+      }
+
+      LOG.debug("Finishing write to {}.", outputFile);
+      try {
         finishWrite();
-        if (!channel.isOpen()) {
-          throw new IllegalStateException("Channel should only be closed by its owner: " + channel);
-        }
+      } catch (Exception e) {
+        LOG.error("Finishing write to {} failed, closing channel.", outputFile, e);
+        closeChannelAndThrow(channel, outputFile, e);
       }
 
-      FilenamePolicy filenamePolicy = getWriteOperation().getSink().fileNamePolicy;
-      String destinationFile;
+      checkState(
+          channel.isOpen(),
+          "Channel %s to %s should only be closed by its owner: %s", channel, outputFile);
+
+      LOG.debug("Closing channel to {}.", outputFile);
+      try {
+        channel.close();
+      } catch (Exception e) {
+        throw new IOException(String.format("Failed closing channel to %s", outputFile), e);
+      }
+
+      FileBasedSink<T> sink = getWriteOperation().getSink();
+      ResourceId outputDirectory = sink.getBaseOutputDirectoryProvider().get();
+      FilenamePolicy filenamePolicy = sink.filenamePolicy;
+      String extension = sink.getExtension();
+      @Nullable ResourceId destinationFile;
       if (window != null) {
-        destinationFile = filenamePolicy.windowedFilename(new WindowedContext(
-            window, paneInfo, shard, numShards));
+        destinationFile = filenamePolicy.windowedFilename(outputDirectory, new WindowedContext(
+            window, paneInfo, shard, numShards), extension);
+      } else if (numShards > 0) {
+        destinationFile = filenamePolicy.unwindowedFilename(
+            outputDirectory, new Context(shard, numShards), extension);
       } else {
-        destinationFile =  filenamePolicy.unwindowedFilename(new Context(shard, numShards));
+        // Destination filename to be generated in the next step.
+        destinationFile = null;
       }
-      FileResult result = new FileResult(filename, destinationFile);
-      LOG.debug("Result for bundle {}: {} {}", this.id, filename, destinationFile);
+      FileResult result = new FileResult(outputFile, destinationFile);
+      LOG.debug("Result for bundle {}: {} {}", this.id, outputFile, destinationFile);
       return result;
     }
 
@@ -955,33 +903,44 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
   }
 
   /**
-   * Result of a single bundle write. Contains the filename of the bundle.
+   * Result of a single bundle write. Contains the filename produced by the bundle, and if known
+   * the final output filename.
    */
-  public static final class FileResult {
-    private final String filename;
-    private final String destinationFilename;
+  public static final class FileResult implements Serializable {
+    private final ResourceId filename;
+    @Nullable private final ResourceId destinationFilename;
 
-    public FileResult(String filename, String destinationFilename) {
+    public FileResult(ResourceId filename, @Nullable ResourceId destinationFilename) {
       this.filename = filename;
       this.destinationFilename = destinationFilename;
     }
 
-    public String getFilename() {
+    public ResourceId getFilename() {
       return filename;
     }
 
-    public String getDestinationFilename() {
+    /**
+     * The filename to be written. Will be null if the output filename is unknown because the number
+     * of shards is determined dynamically by the runner.
+     */
+    @Nullable public ResourceId getDestinationFilename() {
       return destinationFilename;
     }
 
+    public String toString() {
+      return MoreObjects.toStringHelper(FileResult.class)
+          .add("filename", filename)
+          .add("destinationFilename", destinationFilename)
+          .toString();
+    }
   }
 
   /**
-   * A coder for FileResult objects.
+   * A coder for {@link FileResult} objects.
    */
   public static final class FileResultCoder extends CustomCoder<FileResult> {
     private static final FileResultCoder INSTANCE = new FileResultCoder();
-    private final Coder<String> stringCoder = NullableCoder.of(StringUtf8Coder.of());
+    private final NullableCoder<String> stringCoder = NullableCoder.of(StringUtf8Coder.of());
 
     public static FileResultCoder of() {
       return INSTANCE;
@@ -993,25 +952,33 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
       if (value == null) {
         throw new CoderException("cannot encode a null value");
       }
-      stringCoder.encode(value.getFilename(), outStream, context.nested());
-      stringCoder.encode(value.getDestinationFilename(), outStream, context.nested());
+      stringCoder.encode(value.getFilename().toString(), outStream, context.nested());
+      if (value.getDestinationFilename() == null) {
+        stringCoder.encode(null, outStream, context);
+      } else {
+        stringCoder.encode(value.getDestinationFilename().toString(), outStream, context);
+      }
     }
 
     @Override
     public FileResult decode(InputStream inStream, Context context)
         throws IOException {
+      String filename = stringCoder.decode(inStream, context.nested());
+      assert filename != null;  // fixes a compiler warning
+      @Nullable String destinationFilename = stringCoder.decode(inStream, context);
       return new FileResult(
-          stringCoder.decode(inStream, context.nested()),
-          stringCoder.decode(inStream, context.nested()));
+          FileSystems.matchNewResource(filename, false /* isDirectory */),
+          destinationFilename == null
+              ? null
+              : FileSystems.matchNewResource(destinationFilename, false /* isDirectory */));
     }
 
     @Override
     public void verifyDeterministic() throws NonDeterministicException {
-      throw new NonDeterministicException(this, "TableRows are not deterministic.");
+      stringCoder.verifyDeterministic();
     }
   }
 
-
   /**
    * Implementations create instances of {@link WritableByteChannel} used by {@link FileBasedSink}
    * and related classes to allow <em>decorating</em>, or otherwise transforming, the raw data that