You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/05/04 00:45:23 UTC

[1/4] beam git commit: Convert WriteFiles/FileBasedSink from IOChannelFactory to FileSystems

Repository: beam
Updated Branches:
  refs/heads/master b4bafd092 -> 1bc50d627


http://git-wip-us.apache.org/repos/asf/beam/blob/17358248/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java
index ea0395d..a5dacd1 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java
@@ -29,7 +29,6 @@ import static org.junit.Assert.assertThat;
 
 import com.google.common.base.Optional;
 import com.google.common.collect.Lists;
-
 import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileReader;
@@ -39,12 +38,13 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
-
 import java.util.concurrent.ThreadLocalRandom;
-
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.SimpleSink.SimpleWriter;
+import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
+import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
+import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptionsFactoryTest.TestPipelineOptions;
 import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
@@ -64,7 +64,6 @@ import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.Sessions;
 import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.util.IOChannelUtils;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
@@ -149,7 +148,8 @@ public class WriteFilesTest {
   }
 
   private String getBaseOutputFilename() {
-    return appendToTempFolder("baseoutput");
+    return getBaseOutputDirectory()
+        .resolve("file", StandardResolveOptions.RESOLVE_FILE).toString();
   }
 
   /**
@@ -188,6 +188,15 @@ public class WriteFilesTest {
         Optional.of(1));
   }
 
+  private ResourceId getBaseOutputDirectory() {
+    return LocalResources.fromFile(tmpFolder.getRoot(), true)
+        .resolve("output", StandardResolveOptions.RESOLVE_DIRECTORY);
+
+  }
+  private SimpleSink makeSimpleSink() {
+    return new SimpleSink(getBaseOutputDirectory(), "file", "-SS-of-NN", "simple");
+  }
+
   @Test
   @Category(NeedsRunner.class)
   public void testCustomShardedWrite() throws IOException {
@@ -204,7 +213,7 @@ public class WriteFilesTest {
       timestamps.add(i + 1);
     }
 
-    SimpleSink sink = new SimpleSink(getBaseOutputFilename(), "");
+    SimpleSink sink = makeSimpleSink();
     WriteFiles<String> write = WriteFiles.to(sink).withSharding(new LargestInt());
     p.apply(Create.timestamped(inputs, timestamps).withCoder(StringUtf8Coder.of()))
         .apply(IDENTITY_MAP)
@@ -270,7 +279,7 @@ public class WriteFilesTest {
 
   @Test
   public void testBuildWrite() {
-    SimpleSink sink = new SimpleSink(getBaseOutputFilename(), "");
+    SimpleSink sink = makeSimpleSink();
     WriteFiles<String> write = WriteFiles.to(sink).withNumShards(3);
     assertThat((SimpleSink) write.getSink(), is(sink));
     PTransform<PCollection<String>, PCollectionView<Integer>> originalSharding =
@@ -293,7 +302,7 @@ public class WriteFilesTest {
 
   @Test
   public void testDisplayData() {
-    SimpleSink sink = new SimpleSink(getBaseOutputFilename(), "") {
+    SimpleSink sink = new SimpleSink(getBaseOutputDirectory(), "file", "-SS-of-NN", "") {
       @Override
       public void populateDisplayData(DisplayData.Builder builder) {
         builder.add(DisplayData.item("foo", "bar"));
@@ -308,7 +317,7 @@ public class WriteFilesTest {
 
   @Test
   public void testShardedDisplayData() {
-    SimpleSink sink = new SimpleSink(getBaseOutputFilename(), "") {
+    SimpleSink sink = new SimpleSink(getBaseOutputDirectory(), "file", "-SS-of-NN", "") {
       @Override
       public void populateDisplayData(DisplayData.Builder builder) {
         builder.add(DisplayData.item("foo", "bar"));
@@ -323,7 +332,7 @@ public class WriteFilesTest {
 
   @Test
   public void testCustomShardStrategyDisplayData() {
-    SimpleSink sink = new SimpleSink(getBaseOutputFilename(), "") {
+    SimpleSink sink = new SimpleSink(getBaseOutputDirectory(), "file", "-SS-of-NN", "") {
       @Override
       public void populateDisplayData(DisplayData.Builder builder) {
         builder.add(DisplayData.item("foo", "bar"));
@@ -354,7 +363,7 @@ public class WriteFilesTest {
    * methods on a test sink in the correct order, as well as verifies that the elements of a
    * PCollection are written to the sink.
    */
-  private static void runWrite(
+  private void runWrite(
       List<String> inputs, PTransform<PCollection<String>, PCollection<String>> transform,
       String baseName) throws IOException {
     runShardedWrite(inputs, transform, baseName, Optional.<Integer>absent());
@@ -366,7 +375,7 @@ public class WriteFilesTest {
    * verifies that the elements of a PCollection are written to the sink. If numConfiguredShards
    * is not null, also verifies that the output number of shards is correct.
    */
-  private static void runShardedWrite(
+  private void runShardedWrite(
       List<String> inputs,
       PTransform<PCollection<String>, PCollection<String>> transform,
       String baseName,
@@ -382,7 +391,7 @@ public class WriteFilesTest {
       timestamps.add(i + 1);
     }
 
-    SimpleSink sink = new SimpleSink(baseName, "");
+    SimpleSink sink = makeSimpleSink();
     WriteFiles<String> write = WriteFiles.to(sink);
     if (numConfiguredShards.isPresent()) {
       write = write.withNumShards(numConfiguredShards.get());
@@ -399,8 +408,10 @@ public class WriteFilesTest {
                                 Optional<Integer> numExpectedShards) throws IOException {
     List<File> outputFiles = Lists.newArrayList();
     final String pattern = baseName + "*";
-    for (String outputFileName : IOChannelUtils.getFactory(pattern).match(pattern)) {
-      outputFiles.add(new File(outputFileName));
+    List<Metadata> metadata =
+        FileSystems.match(Collections.singletonList(pattern)).get(0).metadata();
+    for (Metadata meta : metadata) {
+      outputFiles.add(new File(meta.resourceId().toString()));
     }
     if (numExpectedShards.isPresent()) {
       assertEquals(numExpectedShards.get().intValue(), outputFiles.size());

http://git-wip-us.apache.org/repos/asf/beam/blob/17358248/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsPathValidator.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsPathValidator.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsPathValidator.java
index 4d58424..9ad4152 100644
--- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsPathValidator.java
+++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsPathValidator.java
@@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkArgument;
 
 import java.io.IOException;
 import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
+import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.util.gcsfs.GcsPath;
 
@@ -44,12 +45,11 @@ public class GcsPathValidator implements PathValidator {
    * is well formed.
    */
   @Override
-  public String validateInputFilePatternSupported(String filepattern) {
+  public void validateInputFilePatternSupported(String filepattern) {
     GcsPath gcsPath = getGcsPath(filepattern);
-    checkArgument(gcpOptions.getGcsUtil().isGcsPatternSupported(gcsPath.getObject()));
-    String returnValue = verifyPath(filepattern);
+    checkArgument(GcsUtil.isGcsPatternSupported(gcsPath.getObject()));
+    verifyPath(filepattern);
     verifyPathIsAccessible(filepattern, "Could not find file %s");
-    return returnValue;
   }
 
   /**
@@ -57,10 +57,18 @@ public class GcsPathValidator implements PathValidator {
    * is well formed.
    */
   @Override
-  public String validateOutputFilePrefixSupported(String filePrefix) {
-    String returnValue = verifyPath(filePrefix);
+  public void validateOutputFilePrefixSupported(String filePrefix) {
+    verifyPath(filePrefix);
     verifyPathIsAccessible(filePrefix, "Output path does not exist or is not writeable: %s");
-    return returnValue;
+  }
+
+  @Override
+  public void validateOutputResourceSupported(ResourceId resourceId) {
+    checkArgument(
+        resourceId.getScheme().equals("gs"),
+        "Expected a valid 'gs://' path but was given: '%s'",
+        resourceId);
+    verifyPath(resourceId.toString());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/17358248/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java
index c41d6bc..a34fb0f 100644
--- a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java
+++ b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java
@@ -21,19 +21,19 @@ import static com.google.common.base.Preconditions.checkNotNull;
 
 import com.google.auto.value.AutoValue;
 import com.google.common.annotations.VisibleForTesting;
-
 import java.nio.charset.Charset;
-
 import javax.annotation.Nullable;
 import javax.xml.bind.JAXBContext;
 import javax.xml.bind.JAXBException;
 import javax.xml.bind.ValidationEventHandler;
-
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.CompressedSource;
 import org.apache.beam.sdk.io.FileBasedSink;
 import org.apache.beam.sdk.io.OffsetBasedSource;
+import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
 import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.display.DisplayData;
@@ -450,7 +450,7 @@ public class XmlIO {
   @AutoValue
   public abstract static class Write<T> extends PTransform<PCollection<T>, PDone> {
     @Nullable
-    abstract String getFilenamePrefix();
+    abstract ValueProvider<ResourceId> getFilenamePrefix();
 
     @Nullable
     abstract Class<T> getRecordClass();
@@ -465,7 +465,7 @@ public class XmlIO {
 
     @AutoValue.Builder
     abstract static class Builder<T> {
-      abstract Builder<T> setFilenamePrefix(String baseOutputFilename);
+      abstract Builder<T> setFilenamePrefix(ValueProvider<ResourceId> prefix);
 
       abstract Builder<T> setRecordClass(Class<T> recordClass);
 
@@ -482,8 +482,9 @@ public class XmlIO {
      * <p>Output files will have the name {@literal {filenamePrefix}-0000i-of-0000n.xml} where n is
      * the number of output bundles.
      */
-    public Write<T> toFilenamePrefix(String filenamePrefix) {
-      return toBuilder().setFilenamePrefix(filenamePrefix).build();
+    public Write<T> to(String filenamePrefix) {
+      ResourceId resourceId = FileBasedSink.convertToFileResourceIfPossible(filenamePrefix);
+      return toBuilder().setFilenamePrefix(StaticValueProvider.of(resourceId)).build();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/beam/blob/17358248/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java
index 6f87d75..963ab1b 100644
--- a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java
+++ b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java
@@ -24,7 +24,10 @@ import java.nio.channels.WritableByteChannel;
 import javax.xml.bind.JAXBContext;
 import javax.xml.bind.Marshaller;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.DefaultFilenamePolicy;
 import org.apache.beam.sdk.io.FileBasedSink;
+import org.apache.beam.sdk.io.ShardNameTemplate;
+import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.util.CoderUtils;
@@ -32,12 +35,17 @@ import org.apache.beam.sdk.util.MimeTypes;
 
 /** Implementation of {@link XmlIO#write}. */
 class XmlSink<T> extends FileBasedSink<T> {
-  protected static final String XML_EXTENSION = "xml";
+  private static final String XML_EXTENSION = ".xml";
 
   private final XmlIO.Write<T> spec;
 
+  private static DefaultFilenamePolicy makeFilenamePolicy(XmlIO.Write<?> spec) {
+    return DefaultFilenamePolicy.constructUsingStandardParameters(
+        spec.getFilenamePrefix(), ShardNameTemplate.INDEX_OF_MAX, XML_EXTENSION);
+  }
+
   XmlSink(XmlIO.Write<T> spec) {
-    super(spec.getFilenamePrefix(), XML_EXTENSION);
+    super(spec.getFilenamePrefix(), makeFilenamePolicy(spec));
     this.spec = spec;
   }
 
@@ -79,7 +87,7 @@ class XmlSink<T> extends FileBasedSink<T> {
      * Creates a {@link XmlWriter} with a marshaller for the type it will write.
      */
     @Override
-    public XmlWriter<T> createWriter(PipelineOptions options) throws Exception {
+    public XmlWriter<T> createWriter() throws Exception {
       JAXBContext context;
       Marshaller marshaller;
       context = JAXBContext.newInstance(getSink().spec.getRecordClass());
@@ -99,7 +107,7 @@ class XmlSink<T> extends FileBasedSink<T> {
     }
 
     @VisibleForTesting
-    String getTemporaryDirectory() {
+    ResourceId getTemporaryDirectory() {
       return this.tempDirectory.get();
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/17358248/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSinkTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSinkTest.java b/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSinkTest.java
index 7f9a8c5..aa0c1c3 100644
--- a/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSinkTest.java
+++ b/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSinkTest.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.io.xml;
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.equalTo;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 
@@ -62,7 +63,7 @@ public class XmlSinkTest {
   public ExpectedException thrown = ExpectedException.none();
 
   private String testRootElement = "testElement";
-  private String testFilePrefix = "/path/to/testPrefix";
+  private String testFilePrefix = "/path/to/file";
 
   /**
    * An XmlWriter correctly writes objects as Xml elements with an enclosing root element.
@@ -72,12 +73,12 @@ public class XmlSinkTest {
     PipelineOptions options = PipelineOptionsFactory.create();
     XmlWriteOperation<Bird> writeOp =
         XmlIO.<Bird>write()
-            .toFilenamePrefix(testFilePrefix)
+            .to(testFilePrefix)
             .withRecordClass(Bird.class)
             .withRootElement("birds")
             .createSink()
             .createWriteOperation();
-    XmlWriter<Bird> writer = writeOp.createWriter(options);
+    XmlWriter<Bird> writer = writeOp.createWriter();
 
     List<Bird> bundle =
         Lists.newArrayList(new Bird("bemused", "robin"), new Bird("evasive", "goose"));
@@ -89,16 +90,15 @@ public class XmlSinkTest {
 
   @Test
   public void testXmlWriterCharset() throws Exception {
-    PipelineOptions options = PipelineOptionsFactory.create();
     XmlWriteOperation<Bird> writeOp =
         XmlIO.<Bird>write()
-            .toFilenamePrefix(testFilePrefix)
+            .to(testFilePrefix)
             .withRecordClass(Bird.class)
             .withRootElement("birds")
             .withCharset(StandardCharsets.ISO_8859_1)
             .createSink()
             .createWriteOperation();
-    XmlWriter<Bird> writer = writeOp.createWriter(options);
+    XmlWriter<Bird> writer = writeOp.createWriter();
 
     List<Bird> bundle = Lists.newArrayList(new Bird("bréche", "pinçon"));
     List<String> lines = Arrays.asList("<birds>", "<bird>", "<species>pinçon</species>",
@@ -113,12 +113,15 @@ public class XmlSinkTest {
   public void testBuildXmlWriteTransform() {
     XmlIO.Write<Bird> write =
         XmlIO.<Bird>write()
-            .toFilenamePrefix(testFilePrefix)
+            .to(testFilePrefix)
             .withRecordClass(Bird.class)
             .withRootElement(testRootElement);
     assertEquals(Bird.class, write.getRecordClass());
     assertEquals(testRootElement, write.getRootElement());
-    assertEquals(testFilePrefix, write.getFilenamePrefix());
+    assertNotNull(write.getFilenamePrefix());
+    assertThat(
+        write.getFilenamePrefix().toString(),
+        containsString(testFilePrefix));
   }
 
   /** Validation ensures no fields are missing. */
@@ -126,19 +129,21 @@ public class XmlSinkTest {
   public void testValidateXmlSinkMissingRecordClass() {
     thrown.expect(NullPointerException.class);
     XmlIO.<Bird>write()
+        .to(testFilePrefix)
         .withRootElement(testRootElement)
-        .toFilenamePrefix(testFilePrefix)
         .validate(null);
   }
 
   @Test
   public void testValidateXmlSinkMissingRootElement() {
     thrown.expect(NullPointerException.class);
-    XmlIO.<Bird>write().withRecordClass(Bird.class).toFilenamePrefix(testFilePrefix).validate(null);
+    XmlIO.<Bird>write().withRecordClass(Bird.class)
+        .to(testFilePrefix)
+        .validate(null);
   }
 
   @Test
-  public void testValidateXmlSinkMissingFilePrefix() {
+  public void testValidateXmlSinkMissingOutputDirectory() {
     thrown.expect(NullPointerException.class);
     XmlIO.<Bird>write().withRecordClass(Bird.class).withRootElement(testRootElement).validate(null);
   }
@@ -151,16 +156,15 @@ public class XmlSinkTest {
     PipelineOptions options = PipelineOptionsFactory.create();
     XmlSink<Bird> sink =
         XmlIO.<Bird>write()
+            .to(testFilePrefix)
             .withRecordClass(Bird.class)
             .withRootElement(testRootElement)
-            .toFilenamePrefix(testFilePrefix)
             .createSink();
     XmlWriteOperation<Bird> writeOp = sink.createWriteOperation();
     Path outputPath = new File(testFilePrefix).toPath();
-    Path tempPath = new File(writeOp.getTemporaryDirectory()).toPath();
-    assertEquals(outputPath.getParent(), tempPath.getParent());
-    assertThat(
-        tempPath.getFileName().toString(), containsString("temp-beam-" + outputPath.getFileName()));
+    Path tempPath = new File(writeOp.getTemporaryDirectory().toString()).toPath();
+    assertThat(tempPath.getParent(), equalTo(outputPath.getParent()));
+    assertThat(tempPath.getFileName().toString(), containsString("temp-beam-"));
   }
 
   /**
@@ -173,28 +177,28 @@ public class XmlSinkTest {
         XmlIO.<Bird>write()
             .withRecordClass(Bird.class)
             .withRootElement(testRootElement)
-            .toFilenamePrefix(testFilePrefix)
+            .to(testFilePrefix)
             .createSink()
             .createWriteOperation();
-    XmlWriter<Bird> writer = writeOp.createWriter(options);
+    XmlWriter<Bird> writer = writeOp.createWriter();
     Path outputPath = new File(testFilePrefix).toPath();
-    Path tempPath = new File(writer.getWriteOperation().getTemporaryDirectory()).toPath();
-    assertEquals(outputPath.getParent(), tempPath.getParent());
-    assertThat(
-        tempPath.getFileName().toString(), containsString("temp-beam-" + outputPath.getFileName()));
+    Path tempPath = new File(writer.getWriteOperation().getTemporaryDirectory().toString())
+        .toPath();
+    assertThat(tempPath.getParent(), equalTo(outputPath.getParent()));
+    assertThat(tempPath.getFileName().toString(), containsString("temp-beam-"));
     assertNotNull(writer.marshaller);
   }
 
   @Test
   public void testDisplayData() {
     XmlIO.Write<Integer> write = XmlIO.<Integer>write()
-        .toFilenamePrefix("foobar")
+        .to(testFilePrefix)
         .withRootElement("bird")
         .withRecordClass(Integer.class);
 
     DisplayData displayData = DisplayData.from(write);
 
-    assertThat(displayData, hasDisplayItem("fileNamePattern", "foobar-SSSSS-of-NNNNN.xml"));
+    assertThat(displayData, hasDisplayItem("filenamePattern", "file-SSSSS-of-NNNNN.xml"));
     assertThat(displayData, hasDisplayItem("rootElement", "bird"));
     assertThat(displayData, hasDisplayItem("recordClass", Integer.class));
   }


[3/4] beam git commit: Convert WriteFiles/FileBasedSink from IOChannelFactory to FileSystems

Posted by dh...@apache.org.
Convert WriteFiles/FileBasedSink from IOChannelFactory to FileSystems

This converts FileBasedSink from IOChannelFactory to FileSystems, with
fallout changes on all existing Transforms that use WriteFiles.

We preserve the existing semantics of most transforms, simply adding the
ability for users to provide ResourceId in addition to String when
setting the outputPrefix.

Other changes:

* Rethink FilenamePolicy as a function from ResourceId (base directory)
  to ResourceId (output file), moving the base directory into the
  context. This way, FilenamePolicy logic is truly independent from the
  base directory. Using ResourceId#resolve, a filename policy can add
  multiple path components, say, base/YYYY/MM/DD/file.txt, in a
  fileystem independent way.

  (Also add an optional extension parameter to the function, enabling an
  owning transform to pass in the suffix from a separately-configured
  compression factory or similar.)

* Make DefaultFilenamePolicy its own top-level class and move
  IOChannelUtils#constructName into it. This the default FilenamePolicy
  used by FileBasedSink.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/17358248
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/17358248
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/17358248

Branch: refs/heads/master
Commit: 17358248f8acbfa3c91e91b7ad80a9e0edb7e782
Parents: b4bafd0
Author: Dan Halperin <dh...@google.com>
Authored: Tue Apr 25 10:10:28 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Wed May 3 17:44:59 2017 -0700

----------------------------------------------------------------------
 .../examples/common/WriteOneFilePerWindow.java  |  52 +-
 .../construction/PTransformMatchersTest.java    |  10 +-
 .../direct/WriteWithShardingFactoryTest.java    |  32 +-
 .../beam/runners/dataflow/DataflowRunner.java   |  12 +-
 .../runners/dataflow/DataflowRunnerTest.java    |   4 +
 .../java/org/apache/beam/sdk/io/AvroIO.java     | 213 ++++---
 .../java/org/apache/beam/sdk/io/AvroSink.java   |  22 +-
 .../beam/sdk/io/DefaultFilenamePolicy.java      | 169 ++++++
 .../org/apache/beam/sdk/io/FileBasedSink.java   | 593 +++++++++----------
 .../java/org/apache/beam/sdk/io/TFRecordIO.java | 126 ++--
 .../java/org/apache/beam/sdk/io/TextIO.java     | 249 ++++----
 .../java/org/apache/beam/sdk/io/TextSink.java   |  20 +-
 .../java/org/apache/beam/sdk/io/WriteFiles.java |  15 +-
 .../apache/beam/sdk/util/IOChannelUtils.java    |   6 +-
 .../apache/beam/sdk/util/NoopPathValidator.java |  12 +-
 .../beam/sdk/util/NumberedShardedFile.java      |   4 +-
 .../org/apache/beam/sdk/util/PathValidator.java |  15 +-
 .../java/org/apache/beam/sdk/io/AvroIOTest.java |  65 +-
 .../beam/sdk/io/DefaultFilenamePolicyTest.java  |  55 ++
 .../apache/beam/sdk/io/FileBasedSinkTest.java   | 330 +++++------
 .../java/org/apache/beam/sdk/io/SimpleSink.java |  27 +-
 .../java/org/apache/beam/sdk/io/TextIOTest.java |  45 +-
 .../org/apache/beam/sdk/io/WriteFilesTest.java  |  41 +-
 .../apache/beam/sdk/util/GcsPathValidator.java  |  22 +-
 .../java/org/apache/beam/sdk/io/xml/XmlIO.java  |  15 +-
 .../org/apache/beam/sdk/io/xml/XmlSink.java     |  16 +-
 .../org/apache/beam/sdk/io/xml/XmlSinkTest.java |  52 +-
 27 files changed, 1251 insertions(+), 971 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/17358248/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java b/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java
index 461b46d..ed35c8a 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java
@@ -17,10 +17,13 @@
  */
 package org.apache.beam.examples.common;
 
+import static com.google.common.base.Verify.verifyNotNull;
+
+import org.apache.beam.sdk.io.FileBasedSink;
 import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
 import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.options.ValueProvider;
-import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
+import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
+import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
@@ -47,9 +50,21 @@ public class WriteOneFilePerWindow extends PTransform<PCollection<String>, PDone
 
   @Override
   public PDone expand(PCollection<String> input) {
+    // filenamePrefix may contain a directory and a filename component. Pull out only the filename
+    // component from that path for the PerWindowFiles.
+    String prefix = "";
+    ResourceId resource = FileBasedSink.convertToFileResourceIfPossible(filenamePrefix);
+    if (!resource.isDirectory()) {
+      prefix = verifyNotNull(
+          resource.getFilename(),
+          "A non-directory resource should have a non-null filename: %s",
+          resource);
+    }
+
     return input.apply(
         TextIO.write()
-            .to(new PerWindowFiles(filenamePrefix))
+            .to(resource.getCurrentDirectory())
+            .withFilenamePolicy(new PerWindowFiles(prefix))
             .withWindowedWrites()
             .withNumShards(3));
   }
@@ -62,32 +77,31 @@ public class WriteOneFilePerWindow extends PTransform<PCollection<String>, PDone
    */
   public static class PerWindowFiles extends FilenamePolicy {
 
-    private final String output;
+    private final String prefix;
 
-    public PerWindowFiles(String output) {
-      this.output = output;
-    }
-
-    @Override
-    public ValueProvider<String> getBaseOutputFilenameProvider() {
-      return StaticValueProvider.of(output);
+    public PerWindowFiles(String prefix) {
+      this.prefix = prefix;
     }
 
-    public String   filenamePrefixForWindow(IntervalWindow window) {
-      return String.format(
-          "%s-%s-%s", output, FORMATTER.print(window.start()), FORMATTER.print(window.end()));
+    public String filenamePrefixForWindow(IntervalWindow window) {
+      return String.format("%s-%s-%s",
+          prefix, FORMATTER.print(window.start()), FORMATTER.print(window.end()));
     }
 
     @Override
-    public String windowedFilename(WindowedContext context) {
+    public ResourceId windowedFilename(
+        ResourceId outputDirectory, WindowedContext context, String extension) {
       IntervalWindow window = (IntervalWindow) context.getWindow();
-      return String.format(
-          "%s-%s-of-%s",
-          filenamePrefixForWindow(window), context.getShardNumber(), context.getNumShards());
+      String filename = String.format(
+          "%s-%s-of-%s%s",
+          filenamePrefixForWindow(window), context.getShardNumber(), context.getNumShards(),
+          extension);
+      return outputDirectory.resolve(filename, StandardResolveOptions.RESOLVE_FILE);
     }
 
     @Override
-    public String unwindowedFilename(Context context) {
+    public ResourceId unwindowedFilename(
+        ResourceId outputDirectory, Context context, String extension) {
       throw new UnsupportedOperationException("Unsupported.");
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/17358248/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
index 33ba80c..e7d4c64 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
@@ -29,8 +29,13 @@ import java.io.Serializable;
 import java.util.Collections;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.io.DefaultFilenamePolicy;
 import org.apache.beam.sdk.io.FileBasedSink;
+import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
+import org.apache.beam.sdk.io.LocalResources;
 import org.apache.beam.sdk.io.WriteFiles;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
 import org.apache.beam.sdk.runners.PTransformMatcher;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
@@ -498,9 +503,12 @@ public class PTransformMatchersTest implements Serializable {
 
   @Test
   public void writeWithRunnerDeterminedSharding() {
+    ResourceId outputDirectory = LocalResources.fromString("/foo/bar", true /* isDirectory */);
+    FilenamePolicy policy = DefaultFilenamePolicy.constructUsingStandardParameters(
+        StaticValueProvider.of(outputDirectory), DefaultFilenamePolicy.DEFAULT_SHARD_TEMPLATE, "");
     WriteFiles<Integer> write =
         WriteFiles.to(
-            new FileBasedSink<Integer>("foo", "bar") {
+            new FileBasedSink<Integer>(StaticValueProvider.of(outputDirectory), policy) {
               @Override
               public FileBasedWriteOperation<Integer> createWriteOperation() {
                 return null;

http://git-wip-us.apache.org/repos/asf/beam/blob/17358248/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
index 53d2ba3..18940d2 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
@@ -39,10 +39,14 @@ import java.util.UUID;
 import org.apache.beam.runners.direct.WriteWithShardingFactory.CalculateShardsFn;
 import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.io.DefaultFilenamePolicy;
 import org.apache.beam.sdk.io.FileBasedSink;
+import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
+import org.apache.beam.sdk.io.LocalResources;
 import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.io.WriteFiles;
-import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.Create;
@@ -121,7 +125,17 @@ public class WriteWithShardingFactoryTest {
 
   @Test
   public void withNoShardingSpecifiedReturnsNewTransform() {
-    WriteFiles<Object> original = WriteFiles.to(new TestSink());
+    ResourceId outputDirectory = LocalResources.fromString("/foo", true /* isDirectory */);
+    FilenamePolicy policy = DefaultFilenamePolicy.constructUsingStandardParameters(
+        StaticValueProvider.of(outputDirectory), DefaultFilenamePolicy.DEFAULT_SHARD_TEMPLATE, "");
+    WriteFiles<Object> original = WriteFiles.to(
+        new FileBasedSink<Object>(StaticValueProvider.of(outputDirectory), policy) {
+          @Override
+          public FileBasedWriteOperation<Object> createWriteOperation() {
+            throw new IllegalArgumentException("Should not be used");
+          }
+        });
+    @SuppressWarnings("unchecked")
     PCollection<Object> objs = (PCollection) p.apply(Create.empty(VoidCoder.of()));
 
     AppliedPTransform<PCollection<Object>, PDone, WriteFiles<Object>> originalApplication =
@@ -206,18 +220,4 @@ public class WriteWithShardingFactoryTest {
     List<Integer> shards = fnTester.processBundle((long) count);
     assertThat(shards, containsInAnyOrder(13));
   }
-
-  private static class TestSink extends FileBasedSink<Object> {
-    public TestSink() {
-      super("", "");
-    }
-
-    @Override
-    public void validate(PipelineOptions options) {}
-
-    @Override
-    public FileBasedWriteOperation<Object> createWriteOperation() {
-      throw new IllegalArgumentException("Should not be used");
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/17358248/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 0a4a151..f7455b3 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -87,12 +87,15 @@ import org.apache.beam.sdk.io.FileBasedSink;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.io.WriteFiles;
+import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
+import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesCoder;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSink;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSource;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsValidator;
+import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
 import org.apache.beam.sdk.runners.PTransformOverride;
 import org.apache.beam.sdk.runners.PTransformOverrideFactory;
@@ -844,11 +847,12 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
 
     @Override
     public PDone expand(PCollection<T> input) {
-      FileBasedSink<T> sink = transform.getSink();
-      if (sink.getBaseOutputFilenameProvider().isAccessible()) {
+      ValueProvider<ResourceId> outputDirectory =
+          transform.getSink().getBaseOutputDirectoryProvider();
+      if (outputDirectory.isAccessible()) {
         PathValidator validator = runner.options.getPathValidator();
-        validator.validateOutputFilePrefixSupported(
-            sink.getBaseOutputFilenameProvider().get());
+        validator.validateOutputResourceSupported(
+            outputDirectory.get().resolve("some-file", StandardResolveOptions.RESOLVE_FILE));
       }
       return transform.expand(input);
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/17358248/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index fa106ac..c0dfbee 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -245,6 +245,10 @@ public class DataflowRunnerTest {
     options.setDataflowClient(buildMockDataflow());
     options.setGcsUtil(mockGcsUtil);
     options.setGcpCredential(new TestCredential());
+
+    // Configure the FileSystem registrar to use these options.
+    FileSystems.setDefaultConfigInWorkers(options);
+
     return options;
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/17358248/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index 3bb61a2..c7e7233 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -18,6 +18,7 @@
 package org.apache.beam.sdk.io;
 
 import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
 
 import com.google.auto.value.AutoValue;
 import com.google.common.collect.ImmutableMap;
@@ -34,8 +35,12 @@ import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
 import org.apache.beam.sdk.io.Read.Bounded;
-import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
+import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.HasDisplayData;
 import org.apache.beam.sdk.values.PBegin;
@@ -46,7 +51,7 @@ import org.apache.beam.sdk.values.PDone;
  * {@link PTransform}s for reading and writing Avro files.
  *
  * <p>To read a {@link PCollection} from one or more Avro files, use {@code AvroIO.read()},
- * specifying {@link AvroIO.Read#from} to specify the filename or filepattern to read from.
+ * using {@link AvroIO.Read#from} to specify the filename or filepattern to read from.
  * See {@link FileSystems} for information on supported file systems and filepatterns.
  *
  * <p>To read specific records, such as Avro-generated classes, use {@link #read(Class)}.
@@ -70,12 +75,14 @@ import org.apache.beam.sdk.values.PDone;
  *                .from("gs://my_bucket/path/to/records-*.avro"));
  * } </pre>
  *
- * <p>To write a {@link PCollection} to one or more Avro files, use {@link AvroIO.Write}, specifying
- * {@code AvroIO.write().to(String)} to specify the filename or sharded filepattern to write to.
- * See {@link FileSystems} for information on supported file systems and {@link ShardNameTemplate}
- * for information on naming of output files. You can also use {@code AvroIO.write()} with
- * {@link Write#to(FileBasedSink.FilenamePolicy)} to
- * specify a custom file naming policy.
+ * <p>To write a {@link PCollection} to one or more Avro files, use {@link AvroIO.Write}, using
+ * {@code AvroIO.write().to(String)} to specify the output filename prefix. The default
+ * {@link DefaultFilenamePolicy} will use this prefix, in conjunction with a
+ * {@link ShardNameTemplate} (set via {@link Write#withShardNameTemplate(String)}) and optional
+ * filename suffix (set via {@link Write#withSuffix(String)}, to generate output filenames in a
+ * sharded way. You can override this default write filename policy using
+ * {@link Write#withFilenamePolicy(FileBasedSink.FilenamePolicy)} to specify a custom file naming
+ * policy.
  *
  * <p>By default, all input is put into the global window before writing. If per-window writes are
  * desired - for example, when using a streaming runner -
@@ -109,11 +116,6 @@ import org.apache.beam.sdk.values.PDone;
  * <p>By default, {@link AvroIO.Write} produces output files that are compressed using the
  * {@link org.apache.avro.file.Codec CodecFactory.deflateCodec(6)}. This default can
  * be changed or overridden using {@link AvroIO.Write#withCodec}.
- *
- * <h3>Permissions</h3>
- * Permission requirements depend on the {@link PipelineRunner} that is used to execute the
- * pipeline. Please refer to the documentation of corresponding {@link PipelineRunner}s for
- * more details.
  */
 public class AvroIO {
   /**
@@ -172,9 +174,9 @@ public class AvroIO {
 
   private static <T> Write.Builder<T> defaultWriteBuilder() {
     return new AutoValue_AvroIO_Write.Builder<T>()
-        .setFilenameSuffix("")
+        .setFilenameSuffix(null)
+        .setShardTemplate(null)
         .setNumShards(0)
-        .setShardTemplate(Write.DEFAULT_SHARD_TEMPLATE)
         .setCodec(Write.DEFAULT_CODEC)
         .setMetadata(ImmutableMap.<String, Object>of())
         .setWindowedWrites(false);
@@ -246,23 +248,16 @@ public class AvroIO {
   /** Implementation of {@link #write}. */
   @AutoValue
   public abstract static class Write<T> extends PTransform<PCollection<T>, PDone> {
-    /**
-     * A {@link PTransform} that writes a bounded {@link PCollection} to an Avro file (or
-     * multiple Avro files matching a sharding pattern).
-     *
-     * @param <T> the type of each of the elements of the input PCollection
-     */
-    private static final String DEFAULT_SHARD_TEMPLATE = ShardNameTemplate.INDEX_OF_MAX;
     private static final SerializableAvroCodecFactory DEFAULT_CODEC =
         new SerializableAvroCodecFactory(CodecFactory.deflateCodec(6));
     // This should be a multiple of 4 to not get a partial encoded byte.
     private static final int METADATA_BYTES_MAX_LENGTH = 40;
 
-    @Nullable abstract String getFilenamePrefix();
-    abstract String getFilenameSuffix();
+    @Nullable abstract ValueProvider<ResourceId> getFilenamePrefix();
+    @Nullable abstract String getShardTemplate();
+    @Nullable abstract String getFilenameSuffix();
     abstract int getNumShards();
-    abstract String getShardTemplate();
-    abstract Class<T> getRecordClass();
+    @Nullable abstract Class<T> getRecordClass();
     @Nullable abstract Schema getSchema();
     abstract boolean getWindowedWrites();
     @Nullable abstract FilenamePolicy getFilenamePolicy();
@@ -278,7 +273,7 @@ public class AvroIO {
 
     @AutoValue.Builder
     abstract static class Builder<T> {
-      abstract Builder<T> setFilenamePrefix(String filenamePrefix);
+      abstract Builder<T> setFilenamePrefix(ValueProvider<ResourceId> filenamePrefix);
       abstract Builder<T> setFilenameSuffix(String filenameSuffix);
       abstract Builder<T> setNumShards(int numShards);
       abstract Builder<T> setShardTemplate(String shardTemplate);
@@ -293,54 +288,106 @@ public class AvroIO {
     }
 
     /**
-     * Writes to the file(s) with the given prefix. See {@link FileSystems} for information on
+     * Writes to file(s) with the given output prefix. See {@link FileSystems} for information on
+     * supported file systems.
+     *
+     * <p>The name of the output files will be determined by the {@link FilenamePolicy} used.
+     *
+     * <p>By default, a {@link DefaultFilenamePolicy} will build output filenames using the
+     * specified prefix, a shard name template (see {@link #withShardNameTemplate(String)}, and
+     * a common suffix (if supplied using {@link #withSuffix(String)}). This default can be
+     * overridden using {@link #withFilenamePolicy(FilenamePolicy)}.
+     */
+    public Write<T> to(String outputPrefix) {
+      return to(FileBasedSink.convertToFileResourceIfPossible(outputPrefix));
+    }
+
+    /**
+     * Writes to file(s) with the given output prefix. See {@link FileSystems} for information on
      * supported file systems.
      *
-     * <p>The files written will begin with this prefix, followed by
-     * a shard identifier (see {@link #withNumShards}, and end
-     * in a common extension, if given by {@link #withSuffix}.
+     * <p>The name of the output files will be determined by the {@link FilenamePolicy} used.
+     *
+     * <p>By default, a {@link DefaultFilenamePolicy} will build output filenames using the
+     * specified prefix, a shard name template (see {@link #withShardNameTemplate(String)}, and
+     * a common suffix (if supplied using {@link #withSuffix(String)}). This default can be
+     * overridden using {@link #withFilenamePolicy(FilenamePolicy)}.
+     */
+    public Write<T> to(ResourceId outputPrefix) {
+      return toResource(StaticValueProvider.of(outputPrefix));
+    }
+
+    /**
+     * Like {@link #to(String)}.
+     */
+    public Write<T> to(ValueProvider<String> outputPrefix) {
+      return toResource(NestedValueProvider.of(outputPrefix,
+          new SerializableFunction<String, ResourceId>() {
+            @Override
+            public ResourceId apply(String input) {
+              return FileBasedSink.convertToFileResourceIfPossible(input);
+            }
+          }));
+    }
+
+    /**
+     * Like {@link #to(ResourceId)}.
      */
-    public Write<T> to(String filenamePrefix) {
-      return toBuilder().setFilenamePrefix(filenamePrefix).build();
+    public Write<T> toResource(ValueProvider<ResourceId> outputPrefix) {
+      return toBuilder().setFilenamePrefix(outputPrefix).build();
     }
 
-    /** Writes to the file(s) specified by the provided {@link FileBasedSink.FilenamePolicy}. */
-    public Write<T> to(FilenamePolicy filenamePolicy) {
+    /**
+     * Configures the {@link FileBasedSink.FilenamePolicy} that will be used to name written files.
+     */
+    public Write<T> withFilenamePolicy(FilenamePolicy filenamePolicy) {
       return toBuilder().setFilenamePolicy(filenamePolicy).build();
     }
 
     /**
-     * Writes to the file(s) with the given filename suffix.
+     * Uses the given {@link ShardNameTemplate} for naming output files. This option may only be
+     * used when {@link #withFilenamePolicy(FilenamePolicy)} has not been configured.
+     *
+     * <p>See {@link DefaultFilenamePolicy} for how the prefix, shard name template, and suffix are
+     * used.
+     */
+    public Write<T> withShardNameTemplate(String shardTemplate) {
+      return toBuilder().setShardTemplate(shardTemplate).build();
+    }
+
+    /**
+     * Configures the filename suffix for written files. This option may only be used when
+     * {@link #withFilenamePolicy(FilenamePolicy)} has not been configured.
      *
-     * <p>See {@link ShardNameTemplate} for a description of shard templates.
+     * <p>See {@link DefaultFilenamePolicy} for how the prefix, shard name template, and suffix are
+     * used.
      */
     public Write<T> withSuffix(String filenameSuffix) {
       return toBuilder().setFilenameSuffix(filenameSuffix).build();
     }
 
     /**
-     * Uses the provided shard count. See {@link ShardNameTemplate} for a description of shard
-     * templates.
+     * Configures the number of output shards produced overall (when using unwindowed writes) or
+     * per-window (when using windowed writes).
      *
-     * <p>Constraining the number of shards is likely to reduce
-     * the performance of a pipeline. Setting this value is not recommended
-     * unless you require a specific number of output files.
+     * <p>For unwindowed writes, constraining the number of shards is likely to reduce the
+     * performance of a pipeline. Setting this value is not recommended unless you require a
+     * specific number of output files.
      *
-     * @param numShards the number of shards to use, or 0 to let the system
-     *                  decide.
+     * @param numShards the number of shards to use, or 0 to let the system decide.
      */
     public Write<T> withNumShards(int numShards) {
       checkArgument(numShards >= 0);
       return toBuilder().setNumShards(numShards).build();
     }
 
-    /** Uses the given {@link ShardNameTemplate} for naming output files. */
-    public Write<T> withShardNameTemplate(String shardTemplate) {
-      return toBuilder().setShardTemplate(shardTemplate).build();
-    }
-
     /**
-     * Forces a single file as output.
+     * Forces a single file as output and empty shard name template. This option is only compatible
+     * with unwindowed writes.
+     *
+     * <p>For unwindowed writes, constraining the number of shards is likely to reduce the
+     * performance of a pipeline. Setting this value is not recommended unless you require a
+     * specific number of output files.
      *
      * <p>This is equivalent to {@code .withNumShards(1).withShardNameTemplate("")}
      */
@@ -351,9 +398,9 @@ public class AvroIO {
     /**
      * Preserves windowing of input elements and writes them to files based on the element's window.
      *
-     * <p>Requires use of {@link #to(FileBasedSink.FilenamePolicy)}. Filenames will be generated
-     * using {@link FilenamePolicy#windowedFilename(FileBasedSink.FilenamePolicy.WindowedContext)}.
-     * See also {@link WriteFiles#withWindowedWrites()}.
+     * <p>Requires use of {@link #withFilenamePolicy(FileBasedSink.FilenamePolicy)}. Filenames will
+     * be generated using {@link FilenamePolicy#windowedFilename}. See also
+     * {@link WriteFiles#withWindowedWrites()}.
      */
     public Write<T> withWindowedWrites() {
       return toBuilder().setWindowedWrites(true).build();
@@ -386,36 +433,28 @@ public class AvroIO {
 
     @Override
     public PDone expand(PCollection<T> input) {
-      if (getFilenamePolicy() == null && getFilenamePrefix() == null) {
-        throw new IllegalStateException(
-            "need to set the filename prefix of an AvroIO.Write transform");
-      }
-      if (getFilenamePolicy() != null && getFilenamePrefix() != null) {
-        throw new IllegalStateException(
-            "cannot set both a filename policy and a filename prefix");
-      }
-      if (getSchema() == null) {
-        throw new IllegalStateException("need to set the schema of an AvroIO.Write transform");
+      checkState(getFilenamePrefix() != null,
+          "Need to set the filename prefix of an AvroIO.Write transform.");
+      checkState(
+          (getFilenamePolicy() == null)
+              || (getShardTemplate() == null && getFilenameSuffix() == null),
+          "Cannot set a filename policy and also a filename template or suffix.");
+      checkState(getSchema() != null,
+          "Need to set the schema of an AvroIO.Write transform.");
+
+      FilenamePolicy usedFilenamePolicy = getFilenamePolicy();
+      if (usedFilenamePolicy == null) {
+        usedFilenamePolicy = DefaultFilenamePolicy.constructUsingStandardParameters(
+            getFilenamePrefix(), getShardTemplate(), getFilenameSuffix());
       }
 
-      WriteFiles<T> write = null;
-      if (getFilenamePolicy() != null) {
-        write = WriteFiles.to(
-            new AvroSink<>(
-                getFilenamePolicy(),
-                AvroCoder.of(getRecordClass(), getSchema()),
-                getCodec(),
-                getMetadata()));
-      } else {
-        write = WriteFiles.to(
+      WriteFiles<T> write = WriteFiles.to(
             new AvroSink<>(
                 getFilenamePrefix(),
-                getFilenameSuffix(),
-                getShardTemplate(),
+                usedFilenamePolicy,
                 AvroCoder.of(getRecordClass(), getSchema()),
                 getCodec(),
                 getMetadata()));
-      }
       if (getNumShards() > 0) {
         write = write.withNumShards(getNumShards());
       }
@@ -428,17 +467,25 @@ public class AvroIO {
     @Override
     public void populateDisplayData(DisplayData.Builder builder) {
       super.populateDisplayData(builder);
+      checkState(
+          getFilenamePrefix() != null,
+          "Unable to populate DisplayData for invalid AvroIO.Write (unset output prefix).");
+      String outputPrefixString = null;
+      if (getFilenamePrefix().isAccessible()) {
+        ResourceId dir = getFilenamePrefix().get();
+        outputPrefixString = dir.toString();
+      } else {
+        outputPrefixString = getFilenamePrefix().toString();
+      }
       builder
           .add(DisplayData.item("schema", getRecordClass())
             .withLabel("Record Schema"))
-          .addIfNotNull(DisplayData.item("filePrefix", getFilenamePrefix())
+          .addIfNotNull(DisplayData.item("filePrefix", outputPrefixString)
             .withLabel("Output File Prefix"))
-          .addIfNotDefault(DisplayData.item("shardNameTemplate", getShardTemplate())
-              .withLabel("Output Shard Name Template"),
-              DEFAULT_SHARD_TEMPLATE)
-          .addIfNotDefault(DisplayData.item("fileSuffix", getFilenameSuffix())
-              .withLabel("Output File Suffix"),
-              "")
+          .addIfNotNull(DisplayData.item("shardNameTemplate", getShardTemplate())
+              .withLabel("Output Shard Name Template"))
+          .addIfNotNull(DisplayData.item("fileSuffix", getFilenameSuffix())
+              .withLabel("Output File Suffix"))
           .addIfNotDefault(DisplayData.item("numShards", getNumShards())
               .withLabel("Maximum Output Shards"),
               0)

http://git-wip-us.apache.org/repos/asf/beam/blob/17358248/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java
index 46bb4f3..7d42574 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java
@@ -27,7 +27,8 @@ import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.io.DatumWriter;
 import org.apache.avro.reflect.ReflectDatumWriter;
 import org.apache.beam.sdk.coders.AvroCoder;
-import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.util.MimeTypes;
 
 /** A {@link FileBasedSink} for Avro files. */
@@ -37,24 +38,13 @@ class AvroSink<T> extends FileBasedSink<T> {
   private final ImmutableMap<String, Object> metadata;
 
   AvroSink(
+      ValueProvider<ResourceId> outputPrefix,
       FilenamePolicy filenamePolicy,
       AvroCoder<T> coder,
       SerializableAvroCodecFactory codec,
       ImmutableMap<String, Object> metadata) {
-    super(filenamePolicy);
-    this.coder = coder;
-    this.codec = codec;
-    this.metadata = metadata;
-  }
-
-  AvroSink(
-      String baseOutputFilename,
-      String extension,
-      String fileNameTemplate,
-      AvroCoder<T> coder,
-      SerializableAvroCodecFactory codec,
-      ImmutableMap<String, Object> metadata) {
-    super(baseOutputFilename, extension, fileNameTemplate);
+    // Avro handle compression internally using the codec.
+    super(outputPrefix, filenamePolicy, CompressionType.UNCOMPRESSED);
     this.coder = coder;
     this.codec = codec;
     this.metadata = metadata;
@@ -82,7 +72,7 @@ class AvroSink<T> extends FileBasedSink<T> {
     }
 
     @Override
-    public FileBasedWriter<T> createWriter(PipelineOptions options) throws Exception {
+    public FileBasedWriter<T> createWriter() throws Exception {
       return new AvroWriter<>(this, coder, codec, metadata);
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/17358248/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java
new file mode 100644
index 0000000..07bc2db
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DefaultFilenamePolicy.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io;
+
+import static com.google.common.base.MoreObjects.firstNonNull;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.text.DecimalFormat;
+import java.util.Arrays;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
+import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+
+/**
+ * A default {@link FilenamePolicy} for unwindowed files. This policy is constructed using three
+ * parameters that together define the output name of a sharded file, in conjunction with the number
+ * of shards and index of the particular file, using {@link #constructName}.
+ *
+ * <p>Most users of unwindowed files will use this {@link DefaultFilenamePolicy}. For more advanced
+ * uses in generating different files for each window and other sharding controls, see the
+ * {@code WriteOneFilePerWindow} example pipeline.
+ */
+public final class DefaultFilenamePolicy extends FilenamePolicy {
+  /** The default sharding name template used in {@link #constructUsingStandardParameters}. */
+  public static final String DEFAULT_SHARD_TEMPLATE = ShardNameTemplate.INDEX_OF_MAX;
+
+  // Pattern that matches shard placeholders within a shard template.
+  private static final Pattern SHARD_FORMAT_RE = Pattern.compile("(S+|N+)");
+
+  /**
+   * Constructs a new {@link DefaultFilenamePolicy}.
+   *
+   * @see DefaultFilenamePolicy for more information on the arguments to this function.
+   */
+  @VisibleForTesting
+  DefaultFilenamePolicy(ValueProvider<String> prefix, String shardTemplate, String suffix) {
+    this.prefix = prefix;
+    this.shardTemplate = shardTemplate;
+    this.suffix = suffix;
+  }
+
+  /**
+   * A helper function to construct a {@link DefaultFilenamePolicy} using the standard filename
+   * parameters, namely a provided {@link ResourceId} for the output prefix, and possibly-null
+   * shard name template and suffix.
+   *
+   * <p>Any filename component of the provided resource will be used as the filename prefix.
+   *
+   * <p>If provided, the shard name template will be used; otherwise {@link #DEFAULT_SHARD_TEMPLATE}
+   * will be used.
+   *
+   * <p>If provided, the suffix will be used; otherwise the files will have an empty suffix.
+   */
+  public static DefaultFilenamePolicy constructUsingStandardParameters(
+      ValueProvider<ResourceId> outputPrefix,
+      @Nullable String shardTemplate,
+      @Nullable String filenameSuffix) {
+    return new DefaultFilenamePolicy(
+        NestedValueProvider.of(outputPrefix, new ExtractFilename()),
+        firstNonNull(shardTemplate, DEFAULT_SHARD_TEMPLATE),
+        firstNonNull(filenameSuffix, ""));
+  }
+
+  private final ValueProvider<String> prefix;
+  private final String shardTemplate;
+  private final String suffix;
+
+  /**
+   * Constructs a fully qualified name from components.
+   *
+   * <p>The name is built from a prefix, shard template (with shard numbers
+   * applied), and a suffix.  All components are required, but may be empty
+   * strings.
+   *
+   * <p>Within a shard template, repeating sequences of the letters "S" or "N"
+   * are replaced with the shard number, or number of shards respectively.  The
+   * numbers are formatted with leading zeros to match the length of the
+   * repeated sequence of letters.
+   *
+   * <p>For example, if prefix = "output", shardTemplate = "-SSS-of-NNN", and
+   * suffix = ".txt", with shardNum = 1 and numShards = 100, the following is
+   * produced:  "output-001-of-100.txt".
+   */
+  public static String constructName(
+      String prefix, String shardTemplate, String suffix, int shardNum, int numShards) {
+    // Matcher API works with StringBuffer, rather than StringBuilder.
+    StringBuffer sb = new StringBuffer();
+    sb.append(prefix);
+
+    Matcher m = SHARD_FORMAT_RE.matcher(shardTemplate);
+    while (m.find()) {
+      boolean isShardNum = (m.group(1).charAt(0) == 'S');
+
+      char[] zeros = new char[m.end() - m.start()];
+      Arrays.fill(zeros, '0');
+      DecimalFormat df = new DecimalFormat(String.valueOf(zeros));
+      String formatted = df.format(isShardNum ? shardNum : numShards);
+      m.appendReplacement(sb, formatted);
+    }
+    m.appendTail(sb);
+
+    sb.append(suffix);
+    return sb.toString();
+  }
+
+  @Override
+  @Nullable
+  public ResourceId unwindowedFilename(ResourceId outputDirectory, Context context,
+      String extension) {
+    String filename =
+        constructName(
+            prefix.get(), shardTemplate, suffix, context.getShardNumber(), context.getNumShards())
+        + extension;
+    return outputDirectory.resolve(filename, StandardResolveOptions.RESOLVE_FILE);
+  }
+
+  @Override
+  public ResourceId windowedFilename(ResourceId outputDirectory,
+      WindowedContext c, String extension) {
+    throw new UnsupportedOperationException("There is no default policy for windowed file"
+        + " output. Please provide an explicit FilenamePolicy to generate filenames.");
+  }
+
+  @Override
+  public void populateDisplayData(DisplayData.Builder builder) {
+    String filenamePattern;
+    if (prefix.isAccessible()) {
+      filenamePattern = String.format("%s%s%s", prefix.get(), shardTemplate, suffix);
+    } else {
+      filenamePattern = String.format("%s%s%s", prefix, shardTemplate, suffix);
+    }
+    builder.add(
+        DisplayData.item("filenamePattern", filenamePattern)
+            .withLabel("Filename Pattern"));
+  }
+
+  private static class ExtractFilename implements SerializableFunction<ResourceId, String> {
+    @Override
+    public String apply(ResourceId input) {
+      if (input.isDirectory()) {
+        return "";
+      } else {
+        return firstNonNull(input.getFilename(), "");
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/17358248/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index 7ba608c..0daf5dc 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -21,10 +21,11 @@ import static com.google.common.base.MoreObjects.firstNonNull;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
-import static com.google.common.base.Strings.isNullOrEmpty;
+import static com.google.common.base.Verify.verifyNotNull;
 
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Ordering;
 import java.io.IOException;
 import java.io.InputStream;
@@ -32,26 +33,27 @@ import java.io.OutputStream;
 import java.io.Serializable;
 import java.nio.channels.Channels;
 import java.nio.channels.WritableByteChannel;
-import java.nio.file.Path;
-import java.text.DecimalFormat;
 import java.util.ArrayList;
-import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.zip.GZIPOutputStream;
 import javax.annotation.Nullable;
-import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.NullableCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy.Context;
 import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy.WindowedContext;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
+import org.apache.beam.sdk.io.fs.MoveOptions.StandardMoveOptions;
+import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
+import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
@@ -61,13 +63,12 @@ import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.HasDisplayData;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.IOChannelFactory;
-import org.apache.beam.sdk.util.IOChannelUtils;
 import org.apache.beam.sdk.util.MimeTypes;
 import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream;
 import org.apache.commons.compress.compressors.deflate.DeflateCompressorOutputStream;
 import org.joda.time.Instant;
 import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -103,52 +104,12 @@ import org.slf4j.LoggerFactory;
  * PCollections into separate files per window pane. This allows file output from unbounded
  * PCollections, and also works for bounded PCollecctions.
  *
- * <p>Supported file systems are those registered with {@link IOChannelUtils}.
+ * <p>Supported file systems are those registered with {@link FileSystems}.
  *
  * @param <T> the type of values written to the sink.
  */
 public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
   private static final Logger LOG = LoggerFactory.getLogger(FileBasedSink.class);
-  // Pattern that matches shard placeholders within a shard template.
-  private static final Pattern SHARD_FORMAT_RE = Pattern.compile("(S+|N+)");
-
-  /**
-   * Constructs a fully qualified name from components.
-   *
-   * <p>The name is built from a prefix, shard template (with shard numbers
-   * applied), and a suffix.  All components are required, but may be empty
-   * strings.
-   *
-   * <p>Within a shard template, repeating sequences of the letters "S" or "N"
-   * are replaced with the shard number, or number of shards respectively.  The
-   * numbers are formatted with leading zeros to match the length of the
-   * repeated sequence of letters.
-   *
-   * <p>For example, if prefix = "output", shardTemplate = "-SSS-of-NNN", and
-   * suffix = ".txt", with shardNum = 1 and numShards = 100, the following is
-   * produced:  "output-001-of-100.txt".
-   */
-  public static String constructName(String prefix,
-      String shardTemplate, String suffix, int shardNum, int numShards) {
-    // Matcher API works with StringBuffer, rather than StringBuilder.
-    StringBuffer sb = new StringBuffer();
-    sb.append(prefix);
-
-    Matcher m = SHARD_FORMAT_RE.matcher(shardTemplate);
-    while (m.find()) {
-      boolean isShardNum = (m.group(1).charAt(0) == 'S');
-
-      char[] zeros = new char[m.end() - m.start()];
-      Arrays.fill(zeros, '0');
-      DecimalFormat df = new DecimalFormat(String.valueOf(zeros));
-      String formatted = df.format(isShardNum ? shardNum : numShards);
-      m.appendReplacement(sb, formatted);
-    }
-    m.appendTail(sb);
-
-    sb.append(suffix);
-    return sb.toString();
-  }
 
   /**
    * Directly supported file output compression types.
@@ -213,12 +174,32 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
   }
 
   /**
+   * This is a helper function for turning a user-provided output filename prefix and converting it
+   * into a {@link ResourceId} for writing output files. See {@link TextIO.Write#to(String)} for an
+   * example use case.
+   *
+   * <p>Typically, the input prefix will be something like {@code /tmp/foo/bar}, and the user would
+   * like output files to be named as {@code /tmp/foo/bar-0-of-3.txt}. Thus, this function tries to
+   * interpret the provided string as a file {@link ResourceId} path.
+   *
+   * <p>However, this may fail, for example if the user gives a prefix that is a directory. E.g.,
+   * {@code /}, {@code gs://my-bucket}, or {@code c://}. In that case, interpreting the string as a
+   * file will fail and this function will return a directory {@link ResourceId} instead.
+   */
+  public static ResourceId convertToFileResourceIfPossible(String outputPrefix) {
+    try {
+      return FileSystems.matchNewResource(outputPrefix, false /* isDirectory */);
+    } catch (Exception e) {
+      return FileSystems.matchNewResource(outputPrefix, true /* isDirectory */);
+    }
+  }
+
+  /**
    * The {@link WritableByteChannelFactory} that is used to wrap the raw data output to the
    * underlying channel. The default is to not compress the output using
    * {@link CompressionType#UNCOMPRESSED}.
    */
-  protected final WritableByteChannelFactory writableByteChannelFactory;
-
+  private final WritableByteChannelFactory writableByteChannelFactory;
 
   /**
    * A naming policy for output files.
@@ -294,23 +275,28 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
 
     /**
      * When a sink has requested windowed or triggered output, this method will be invoked to return
-     * the filename. The {@link WindowedContext} object gives access to the window and pane, as
-     * well as sharding information. The policy must return unique and consistent filenames
+     * the file {@link ResourceId resource} to be created given the base output directory and a
+     * (possibly empty) extension from {@link FileBasedSink} configuration
+     * (e.g., {@link CompressionType}).
+     *
+     * <p>The {@link WindowedContext} object gives access to the window and pane,
+     * as well as sharding information. The policy must return unique and consistent filenames
      * for different windows and panes.
      */
-    public abstract String windowedFilename(WindowedContext c);
-
-    /**
-     * When a sink has not requested windowed output, this method will be invoked to return the
-     * filename. The {@link Context} object only provides sharding information, which is used by
-     * the policy to generate unique and consistent filenames.
-     */
-    public abstract String unwindowedFilename(Context c);
+    public abstract ResourceId windowedFilename(
+        ResourceId outputDirectory, WindowedContext c, String extension);
 
     /**
-     * @return The base filename for all output files.
+     * When a sink has not requested windowed or triggered output, this method will be invoked to
+     * return the file {@link ResourceId resource} to be created given the base output directory and
+     * a (possibly empty) extension applied by additional {@link FileBasedSink} configuration
+     * (e.g., {@link CompressionType}).
+     *
+     * <p>The {@link Context} object only provides sharding information, which is used by the policy
+     * to generate unique and consistent filenames.
      */
-    public abstract ValueProvider<String> getBaseOutputFilenameProvider();
+    @Nullable public abstract ResourceId unwindowedFilename(
+        ResourceId outputDirectory, Context c, String extension);
 
     /**
      * Populates the display data.
@@ -319,129 +305,55 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
     }
   }
 
+  /** The policy used to generate names of files to be produced. */
+  @VisibleForTesting
+  final FilenamePolicy filenamePolicy;
+  /** The directory to which files will be written. */
+  private final ValueProvider<ResourceId> baseOutputDirectoryProvider;
+
   /**
-   * A default filename policy.
+   * Construct a {@link FileBasedSink} with the given filename policy, producing uncompressed files.
    */
-  protected class DefaultFilenamePolicy extends FilenamePolicy {
-    ValueProvider<String> baseOutputFilename;
-    String extension;
-    String fileNamingTemplate;
-
-    public DefaultFilenamePolicy(ValueProvider<String> baseOutputFilename, String extension,
-                                 String fileNamingTemplate) {
-      this.baseOutputFilename = baseOutputFilename;
-      if (!isNullOrEmpty(writableByteChannelFactory.getFilenameSuffix())) {
-        this.extension = extension + getFileExtension(
-            writableByteChannelFactory.getFilenameSuffix());
-      } else {
-        this.extension = extension;
-      }
-      this.fileNamingTemplate = fileNamingTemplate;
-    }
-
-    @Override
-    public String unwindowedFilename(FilenamePolicy.Context context) {
-      if (context.numShards <= 0) {
-        return null;
-      }
-
-      String suffix = getFileExtension(extension);
-      String filename = constructName(
-          baseOutputFilename.get(), fileNamingTemplate, suffix, context.getShardNumber(),
-          context.getNumShards());
-      return filename;
-    }
-
-    @Override
-    public String windowedFilename(FilenamePolicy.WindowedContext c) {
-      throw new UnsupportedOperationException("There is no default policy for windowed file"
-          + " output. Please provide an explicit FilenamePolicy to generate filenames.");
-    }
-
-    @Override
-    public ValueProvider<String> getBaseOutputFilenameProvider() {
-      return baseOutputFilename;
-    }
+  public FileBasedSink(
+      ValueProvider<ResourceId> baseOutputDirectoryProvider, FilenamePolicy filenamePolicy) {
+    this(baseOutputDirectoryProvider, filenamePolicy, CompressionType.UNCOMPRESSED);
+  }
 
+  private static class ExtractDirectory implements SerializableFunction<ResourceId, ResourceId> {
     @Override
-    public void populateDisplayData(DisplayData.Builder builder) {
-    String fileNamePattern = String.format("%s%s%s",
-        baseOutputFilename.isAccessible()
-        ? baseOutputFilename.get() : baseOutputFilename.toString(),
-        fileNamingTemplate, getFileExtension(extension));
-    builder.add(DisplayData.item("fileNamePattern", fileNamePattern)
-      .withLabel("File Name Pattern"));
+    public ResourceId apply(ResourceId input) {
+      return input.getCurrentDirectory();
     }
   }
 
   /**
-   * The policy used to generate output filenames.
-   */
-  protected FilenamePolicy fileNamePolicy;
-
-  /**
-   * Construct a FileBasedSink with the given base output filename and extension. A
-   * {@link WritableByteChannelFactory} of type {@link CompressionType#UNCOMPRESSED} will be used.
-   */
-  public FileBasedSink(String baseOutputFilename, String extension) {
-    this(baseOutputFilename, extension, ShardNameTemplate.INDEX_OF_MAX);
-  }
-
-  /**
-   * Construct a FileBasedSink with the given base output filename, extension, and
-   * {@link WritableByteChannelFactory}.
+   * Construct a {@link FileBasedSink} with the given filename policy and output channel type.
    */
-  public FileBasedSink(String baseOutputFilename, String extension,
+  public FileBasedSink(
+      ValueProvider<ResourceId> baseOutputDirectoryProvider,
+      FilenamePolicy filenamePolicy,
       WritableByteChannelFactory writableByteChannelFactory) {
-    this(StaticValueProvider.of(baseOutputFilename), extension,
-        ShardNameTemplate.INDEX_OF_MAX, writableByteChannelFactory);
+    this.baseOutputDirectoryProvider =
+        NestedValueProvider.of(baseOutputDirectoryProvider, new ExtractDirectory());
+    this.filenamePolicy = filenamePolicy;
+    this.writableByteChannelFactory = writableByteChannelFactory;
   }
 
   /**
-   * Construct a FileBasedSink with the given base output filename, extension, and file naming
-   * template. A {@link WritableByteChannelFactory} of type {@link CompressionType#UNCOMPRESSED}
-   * will be used.
-   *
-   * <p>See {@link ShardNameTemplate} for a description of file naming templates.
+   * Returns the base directory inside which files will be written according to the configured
+   * {@link FilenamePolicy}.
    */
-  public FileBasedSink(String baseOutputFilename, String extension, String fileNamingTemplate) {
-    this(StaticValueProvider.of(baseOutputFilename), extension, fileNamingTemplate,
-        CompressionType.UNCOMPRESSED);
-  }
-
-  /**
-   * Construct a FileBasedSink with the given base output filename, extension, file naming template,
-   * and {@link WritableByteChannelFactory}.
-   *
-   * <p>See {@link ShardNameTemplate} for a description of file naming templates.
-   */
-  public FileBasedSink(ValueProvider<String> baseOutputFilename, String extension,
-      String fileNamingTemplate, WritableByteChannelFactory writableByteChannelFactory) {
-    this.writableByteChannelFactory = writableByteChannelFactory;
-    this.fileNamePolicy = new DefaultFilenamePolicy(baseOutputFilename, extension,
-        fileNamingTemplate);
-  }
-
-  public FileBasedSink(FilenamePolicy fileNamePolicy) {
-    this(fileNamePolicy, CompressionType.UNCOMPRESSED);
-
-  }
-
-  public FileBasedSink(FilenamePolicy fileNamePolicy,
-                       WritableByteChannelFactory writableByteChannelFactory) {
-    this.fileNamePolicy = fileNamePolicy;
-    this.writableByteChannelFactory = writableByteChannelFactory;
+  public ValueProvider<ResourceId> getBaseOutputDirectoryProvider() {
+    return baseOutputDirectoryProvider;
   }
 
   /**
-   * Returns the base output filename for this file based sink.
+   * Returns the policy by which files will be named inside of the base output directory. Note that
+   * the {@link FilenamePolicy} may itself specify one or more inner directories before each output
+   * file, say when writing windowed outputs in a {@code output/YYYY/MM/DD/file.txt} format.
    */
-  public ValueProvider<String> getBaseOutputFilenameProvider() {
-    return fileNamePolicy.getBaseOutputFilenameProvider();
-  }
-
-  public FilenamePolicy getFileNamePolicy() {
-    return fileNamePolicy;
+  public final FilenamePolicy getFilenamePolicy() {
+    return filenamePolicy;
   }
 
   public void validate(PipelineOptions options) {}
@@ -453,22 +365,7 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
   public abstract FileBasedWriteOperation<T> createWriteOperation();
 
   public void populateDisplayData(DisplayData.Builder builder) {
-    getFileNamePolicy().populateDisplayData(builder);
-  }
-
-  /**
-   * Returns the file extension to be used. If the user did not request a file
-   * extension then this method returns the empty string. Otherwise this method
-   * adds a {@code "."} to the beginning of the users extension if one is not present.
-   */
-  private static String getFileExtension(String usersExtension) {
-    if (usersExtension == null || usersExtension.isEmpty()) {
-      return "";
-    }
-    if (usersExtension.startsWith(".")) {
-      return usersExtension;
-    }
-    return "." + usersExtension;
+    getFilenamePolicy().populateDisplayData(builder);
   }
 
   /**
@@ -518,15 +415,15 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
     protected final FileBasedSink<T> sink;
 
     /** Directory for temporary output files. */
-    protected final ValueProvider<String> tempDirectory;
+    protected final ValueProvider<ResourceId> tempDirectory;
 
     /** Whether windowed writes are being used. */
     protected  boolean windowedWrites;
 
-    /** Constructs a temporary file path given the temporary directory and a filename. */
-    protected static String buildTemporaryFilename(String tempDirectory, String filename)
+    /** Constructs a temporary file resource given the temporary directory and a filename. */
+    protected static ResourceId buildTemporaryFilename(ResourceId tempDirectory, String filename)
         throws IOException {
-      return IOChannelUtils.getFactory(tempDirectory).resolve(tempDirectory, filename);
+      return tempDirectory.resolve(filename, StandardResolveOptions.RESOLVE_FILE);
     }
 
     /**
@@ -540,30 +437,26 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
      */
     public FileBasedWriteOperation(FileBasedSink<T> sink) {
       this(sink, NestedValueProvider.of(
-          sink.getBaseOutputFilenameProvider(), new TemporaryDirectoryBuilder()));
+          sink.getBaseOutputDirectoryProvider(), new TemporaryDirectoryBuilder()));
     }
 
     private static class TemporaryDirectoryBuilder
-        implements SerializableFunction<String, String> {
+        implements SerializableFunction<ResourceId, ResourceId> {
+      private static final AtomicLong TEMP_COUNT = new AtomicLong(0);
+      private static final DateTimeFormatter TEMPDIR_TIMESTAMP =
+          DateTimeFormat.forPattern("yyyy-MM-DD_HH-mm-ss");
       // The intent of the code is to have a consistent value of tempDirectory across
       // all workers, which wouldn't happen if now() was called inline.
-      Instant now = Instant.now();
+      private final String timestamp = Instant.now().toString(TEMPDIR_TIMESTAMP);
+      // Multiple different sinks may be used in the same output directory; use tempId to create a
+      // separate temp directory for each.
+      private final Long tempId = TEMP_COUNT.getAndIncrement();
 
       @Override
-      public String apply(String baseOutputFilename) {
-        try {
-          IOChannelFactory factory = IOChannelUtils.getFactory(baseOutputFilename);
-          Path baseOutputPath = factory.toPath(baseOutputFilename);
-          return baseOutputPath
-              .resolveSibling(
-                  "temp-beam-"
-                  + baseOutputPath.getFileName()
-                  + "-"
-                  + now.toString(DateTimeFormat.forPattern("yyyy-MM-DD_HH-mm-ss")))
-              .toString();
-        } catch (IOException e) {
-          throw new RuntimeException(e);
-        }
+      public ResourceId apply(ResourceId baseOutputDirectory) {
+        // Temp directory has a timestamp and a unique ID
+        String tempDirName = String.format(".temp-beam-%s-%s", timestamp, tempId);
+        return baseOutputDirectory.resolve(tempDirName, StandardResolveOptions.RESOLVE_DIRECTORY);
       }
     }
 
@@ -573,11 +466,12 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
      * @param sink the FileBasedSink that will be used to configure this write operation.
      * @param tempDirectory the base directory to be used for temporary output files.
      */
-    public FileBasedWriteOperation(FileBasedSink<T> sink, String tempDirectory) {
+    public FileBasedWriteOperation(FileBasedSink<T> sink, ResourceId tempDirectory) {
       this(sink, StaticValueProvider.of(tempDirectory));
     }
 
-    private FileBasedWriteOperation(FileBasedSink<T> sink, ValueProvider<String> tempDirectory) {
+    private FileBasedWriteOperation(
+        FileBasedSink<T> sink, ValueProvider<ResourceId> tempDirectory) {
       this.sink = sink;
       this.tempDirectory = tempDirectory;
       this.windowedWrites = false;
@@ -587,7 +481,7 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
      * Clients must implement to return a subclass of {@link FileBasedSink.FileBasedWriter}. This
      * method must not mutate the state of the object.
      */
-    public abstract FileBasedWriter<T> createWriter(PipelineOptions options) throws Exception;
+    public abstract FileBasedWriter<T> createWriter() throws Exception;
 
     /**
      * Indicates that the operation will be performing windowed writes.
@@ -610,11 +504,10 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
      *
      * @param writerResults the results of writes (FileResult).
      */
-    public void finalize(Iterable<FileResult> writerResults,
-                         PipelineOptions options) throws Exception {
+    public void finalize(Iterable<FileResult> writerResults) throws Exception {
       // Collect names of temporary files and rename them.
-      Map<String, String> outputFilenames = buildOutputFilenames(writerResults);
-      copyToOutputFiles(outputFilenames, options);
+      Map<ResourceId, ResourceId> outputFilenames = buildOutputFilenames(writerResults);
+      copyToOutputFiles(outputFilenames);
 
       // Optionally remove temporary files.
       // We remove the entire temporary directory, rather than specifically removing the files
@@ -625,12 +518,13 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
       //
       // When windows or triggers are specified, files are generated incrementally so deleting
       // the entire directory in finalize is incorrect.
-      removeTemporaryFiles(outputFilenames.keySet(), !windowedWrites, options);
+      removeTemporaryFiles(outputFilenames.keySet(), !windowedWrites);
     }
 
-    protected final Map<String, String> buildOutputFilenames(Iterable<FileResult> writerResults) {
-      Map<String, String> outputFilenames = new HashMap<>();
-      List<String> files = new ArrayList<>();
+    protected final Map<ResourceId, ResourceId> buildOutputFilenames(
+        Iterable<FileResult> writerResults) {
+      Map<ResourceId, ResourceId> outputFilenames = new HashMap<>();
+      List<ResourceId> files = new ArrayList<>();
       for (FileResult result : writerResults) {
         if (result.getDestinationFilename() != null) {
           outputFilenames.put(result.getFilename(), result.getDestinationFilename());
@@ -639,20 +533,21 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
         }
       }
 
-      // If the user does not specify numShards() (not supported with windowing). Then the
       // writerResults won't contain destination filenames, so we dynamically generate them here.
       if (files.size() > 0) {
         checkArgument(outputFilenames.isEmpty());
         // Sort files for idempotence.
-        files = Ordering.natural().sortedCopy(files);
-        FilenamePolicy filenamePolicy = getSink().fileNamePolicy;
+        files = Ordering.usingToString().sortedCopy(files);
+        ResourceId outputDirectory = getSink().getBaseOutputDirectoryProvider().get();
+        FilenamePolicy filenamePolicy = getSink().filenamePolicy;
         for (int i = 0; i < files.size(); i++) {
           outputFilenames.put(files.get(i),
-              filenamePolicy.unwindowedFilename(new Context(i, files.size())));
+              filenamePolicy.unwindowedFilename(outputDirectory, new Context(i, files.size()),
+                  getSink().getExtension()));
         }
       }
 
-      int numDistinctShards = new HashSet<String>(outputFilenames.values()).size();
+      int numDistinctShards = new HashSet<ResourceId>(outputFilenames.values()).size();
       checkState(numDistinctShards == outputFilenames.size(),
          "Only generated %s distinct file names for %s files.",
          numDistinctShards, outputFilenames.size());
@@ -673,15 +568,21 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
      *
      * @param filenames the filenames of temporary files.
      */
-    protected final void copyToOutputFiles(Map<String, String> filenames,
-                                           PipelineOptions options)
+    @VisibleForTesting
+    final void copyToOutputFiles(Map<ResourceId, ResourceId> filenames)
         throws IOException {
       int numFiles = filenames.size();
       if (numFiles > 0) {
         LOG.debug("Copying {} files.", numFiles);
-        IOChannelFactory channelFactory =
-            IOChannelUtils.getFactory(filenames.values().iterator().next());
-        channelFactory.copy(filenames.keySet(), filenames.values());
+        List<ResourceId> srcFiles = new ArrayList<>(filenames.size());
+        List<ResourceId> dstFiles = new ArrayList<>(filenames.size());
+        for (Map.Entry<ResourceId, ResourceId> srcDestPair : filenames.entrySet()) {
+          srcFiles.add(srcDestPair.getKey());
+          dstFiles.add(srcDestPair.getValue());
+        }
+        // During a failure case, files may have been deleted in an earlier step. Thus
+        // we ignore missing files here.
+        FileSystems.copy(srcFiles, dstFiles, StandardMoveOptions.IGNORE_MISSING_FILES);
       } else {
         LOG.info("No output files to write.");
       }
@@ -694,13 +595,11 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
      * <b>Note:</b>If finalize is overridden and does <b>not</b> rename or otherwise finalize
      * temporary files, this method will remove them.
      */
-    protected final void removeTemporaryFiles(Set<String> knownFiles,
-                                              boolean shouldRemoveTemporaryDirectory,
-                                              PipelineOptions options)
-        throws IOException {
-      String tempDir = tempDirectory.get();
+    @VisibleForTesting
+    final void removeTemporaryFiles(
+        Set<ResourceId> knownFiles, boolean shouldRemoveTemporaryDirectory) throws IOException {
+      ResourceId tempDir = tempDirectory.get();
       LOG.debug("Removing temporary bundle output files in {}.", tempDir);
-      IOChannelFactory factory = IOChannelUtils.getFactory(tempDir);
 
       // To partially mitigate the effects of filesystems with eventually-consistent
       // directory matching APIs, we remove not only files that the filesystem says exist
@@ -709,17 +608,21 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
 
       // This may still fail to remove temporary outputs of some failed bundles, but at least
       // the common case (where all bundles succeed) is guaranteed to be fully addressed.
-      Set<String> matches = new HashSet<>();
+      Set<ResourceId> matches = new HashSet<>();
       // TODO: Windows OS cannot resolves and matches '*' in the path,
       // ignore the exception for now to avoid failing the pipeline.
       if (shouldRemoveTemporaryDirectory) {
         try {
-          matches.addAll(factory.match(factory.resolve(tempDir, "*")));
+          MatchResult singleMatch = Iterables.getOnlyElement(
+              FileSystems.match(Collections.singletonList(tempDir.toString() + "*")));
+          for (Metadata matchResult : singleMatch.metadata()) {
+            matches.add(matchResult.resourceId());
+          }
         } catch (Exception e) {
           LOG.warn("Failed to match temporary files under: [{}].", tempDir);
         }
       }
-      Set<String> allMatches = new HashSet<>(matches);
+      Set<ResourceId> allMatches = new HashSet<>(matches);
       allMatches.addAll(knownFiles);
       LOG.debug(
           "Removing {} temporary files found under {} ({} matched glob, {} known files)",
@@ -727,23 +630,18 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
           tempDir,
           matches.size(),
           allMatches.size() - matches.size());
+      FileSystems.delete(allMatches, StandardMoveOptions.IGNORE_MISSING_FILES);
+
       // Deletion of the temporary directory might fail, if not all temporary files are removed.
       try {
-        factory.remove(allMatches);
-        factory.remove(ImmutableList.of(tempDir));
+        FileSystems.delete(
+            Collections.singletonList(tempDir), StandardMoveOptions.IGNORE_MISSING_FILES);
       } catch (Exception e) {
         LOG.warn("Failed to remove temporary directory: [{}].", tempDir);
       }
     }
 
     /**
-     * Provides a coder for {@link FileBasedSink.FileResult}.
-     */
-    public final Coder<FileResult> getFileResultCoder() {
-      return FileResultCoder.of();
-    }
-
-    /**
      * Returns the FileBasedSink for this write operation.
      */
     public FileBasedSink<T> getSink() {
@@ -751,6 +649,15 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
     }
   }
 
+  /** Returns the extension that will be written to the produced files. */
+  protected final String getExtension() {
+    String extension = MoreObjects.firstNonNull(writableByteChannelFactory.getFilenameSuffix(), "");
+    if (!extension.isEmpty() && !extension.startsWith(".")) {
+      extension = "." + extension;
+    }
+    return extension;
+  }
+
   /**
    * Abstract writer that writes a bundle to a {@link FileBasedSink}. Subclass
    * implementations provide a method that can write a single value to a
@@ -760,19 +667,17 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
    * after the values in a bundle, respectively, as well as provide a MIME type for the output
    * channel.
    *
-   * <p>Multiple FileBasedWriter instances may be created on the same worker, and therefore any
-   * access to static members or methods should be thread safe.
+   * <p>Multiple {@link FileBasedWriter} instances may be created on the same worker, and therefore
+   * any access to static members or methods should be thread safe.
    *
    * @param <T> the type of values to write.
    */
   public abstract static class FileBasedWriter<T> {
     private static final Logger LOG = LoggerFactory.getLogger(FileBasedWriter.class);
 
-    final FileBasedWriteOperation<T> writeOperation;
+    private final FileBasedWriteOperation<T> writeOperation;
 
-    /**
-     * Unique id for this output bundle.
-     */
+    /** Unique id for this output bundle. */
     private String id;
 
     private BoundedWindow window;
@@ -780,10 +685,8 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
     private int shard = -1;
     private int numShards = -1;
 
-    /**
-     * The filename of the output bundle.
-     */
-    private String filename;
+    /** The output file for this bundle. May be null if opening failed. */
+    private @Nullable ResourceId outputFile;
 
     /**
      * The channel to write to.
@@ -801,7 +704,7 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
     private final String mimeType;
 
     /**
-     * Construct a new FileBasedWriter with a base filename.
+     * Construct a new {@link FileBasedWriter} that will produce files of the given MIME type.
      */
     public FileBasedWriter(FileBasedWriteOperation<T> writeOperation, String mimeType) {
       checkNotNull(writeOperation);
@@ -846,11 +749,9 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
      * shard and numShards are populated for the case of static sharding. In cases where the
      * runner is dynamically picking sharding, shard and numShards might both be set to -1.
      */
-    public final void openWindowed(String uId,
-                                   BoundedWindow window,
-                                   PaneInfo paneInfo,
-                                   int shard,
-                                   int numShards) throws Exception {
+    public final void openWindowed(
+        String uId, BoundedWindow window, PaneInfo paneInfo, int shard, int numShards)
+        throws Exception {
       if (!getWriteOperation().windowedWrites) {
         throw new IllegalStateException("openWindowed called a non-windowed sink.");
       }
@@ -875,6 +776,19 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
       open(uId, null, null, shard, numShards);
     }
 
+    // Helper function to close a channel, on exception cases.
+    // Always throws prior exception, with any new closing exception suppressed.
+    private static void closeChannelAndThrow(
+        WritableByteChannel channel, ResourceId filename, Exception prior) throws Exception {
+      try {
+        channel.close();
+      } catch (Exception e) {
+        LOG.error("Closing channel for {} failed.", filename, e);
+        prior.addSuppressed(e);
+        throw prior;
+      }
+    }
+
     private void open(String uId,
                       @Nullable BoundedWindow window,
                       @Nullable PaneInfo paneInfo,
@@ -885,64 +799,98 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
       this.paneInfo = paneInfo;
       this.shard = shard;
       this.numShards = numShards;
-      filename = FileBasedWriteOperation.buildTemporaryFilename(
-          getWriteOperation().tempDirectory.get(), uId);
-      LOG.debug("Opening {}.", filename);
+      ResourceId tempDirectory = getWriteOperation().tempDirectory.get();
+      outputFile = tempDirectory.resolve(id, StandardResolveOptions.RESOLVE_FILE);
+      verifyNotNull(
+          outputFile, "FileSystems are not allowed to return null from resolve: %s", tempDirectory);
+
       final WritableByteChannelFactory factory =
           getWriteOperation().getSink().writableByteChannelFactory;
       // The factory may force a MIME type or it may return null, indicating to use the sink's MIME.
       String channelMimeType = firstNonNull(factory.getMimeType(), mimeType);
-      channel = factory.create(IOChannelUtils.create(filename, channelMimeType));
+      LOG.debug("Opening {} for write with MIME type {}.", outputFile, channelMimeType);
+      WritableByteChannel tempChannel = FileSystems.create(outputFile, channelMimeType);
+      try {
+        channel = factory.create(tempChannel);
+      } catch (Exception e) {
+        // If we have opened the underlying channel but fail to open the compression channel,
+        // we should still close the underlying channel.
+        closeChannelAndThrow(tempChannel, outputFile, e);
+      }
+
+      // The caller shouldn't have to close() this FileBasedWriter if it fails to open(), so close
+      // the channel if prepareWrite() or writeHeader() fails.
+      String step = "";
       try {
+        LOG.debug("Preparing write to {}.", outputFile);
         prepareWrite(channel);
-        LOG.debug("Writing header to {}.", filename);
+
+        LOG.debug("Writing header to {}.", outputFile);
         writeHeader();
       } catch (Exception e) {
-        // The caller shouldn't have to close() this Writer if it fails to open(), so close the
-        // channel if prepareWrite() or writeHeader() fails.
-        try {
-          LOG.error("Writing header to {} failed, closing channel.", filename);
-          channel.close();
-        } catch (IOException closeException) {
-          // Log exception and mask it.
-          LOG.error("Closing channel for {} failed: {}", filename, closeException.getMessage());
-        }
-        // Throw the exception that caused the write to fail.
-        throw e;
+        LOG.error("Beginning write to {} failed, closing channel.", step, outputFile, e);
+        closeChannelAndThrow(channel, outputFile, e);
       }
-      LOG.debug("Starting write of bundle {} to {}.", this.id, filename);
+
+      LOG.debug("Starting write of bundle {} to {}.", this.id, outputFile);
     }
 
     public final void cleanup() throws Exception {
-      if (filename != null) {
-        IOChannelUtils.getFactory(filename).remove(Lists.<String>newArrayList(filename));
+      if (outputFile != null) {
+        // outputFile may be null if open() was not called or failed.
+        FileSystems.delete(
+            Collections.singletonList(outputFile), StandardMoveOptions.IGNORE_MISSING_FILES);
       }
     }
 
-    /**
-     * Closes the channel and returns the bundle result.
-     */
+    /** Closes the channel and returns the bundle result. */
     public final FileResult close() throws Exception {
-      try (WritableByteChannel theChannel = channel) {
-        LOG.debug("Writing footer to {}.", filename);
+      checkState(outputFile != null, "FileResult.close cannot be called with a null outputFile");
+
+      LOG.debug("Writing footer to {}.", outputFile);
+      try {
         writeFooter();
-        LOG.debug("Finishing write to {}.", filename);
+      } catch (Exception e) {
+        LOG.error("Writing footer to {} failed, closing channel.", outputFile, e);
+        closeChannelAndThrow(channel, outputFile, e);
+      }
+
+      LOG.debug("Finishing write to {}.", outputFile);
+      try {
         finishWrite();
-        if (!channel.isOpen()) {
-          throw new IllegalStateException("Channel should only be closed by its owner: " + channel);
-        }
+      } catch (Exception e) {
+        LOG.error("Finishing write to {} failed, closing channel.", outputFile, e);
+        closeChannelAndThrow(channel, outputFile, e);
       }
 
-      FilenamePolicy filenamePolicy = getWriteOperation().getSink().fileNamePolicy;
-      String destinationFile;
+      checkState(
+          channel.isOpen(),
+          "Channel %s to %s should only be closed by its owner: %s", channel, outputFile);
+
+      LOG.debug("Closing channel to {}.", outputFile);
+      try {
+        channel.close();
+      } catch (Exception e) {
+        throw new IOException(String.format("Failed closing channel to %s", outputFile), e);
+      }
+
+      FileBasedSink<T> sink = getWriteOperation().getSink();
+      ResourceId outputDirectory = sink.getBaseOutputDirectoryProvider().get();
+      FilenamePolicy filenamePolicy = sink.filenamePolicy;
+      String extension = sink.getExtension();
+      @Nullable ResourceId destinationFile;
       if (window != null) {
-        destinationFile = filenamePolicy.windowedFilename(new WindowedContext(
-            window, paneInfo, shard, numShards));
+        destinationFile = filenamePolicy.windowedFilename(outputDirectory, new WindowedContext(
+            window, paneInfo, shard, numShards), extension);
+      } else if (numShards > 0) {
+        destinationFile = filenamePolicy.unwindowedFilename(
+            outputDirectory, new Context(shard, numShards), extension);
       } else {
-        destinationFile =  filenamePolicy.unwindowedFilename(new Context(shard, numShards));
+        // Destination filename to be generated in the next step.
+        destinationFile = null;
       }
-      FileResult result = new FileResult(filename, destinationFile);
-      LOG.debug("Result for bundle {}: {} {}", this.id, filename, destinationFile);
+      FileResult result = new FileResult(outputFile, destinationFile);
+      LOG.debug("Result for bundle {}: {} {}", this.id, outputFile, destinationFile);
       return result;
     }
 
@@ -955,33 +903,44 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
   }
 
   /**
-   * Result of a single bundle write. Contains the filename of the bundle.
+   * Result of a single bundle write. Contains the filename produced by the bundle, and if known
+   * the final output filename.
    */
-  public static final class FileResult {
-    private final String filename;
-    private final String destinationFilename;
+  public static final class FileResult implements Serializable {
+    private final ResourceId filename;
+    @Nullable private final ResourceId destinationFilename;
 
-    public FileResult(String filename, String destinationFilename) {
+    public FileResult(ResourceId filename, @Nullable ResourceId destinationFilename) {
       this.filename = filename;
       this.destinationFilename = destinationFilename;
     }
 
-    public String getFilename() {
+    public ResourceId getFilename() {
       return filename;
     }
 
-    public String getDestinationFilename() {
+    /**
+     * The filename to be written. Will be null if the output filename is unknown because the number
+     * of shards is determined dynamically by the runner.
+     */
+    @Nullable public ResourceId getDestinationFilename() {
       return destinationFilename;
     }
 
+    public String toString() {
+      return MoreObjects.toStringHelper(FileResult.class)
+          .add("filename", filename)
+          .add("destinationFilename", destinationFilename)
+          .toString();
+    }
   }
 
   /**
-   * A coder for FileResult objects.
+   * A coder for {@link FileResult} objects.
    */
   public static final class FileResultCoder extends CustomCoder<FileResult> {
     private static final FileResultCoder INSTANCE = new FileResultCoder();
-    private final Coder<String> stringCoder = NullableCoder.of(StringUtf8Coder.of());
+    private final NullableCoder<String> stringCoder = NullableCoder.of(StringUtf8Coder.of());
 
     public static FileResultCoder of() {
       return INSTANCE;
@@ -993,25 +952,33 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
       if (value == null) {
         throw new CoderException("cannot encode a null value");
       }
-      stringCoder.encode(value.getFilename(), outStream, context.nested());
-      stringCoder.encode(value.getDestinationFilename(), outStream, context.nested());
+      stringCoder.encode(value.getFilename().toString(), outStream, context.nested());
+      if (value.getDestinationFilename() == null) {
+        stringCoder.encode(null, outStream, context);
+      } else {
+        stringCoder.encode(value.getDestinationFilename().toString(), outStream, context);
+      }
     }
 
     @Override
     public FileResult decode(InputStream inStream, Context context)
         throws IOException {
+      String filename = stringCoder.decode(inStream, context.nested());
+      assert filename != null;  // fixes a compiler warning
+      @Nullable String destinationFilename = stringCoder.decode(inStream, context);
       return new FileResult(
-          stringCoder.decode(inStream, context.nested()),
-          stringCoder.decode(inStream, context.nested()));
+          FileSystems.matchNewResource(filename, false /* isDirectory */),
+          destinationFilename == null
+              ? null
+              : FileSystems.matchNewResource(destinationFilename, false /* isDirectory */));
     }
 
     @Override
     public void verifyDeterministic() throws NonDeterministicException {
-      throw new NonDeterministicException(this, "TableRows are not deterministic.");
+      stringCoder.verifyDeterministic();
     }
   }
 
-
   /**
    * Implementations create instances of {@link WritableByteChannel} used by {@link FileBasedSink}
    * and related classes to allow <em>decorating</em>, or otherwise transforming, the raw data that


[4/4] beam git commit: This closes #2779

Posted by dh...@apache.org.
This closes #2779


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1bc50d62
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1bc50d62
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1bc50d62

Branch: refs/heads/master
Commit: 1bc50d627289e6cbce617424744628a8cda4c52a
Parents: b4bafd0 1735824
Author: Dan Halperin <dh...@google.com>
Authored: Wed May 3 17:45:10 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Wed May 3 17:45:10 2017 -0700

----------------------------------------------------------------------
 .../examples/common/WriteOneFilePerWindow.java  |  52 +-
 .../construction/PTransformMatchersTest.java    |  10 +-
 .../direct/WriteWithShardingFactoryTest.java    |  32 +-
 .../beam/runners/dataflow/DataflowRunner.java   |  12 +-
 .../runners/dataflow/DataflowRunnerTest.java    |   4 +
 .../java/org/apache/beam/sdk/io/AvroIO.java     | 213 ++++---
 .../java/org/apache/beam/sdk/io/AvroSink.java   |  22 +-
 .../beam/sdk/io/DefaultFilenamePolicy.java      | 169 ++++++
 .../org/apache/beam/sdk/io/FileBasedSink.java   | 593 +++++++++----------
 .../java/org/apache/beam/sdk/io/TFRecordIO.java | 126 ++--
 .../java/org/apache/beam/sdk/io/TextIO.java     | 249 ++++----
 .../java/org/apache/beam/sdk/io/TextSink.java   |  20 +-
 .../java/org/apache/beam/sdk/io/WriteFiles.java |  15 +-
 .../apache/beam/sdk/util/IOChannelUtils.java    |   6 +-
 .../apache/beam/sdk/util/NoopPathValidator.java |  12 +-
 .../beam/sdk/util/NumberedShardedFile.java      |   4 +-
 .../org/apache/beam/sdk/util/PathValidator.java |  15 +-
 .../java/org/apache/beam/sdk/io/AvroIOTest.java |  65 +-
 .../beam/sdk/io/DefaultFilenamePolicyTest.java  |  55 ++
 .../apache/beam/sdk/io/FileBasedSinkTest.java   | 330 +++++------
 .../java/org/apache/beam/sdk/io/SimpleSink.java |  27 +-
 .../java/org/apache/beam/sdk/io/TextIOTest.java |  45 +-
 .../org/apache/beam/sdk/io/WriteFilesTest.java  |  41 +-
 .../apache/beam/sdk/util/GcsPathValidator.java  |  22 +-
 .../java/org/apache/beam/sdk/io/xml/XmlIO.java  |  15 +-
 .../org/apache/beam/sdk/io/xml/XmlSink.java     |  16 +-
 .../org/apache/beam/sdk/io/xml/XmlSinkTest.java |  52 +-
 27 files changed, 1251 insertions(+), 971 deletions(-)
----------------------------------------------------------------------



[2/4] beam git commit: Convert WriteFiles/FileBasedSink from IOChannelFactory to FileSystems

Posted by dh...@apache.org.
http://git-wip-us.apache.org/repos/asf/beam/blob/17358248/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
index fe0b97d..3198829 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
@@ -36,10 +36,12 @@ import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.Read.Bounded;
 import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
+import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.util.IOChannelUtils;
 import org.apache.beam.sdk.util.MimeTypes;
@@ -73,9 +75,9 @@ public class TFRecordIO {
    */
   public static Write write() {
     return new AutoValue_TFRecordIO_Write.Builder()
-        .setFilenameSuffix("")
+        .setShardTemplate(null)
+        .setFilenameSuffix(null)
         .setNumShards(0)
-        .setShardTemplate(Write.DEFAULT_SHARD_TEMPLATE)
         .setCompressionType(CompressionType.NONE)
         .build();
   }
@@ -212,7 +214,7 @@ public class TFRecordIO {
 
     @Override
     protected Coder<byte[]> getDefaultOutputCoder() {
-      return DEFAULT_BYTE_ARRAY_CODER;
+      return ByteArrayCoder.of();
     }
   }
 
@@ -221,20 +223,17 @@ public class TFRecordIO {
   /** Implementation of {@link #write}. */
   @AutoValue
   public abstract static class Write extends PTransform<PCollection<byte[]>, PDone> {
-    private static final String DEFAULT_SHARD_TEMPLATE = ShardNameTemplate.INDEX_OF_MAX;
-
-    /** The prefix of each file written, combined with suffix and shardTemplate. */
-    @Nullable
-    abstract ValueProvider<String> getFilenamePrefix();
+    /** The directory to which files will be written. */
+    @Nullable abstract ValueProvider<ResourceId> getOutputPrefix();
 
     /** The suffix of each file written, combined with prefix and shardTemplate. */
-    abstract String getFilenameSuffix();
+    @Nullable abstract String getFilenameSuffix();
 
     /** Requested number of shards. 0 for automatic. */
     abstract int getNumShards();
 
     /** The shard template of each file written, combined with prefix and suffix. */
-    abstract String getShardTemplate();
+    @Nullable abstract String getShardTemplate();
 
     /** Option to indicate the output sink's compression type. Default is NONE. */
     abstract CompressionType getCompressionType();
@@ -243,38 +242,51 @@ public class TFRecordIO {
 
     @AutoValue.Builder
     abstract static class Builder {
-      abstract Builder setFilenamePrefix(ValueProvider<String> filenamePrefix);
+      abstract Builder setOutputPrefix(ValueProvider<ResourceId> outputPrefix);
+
+      abstract Builder setShardTemplate(String shardTemplate);
 
       abstract Builder setFilenameSuffix(String filenameSuffix);
 
       abstract Builder setNumShards(int numShards);
 
-      abstract Builder setShardTemplate(String shardTemplate);
-
       abstract Builder setCompressionType(CompressionType compressionType);
 
       abstract Write build();
     }
 
     /**
-     * Writes to TFRecord file(s) with the given prefix. This can be a local filename
-     * (if running locally), or a Google Cloud Storage filename of
-     * the form {@code "gs://<bucket>/<filepath>"}
-     * (if running locally or using remote execution).
+     * Writes TFRecord file(s) with the given output prefix. The {@code prefix} will be used as a
+     * to generate a {@link ResourceId} using any supported {@link FileSystem}.
+     *
+     * <p>In addition to their prefix, created files will have a shard identifier (see
+     * {@link #withNumShards(int)}), and end in a common suffix, if given by
+     * {@link #withSuffix(String)}.
+     *
+     * <p>For more information on filenames, see {@link DefaultFilenamePolicy}.
+     */
+    public Write to(String outputPrefix) {
+      return to(FileBasedSink.convertToFileResourceIfPossible(outputPrefix));
+    }
+
+    /**
+     * Writes TFRecord file(s) with a prefix given by the specified resource.
+     *
+     * <p>In addition to their prefix, created files will have a shard identifier (see
+     * {@link #withNumShards(int)}), and end in a common suffix, if given by
+     * {@link #withSuffix(String)}.
      *
-     * <p>The files written will begin with this prefix, followed by
-     * a shard identifier (see {@link #withNumShards(int)}, and end
-     * in a common extension, if given by {@link #withSuffix(String)}.
+     * <p>For more information on filenames, see {@link DefaultFilenamePolicy}.
      */
-    public Write to(String filenamePrefix) {
-      return to(StaticValueProvider.of(filenamePrefix));
+    public Write to(ResourceId outputResource) {
+      return toResource(StaticValueProvider.of(outputResource));
     }
 
     /**
-     * Like {@link #to(String)}, but with a {@link ValueProvider}.
+     * Like {@link #to(ResourceId)}.
      */
-    public Write to(ValueProvider<String> filenamePrefix) {
-      return toBuilder().setFilenamePrefix(filenamePrefix).build();
+    public Write toResource(ValueProvider<ResourceId> outputResource) {
+      return toBuilder().setOutputPrefix(outputResource).build();
     }
 
     /**
@@ -282,8 +294,8 @@ public class TFRecordIO {
      *
      * @see ShardNameTemplate
      */
-    public Write withSuffix(String nameExtension) {
-      return toBuilder().setFilenameSuffix(nameExtension).build();
+    public Write withSuffix(String suffix) {
+      return toBuilder().setFilenameSuffix(suffix).build();
     }
 
     /**
@@ -298,7 +310,7 @@ public class TFRecordIO {
      * @see ShardNameTemplate
      */
     public Write withNumShards(int numShards) {
-      checkArgument(numShards >= 0);
+      checkArgument(numShards >= 0, "Number of shards %s must be >= 0", numShards);
       return toBuilder().setNumShards(numShards).build();
     }
 
@@ -338,16 +350,13 @@ public class TFRecordIO {
 
     @Override
     public PDone expand(PCollection<byte[]> input) {
-      if (getFilenamePrefix() == null) {
-        throw new IllegalStateException(
-            "need to set the filename prefix of a TFRecordIO.Write transform");
-     }
-      org.apache.beam.sdk.io.WriteFiles<byte[]> write =
-          org.apache.beam.sdk.io.WriteFiles.to(
+      checkState(getOutputPrefix() != null,
+          "need to set the output prefix of a TFRecordIO.Write transform");
+      WriteFiles<byte[]> write = WriteFiles.to(
               new TFRecordSink(
-                  getFilenamePrefix(),
-                  getFilenameSuffix(),
+                  getOutputPrefix(),
                   getShardTemplate(),
+                  getFilenameSuffix(),
                   getCompressionType()));
       if (getNumShards() > 0) {
         write = write.withNumShards(getNumShards());
@@ -359,20 +368,23 @@ public class TFRecordIO {
     public void populateDisplayData(DisplayData.Builder builder) {
       super.populateDisplayData(builder);
 
-      String prefixString = getFilenamePrefix().isAccessible()
-          ? getFilenamePrefix().get() : getFilenamePrefix().toString();
+      String outputPrefixString = null;
+      if (getOutputPrefix().isAccessible()) {
+        ResourceId dir = getOutputPrefix().get();
+        outputPrefixString = dir.toString();
+      } else {
+        outputPrefixString = getOutputPrefix().toString();
+      }
       builder
-          .addIfNotNull(DisplayData.item("filePrefix", prefixString)
+          .add(DisplayData.item("filePrefix", outputPrefixString)
               .withLabel("Output File Prefix"))
-          .addIfNotDefault(DisplayData.item("fileSuffix", getFilenameSuffix())
-              .withLabel("Output File Suffix"), "")
-          .addIfNotDefault(DisplayData.item("shardNameTemplate", getShardTemplate())
-                  .withLabel("Output Shard Name Template"),
-              DEFAULT_SHARD_TEMPLATE)
+          .addIfNotNull(DisplayData.item("fileSuffix", getFilenameSuffix())
+              .withLabel("Output File Suffix"))
+          .addIfNotNull(DisplayData.item("shardNameTemplate", getShardTemplate())
+                  .withLabel("Output Shard Name Template"))
           .addIfNotDefault(DisplayData.item("numShards", getNumShards())
               .withLabel("Maximum Output Shards"), 0)
-          .add(DisplayData
-              .item("compressionType", getCompressionType().toString())
+          .add(DisplayData.item("compressionType", getCompressionType().toString())
               .withLabel("Compression Type"));
     }
 
@@ -537,14 +549,24 @@ public class TFRecordIO {
   @VisibleForTesting
   static class TFRecordSink extends FileBasedSink<byte[]> {
     @VisibleForTesting
-    TFRecordSink(ValueProvider<String> baseOutputFilename,
-                 String extension,
-                 String fileNameTemplate,
-                 TFRecordIO.CompressionType compressionType) {
-      super(baseOutputFilename, extension, fileNameTemplate,
+    TFRecordSink(ValueProvider<ResourceId> outputPrefix,
+        @Nullable String shardTemplate,
+        @Nullable String suffix,
+        TFRecordIO.CompressionType compressionType) {
+      super(
+          outputPrefix,
+          DefaultFilenamePolicy.constructUsingStandardParameters(
+              outputPrefix, shardTemplate, suffix),
           writableByteChannelFactory(compressionType));
     }
 
+    private static class ExtractDirectory implements SerializableFunction<ResourceId, ResourceId> {
+      @Override
+      public ResourceId apply(ResourceId input) {
+        return input.getCurrentDirectory();
+      }
+    }
+
     @Override
     public FileBasedWriteOperation<byte[]> createWriteOperation() {
       return new TFRecordWriteOperation(this);
@@ -575,7 +597,7 @@ public class TFRecordIO {
       }
 
       @Override
-      public FileBasedWriter<byte[]> createWriter(PipelineOptions options) throws Exception {
+      public FileBasedWriter<byte[]> createWriter() throws Exception {
         return new TFRecordWriter(this);
       }
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/17358248/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index 6b08e1f..dbfaeee 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.io;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
 
 import com.google.auto.value.AutoValue;
 import javax.annotation.Nullable;
@@ -28,9 +29,12 @@ import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
 import org.apache.beam.sdk.io.FileBasedSink.WritableByteChannelFactory;
 import org.apache.beam.sdk.io.Read.Bounded;
+import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
@@ -39,16 +43,13 @@ import org.apache.beam.sdk.values.PDone;
 /**
  * {@link PTransform}s for reading and writing text files.
  *
- * <p>To read a {@link PCollection} from one or more text files, use {@link TextIO.Read}.
- * You can instantiate a transform using {@link TextIO.Read#from(String)} to specify
- * the path of the file(s) to read from (e.g., a local filename or
- * filename pattern if running locally, or a Google Cloud Storage
- * filename or filename pattern of the form
- * {@code "gs://<bucket>/<filepath>"}).
+ * <p>To read a {@link PCollection} from one or more text files, use {@code TextIO.read()} to
+ * instantiate a transform and use {@link TextIO.Read#from(String)} to specify the path of the
+ * file(s) to be read.
  *
- * <p>{@link TextIO.Read} returns a {@link PCollection} of {@link String Strings},
- * each corresponding to one line of an input UTF-8 text file (split into lines delimited by '\n',
- * '\r', or '\r\n').
+ * <p>{@link TextIO.Read} returns a {@link PCollection} of {@link String Strings}, each
+ * corresponding to one line of an input UTF-8 text file (split into lines delimited by '\n', '\r',
+ * or '\r\n').
  *
  * <p>Example:
  *
@@ -56,16 +57,11 @@ import org.apache.beam.sdk.values.PDone;
  * Pipeline p = ...;
  *
  * // A simple Read of a local file (only runs locally):
- * PCollection<String> lines =
- *     p.apply(TextIO.read().from("/local/path/to/file.txt"));
+ * PCollection<String> lines = p.apply(TextIO.read().from("/local/path/to/file.txt"));
  * }</pre>
  *
- * <p>To write a {@link PCollection} to one or more text files, use
- * {@link TextIO.Write}, specifying {@link TextIO.Write#to(String)} to specify
- * the path of the file to write to (e.g., a local filename or sharded
- * filename pattern if running locally, or a Google Cloud Storage
- * filename or sharded filename pattern of the form
- * {@code "gs://<bucket>/<filepath>"}).
+ * <p>To write a {@link PCollection} to one or more text files, use {@code TextIO.write()}, using
+ * {@link TextIO.Write#to(String)} to specify the output prefix of the files to write.
  *
  * <p>By default, all input is put into the global window before writing. If per-window writes are
  * desired - for example, when using a streaming runner -
@@ -75,8 +71,7 @@ import org.apache.beam.sdk.values.PDone;
  * runner-chosen value, so you may need not set it yourself. A {@link FilenamePolicy} must be
  * set, and unique windows and triggers must produce unique filenames.
  *
- * <p>Any existing files with the same names as generated output files
- * will be overwritten.
+ * <p>Any existing files with the same names as generated output files will be overwritten.
  *
  * <p>For example:
  * <pre>{@code
@@ -93,25 +88,27 @@ import org.apache.beam.sdk.values.PDone;
  */
 public class TextIO {
   /**
-   * Reads from one or more text files and returns a bounded {@link PCollection} containing one
-   * element for each line of the input files.
+   * A {@link PTransform} that reads from one or more text files and returns a bounded
+   * {@link PCollection} containing one element for each line of the input files.
    */
   public static Read read() {
     return new AutoValue_TextIO_Read.Builder().setCompressionType(CompressionType.AUTO).build();
   }
 
   /**
-   * A {@link PTransform} that writes a {@link PCollection} to text file (or
-   * multiple text files matching a sharding pattern), with each
-   * element of the input collection encoded into its own line.
+   * A {@link PTransform} that writes a {@link PCollection} to a text file (or multiple text files
+   * matching a sharding pattern), with each element of the input collection encoded into its own
+   * line.
    */
   public static Write write() {
     return new AutoValue_TextIO_Write.Builder()
-        .setFilenameSuffix("")
-        .setNumShards(0)
-        .setShardTemplate(Write.DEFAULT_SHARD_TEMPLATE)
+        .setFilenamePrefix(null)
+        .setShardTemplate(null)
+        .setFilenameSuffix(null)
+        .setFilenamePolicy(null)
         .setWritableByteChannelFactory(FileBasedSink.CompressionType.UNCOMPRESSED)
         .setWindowedWrites(false)
+        .setNumShards(0)
         .build();
   }
 
@@ -228,13 +225,11 @@ public class TextIO {
   /** Implementation of {@link #write}. */
   @AutoValue
   public abstract static class Write extends PTransform<PCollection<String>, PDone> {
-    private static final String DEFAULT_SHARD_TEMPLATE = ShardNameTemplate.INDEX_OF_MAX;
-
     /** The prefix of each file written, combined with suffix and shardTemplate. */
-    @Nullable abstract ValueProvider<String> getFilenamePrefix();
+    @Nullable abstract ValueProvider<ResourceId> getFilenamePrefix();
 
     /** The suffix of each file written, combined with prefix and shardTemplate. */
-    abstract String getFilenameSuffix();
+    @Nullable abstract String getFilenameSuffix();
 
     /** An optional header to add to each file. */
     @Nullable abstract String getHeader();
@@ -246,7 +241,7 @@ public class TextIO {
     abstract int getNumShards();
 
     /** The shard template of each file written, combined with prefix and suffix. */
-    abstract String getShardTemplate();
+    @Nullable abstract String getShardTemplate();
 
     /** A policy for naming output files. */
     @Nullable abstract FilenamePolicy getFilenamePolicy();
@@ -264,13 +259,13 @@ public class TextIO {
 
     @AutoValue.Builder
     abstract static class Builder {
-      abstract Builder setFilenamePrefix(ValueProvider<String> filenamePrefix);
-      abstract Builder setFilenameSuffix(String filenameSuffix);
-      abstract Builder setHeader(String header);
-      abstract Builder setFooter(String footer);
+      abstract Builder setFilenamePrefix(ValueProvider<ResourceId> filenamePrefix);
+      abstract Builder setShardTemplate(@Nullable String shardTemplate);
+      abstract Builder setFilenameSuffix(@Nullable String filenameSuffix);
+      abstract Builder setHeader(@Nullable String header);
+      abstract Builder setFooter(@Nullable String footer);
+      abstract Builder setFilenamePolicy(@Nullable FilenamePolicy filenamePolicy);
       abstract Builder setNumShards(int numShards);
-      abstract Builder setShardTemplate(String shardTemplate);
-      abstract Builder setFilenamePolicy(FilenamePolicy filenamePolicy);
       abstract Builder setWindowedWrites(boolean windowedWrites);
       abstract Builder setWritableByteChannelFactory(
           WritableByteChannelFactory writableByteChannelFactory);
@@ -279,72 +274,115 @@ public class TextIO {
     }
 
     /**
-     * Writes to text files with the given prefix. This can be a local filename
-     * (if running locally), or a Google Cloud Storage filename of
-     * the form {@code "gs://<bucket>/<filepath>"}
-     * (if running locally or using remote execution).
+     * Writes to text files with the given prefix. The given {@code prefix} can reference any
+     * {@link FileSystem} on the classpath.
+     *
+     * <p>The name of the output files will be determined by the {@link FilenamePolicy} used.
+     *
+     * <p>By default, a {@link DefaultFilenamePolicy} will be used built using the specified prefix
+     * to define the base output directory and file prefix, a shard identifier (see
+     * {@link #withNumShards(int)}), and a common suffix (if supplied using
+     * {@link #withSuffix(String)}).
      *
-     * <p>The files written will begin with this prefix, followed by
-     * a shard identifier (see {@link #withNumShards(int)}, and end
-     * in a common extension, if given by {@link #withSuffix(String)}.
+     * <p>This default policy can be overridden using {@link #withFilenamePolicy(FilenamePolicy)},
+     * in which case {@link #withShardNameTemplate(String)} and {@link #withSuffix(String)} should
+     * not be set.
      */
     public Write to(String filenamePrefix) {
-      return to(StaticValueProvider.of(filenamePrefix));
+      return to(FileBasedSink.convertToFileResourceIfPossible(filenamePrefix));
     }
 
-    /** Like {@link #to(String)}, but with a {@link ValueProvider}. */
-    public Write to(ValueProvider<String> filenamePrefix) {
-      return toBuilder().setFilenamePrefix(filenamePrefix).build();
+    /**
+     * Writes to text files with prefix from the given resource.
+     *
+     * <p>The name of the output files will be determined by the {@link FilenamePolicy} used.
+     *
+     * <p>By default, a {@link DefaultFilenamePolicy} will be used built using the specified prefix
+     * to define the base output directory and file prefix, a shard identifier (see
+     * {@link #withNumShards(int)}), and a common suffix (if supplied using
+     * {@link #withSuffix(String)}).
+     *
+     * <p>This default policy can be overridden using {@link #withFilenamePolicy(FilenamePolicy)},
+     * in which case {@link #withShardNameTemplate(String)} and {@link #withSuffix(String)} should
+     * not be set.
+     */
+    public Write to(ResourceId filenamePrefix) {
+      return toResource(StaticValueProvider.of(filenamePrefix));
     }
 
-    /** Like {@link #to(String)}, but with a {@link FilenamePolicy}. */
-    public Write to(FilenamePolicy filenamePolicy) {
-      return toBuilder().setFilenamePolicy(filenamePolicy).build();
+    /**
+     * Like {@link #to(String)}.
+     */
+    public Write to(ValueProvider<String> outputPrefix) {
+      return toResource(NestedValueProvider.of(outputPrefix,
+          new SerializableFunction<String, ResourceId>() {
+            @Override
+            public ResourceId apply(String input) {
+              return FileBasedSink.convertToFileResourceIfPossible(input);
+            }
+          }));
     }
 
     /**
-     * Writes to the file(s) with the given filename suffix.
-     *
-     * @see ShardNameTemplate
+     * Like {@link #to(ResourceId)}.
      */
-    public Write withSuffix(String nameExtension) {
-      return toBuilder().setFilenameSuffix(nameExtension).build();
+    public Write toResource(ValueProvider<ResourceId> filenamePrefix) {
+      return toBuilder().setFilenamePrefix(filenamePrefix).build();
     }
 
     /**
-     * Uses the provided shard count.
+     * Uses the given {@link ShardNameTemplate} for naming output files. This option may only be
+     * used when {@link #withFilenamePolicy(FilenamePolicy)} has not been configured.
      *
-     * <p>Constraining the number of shards is likely to reduce
-     * the performance of a pipeline. Setting this value is not recommended
-     * unless you require a specific number of output files.
+     * <p>See {@link DefaultFilenamePolicy} for how the prefix, shard name template, and suffix are
+     * used.
+     */
+    public Write withShardNameTemplate(String shardTemplate) {
+      return toBuilder().setShardTemplate(shardTemplate).build();
+    }
+
+    /**
+     * Configures the filename suffix for written files. This option may only be used when
+     * {@link #withFilenamePolicy(FilenamePolicy)} has not been configured.
      *
-     * @param numShards the number of shards to use, or 0 to let the system
-     *                  decide.
-     * @see ShardNameTemplate
+     * <p>See {@link DefaultFilenamePolicy} for how the prefix, shard name template, and suffix are
+     * used.
      */
-    public Write withNumShards(int numShards) {
-      checkArgument(numShards >= 0);
-      return toBuilder().setNumShards(numShards).build();
+    public Write withSuffix(String filenameSuffix) {
+      return toBuilder().setFilenameSuffix(filenameSuffix).build();
     }
 
     /**
-     * Uses the given shard name template.
+     * Configures the {@link FileBasedSink.FilenamePolicy} that will be used to name written files.
+     */
+    public Write withFilenamePolicy(FilenamePolicy filenamePolicy) {
+      return toBuilder().setFilenamePolicy(filenamePolicy).build();
+    }
+
+    /**
+     * Configures the number of output shards produced overall (when using unwindowed writes) or
+     * per-window (when using windowed writes).
+     *
+     * <p>For unwindowed writes, constraining the number of shards is likely to reduce the
+     * performance of a pipeline. Setting this value is not recommended unless you require a
+     * specific number of output files.
      *
-     * @see ShardNameTemplate
+     * @param numShards the number of shards to use, or 0 to let the system decide.
      */
-    public Write withShardNameTemplate(String shardTemplate) {
-      return toBuilder().setShardTemplate(shardTemplate).build();
+    public Write withNumShards(int numShards) {
+      checkArgument(numShards >= 0);
+      return toBuilder().setNumShards(numShards).build();
     }
 
     /**
-     * Forces a single file as output.
+     * Forces a single file as output and empty shard name template. This option is only compatible
+     * with unwindowed writes.
      *
-     * <p>Constraining the number of shards is likely to reduce
-     * the performance of a pipeline. Using this setting is not recommended
-     * unless you truly require a single output file.
+     * <p>For unwindowed writes, constraining the number of shards is likely to reduce the
+     * performance of a pipeline. Setting this value is not recommended unless you require a
+     * specific number of output files.
      *
-     * <p>This is a shortcut for
-     * {@code .withNumShards(1).withShardNameTemplate("")}
+     * <p>This is equivalent to {@code .withNumShards(1).withShardNameTemplate("")}
      */
     public Write withoutSharding() {
       return withNumShards(1).withShardNameTemplate("");
@@ -386,34 +424,26 @@ public class TextIO {
 
     @Override
     public PDone expand(PCollection<String> input) {
-      if (getFilenamePolicy() == null && getFilenamePrefix() == null) {
-        throw new IllegalStateException(
-            "need to set the filename prefix of an TextIO.Write transform");
-      }
-      if (getFilenamePolicy() != null && getFilenamePrefix() != null) {
-        throw new IllegalStateException(
-            "cannot set both a filename policy and a filename prefix");
-      }
-      WriteFiles<String> write;
-      if (getFilenamePolicy() != null) {
-        write =
-            WriteFiles.to(
-                new TextSink(
-                    getFilenamePolicy(),
-                    getHeader(),
-                    getFooter(),
-                    getWritableByteChannelFactory()));
-      } else {
-        write =
-            WriteFiles.to(
-                new TextSink(
-                    getFilenamePrefix(),
-                    getFilenameSuffix(),
-                    getHeader(),
-                    getFooter(),
-                    getShardTemplate(),
-                    getWritableByteChannelFactory()));
+      checkState(getFilenamePrefix() != null,
+          "Need to set the filename prefix of a TextIO.Write transform.");
+      checkState(
+          (getFilenamePolicy() == null)
+              || (getShardTemplate() == null && getFilenameSuffix() == null),
+          "Cannot set a filename policy and also a filename template or suffix.");
+
+      FilenamePolicy usedFilenamePolicy = getFilenamePolicy();
+      if (usedFilenamePolicy == null) {
+        usedFilenamePolicy = DefaultFilenamePolicy.constructUsingStandardParameters(
+            getFilenamePrefix(), getShardTemplate(), getFilenameSuffix());
       }
+      WriteFiles<String> write =
+          WriteFiles.to(
+              new TextSink(
+                  getFilenamePrefix(),
+                  usedFilenamePolicy,
+                  getHeader(),
+                  getFooter(),
+                  getWritableByteChannelFactory()));
       if (getNumShards() > 0) {
         write = write.withNumShards(getNumShards());
       }
@@ -430,16 +460,15 @@ public class TextIO {
       String prefixString = "";
       if (getFilenamePrefix() != null) {
         prefixString = getFilenamePrefix().isAccessible()
-            ? getFilenamePrefix().get() : getFilenamePrefix().toString();
+            ? getFilenamePrefix().get().toString() : getFilenamePrefix().toString();
       }
       builder
           .addIfNotNull(DisplayData.item("filePrefix", prefixString)
             .withLabel("Output File Prefix"))
-          .addIfNotDefault(DisplayData.item("fileSuffix", getFilenameSuffix())
-            .withLabel("Output File Suffix"), "")
-          .addIfNotDefault(DisplayData.item("shardNameTemplate", getShardTemplate())
-            .withLabel("Output Shard Name Template"),
-              DEFAULT_SHARD_TEMPLATE)
+          .addIfNotNull(DisplayData.item("fileSuffix", getFilenameSuffix())
+            .withLabel("Output File Suffix"))
+          .addIfNotNull(DisplayData.item("shardNameTemplate", getShardTemplate())
+            .withLabel("Output Shard Name Template"))
           .addIfNotDefault(DisplayData.item("numShards", getNumShards())
             .withLabel("Maximum Output Shards"), 0)
           .addIfNotNull(DisplayData.item("fileHeader", getHeader())

http://git-wip-us.apache.org/repos/asf/beam/blob/17358248/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSink.java
index 4efdc32..0ba537e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSink.java
@@ -23,7 +23,7 @@ import java.nio.channels.Channels;
 import java.nio.channels.WritableByteChannel;
 import java.nio.charset.StandardCharsets;
 import javax.annotation.Nullable;
-import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.util.MimeTypes;
 
@@ -39,27 +39,15 @@ class TextSink extends FileBasedSink<String> {
   @Nullable private final String footer;
 
   TextSink(
+      ValueProvider<ResourceId> baseOutputFilename,
       FilenamePolicy filenamePolicy,
       @Nullable String header,
       @Nullable String footer,
       WritableByteChannelFactory writableByteChannelFactory) {
-    super(filenamePolicy, writableByteChannelFactory);
+    super(baseOutputFilename, filenamePolicy, writableByteChannelFactory);
     this.header = header;
     this.footer = footer;
   }
-
-  TextSink(
-      ValueProvider<String> baseOutputFilename,
-      String extension,
-      @Nullable String header,
-      @Nullable String footer,
-      String fileNameTemplate,
-      WritableByteChannelFactory writableByteChannelFactory) {
-    super(baseOutputFilename, extension, fileNameTemplate, writableByteChannelFactory);
-    this.header = header;
-    this.footer = footer;
-  }
-
   @Override
   public FileBasedWriteOperation<String> createWriteOperation() {
     return new TextWriteOperation(this, header, footer);
@@ -77,7 +65,7 @@ class TextSink extends FileBasedSink<String> {
     }
 
     @Override
-    public FileBasedWriter<String> createWriter(PipelineOptions options) throws Exception {
+    public FileBasedWriter<String> createWriter() throws Exception {
       return new TextWriter(this, header, footer);
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/17358248/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
index dcd600f..2a057e4 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
@@ -34,6 +34,7 @@ import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.FileBasedSink.FileBasedWriteOperation;
 import org.apache.beam.sdk.io.FileBasedSink.FileBasedWriter;
 import org.apache.beam.sdk.io.FileBasedSink.FileResult;
+import org.apache.beam.sdk.io.FileBasedSink.FileResultCoder;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
@@ -254,7 +255,7 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> {
       // Lazily initialize the Writer
       if (writer == null) {
         LOG.info("Opening writer for write operation {}", writeOperation);
-        writer = writeOperation.createWriter(c.getPipelineOptions());
+        writer = writeOperation.createWriter();
 
         if (windowedWrites) {
           writer.openWindowed(UUID.randomUUID().toString(), window, c.pane(), UNKNOWN_SHARDNUM,
@@ -318,7 +319,7 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> {
       // In a sharded write, single input element represents one shard. We can open and close
       // the writer in each call to processElement.
       LOG.info("Opening writer for write operation {}", writeOperation);
-      FileBasedWriter<T> writer = writeOperation.createWriter(c.getPipelineOptions());
+      FileBasedWriter<T> writer = writeOperation.createWriter();
       if (windowedWrites) {
         writer.openWindowed(UUID.randomUUID().toString(), window, c.pane(), c.element().getKey(),
             numShards);
@@ -474,7 +475,7 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> {
                 ParDo.of(new WriteShardedBundles(null)));
       }
     }
-    results.setCoder(writeOperation.getFileResultCoder());
+    results.setCoder(FileResultCoder.of());
 
     if (windowedWrites) {
       // When processing streaming windowed writes, results will arrive multiple times. This
@@ -484,7 +485,7 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> {
       // whenever new data arrives.
       PCollection<KV<Void, FileResult>> keyedResults =
           results.apply("AttachSingletonKey", WithKeys.<Void, FileResult>of((Void) null));
-      keyedResults.setCoder(KvCoder.of(VoidCoder.of(), writeOperation.getFileResultCoder()));
+      keyedResults.setCoder(KvCoder.of(VoidCoder.of(), FileResultCoder.of()));
 
       // Is the continuation trigger sufficient?
       keyedResults
@@ -494,7 +495,7 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> {
             public void processElement(ProcessContext c) throws Exception {
               LOG.info("Finalizing write operation {}.", writeOperation);
               List<FileResult> results = Lists.newArrayList(c.element().getValue());
-              writeOperation.finalize(results, c.getPipelineOptions());
+              writeOperation.finalize(results);
               LOG.debug("Done finalizing write operation {}", writeOperation);
             }
           }));
@@ -540,7 +541,7 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> {
                     "Creating {} empty output shards in addition to {} written for a total of {}.",
                     extraShardsNeeded, results.size(), minShardsNeeded);
                 for (int i = 0; i < extraShardsNeeded; ++i) {
-                  FileBasedWriter<T> writer = writeOperation.createWriter(c.getPipelineOptions());
+                  FileBasedWriter<T> writer = writeOperation.createWriter();
                   writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM,
                       UNKNOWN_NUMSHARDS);
                   FileResult emptyWrite = writer.close();
@@ -548,7 +549,7 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> {
                 }
                 LOG.debug("Done creating extra shards.");
               }
-              writeOperation.finalize(results, c.getPipelineOptions());
+              writeOperation.finalize(results);
               LOG.debug("Done finalizing write operation {}", writeOperation);
             }
           }).withSideInputs(sideInputs.build()));

http://git-wip-us.apache.org/repos/asf/beam/blob/17358248/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelUtils.java
index 0d91bbc..33913f8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelUtils.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelUtils.java
@@ -41,7 +41,7 @@ import java.util.Set;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import javax.annotation.Nonnull;
-import org.apache.beam.sdk.io.FileBasedSink;
+import org.apache.beam.sdk.io.DefaultFilenamePolicy;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.util.common.ReflectHelpers;
 
@@ -197,7 +197,7 @@ public class IOChannelUtils {
   public static WritableByteChannel create(String prefix, String shardTemplate,
       String suffix, int numShards, String mimeType) throws IOException {
     if (numShards == 1) {
-      return create(FileBasedSink.constructName(prefix, shardTemplate, suffix, 0, 1),
+      return create(DefaultFilenamePolicy.constructName(prefix, shardTemplate, suffix, 0, 1),
                     mimeType);
     }
 
@@ -209,7 +209,7 @@ public class IOChannelUtils {
     Set<String> outputNames = new HashSet<>();
     for (int i = 0; i < numShards; i++) {
       String outputName =
-          FileBasedSink.constructName(prefix, shardTemplate, suffix, i, numShards);
+          DefaultFilenamePolicy.constructName(prefix, shardTemplate, suffix, i, numShards);
       if (!outputNames.add(outputName)) {
         throw new IllegalArgumentException(
             "Shard name collision detected for: " + outputName);

http://git-wip-us.apache.org/repos/asf/beam/blob/17358248/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NoopPathValidator.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NoopPathValidator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NoopPathValidator.java
index feee6a0..1f3f5a8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NoopPathValidator.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NoopPathValidator.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.sdk.util;
 
+import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.options.PipelineOptions;
 
 /**
@@ -33,14 +34,13 @@ public class NoopPathValidator implements PathValidator {
   }
 
   @Override
-  public String validateInputFilePatternSupported(String filepattern) {
-    return filepattern;
-  }
+  public void validateInputFilePatternSupported(String filepattern) {}
 
   @Override
-  public String validateOutputFilePrefixSupported(String filePrefix) {
-    return filePrefix;
-  }
+  public void validateOutputFilePrefixSupported(String filePrefix) {}
+
+  @Override
+  public void validateOutputResourceSupported(ResourceId resourceId) {}
 
   @Override
   public String verifyPath(String path) {

http://git-wip-us.apache.org/repos/asf/beam/blob/17358248/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NumberedShardedFile.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NumberedShardedFile.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NumberedShardedFile.java
index 786cdcb..e18dd96 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NumberedShardedFile.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NumberedShardedFile.java
@@ -26,7 +26,6 @@ import com.google.api.client.util.BackOffUtils;
 import com.google.api.client.util.Sleeper;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Strings;
-import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.hash.HashCode;
@@ -38,6 +37,7 @@ import java.nio.channels.Channels;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -120,7 +120,7 @@ public class NumberedShardedFile implements ShardedFile {
       try {
         // Match inputPath which may contains glob
         Collection<Metadata> files = Iterables.getOnlyElement(
-            FileSystems.match(ImmutableList.of(filePattern))).metadata();
+            FileSystems.match(Collections.singletonList(filePattern))).metadata();
 
         LOG.debug("Found {} file(s) by matching the path: {}", files.size(), filePattern);
 

http://git-wip-us.apache.org/repos/asf/beam/blob/17358248/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PathValidator.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PathValidator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PathValidator.java
index a7ee16e..e69648b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PathValidator.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PathValidator.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.sdk.util;
 
+import org.apache.beam.sdk.io.fs.ResourceId;
+
 /**
  * Interface for controlling validation of paths.
  */
@@ -25,17 +27,22 @@ public interface PathValidator {
    * Validate that a file pattern is conforming.
    *
    * @param filepattern The file pattern to verify.
-   * @return The post-validation filepattern.
    */
-  String validateInputFilePatternSupported(String filepattern);
+  void validateInputFilePatternSupported(String filepattern);
 
   /**
    * Validate that an output file prefix is conforming.
    *
    * @param filePrefix the file prefix to verify.
-   * @return The post-validation filePrefix.
    */
-  String validateOutputFilePrefixSupported(String filePrefix);
+  void validateOutputFilePrefixSupported(String filePrefix);
+
+  /**
+   * Validates that an output path is conforming.
+   *
+   * @param resourceId the file prefix to verify.
+   */
+  void validateOutputResourceSupported(ResourceId resourceId);
 
   /**
    * Validate that a path is a valid path and that the path

http://git-wip-us.apache.org/repos/asf/beam/blob/17358248/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
index 5991c96..1506aa9 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.sdk.io;
 
+import static com.google.common.base.MoreObjects.firstNonNull;
 import static org.apache.avro.file.DataFileConstants.SNAPPY_CODEC;
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
 import static org.hamcrest.Matchers.containsInAnyOrder;
@@ -34,6 +35,8 @@ import com.google.common.collect.Lists;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -52,9 +55,9 @@ import org.apache.avro.reflect.ReflectDatumReader;
 import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.coders.DefaultCoder;
 import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
+import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
+import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.ValueProvider;
-import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
@@ -278,33 +281,31 @@ public class AvroIOTest {
   }
 
   private static class WindowedFilenamePolicy extends FilenamePolicy {
-    String outputFilePrefix;
+    final String outputFilePrefix;
 
     WindowedFilenamePolicy(String outputFilePrefix) {
       this.outputFilePrefix = outputFilePrefix;
     }
 
     @Override
-    public ValueProvider<String> getBaseOutputFilenameProvider() {
-      return StaticValueProvider.of(outputFilePrefix);
+    public ResourceId windowedFilename(
+        ResourceId outputDirectory, WindowedContext input, String extension) {
+      String filename = String.format(
+          "%s-%s-%s-of-%s-pane-%s%s%s",
+          outputFilePrefix,
+          input.getWindow(),
+          input.getShardNumber(),
+          input.getNumShards() - 1,
+          input.getPaneInfo().getIndex(),
+          input.getPaneInfo().isLast() ? "-final" : "",
+          extension);
+      return outputDirectory.resolve(filename, StandardResolveOptions.RESOLVE_FILE);
     }
 
     @Override
-    public String windowedFilename(WindowedContext input) {
-      String filename = outputFilePrefix + "-" + input.getWindow().toString() +  "-"
-          + input.getShardNumber() + "-of-" + (input.getNumShards() - 1) + "-pane-"
-          + input.getPaneInfo().getIndex();
-      if (input.getPaneInfo().isLast()) {
-        filename += "-final";
-      }
-      return filename;
-    }
-
-    @Override
-    public String unwindowedFilename(Context input) {
-      String filename = outputFilePrefix + input.getShardNumber() + "-of-"
-          + (input.getNumShards() - 1);
-      return filename;
+    public ResourceId unwindowedFilename(
+        ResourceId outputDirectory, Context input, String extension) {
+      throw new UnsupportedOperationException("Expecting windowed outputs only");
     }
 
     @Override
@@ -320,8 +321,8 @@ public class AvroIOTest {
   @Test
   @Category({ValidatesRunner.class, UsesTestStream.class})
   public void testWindowedAvroIOWrite() throws Throwable {
-    File baseOutputFile = new File(tmpFolder.getRoot(), "prefix");
-    final String outputFilePrefix = baseOutputFile.getAbsolutePath();
+    Path baseDir = Files.createTempDirectory(tmpFolder.getRoot().toPath(), "testwrite");
+    String baseFilename = baseDir.resolve("prefix").toString();
 
     Instant base = new Instant(0);
     ArrayList<GenericClass> allElements = new ArrayList<>();
@@ -349,7 +350,6 @@ public class AvroIOTest {
           secondWindowTimestamps.get(random.nextInt(secondWindowTimestamps.size()))));
     }
 
-
     TimestampedValue<GenericClass>[] firstWindowArray =
         firstWindowElements.toArray(new TimestampedValue[100]);
     TimestampedValue<GenericClass>[] secondWindowArray =
@@ -364,11 +364,13 @@ public class AvroIOTest {
         Arrays.copyOfRange(secondWindowArray, 1, secondWindowArray.length))
         .advanceWatermarkToInfinity();
 
+    FilenamePolicy policy = new WindowedFilenamePolicy(baseFilename);
     windowedAvroWritePipeline
         .apply(values)
         .apply(Window.<GenericClass>into(FixedWindows.of(Duration.standardMinutes(1))))
         .apply(AvroIO.write(GenericClass.class)
-            .to(new WindowedFilenamePolicy(outputFilePrefix))
+            .to(baseFilename)
+            .withFilenamePolicy(policy)
             .withWindowedWrites()
             .withNumShards(2));
     windowedAvroWritePipeline.run();
@@ -381,7 +383,7 @@ public class AvroIOTest {
         IntervalWindow intervalWindow = new IntervalWindow(
             windowStart, Duration.standardMinutes(1));
         expectedFiles.add(
-            new File(outputFilePrefix + "-" + intervalWindow.toString() + "-" + shard
+            new File(baseFilename + "-" + intervalWindow.toString() + "-" + shard
                 + "-of-1" + "-pane-0-final"));
       }
     }
@@ -442,7 +444,7 @@ public class AvroIOTest {
   @Test
   @SuppressWarnings("unchecked")
   @Category(NeedsRunner.class)
-  public void testMetdata() throws Exception {
+  public void testMetadata() throws Exception {
     List<GenericClass> values = ImmutableList.of(new GenericClass(3, "hi"),
         new GenericClass(5, "bar"));
     File outputFile = tmpFolder.newFile("output.avro");
@@ -481,7 +483,8 @@ public class AvroIOTest {
     p.apply(Create.of(ImmutableList.copyOf(expectedElements))).apply(write);
     p.run();
 
-    String shardNameTemplate = write.getShardTemplate();
+    String shardNameTemplate =
+        firstNonNull(write.getShardTemplate(), DefaultFilenamePolicy.DEFAULT_SHARD_TEMPLATE);
 
     assertTestOutputs(expectedElements, numShards, outputFilePrefix, shardNameTemplate);
   }
@@ -494,7 +497,7 @@ public class AvroIOTest {
     for (int i = 0; i < numShards; i++) {
       expectedFiles.add(
           new File(
-              FileBasedSink.constructName(
+              DefaultFilenamePolicy.constructName(
                   outputFilePrefix, shardNameTemplate, "" /* no suffix */, i, numShards)));
     }
 
@@ -530,10 +533,10 @@ public class AvroIOTest {
 
   @Test
   public void testReadDisplayData() {
-    AvroIO.Read<String> read = AvroIO.read(String.class).from("foo.*");
+    AvroIO.Read<String> read = AvroIO.read(String.class).from("/foo.*");
 
     DisplayData displayData = DisplayData.from(read);
-    assertThat(displayData, hasDisplayItem("filePattern", "foo.*"));
+    assertThat(displayData, hasDisplayItem("filePattern", "/foo.*"));
   }
 
   @Test
@@ -542,7 +545,7 @@ public class AvroIOTest {
     DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
 
     AvroIO.Read<GenericRecord> read =
-        AvroIO.readGenericRecords(Schema.create(Schema.Type.STRING)).from("foo.*");
+        AvroIO.readGenericRecords(Schema.create(Schema.Type.STRING)).from("/foo.*");
 
     Set<DisplayData> displayData = evaluator.displayDataForPrimitiveSourceTransforms(read);
     assertThat("AvroIO.Read should include the file pattern in its primitive transform",

http://git-wip-us.apache.org/repos/asf/beam/blob/17358248/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DefaultFilenamePolicyTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DefaultFilenamePolicyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DefaultFilenamePolicyTest.java
new file mode 100644
index 0000000..c895da8
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DefaultFilenamePolicyTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io;
+
+import static org.apache.beam.sdk.io.DefaultFilenamePolicy.constructName;
+import static org.junit.Assert.assertEquals;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests of {@link DefaultFilenamePolicy}.
+ */
+@RunWith(JUnit4.class)
+public class DefaultFilenamePolicyTest {
+  @Test
+  public void testConstructName() {
+    assertEquals("output-001-of-123.txt",
+        constructName("output", "-SSS-of-NNN", ".txt", 1, 123));
+
+    assertEquals("out.txt/part-00042",
+        constructName("out.txt", "/part-SSSSS", "", 42, 100));
+
+    assertEquals("out.txt",
+        constructName("ou", "t.t", "xt", 1, 1));
+
+    assertEquals("out0102shard.txt",
+        constructName("out", "SSNNshard", ".txt", 1, 2));
+
+    assertEquals("out-2/1.part-1-of-2.txt",
+        constructName("out", "-N/S.part-S-of-N", ".txt", 1, 2));
+  }
+
+  @Test
+  public void testConstructNameWithLargeShardCount() {
+    assertEquals("out-100-of-5000.txt",
+        constructName("out", "-SS-of-NN", ".txt", 100, 5000));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/17358248/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
index 7efe47c..d9bcef4 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
@@ -17,9 +17,10 @@
  */
 package org.apache.beam.sdk.io;
 
-import static org.apache.beam.sdk.io.FileBasedSink.constructName;
+import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -37,7 +38,6 @@ import java.nio.ByteBuffer;
 import java.nio.channels.Channels;
 import java.nio.channels.WritableByteChannel;
 import java.nio.charset.StandardCharsets;
-import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -52,9 +52,8 @@ import org.apache.beam.sdk.io.FileBasedSink.FileResult;
 import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
 import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy.Context;
 import org.apache.beam.sdk.io.FileBasedSink.WritableByteChannelFactory;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.util.IOChannelUtils;
+import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
+import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
 import org.apache.commons.compress.compressors.deflate.DeflateCompressorInputStream;
 import org.junit.Rule;
@@ -64,50 +63,28 @@ import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
 /**
- * Tests for FileBasedSink.
+ * Tests for {@link FileBasedSink}.
  */
 @RunWith(JUnit4.class)
 public class FileBasedSinkTest {
   @Rule
   public TemporaryFolder tmpFolder = new TemporaryFolder();
 
-  private String baseOutputFilename = "output";
-  private String tempDirectory = "temp";
+  private final String tempDirectoryName = "temp";
 
-  private String appendToTempFolder(String filename) {
-    return Paths.get(tmpFolder.getRoot().getPath(), filename).toString();
+  private ResourceId getTemporaryFolder() {
+    return LocalResources.fromFile(tmpFolder.getRoot(), /* isDirectory */ true);
   }
 
-  private String getBaseOutputFilename() {
-    return appendToTempFolder(baseOutputFilename);
+  private ResourceId getBaseOutputDirectory() {
+    String baseOutputDirname = "output";
+    return getTemporaryFolder()
+        .resolve(baseOutputDirname, StandardResolveOptions.RESOLVE_DIRECTORY);
   }
 
-  private String getBaseTempDirectory() {
-    return appendToTempFolder(tempDirectory);
-  }
-
-  @Test
-  public void testConstructName() {
-    assertEquals("output-001-of-123.txt",
-        constructName("output", "-SSS-of-NNN", ".txt", 1, 123));
-
-    assertEquals("out.txt/part-00042",
-        constructName("out.txt", "/part-SSSSS", "", 42, 100));
-
-    assertEquals("out.txt",
-        constructName("ou", "t.t", "xt", 1, 1));
-
-    assertEquals("out0102shard.txt",
-        constructName("out", "SSNNshard", ".txt", 1, 2));
-
-    assertEquals("out-2/1.part-1-of-2.txt",
-        constructName("out", "-N/S.part-S-of-N", ".txt", 1, 2));
-  }
-
-  @Test
-  public void testConstructNameWithLargeShardCount() {
-    assertEquals("out-100-of-5000.txt",
-        constructName("out", "-SS-of-NN", ".txt", 100, 5000));
+  private ResourceId getBaseTempDirectory() {
+    return getTemporaryFolder()
+        .resolve(tempDirectoryName, StandardResolveOptions.RESOLVE_DIRECTORY);
   }
 
   /**
@@ -117,30 +94,31 @@ public class FileBasedSinkTest {
   @Test
   public void testWriter() throws Exception {
     String testUid = "testId";
-    String expectedFilename = IOChannelUtils.resolve(getBaseTempDirectory(), testUid);
-    SimpleSink.SimpleWriter writer = buildWriter();
-
+    ResourceId expectedFile = getBaseTempDirectory()
+        .resolve(testUid, StandardResolveOptions.RESOLVE_FILE);
     List<String> values = Arrays.asList("sympathetic vulture", "boresome hummingbird");
     List<String> expected = new ArrayList<>();
     expected.add(SimpleSink.SimpleWriter.HEADER);
     expected.addAll(values);
     expected.add(SimpleSink.SimpleWriter.FOOTER);
 
+    SimpleSink.SimpleWriter writer =
+        buildWriteOperationWithTempDir(getBaseTempDirectory()).createWriter();
     writer.openUnwindowed(testUid, -1, -1);
     for (String value : values) {
       writer.write(value);
     }
     FileResult result = writer.close();
 
-    assertEquals(expectedFilename, result.getFilename());
-    assertFileContains(expected, expectedFilename);
+    assertEquals(expectedFile, result.getFilename());
+    assertFileContains(expected, expectedFile);
   }
 
   /**
    * Assert that a file contains the lines provided, in the same order as expected.
    */
-  private void assertFileContains(List<String> expected, String filename) throws Exception {
-    try (BufferedReader reader = new BufferedReader(new FileReader(filename))) {
+  private void assertFileContains(List<String> expected, ResourceId file) throws Exception {
+    try (BufferedReader reader = new BufferedReader(new FileReader(file.toString()))) {
       List<String> actual = new ArrayList<>();
       for (;;) {
         String line = reader.readLine();
@@ -149,7 +127,7 @@ public class FileBasedSinkTest {
         }
         actual.add(line);
       }
-      assertEquals(expected, actual);
+      assertEquals("contents for " + file, expected, actual);
     }
   }
 
@@ -165,19 +143,11 @@ public class FileBasedSinkTest {
   }
 
   /**
-   * Removes temporary files when temporary and output filenames differ.
+   * Removes temporary files when temporary and output directories differ.
    */
   @Test
   public void testRemoveWithTempFilename() throws Exception {
-    testRemoveTemporaryFiles(3, tempDirectory);
-  }
-
-  /**
-   * Removes only temporary files, even if temporary and output files share the same base filename.
-   */
-  @Test
-  public void testRemoveWithSameFilename() throws Exception {
-    testRemoveTemporaryFiles(3, baseOutputFilename);
+    testRemoveTemporaryFiles(3, getBaseTempDirectory());
   }
 
   /**
@@ -205,13 +175,13 @@ public class FileBasedSinkTest {
    */
   @Test
   public void testFinalizeWithIntermediateState() throws Exception {
-    List<File> files = generateTemporaryFilesForFinalize(3);
     SimpleSink.SimpleWriteOperation writeOp = buildWriteOperation();
+    List<File> files = generateTemporaryFilesForFinalize(3);
     runFinalize(writeOp, files);
 
-    // create a temporary file
-    tmpFolder.newFolder(tempDirectory);
-    tmpFolder.newFile(tempDirectory + "/1");
+    // create a temporary file and then rerun finalize
+    tmpFolder.newFolder(tempDirectoryName);
+    tmpFolder.newFile(tempDirectoryName + "/1");
 
     runFinalize(writeOp, files);
   }
@@ -222,9 +192,9 @@ public class FileBasedSinkTest {
   private List<File> generateTemporaryFilesForFinalize(int numFiles) throws Exception {
     List<File> temporaryFiles = new ArrayList<>();
     for (int i = 0; i < numFiles; i++) {
-      String temporaryFilename =
-          FileBasedWriteOperation.buildTemporaryFilename(tempDirectory, "" + i);
-      File tmpFile = new File(tmpFolder.getRoot(), temporaryFilename);
+      ResourceId temporaryFile =
+          FileBasedWriteOperation.buildTemporaryFilename(getBaseTempDirectory(), "" + i);
+      File tmpFile = new File(tmpFolder.getRoot(), temporaryFile.toString());
       tmpFile.getParentFile().mkdirs();
       assertTrue(tmpFile.createNewFile());
       temporaryFiles.add(tmpFile);
@@ -238,26 +208,26 @@ public class FileBasedSinkTest {
    */
   private void runFinalize(SimpleSink.SimpleWriteOperation writeOp, List<File> temporaryFiles)
       throws Exception {
-    PipelineOptions options = PipelineOptionsFactory.create();
-
     int numFiles = temporaryFiles.size();
 
     List<FileResult> fileResults = new ArrayList<>();
     // Create temporary output bundles and output File objects.
-    for (int i = 0; i < numFiles; i++) {
-      fileResults.add(new FileResult(temporaryFiles.get(i).toString(), null));
+    for (File f : temporaryFiles) {
+      ResourceId file = LocalResources.fromFile(f, false);
+      fileResults.add(new FileResult(file, null));
     }
 
-    writeOp.finalize(fileResults, options);
+    writeOp.finalize(fileResults);
 
+    ResourceId outputDirectory = writeOp.getSink().getBaseOutputDirectoryProvider().get();
     for (int i = 0; i < numFiles; i++) {
-      String outputFilename = writeOp.getSink().getFileNamePolicy().unwindowedFilename(
-          new Context(i, numFiles));
-      assertTrue(new File(outputFilename).exists());
+      ResourceId outputFilename = writeOp.getSink().getFilenamePolicy()
+          .unwindowedFilename(outputDirectory, new Context(i, numFiles), "");
+      assertTrue(new File(outputFilename.toString()).exists());
       assertFalse(temporaryFiles.get(i).exists());
     }
 
-    assertFalse(new File(writeOp.tempDirectory.get()).exists());
+    assertFalse(new File(writeOp.tempDirectory.get().toString()).exists());
     // Test that repeated requests of the temp directory return a stable result.
     assertEquals(writeOp.tempDirectory.get(), writeOp.tempDirectory.get());
   }
@@ -266,28 +236,43 @@ public class FileBasedSinkTest {
    * Create n temporary and output files and verify that removeTemporaryFiles only
    * removes temporary files.
    */
-  private void testRemoveTemporaryFiles(int numFiles, String baseTemporaryFilename)
+  private void testRemoveTemporaryFiles(int numFiles, ResourceId tempDirectory)
       throws Exception {
-    PipelineOptions options = PipelineOptionsFactory.create();
-    SimpleSink.SimpleWriteOperation writeOp = buildWriteOperation(baseTemporaryFilename);
+    String prefix = "file";
+    SimpleSink sink =
+        new SimpleSink(getBaseOutputDirectory(), prefix, "", "");
+
+    FileBasedWriteOperation<String> writeOp =
+        new SimpleSink.SimpleWriteOperation(sink, tempDirectory);
 
     List<File> temporaryFiles = new ArrayList<>();
     List<File> outputFiles = new ArrayList<>();
     for (int i = 0; i < numFiles; i++) {
-      File tmpFile = new File(tmpFolder.getRoot(),
-          FileBasedWriteOperation.buildTemporaryFilename(baseTemporaryFilename, "" + i));
+      ResourceId tempResource =
+          FileBasedWriteOperation.buildTemporaryFilename(tempDirectory, prefix + i);
+      File tmpFile = new File(tempResource.toString());
       tmpFile.getParentFile().mkdirs();
-      assertTrue(tmpFile.createNewFile());
+      assertTrue("not able to create new temp file", tmpFile.createNewFile());
       temporaryFiles.add(tmpFile);
-      File outputFile = tmpFolder.newFile(baseOutputFilename + i);
+      ResourceId outputFileId =
+          getBaseOutputDirectory().resolve(prefix + i, StandardResolveOptions.RESOLVE_FILE);
+      File outputFile = new File(outputFileId.toString());
+      outputFile.getParentFile().mkdirs();
+      assertTrue("not able to create new output file", outputFile.createNewFile());
       outputFiles.add(outputFile);
     }
 
-    writeOp.removeTemporaryFiles(Collections.<String>emptySet(), true, options);
+    writeOp.removeTemporaryFiles(Collections.<ResourceId>emptySet(), true);
 
     for (int i = 0; i < numFiles; i++) {
-      assertFalse(temporaryFiles.get(i).exists());
-      assertTrue(outputFiles.get(i).exists());
+      File temporaryFile = temporaryFiles.get(i);
+      assertThat(
+          String.format("temp file %s exists", temporaryFile),
+          temporaryFile.exists(), is(false));
+      File outputFile = outputFiles.get(i);
+      assertThat(
+          String.format("output file %s exists", outputFile),
+          outputFile.exists(), is(true));
     }
   }
 
@@ -296,111 +281,79 @@ public class FileBasedSinkTest {
    */
   @Test
   public void testCopyToOutputFiles() throws Exception {
-    PipelineOptions options = PipelineOptionsFactory.create();
     SimpleSink.SimpleWriteOperation writeOp = buildWriteOperation();
+    ResourceId outputDirectory = writeOp.getSink().getBaseOutputDirectoryProvider().get();
 
     List<String> inputFilenames = Arrays.asList("input-1", "input-2", "input-3");
     List<String> inputContents = Arrays.asList("1", "2", "3");
     List<String> expectedOutputFilenames = Arrays.asList(
-        "output-00000-of-00003.test", "output-00001-of-00003.test", "output-00002-of-00003.test");
+        "file-00-of-03.test", "file-01-of-03.test", "file-02-of-03.test");
 
-    Map<String, String> inputFilePaths = new HashMap<>();
-    List<String> expectedOutputPaths = new ArrayList<>();
+    Map<ResourceId, ResourceId> inputFilePaths = new HashMap<>();
+    List<ResourceId> expectedOutputPaths = new ArrayList<>();
 
     for (int i = 0; i < inputFilenames.size(); i++) {
       // Generate output paths.
-      File outputFile = tmpFolder.newFile(expectedOutputFilenames.get(i));
-      expectedOutputPaths.add(outputFile.toString());
+      expectedOutputPaths.add(
+          getBaseOutputDirectory()
+              .resolve(expectedOutputFilenames.get(i), StandardResolveOptions.RESOLVE_FILE));
 
       // Generate and write to input paths.
       File inputTmpFile = tmpFolder.newFile(inputFilenames.get(i));
-      List<String> lines = Arrays.asList(inputContents.get(i));
+      List<String> lines = Collections.singletonList(inputContents.get(i));
       writeFile(lines, inputTmpFile);
-      inputFilePaths.put(inputTmpFile.toString(),
-          writeOp.getSink().getFileNamePolicy().unwindowedFilename(
-              new Context(i, inputFilenames.size())));
+      inputFilePaths.put(LocalResources.fromFile(inputTmpFile, false),
+          writeOp.getSink().getFilenamePolicy()
+              .unwindowedFilename(outputDirectory, new Context(i, inputFilenames.size()), ""));
     }
 
     // Copy input files to output files.
-    writeOp.copyToOutputFiles(inputFilePaths, options);
+    writeOp.copyToOutputFiles(inputFilePaths);
 
     // Assert that the contents were copied.
     for (int i = 0; i < expectedOutputPaths.size(); i++) {
-      assertFileContains(Arrays.asList(inputContents.get(i)), expectedOutputPaths.get(i));
+      assertFileContains(
+          Collections.singletonList(inputContents.get(i)), expectedOutputPaths.get(i));
     }
   }
 
-  public List<String> generateDestinationFilenames(FilenamePolicy policy, int numFiles) {
-    List<String> filenames = new ArrayList<>();
+  public List<ResourceId> generateDestinationFilenames(
+      ResourceId outputDirectory, FilenamePolicy policy, int numFiles) {
+    List<ResourceId> filenames = new ArrayList<>();
     for (int i = 0; i < numFiles; i++) {
-      filenames.add(policy.unwindowedFilename(new Context(i, numFiles)));
+      filenames.add(policy.unwindowedFilename(outputDirectory, new Context(i, numFiles), ""));
     }
     return filenames;
   }
 
   /**
-   * Output filenames use the supplied naming template.
-   */
-  @Test
-  public void testGenerateOutputFilenamesWithTemplate() {
-    List<String> expected;
-    List<String> actual;
-    SimpleSink sink = new SimpleSink(getBaseOutputFilename(), "test", ".SS.of.NN");
-    FilenamePolicy policy = sink.getFileNamePolicy();
-
-    expected = Arrays.asList(appendToTempFolder("output.00.of.03.test"),
-        appendToTempFolder("output.01.of.03.test"), appendToTempFolder("output.02.of.03.test"));
-    actual = generateDestinationFilenames(policy, 3);
-    assertEquals(expected, actual);
-
-    expected = Arrays.asList(appendToTempFolder("output.00.of.01.test"));
-    actual = generateDestinationFilenames(policy, 1);
-    assertEquals(expected, actual);
-
-    expected = new ArrayList<>();
-    actual = generateDestinationFilenames(policy, 0);
-    assertEquals(expected, actual);
-
-    // Also validate that we handle the case where the user specified "." that we do
-    // not prefix an additional "." making "..test"
-    sink = new SimpleSink(getBaseOutputFilename(), ".test", ".SS.of.NN");
-    expected = Arrays.asList(appendToTempFolder("output.00.of.03.test"),
-        appendToTempFolder("output.01.of.03.test"), appendToTempFolder("output.02.of.03.test"));
-    actual = generateDestinationFilenames(policy, 3);
-    assertEquals(expected, actual);
-
-    expected = Arrays.asList(appendToTempFolder("output.00.of.01.test"));
-    actual = generateDestinationFilenames(policy, 1);
-    assertEquals(expected, actual);
-
-    expected = new ArrayList<>();
-    actual = generateDestinationFilenames(policy, 0);
-    assertEquals(expected, actual);
-  }
-
-  /**
    * Output filenames are generated correctly when an extension is supplied.
    */
   @Test
-  public void testGenerateOutputFilenamesWithExtension() {
-    List<String> expected;
-    List<String> actual;
-    SimpleSink.SimpleWriteOperation writeOp = buildWriteOperation();
-    FilenamePolicy policy = writeOp.getSink().getFileNamePolicy();
+  public void testGenerateOutputFilenames() {
+    List<ResourceId> expected;
+    List<ResourceId> actual;
+    ResourceId root = getBaseOutputDirectory();
+
+    SimpleSink sink = new SimpleSink(root, "file", ".SSSSS.of.NNNNN", ".test");
+    FilenamePolicy policy = sink.getFilenamePolicy();
 
     expected = Arrays.asList(
-        appendToTempFolder("output-00000-of-00003.test"),
-        appendToTempFolder("output-00001-of-00003.test"),
-        appendToTempFolder("output-00002-of-00003.test"));
-    actual = generateDestinationFilenames(policy, 3);
+        root.resolve("file.00000.of.00003.test", StandardResolveOptions.RESOLVE_FILE),
+        root.resolve("file.00001.of.00003.test", StandardResolveOptions.RESOLVE_FILE),
+        root.resolve("file.00002.of.00003.test", StandardResolveOptions.RESOLVE_FILE)
+    );
+    actual = generateDestinationFilenames(root, policy, 3);
     assertEquals(expected, actual);
 
-    expected = Arrays.asList(appendToTempFolder("output-00000-of-00001.test"));
-    actual = generateDestinationFilenames(policy, 1);
+    expected = Collections.singletonList(
+        root.resolve("file.00000.of.00001.test", StandardResolveOptions.RESOLVE_FILE)
+    );
+    actual = generateDestinationFilenames(root, policy, 1);
     assertEquals(expected, actual);
 
     expected = new ArrayList<>();
-    actual = generateDestinationFilenames(policy, 0);
+    actual = generateDestinationFilenames(root, policy, 0);
     assertEquals(expected, actual);
   }
 
@@ -408,16 +361,21 @@ public class FileBasedSinkTest {
    * Reject non-distinct output filenames.
    */
   @Test
-  public void testCollidingOutputFilenames() {
-    SimpleSink sink = new SimpleSink("output", "test", "-NN");
+  public void testCollidingOutputFilenames() throws IOException {
+    ResourceId root = getBaseOutputDirectory();
+    SimpleSink sink = new SimpleSink(root, "file", "-NN", "test");
     SimpleSink.SimpleWriteOperation writeOp = new SimpleSink.SimpleWriteOperation(sink);
 
+    ResourceId temp1 = root.resolve("temp1", StandardResolveOptions.RESOLVE_FILE);
+    ResourceId temp2 = root.resolve("temp2", StandardResolveOptions.RESOLVE_FILE);
+    ResourceId temp3 = root.resolve("temp3", StandardResolveOptions.RESOLVE_FILE);
+    ResourceId output = root.resolve("file-03.test", StandardResolveOptions.RESOLVE_FILE);
     // More than one shard does.
     try {
       Iterable<FileResult> results = Lists.newArrayList(
-          new FileResult("temp1", "file1"),
-          new FileResult("temp2", "file1"),
-          new FileResult("temp3", "file1"));
+          new FileResult(temp1, output),
+          new FileResult(temp2, output),
+          new FileResult(temp3, output));
 
       writeOp.buildOutputFilenames(results);
       fail("Should have failed.");
@@ -432,22 +390,28 @@ public class FileBasedSinkTest {
    */
   @Test
   public void testGenerateOutputFilenamesWithoutExtension() {
-    List<String> expected;
-    List<String> actual;
-    SimpleSink sink = new SimpleSink(appendToTempFolder(baseOutputFilename), "");
-    FilenamePolicy policy = sink.getFileNamePolicy();
-
-    expected = Arrays.asList(appendToTempFolder("output-00000-of-00003"),
-        appendToTempFolder("output-00001-of-00003"), appendToTempFolder("output-00002-of-00003"));
-    actual = generateDestinationFilenames(policy, 3);
+    List<ResourceId> expected;
+    List<ResourceId> actual;
+    ResourceId root = getBaseOutputDirectory();
+    SimpleSink sink = new SimpleSink(root, "file", "-SSSSS-of-NNNNN", "");
+    FilenamePolicy policy = sink.getFilenamePolicy();
+
+    expected = Arrays.asList(
+        root.resolve("file-00000-of-00003", StandardResolveOptions.RESOLVE_FILE),
+        root.resolve("file-00001-of-00003", StandardResolveOptions.RESOLVE_FILE),
+        root.resolve("file-00002-of-00003", StandardResolveOptions.RESOLVE_FILE)
+    );
+    actual = generateDestinationFilenames(root, policy, 3);
     assertEquals(expected, actual);
 
-    expected = Arrays.asList(appendToTempFolder("output-00000-of-00001"));
-    actual = generateDestinationFilenames(policy, 1);
+    expected = Collections.singletonList(
+        root.resolve("file-00000-of-00001", StandardResolveOptions.RESOLVE_FILE)
+    );
+    actual = generateDestinationFilenames(root, policy, 1);
     assertEquals(expected, actual);
 
     expected = new ArrayList<>();
-    actual = generateDestinationFilenames(policy, 0);
+    actual = generateDestinationFilenames(root, policy, 0);
     assertEquals(expected, actual);
   }
 
@@ -511,7 +475,7 @@ public class FileBasedSinkTest {
 
   private File writeValuesWithWritableByteChannelFactory(final WritableByteChannelFactory factory,
       String... values)
-      throws IOException, FileNotFoundException {
+      throws IOException {
     final File file = tmpFolder.newFile("test.gz");
     final WritableByteChannel channel =
         factory.create(Channels.newChannel(new FileOutputStream(file)));
@@ -529,12 +493,13 @@ public class FileBasedSinkTest {
   @Test
   public void testFileBasedWriterWithWritableByteChannelFactory() throws Exception {
     final String testUid = "testId";
-    SimpleSink.SimpleWriteOperation writeOp =
-        new SimpleSink(getBaseOutputFilename(), "txt", new DrunkWritableByteChannelFactory())
+    ResourceId root = getBaseOutputDirectory();
+    FileBasedWriteOperation<String> writeOp =
+        new SimpleSink(root, "file", "-SS-of-NN", "txt", new DrunkWritableByteChannelFactory())
             .createWriteOperation();
-    final FileBasedWriter<String> writer =
-        writeOp.createWriter(null);
-    final String expectedFilename = IOChannelUtils.resolve(writeOp.tempDirectory.get(), testUid);
+    final FileBasedWriter<String> writer = writeOp.createWriter();
+    final ResourceId expectedFile =
+        writeOp.tempDirectory.get().resolve(testUid, StandardResolveOptions.RESOLVE_FILE);
 
     final List<String> expected = new ArrayList<>();
     expected.add("header");
@@ -551,38 +516,29 @@ public class FileBasedSinkTest {
     writer.write("b");
     final FileResult result = writer.close();
 
-    assertEquals(expectedFilename, result.getFilename());
-    assertFileContains(expected, expectedFilename);
+    assertEquals(expectedFile, result.getFilename());
+    assertFileContains(expected, expectedFile);
   }
 
   /**
    * Build a SimpleSink with default options.
    */
   private SimpleSink buildSink() {
-    return new SimpleSink(getBaseOutputFilename(), "test");
+    return new SimpleSink(getBaseOutputDirectory(), "file", "-SS-of-NN", ".test");
   }
 
   /**
-   * Build a SimpleWriteOperation with default options and the given base temporary filename.
+   * Build a SimpleWriteOperation with default options and the given temporary directory.
    */
-  private SimpleSink.SimpleWriteOperation buildWriteOperation(String baseTemporaryFilename) {
+  private SimpleSink.SimpleWriteOperation buildWriteOperationWithTempDir(ResourceId tempDirectory) {
     SimpleSink sink = buildSink();
-    return new SimpleSink.SimpleWriteOperation(sink, appendToTempFolder(baseTemporaryFilename));
+    return new SimpleSink.SimpleWriteOperation(sink, tempDirectory);
   }
 
   /**
    * Build a write operation with the default options for it and its parent sink.
    */
   private SimpleSink.SimpleWriteOperation buildWriteOperation() {
-    SimpleSink sink = buildSink();
-    return new SimpleSink.SimpleWriteOperation(sink, getBaseTempDirectory());
-  }
-
-  /**
-   * Build a writer with the default options for its parent write operation and sink.
-   */
-  private SimpleSink.SimpleWriter buildWriter() {
-    SimpleSink.SimpleWriteOperation writeOp = buildWriteOperation();
-    return new SimpleSink.SimpleWriter(writeOp);
+    return buildSink().createWriteOperation();
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/17358248/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java
index f83642a..9265520 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java
@@ -19,24 +19,25 @@ package org.apache.beam.sdk.io;
 
 import java.nio.ByteBuffer;
 import java.nio.channels.WritableByteChannel;
-import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
 import org.apache.beam.sdk.util.MimeTypes;
 
 /**
- * A simple FileBasedSink that writes String values as lines with header and footer lines.
+ * A simple {@link FileBasedSink} that writes {@link String} values as lines with
+ * header and footer.
  */
 class SimpleSink extends FileBasedSink<String> {
-  public SimpleSink(String baseOutputFilename, String extension) {
-    super(baseOutputFilename, extension);
+  public SimpleSink(ResourceId baseOutputDirectory, String prefix, String template, String suffix) {
+    this(baseOutputDirectory, prefix, template, suffix, CompressionType.UNCOMPRESSED);
   }
 
-  public SimpleSink(String baseOutputFilename, String extension,
+  public SimpleSink(ResourceId baseOutputDirectory, String prefix, String template, String suffix,
                     WritableByteChannelFactory writableByteChannelFactory) {
-    super(baseOutputFilename, extension, writableByteChannelFactory);
-  }
-
-  public SimpleSink(String baseOutputFilename, String extension, String fileNamingTemplate) {
-    super(baseOutputFilename, extension, fileNamingTemplate);
+    super(
+        StaticValueProvider.of(baseOutputDirectory),
+        new DefaultFilenamePolicy(StaticValueProvider.of(prefix), template, suffix),
+        writableByteChannelFactory);
   }
 
   @Override
@@ -45,8 +46,8 @@ class SimpleSink extends FileBasedSink<String> {
   }
 
   static final class SimpleWriteOperation extends FileBasedWriteOperation<String> {
-    public SimpleWriteOperation(SimpleSink sink, String tempOutputFilename) {
-      super(sink, tempOutputFilename);
+    public SimpleWriteOperation(SimpleSink sink, ResourceId tempOutputDirectory) {
+      super(sink, tempOutputDirectory);
     }
 
     public SimpleWriteOperation(SimpleSink sink) {
@@ -54,7 +55,7 @@ class SimpleSink extends FileBasedSink<String> {
     }
 
     @Override
-    public SimpleWriter createWriter(PipelineOptions options) throws Exception {
+    public SimpleWriter createWriter() throws Exception {
       return new SimpleWriter(this);
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/17358248/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
index 66b605f..685da82 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.sdk.io;
 
+import static com.google.common.base.MoreObjects.firstNonNull;
 import static org.apache.beam.sdk.TestUtils.LINES2_ARRAY;
 import static org.apache.beam.sdk.TestUtils.LINES_ARRAY;
 import static org.apache.beam.sdk.TestUtils.NO_LINES_ARRAY;
@@ -28,7 +29,6 @@ import static org.apache.beam.sdk.io.TextIO.CompressionType.UNCOMPRESSED;
 import static org.apache.beam.sdk.io.TextIO.CompressionType.ZIP;
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasValue;
-import static org.apache.beam.sdk.util.IOChannelUtils.resolve;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThan;
@@ -62,6 +62,7 @@ import java.nio.file.SimpleFileVisitor;
 import java.nio.file.attribute.BasicFileAttributes;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Set;
 import java.util.zip.GZIPOutputStream;
@@ -73,6 +74,8 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
 import org.apache.beam.sdk.io.FileBasedSink.WritableByteChannelFactory;
 import org.apache.beam.sdk.io.TextIO.CompressionType;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.options.ValueProvider;
@@ -80,19 +83,16 @@ import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.SourceTestUtils;
 import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.testing.TestPipelineOptions;
 import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
 import org.apache.beam.sdk.util.CoderUtils;
-import org.apache.beam.sdk.util.IOChannelUtils;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream;
 import org.apache.commons.compress.compressors.deflate.DeflateCompressorOutputStream;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
-import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -101,7 +101,7 @@ import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
 /**
- * Tests for TextIO Read and Write transforms.
+ * Tests for {@link TextIO} {@link TextIO.Read} and {@link TextIO.Write} transforms.
  */
 // TODO: Change the tests to use ValidatesRunner instead of NeedsRunner
 @RunWith(JUnit4.class)
@@ -168,7 +168,6 @@ public class TextIOTest {
 
   @BeforeClass
   public static void setupClass() throws IOException {
-    IOChannelUtils.registerIOFactoriesAllowOverride(TestPipeline.testingPipelineOptions());
     tempFolder = Files.createTempDirectory("TextIOTest");
     // empty files
     emptyTxt = writeToFile(EMPTY, "empty.txt", CompressionType.UNCOMPRESSED);
@@ -314,7 +313,7 @@ public class TextIOTest {
     p.run();
 
     assertOutputFiles(elems, header, footer, numShards, baseDir, outputName,
-        write.getShardTemplate());
+        firstNonNull(write.getShardTemplate(), DefaultFilenamePolicy.DEFAULT_SHARD_TEMPLATE));
   }
 
   public static void assertOutputFiles(
@@ -328,17 +327,18 @@ public class TextIOTest {
       throws Exception {
     List<File> expectedFiles = new ArrayList<>();
     if (numShards == 0) {
-      String pattern =
-          resolve(rootLocation.toAbsolutePath().toString(), outputName + "*");
-      for (String expected : IOChannelUtils.getFactory(pattern).match(pattern)) {
-        expectedFiles.add(new File(expected));
+      String pattern = rootLocation.toAbsolutePath().resolve(outputName + "*").toString();
+      List<MatchResult> matches = FileSystems.match(Collections.singletonList(pattern));
+      for (Metadata expectedFile : Iterables.getOnlyElement(matches).metadata()) {
+        expectedFiles.add(new File(expectedFile.resourceId().toString()));
       }
     } else {
       for (int i = 0; i < numShards; i++) {
         expectedFiles.add(
             new File(
                 rootLocation.toString(),
-                FileBasedSink.constructName(outputName, shardNameTemplate, "", i, numShards)));
+                DefaultFilenamePolicy.constructName(
+                    outputName, shardNameTemplate, "", i, numShards)));
       }
     }
 
@@ -483,7 +483,7 @@ public class TextIOTest {
   @Test
   public void testWriteDisplayData() {
     TextIO.Write write = TextIO.write()
-        .to("foo")
+        .to("/foo")
         .withSuffix("bar")
         .withShardNameTemplate("-SS-of-NN-")
         .withNumShards(100)
@@ -492,7 +492,7 @@ public class TextIOTest {
 
     DisplayData displayData = DisplayData.from(write);
 
-    assertThat(displayData, hasDisplayItem("filePrefix", "foo"));
+    assertThat(displayData, hasDisplayItem("filePrefix", "/foo"));
     assertThat(displayData, hasDisplayItem("fileSuffix", "bar"));
     assertThat(displayData, hasDisplayItem("fileHeader", "myHeader"));
     assertThat(displayData, hasDisplayItem("fileFooter", "myFooter"));
@@ -523,23 +523,6 @@ public class TextIOTest {
     assertThat(displayData, hasDisplayItem("fileFooter", "myFooter"));
   }
 
-  @Test
-  @Category(ValidatesRunner.class)
-  @Ignore("[BEAM-436] DirectRunner ValidatesRunner tempLocation configuration insufficient")
-  public void testPrimitiveWriteDisplayData() throws IOException {
-    PipelineOptions options = DisplayDataEvaluator.getDefaultOptions();
-    String tempRoot = options.as(TestPipelineOptions.class).getTempRoot();
-    String outputPath = IOChannelUtils.getFactory(tempRoot).resolve(tempRoot, "foobar");
-
-    DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
-
-    TextIO.Write write = TextIO.write().to(outputPath);
-
-    Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write);
-    assertThat("TextIO.Write should include the file prefix in its primitive display data",
-        displayData, hasItem(hasDisplayItem(hasValue(startsWith(outputPath)))));
-  }
-
   /** Options for testing. */
   public interface RuntimeTestOptions extends PipelineOptions {
     ValueProvider<String> getInput();