You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2017/05/04 07:17:42 UTC

[36/50] [abbrv] beam git commit: Convert WriteFiles/FileBasedSink from IOChannelFactory to FileSystems

http://git-wip-us.apache.org/repos/asf/beam/blob/17358248/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
index fe0b97d..3198829 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
@@ -36,10 +36,12 @@ import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.Read.Bounded;
 import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
+import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.util.IOChannelUtils;
 import org.apache.beam.sdk.util.MimeTypes;
@@ -73,9 +75,9 @@ public class TFRecordIO {
    */
   public static Write write() {
     return new AutoValue_TFRecordIO_Write.Builder()
-        .setFilenameSuffix("")
+        .setShardTemplate(null)
+        .setFilenameSuffix(null)
         .setNumShards(0)
-        .setShardTemplate(Write.DEFAULT_SHARD_TEMPLATE)
         .setCompressionType(CompressionType.NONE)
         .build();
   }
@@ -212,7 +214,7 @@ public class TFRecordIO {
 
     @Override
     protected Coder<byte[]> getDefaultOutputCoder() {
-      return DEFAULT_BYTE_ARRAY_CODER;
+      return ByteArrayCoder.of();
     }
   }
 
@@ -221,20 +223,17 @@ public class TFRecordIO {
   /** Implementation of {@link #write}. */
   @AutoValue
   public abstract static class Write extends PTransform<PCollection<byte[]>, PDone> {
-    private static final String DEFAULT_SHARD_TEMPLATE = ShardNameTemplate.INDEX_OF_MAX;
-
-    /** The prefix of each file written, combined with suffix and shardTemplate. */
-    @Nullable
-    abstract ValueProvider<String> getFilenamePrefix();
+    /** The directory to which files will be written. */
+    @Nullable abstract ValueProvider<ResourceId> getOutputPrefix();
 
     /** The suffix of each file written, combined with prefix and shardTemplate. */
-    abstract String getFilenameSuffix();
+    @Nullable abstract String getFilenameSuffix();
 
     /** Requested number of shards. 0 for automatic. */
     abstract int getNumShards();
 
     /** The shard template of each file written, combined with prefix and suffix. */
-    abstract String getShardTemplate();
+    @Nullable abstract String getShardTemplate();
 
     /** Option to indicate the output sink's compression type. Default is NONE. */
     abstract CompressionType getCompressionType();
@@ -243,38 +242,51 @@ public class TFRecordIO {
 
     @AutoValue.Builder
     abstract static class Builder {
-      abstract Builder setFilenamePrefix(ValueProvider<String> filenamePrefix);
+      abstract Builder setOutputPrefix(ValueProvider<ResourceId> outputPrefix);
+
+      abstract Builder setShardTemplate(String shardTemplate);
 
       abstract Builder setFilenameSuffix(String filenameSuffix);
 
       abstract Builder setNumShards(int numShards);
 
-      abstract Builder setShardTemplate(String shardTemplate);
-
       abstract Builder setCompressionType(CompressionType compressionType);
 
       abstract Write build();
     }
 
     /**
-     * Writes to TFRecord file(s) with the given prefix. This can be a local filename
-     * (if running locally), or a Google Cloud Storage filename of
-     * the form {@code "gs://<bucket>/<filepath>"}
-     * (if running locally or using remote execution).
+     * Writes TFRecord file(s) with the given output prefix. The {@code prefix} will be used as a
+     * to generate a {@link ResourceId} using any supported {@link FileSystem}.
+     *
+     * <p>In addition to their prefix, created files will have a shard identifier (see
+     * {@link #withNumShards(int)}), and end in a common suffix, if given by
+     * {@link #withSuffix(String)}.
+     *
+     * <p>For more information on filenames, see {@link DefaultFilenamePolicy}.
+     */
+    public Write to(String outputPrefix) {
+      return to(FileBasedSink.convertToFileResourceIfPossible(outputPrefix));
+    }
+
+    /**
+     * Writes TFRecord file(s) with a prefix given by the specified resource.
+     *
+     * <p>In addition to their prefix, created files will have a shard identifier (see
+     * {@link #withNumShards(int)}), and end in a common suffix, if given by
+     * {@link #withSuffix(String)}.
      *
-     * <p>The files written will begin with this prefix, followed by
-     * a shard identifier (see {@link #withNumShards(int)}, and end
-     * in a common extension, if given by {@link #withSuffix(String)}.
+     * <p>For more information on filenames, see {@link DefaultFilenamePolicy}.
      */
-    public Write to(String filenamePrefix) {
-      return to(StaticValueProvider.of(filenamePrefix));
+    public Write to(ResourceId outputResource) {
+      return toResource(StaticValueProvider.of(outputResource));
     }
 
     /**
-     * Like {@link #to(String)}, but with a {@link ValueProvider}.
+     * Like {@link #to(ResourceId)}.
      */
-    public Write to(ValueProvider<String> filenamePrefix) {
-      return toBuilder().setFilenamePrefix(filenamePrefix).build();
+    public Write toResource(ValueProvider<ResourceId> outputResource) {
+      return toBuilder().setOutputPrefix(outputResource).build();
     }
 
     /**
@@ -282,8 +294,8 @@ public class TFRecordIO {
      *
      * @see ShardNameTemplate
      */
-    public Write withSuffix(String nameExtension) {
-      return toBuilder().setFilenameSuffix(nameExtension).build();
+    public Write withSuffix(String suffix) {
+      return toBuilder().setFilenameSuffix(suffix).build();
     }
 
     /**
@@ -298,7 +310,7 @@ public class TFRecordIO {
      * @see ShardNameTemplate
      */
     public Write withNumShards(int numShards) {
-      checkArgument(numShards >= 0);
+      checkArgument(numShards >= 0, "Number of shards %s must be >= 0", numShards);
       return toBuilder().setNumShards(numShards).build();
     }
 
@@ -338,16 +350,13 @@ public class TFRecordIO {
 
     @Override
     public PDone expand(PCollection<byte[]> input) {
-      if (getFilenamePrefix() == null) {
-        throw new IllegalStateException(
-            "need to set the filename prefix of a TFRecordIO.Write transform");
-     }
-      org.apache.beam.sdk.io.WriteFiles<byte[]> write =
-          org.apache.beam.sdk.io.WriteFiles.to(
+      checkState(getOutputPrefix() != null,
+          "need to set the output prefix of a TFRecordIO.Write transform");
+      WriteFiles<byte[]> write = WriteFiles.to(
               new TFRecordSink(
-                  getFilenamePrefix(),
-                  getFilenameSuffix(),
+                  getOutputPrefix(),
                   getShardTemplate(),
+                  getFilenameSuffix(),
                   getCompressionType()));
       if (getNumShards() > 0) {
         write = write.withNumShards(getNumShards());
@@ -359,20 +368,23 @@ public class TFRecordIO {
     public void populateDisplayData(DisplayData.Builder builder) {
       super.populateDisplayData(builder);
 
-      String prefixString = getFilenamePrefix().isAccessible()
-          ? getFilenamePrefix().get() : getFilenamePrefix().toString();
+      String outputPrefixString = null;
+      if (getOutputPrefix().isAccessible()) {
+        ResourceId dir = getOutputPrefix().get();
+        outputPrefixString = dir.toString();
+      } else {
+        outputPrefixString = getOutputPrefix().toString();
+      }
       builder
-          .addIfNotNull(DisplayData.item("filePrefix", prefixString)
+          .add(DisplayData.item("filePrefix", outputPrefixString)
               .withLabel("Output File Prefix"))
-          .addIfNotDefault(DisplayData.item("fileSuffix", getFilenameSuffix())
-              .withLabel("Output File Suffix"), "")
-          .addIfNotDefault(DisplayData.item("shardNameTemplate", getShardTemplate())
-                  .withLabel("Output Shard Name Template"),
-              DEFAULT_SHARD_TEMPLATE)
+          .addIfNotNull(DisplayData.item("fileSuffix", getFilenameSuffix())
+              .withLabel("Output File Suffix"))
+          .addIfNotNull(DisplayData.item("shardNameTemplate", getShardTemplate())
+                  .withLabel("Output Shard Name Template"))
           .addIfNotDefault(DisplayData.item("numShards", getNumShards())
               .withLabel("Maximum Output Shards"), 0)
-          .add(DisplayData
-              .item("compressionType", getCompressionType().toString())
+          .add(DisplayData.item("compressionType", getCompressionType().toString())
               .withLabel("Compression Type"));
     }
 
@@ -537,14 +549,24 @@ public class TFRecordIO {
   @VisibleForTesting
   static class TFRecordSink extends FileBasedSink<byte[]> {
     @VisibleForTesting
-    TFRecordSink(ValueProvider<String> baseOutputFilename,
-                 String extension,
-                 String fileNameTemplate,
-                 TFRecordIO.CompressionType compressionType) {
-      super(baseOutputFilename, extension, fileNameTemplate,
+    TFRecordSink(ValueProvider<ResourceId> outputPrefix,
+        @Nullable String shardTemplate,
+        @Nullable String suffix,
+        TFRecordIO.CompressionType compressionType) {
+      super(
+          outputPrefix,
+          DefaultFilenamePolicy.constructUsingStandardParameters(
+              outputPrefix, shardTemplate, suffix),
           writableByteChannelFactory(compressionType));
     }
 
+    private static class ExtractDirectory implements SerializableFunction<ResourceId, ResourceId> {
+      @Override
+      public ResourceId apply(ResourceId input) {
+        return input.getCurrentDirectory();
+      }
+    }
+
     @Override
     public FileBasedWriteOperation<byte[]> createWriteOperation() {
       return new TFRecordWriteOperation(this);
@@ -575,7 +597,7 @@ public class TFRecordIO {
       }
 
       @Override
-      public FileBasedWriter<byte[]> createWriter(PipelineOptions options) throws Exception {
+      public FileBasedWriter<byte[]> createWriter() throws Exception {
         return new TFRecordWriter(this);
       }
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/17358248/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index 6b08e1f..dbfaeee 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.io;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
 
 import com.google.auto.value.AutoValue;
 import javax.annotation.Nullable;
@@ -28,9 +29,12 @@ import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
 import org.apache.beam.sdk.io.FileBasedSink.WritableByteChannelFactory;
 import org.apache.beam.sdk.io.Read.Bounded;
+import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
@@ -39,16 +43,13 @@ import org.apache.beam.sdk.values.PDone;
 /**
  * {@link PTransform}s for reading and writing text files.
  *
- * <p>To read a {@link PCollection} from one or more text files, use {@link TextIO.Read}.
- * You can instantiate a transform using {@link TextIO.Read#from(String)} to specify
- * the path of the file(s) to read from (e.g., a local filename or
- * filename pattern if running locally, or a Google Cloud Storage
- * filename or filename pattern of the form
- * {@code "gs://<bucket>/<filepath>"}).
+ * <p>To read a {@link PCollection} from one or more text files, use {@code TextIO.read()} to
+ * instantiate a transform and use {@link TextIO.Read#from(String)} to specify the path of the
+ * file(s) to be read.
  *
- * <p>{@link TextIO.Read} returns a {@link PCollection} of {@link String Strings},
- * each corresponding to one line of an input UTF-8 text file (split into lines delimited by '\n',
- * '\r', or '\r\n').
+ * <p>{@link TextIO.Read} returns a {@link PCollection} of {@link String Strings}, each
+ * corresponding to one line of an input UTF-8 text file (split into lines delimited by '\n', '\r',
+ * or '\r\n').
  *
  * <p>Example:
  *
@@ -56,16 +57,11 @@ import org.apache.beam.sdk.values.PDone;
  * Pipeline p = ...;
  *
  * // A simple Read of a local file (only runs locally):
- * PCollection<String> lines =
- *     p.apply(TextIO.read().from("/local/path/to/file.txt"));
+ * PCollection<String> lines = p.apply(TextIO.read().from("/local/path/to/file.txt"));
  * }</pre>
  *
- * <p>To write a {@link PCollection} to one or more text files, use
- * {@link TextIO.Write}, specifying {@link TextIO.Write#to(String)} to specify
- * the path of the file to write to (e.g., a local filename or sharded
- * filename pattern if running locally, or a Google Cloud Storage
- * filename or sharded filename pattern of the form
- * {@code "gs://<bucket>/<filepath>"}).
+ * <p>To write a {@link PCollection} to one or more text files, use {@code TextIO.write()}, using
+ * {@link TextIO.Write#to(String)} to specify the output prefix of the files to write.
  *
  * <p>By default, all input is put into the global window before writing. If per-window writes are
  * desired - for example, when using a streaming runner -
@@ -75,8 +71,7 @@ import org.apache.beam.sdk.values.PDone;
  * runner-chosen value, so you may need not set it yourself. A {@link FilenamePolicy} must be
  * set, and unique windows and triggers must produce unique filenames.
  *
- * <p>Any existing files with the same names as generated output files
- * will be overwritten.
+ * <p>Any existing files with the same names as generated output files will be overwritten.
  *
  * <p>For example:
  * <pre>{@code
@@ -93,25 +88,27 @@ import org.apache.beam.sdk.values.PDone;
  */
 public class TextIO {
   /**
-   * Reads from one or more text files and returns a bounded {@link PCollection} containing one
-   * element for each line of the input files.
+   * A {@link PTransform} that reads from one or more text files and returns a bounded
+   * {@link PCollection} containing one element for each line of the input files.
    */
   public static Read read() {
     return new AutoValue_TextIO_Read.Builder().setCompressionType(CompressionType.AUTO).build();
   }
 
   /**
-   * A {@link PTransform} that writes a {@link PCollection} to text file (or
-   * multiple text files matching a sharding pattern), with each
-   * element of the input collection encoded into its own line.
+   * A {@link PTransform} that writes a {@link PCollection} to a text file (or multiple text files
+   * matching a sharding pattern), with each element of the input collection encoded into its own
+   * line.
    */
   public static Write write() {
     return new AutoValue_TextIO_Write.Builder()
-        .setFilenameSuffix("")
-        .setNumShards(0)
-        .setShardTemplate(Write.DEFAULT_SHARD_TEMPLATE)
+        .setFilenamePrefix(null)
+        .setShardTemplate(null)
+        .setFilenameSuffix(null)
+        .setFilenamePolicy(null)
         .setWritableByteChannelFactory(FileBasedSink.CompressionType.UNCOMPRESSED)
         .setWindowedWrites(false)
+        .setNumShards(0)
         .build();
   }
 
@@ -228,13 +225,11 @@ public class TextIO {
   /** Implementation of {@link #write}. */
   @AutoValue
   public abstract static class Write extends PTransform<PCollection<String>, PDone> {
-    private static final String DEFAULT_SHARD_TEMPLATE = ShardNameTemplate.INDEX_OF_MAX;
-
     /** The prefix of each file written, combined with suffix and shardTemplate. */
-    @Nullable abstract ValueProvider<String> getFilenamePrefix();
+    @Nullable abstract ValueProvider<ResourceId> getFilenamePrefix();
 
     /** The suffix of each file written, combined with prefix and shardTemplate. */
-    abstract String getFilenameSuffix();
+    @Nullable abstract String getFilenameSuffix();
 
     /** An optional header to add to each file. */
     @Nullable abstract String getHeader();
@@ -246,7 +241,7 @@ public class TextIO {
     abstract int getNumShards();
 
     /** The shard template of each file written, combined with prefix and suffix. */
-    abstract String getShardTemplate();
+    @Nullable abstract String getShardTemplate();
 
     /** A policy for naming output files. */
     @Nullable abstract FilenamePolicy getFilenamePolicy();
@@ -264,13 +259,13 @@ public class TextIO {
 
     @AutoValue.Builder
     abstract static class Builder {
-      abstract Builder setFilenamePrefix(ValueProvider<String> filenamePrefix);
-      abstract Builder setFilenameSuffix(String filenameSuffix);
-      abstract Builder setHeader(String header);
-      abstract Builder setFooter(String footer);
+      abstract Builder setFilenamePrefix(ValueProvider<ResourceId> filenamePrefix);
+      abstract Builder setShardTemplate(@Nullable String shardTemplate);
+      abstract Builder setFilenameSuffix(@Nullable String filenameSuffix);
+      abstract Builder setHeader(@Nullable String header);
+      abstract Builder setFooter(@Nullable String footer);
+      abstract Builder setFilenamePolicy(@Nullable FilenamePolicy filenamePolicy);
       abstract Builder setNumShards(int numShards);
-      abstract Builder setShardTemplate(String shardTemplate);
-      abstract Builder setFilenamePolicy(FilenamePolicy filenamePolicy);
       abstract Builder setWindowedWrites(boolean windowedWrites);
       abstract Builder setWritableByteChannelFactory(
           WritableByteChannelFactory writableByteChannelFactory);
@@ -279,72 +274,115 @@ public class TextIO {
     }
 
     /**
-     * Writes to text files with the given prefix. This can be a local filename
-     * (if running locally), or a Google Cloud Storage filename of
-     * the form {@code "gs://<bucket>/<filepath>"}
-     * (if running locally or using remote execution).
+     * Writes to text files with the given prefix. The given {@code prefix} can reference any
+     * {@link FileSystem} on the classpath.
+     *
+     * <p>The name of the output files will be determined by the {@link FilenamePolicy} used.
+     *
+     * <p>By default, a {@link DefaultFilenamePolicy} will be used built using the specified prefix
+     * to define the base output directory and file prefix, a shard identifier (see
+     * {@link #withNumShards(int)}), and a common suffix (if supplied using
+     * {@link #withSuffix(String)}).
      *
-     * <p>The files written will begin with this prefix, followed by
-     * a shard identifier (see {@link #withNumShards(int)}, and end
-     * in a common extension, if given by {@link #withSuffix(String)}.
+     * <p>This default policy can be overridden using {@link #withFilenamePolicy(FilenamePolicy)},
+     * in which case {@link #withShardNameTemplate(String)} and {@link #withSuffix(String)} should
+     * not be set.
      */
     public Write to(String filenamePrefix) {
-      return to(StaticValueProvider.of(filenamePrefix));
+      return to(FileBasedSink.convertToFileResourceIfPossible(filenamePrefix));
     }
 
-    /** Like {@link #to(String)}, but with a {@link ValueProvider}. */
-    public Write to(ValueProvider<String> filenamePrefix) {
-      return toBuilder().setFilenamePrefix(filenamePrefix).build();
+    /**
+     * Writes to text files with prefix from the given resource.
+     *
+     * <p>The name of the output files will be determined by the {@link FilenamePolicy} used.
+     *
+     * <p>By default, a {@link DefaultFilenamePolicy} will be used built using the specified prefix
+     * to define the base output directory and file prefix, a shard identifier (see
+     * {@link #withNumShards(int)}), and a common suffix (if supplied using
+     * {@link #withSuffix(String)}).
+     *
+     * <p>This default policy can be overridden using {@link #withFilenamePolicy(FilenamePolicy)},
+     * in which case {@link #withShardNameTemplate(String)} and {@link #withSuffix(String)} should
+     * not be set.
+     */
+    public Write to(ResourceId filenamePrefix) {
+      return toResource(StaticValueProvider.of(filenamePrefix));
     }
 
-    /** Like {@link #to(String)}, but with a {@link FilenamePolicy}. */
-    public Write to(FilenamePolicy filenamePolicy) {
-      return toBuilder().setFilenamePolicy(filenamePolicy).build();
+    /**
+     * Like {@link #to(String)}.
+     */
+    public Write to(ValueProvider<String> outputPrefix) {
+      return toResource(NestedValueProvider.of(outputPrefix,
+          new SerializableFunction<String, ResourceId>() {
+            @Override
+            public ResourceId apply(String input) {
+              return FileBasedSink.convertToFileResourceIfPossible(input);
+            }
+          }));
     }
 
     /**
-     * Writes to the file(s) with the given filename suffix.
-     *
-     * @see ShardNameTemplate
+     * Like {@link #to(ResourceId)}.
      */
-    public Write withSuffix(String nameExtension) {
-      return toBuilder().setFilenameSuffix(nameExtension).build();
+    public Write toResource(ValueProvider<ResourceId> filenamePrefix) {
+      return toBuilder().setFilenamePrefix(filenamePrefix).build();
     }
 
     /**
-     * Uses the provided shard count.
+     * Uses the given {@link ShardNameTemplate} for naming output files. This option may only be
+     * used when {@link #withFilenamePolicy(FilenamePolicy)} has not been configured.
      *
-     * <p>Constraining the number of shards is likely to reduce
-     * the performance of a pipeline. Setting this value is not recommended
-     * unless you require a specific number of output files.
+     * <p>See {@link DefaultFilenamePolicy} for how the prefix, shard name template, and suffix are
+     * used.
+     */
+    public Write withShardNameTemplate(String shardTemplate) {
+      return toBuilder().setShardTemplate(shardTemplate).build();
+    }
+
+    /**
+     * Configures the filename suffix for written files. This option may only be used when
+     * {@link #withFilenamePolicy(FilenamePolicy)} has not been configured.
      *
-     * @param numShards the number of shards to use, or 0 to let the system
-     *                  decide.
-     * @see ShardNameTemplate
+     * <p>See {@link DefaultFilenamePolicy} for how the prefix, shard name template, and suffix are
+     * used.
      */
-    public Write withNumShards(int numShards) {
-      checkArgument(numShards >= 0);
-      return toBuilder().setNumShards(numShards).build();
+    public Write withSuffix(String filenameSuffix) {
+      return toBuilder().setFilenameSuffix(filenameSuffix).build();
     }
 
     /**
-     * Uses the given shard name template.
+     * Configures the {@link FileBasedSink.FilenamePolicy} that will be used to name written files.
+     */
+    public Write withFilenamePolicy(FilenamePolicy filenamePolicy) {
+      return toBuilder().setFilenamePolicy(filenamePolicy).build();
+    }
+
+    /**
+     * Configures the number of output shards produced overall (when using unwindowed writes) or
+     * per-window (when using windowed writes).
+     *
+     * <p>For unwindowed writes, constraining the number of shards is likely to reduce the
+     * performance of a pipeline. Setting this value is not recommended unless you require a
+     * specific number of output files.
      *
-     * @see ShardNameTemplate
+     * @param numShards the number of shards to use, or 0 to let the system decide.
      */
-    public Write withShardNameTemplate(String shardTemplate) {
-      return toBuilder().setShardTemplate(shardTemplate).build();
+    public Write withNumShards(int numShards) {
+      checkArgument(numShards >= 0);
+      return toBuilder().setNumShards(numShards).build();
     }
 
     /**
-     * Forces a single file as output.
+     * Forces a single file as output and empty shard name template. This option is only compatible
+     * with unwindowed writes.
      *
-     * <p>Constraining the number of shards is likely to reduce
-     * the performance of a pipeline. Using this setting is not recommended
-     * unless you truly require a single output file.
+     * <p>For unwindowed writes, constraining the number of shards is likely to reduce the
+     * performance of a pipeline. Setting this value is not recommended unless you require a
+     * specific number of output files.
      *
-     * <p>This is a shortcut for
-     * {@code .withNumShards(1).withShardNameTemplate("")}
+     * <p>This is equivalent to {@code .withNumShards(1).withShardNameTemplate("")}
      */
     public Write withoutSharding() {
       return withNumShards(1).withShardNameTemplate("");
@@ -386,34 +424,26 @@ public class TextIO {
 
     @Override
     public PDone expand(PCollection<String> input) {
-      if (getFilenamePolicy() == null && getFilenamePrefix() == null) {
-        throw new IllegalStateException(
-            "need to set the filename prefix of an TextIO.Write transform");
-      }
-      if (getFilenamePolicy() != null && getFilenamePrefix() != null) {
-        throw new IllegalStateException(
-            "cannot set both a filename policy and a filename prefix");
-      }
-      WriteFiles<String> write;
-      if (getFilenamePolicy() != null) {
-        write =
-            WriteFiles.to(
-                new TextSink(
-                    getFilenamePolicy(),
-                    getHeader(),
-                    getFooter(),
-                    getWritableByteChannelFactory()));
-      } else {
-        write =
-            WriteFiles.to(
-                new TextSink(
-                    getFilenamePrefix(),
-                    getFilenameSuffix(),
-                    getHeader(),
-                    getFooter(),
-                    getShardTemplate(),
-                    getWritableByteChannelFactory()));
+      checkState(getFilenamePrefix() != null,
+          "Need to set the filename prefix of a TextIO.Write transform.");
+      checkState(
+          (getFilenamePolicy() == null)
+              || (getShardTemplate() == null && getFilenameSuffix() == null),
+          "Cannot set a filename policy and also a filename template or suffix.");
+
+      FilenamePolicy usedFilenamePolicy = getFilenamePolicy();
+      if (usedFilenamePolicy == null) {
+        usedFilenamePolicy = DefaultFilenamePolicy.constructUsingStandardParameters(
+            getFilenamePrefix(), getShardTemplate(), getFilenameSuffix());
       }
+      WriteFiles<String> write =
+          WriteFiles.to(
+              new TextSink(
+                  getFilenamePrefix(),
+                  usedFilenamePolicy,
+                  getHeader(),
+                  getFooter(),
+                  getWritableByteChannelFactory()));
       if (getNumShards() > 0) {
         write = write.withNumShards(getNumShards());
       }
@@ -430,16 +460,15 @@ public class TextIO {
       String prefixString = "";
       if (getFilenamePrefix() != null) {
         prefixString = getFilenamePrefix().isAccessible()
-            ? getFilenamePrefix().get() : getFilenamePrefix().toString();
+            ? getFilenamePrefix().get().toString() : getFilenamePrefix().toString();
       }
       builder
           .addIfNotNull(DisplayData.item("filePrefix", prefixString)
             .withLabel("Output File Prefix"))
-          .addIfNotDefault(DisplayData.item("fileSuffix", getFilenameSuffix())
-            .withLabel("Output File Suffix"), "")
-          .addIfNotDefault(DisplayData.item("shardNameTemplate", getShardTemplate())
-            .withLabel("Output Shard Name Template"),
-              DEFAULT_SHARD_TEMPLATE)
+          .addIfNotNull(DisplayData.item("fileSuffix", getFilenameSuffix())
+            .withLabel("Output File Suffix"))
+          .addIfNotNull(DisplayData.item("shardNameTemplate", getShardTemplate())
+            .withLabel("Output Shard Name Template"))
           .addIfNotDefault(DisplayData.item("numShards", getNumShards())
             .withLabel("Maximum Output Shards"), 0)
           .addIfNotNull(DisplayData.item("fileHeader", getHeader())

http://git-wip-us.apache.org/repos/asf/beam/blob/17358248/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSink.java
index 4efdc32..0ba537e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSink.java
@@ -23,7 +23,7 @@ import java.nio.channels.Channels;
 import java.nio.channels.WritableByteChannel;
 import java.nio.charset.StandardCharsets;
 import javax.annotation.Nullable;
-import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.util.MimeTypes;
 
@@ -39,27 +39,15 @@ class TextSink extends FileBasedSink<String> {
   @Nullable private final String footer;
 
   TextSink(
+      ValueProvider<ResourceId> baseOutputFilename,
       FilenamePolicy filenamePolicy,
       @Nullable String header,
       @Nullable String footer,
       WritableByteChannelFactory writableByteChannelFactory) {
-    super(filenamePolicy, writableByteChannelFactory);
+    super(baseOutputFilename, filenamePolicy, writableByteChannelFactory);
     this.header = header;
     this.footer = footer;
   }
-
-  TextSink(
-      ValueProvider<String> baseOutputFilename,
-      String extension,
-      @Nullable String header,
-      @Nullable String footer,
-      String fileNameTemplate,
-      WritableByteChannelFactory writableByteChannelFactory) {
-    super(baseOutputFilename, extension, fileNameTemplate, writableByteChannelFactory);
-    this.header = header;
-    this.footer = footer;
-  }
-
   @Override
   public FileBasedWriteOperation<String> createWriteOperation() {
     return new TextWriteOperation(this, header, footer);
@@ -77,7 +65,7 @@ class TextSink extends FileBasedSink<String> {
     }
 
     @Override
-    public FileBasedWriter<String> createWriter(PipelineOptions options) throws Exception {
+    public FileBasedWriter<String> createWriter() throws Exception {
       return new TextWriter(this, header, footer);
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/17358248/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
index dcd600f..2a057e4 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
@@ -34,6 +34,7 @@ import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.FileBasedSink.FileBasedWriteOperation;
 import org.apache.beam.sdk.io.FileBasedSink.FileBasedWriter;
 import org.apache.beam.sdk.io.FileBasedSink.FileResult;
+import org.apache.beam.sdk.io.FileBasedSink.FileResultCoder;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
@@ -254,7 +255,7 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> {
       // Lazily initialize the Writer
       if (writer == null) {
         LOG.info("Opening writer for write operation {}", writeOperation);
-        writer = writeOperation.createWriter(c.getPipelineOptions());
+        writer = writeOperation.createWriter();
 
         if (windowedWrites) {
           writer.openWindowed(UUID.randomUUID().toString(), window, c.pane(), UNKNOWN_SHARDNUM,
@@ -318,7 +319,7 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> {
       // In a sharded write, single input element represents one shard. We can open and close
       // the writer in each call to processElement.
       LOG.info("Opening writer for write operation {}", writeOperation);
-      FileBasedWriter<T> writer = writeOperation.createWriter(c.getPipelineOptions());
+      FileBasedWriter<T> writer = writeOperation.createWriter();
       if (windowedWrites) {
         writer.openWindowed(UUID.randomUUID().toString(), window, c.pane(), c.element().getKey(),
             numShards);
@@ -474,7 +475,7 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> {
                 ParDo.of(new WriteShardedBundles(null)));
       }
     }
-    results.setCoder(writeOperation.getFileResultCoder());
+    results.setCoder(FileResultCoder.of());
 
     if (windowedWrites) {
       // When processing streaming windowed writes, results will arrive multiple times. This
@@ -484,7 +485,7 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> {
       // whenever new data arrives.
       PCollection<KV<Void, FileResult>> keyedResults =
           results.apply("AttachSingletonKey", WithKeys.<Void, FileResult>of((Void) null));
-      keyedResults.setCoder(KvCoder.of(VoidCoder.of(), writeOperation.getFileResultCoder()));
+      keyedResults.setCoder(KvCoder.of(VoidCoder.of(), FileResultCoder.of()));
 
       // Is the continuation trigger sufficient?
       keyedResults
@@ -494,7 +495,7 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> {
             public void processElement(ProcessContext c) throws Exception {
               LOG.info("Finalizing write operation {}.", writeOperation);
               List<FileResult> results = Lists.newArrayList(c.element().getValue());
-              writeOperation.finalize(results, c.getPipelineOptions());
+              writeOperation.finalize(results);
               LOG.debug("Done finalizing write operation {}", writeOperation);
             }
           }));
@@ -540,7 +541,7 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> {
                     "Creating {} empty output shards in addition to {} written for a total of {}.",
                     extraShardsNeeded, results.size(), minShardsNeeded);
                 for (int i = 0; i < extraShardsNeeded; ++i) {
-                  FileBasedWriter<T> writer = writeOperation.createWriter(c.getPipelineOptions());
+                  FileBasedWriter<T> writer = writeOperation.createWriter();
                   writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM,
                       UNKNOWN_NUMSHARDS);
                   FileResult emptyWrite = writer.close();
@@ -548,7 +549,7 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> {
                 }
                 LOG.debug("Done creating extra shards.");
               }
-              writeOperation.finalize(results, c.getPipelineOptions());
+              writeOperation.finalize(results);
               LOG.debug("Done finalizing write operation {}", writeOperation);
             }
           }).withSideInputs(sideInputs.build()));

http://git-wip-us.apache.org/repos/asf/beam/blob/17358248/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelUtils.java
index 0d91bbc..33913f8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelUtils.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelUtils.java
@@ -41,7 +41,7 @@ import java.util.Set;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import javax.annotation.Nonnull;
-import org.apache.beam.sdk.io.FileBasedSink;
+import org.apache.beam.sdk.io.DefaultFilenamePolicy;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.util.common.ReflectHelpers;
 
@@ -197,7 +197,7 @@ public class IOChannelUtils {
   public static WritableByteChannel create(String prefix, String shardTemplate,
       String suffix, int numShards, String mimeType) throws IOException {
     if (numShards == 1) {
-      return create(FileBasedSink.constructName(prefix, shardTemplate, suffix, 0, 1),
+      return create(DefaultFilenamePolicy.constructName(prefix, shardTemplate, suffix, 0, 1),
                     mimeType);
     }
 
@@ -209,7 +209,7 @@ public class IOChannelUtils {
     Set<String> outputNames = new HashSet<>();
     for (int i = 0; i < numShards; i++) {
       String outputName =
-          FileBasedSink.constructName(prefix, shardTemplate, suffix, i, numShards);
+          DefaultFilenamePolicy.constructName(prefix, shardTemplate, suffix, i, numShards);
       if (!outputNames.add(outputName)) {
         throw new IllegalArgumentException(
             "Shard name collision detected for: " + outputName);

http://git-wip-us.apache.org/repos/asf/beam/blob/17358248/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NoopPathValidator.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NoopPathValidator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NoopPathValidator.java
index feee6a0..1f3f5a8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NoopPathValidator.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NoopPathValidator.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.sdk.util;
 
+import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.options.PipelineOptions;
 
 /**
@@ -33,14 +34,13 @@ public class NoopPathValidator implements PathValidator {
   }
 
   @Override
-  public String validateInputFilePatternSupported(String filepattern) {
-    return filepattern;
-  }
+  public void validateInputFilePatternSupported(String filepattern) {}
 
   @Override
-  public String validateOutputFilePrefixSupported(String filePrefix) {
-    return filePrefix;
-  }
+  public void validateOutputFilePrefixSupported(String filePrefix) {}
+
+  @Override
+  public void validateOutputResourceSupported(ResourceId resourceId) {}
 
   @Override
   public String verifyPath(String path) {

http://git-wip-us.apache.org/repos/asf/beam/blob/17358248/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NumberedShardedFile.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NumberedShardedFile.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NumberedShardedFile.java
index 786cdcb..e18dd96 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NumberedShardedFile.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NumberedShardedFile.java
@@ -26,7 +26,6 @@ import com.google.api.client.util.BackOffUtils;
 import com.google.api.client.util.Sleeper;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Strings;
-import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.hash.HashCode;
@@ -38,6 +37,7 @@ import java.nio.channels.Channels;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -120,7 +120,7 @@ public class NumberedShardedFile implements ShardedFile {
       try {
         // Match inputPath which may contains glob
         Collection<Metadata> files = Iterables.getOnlyElement(
-            FileSystems.match(ImmutableList.of(filePattern))).metadata();
+            FileSystems.match(Collections.singletonList(filePattern))).metadata();
 
         LOG.debug("Found {} file(s) by matching the path: {}", files.size(), filePattern);
 

http://git-wip-us.apache.org/repos/asf/beam/blob/17358248/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PathValidator.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PathValidator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PathValidator.java
index a7ee16e..e69648b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PathValidator.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PathValidator.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.sdk.util;
 
+import org.apache.beam.sdk.io.fs.ResourceId;
+
 /**
  * Interface for controlling validation of paths.
  */
@@ -25,17 +27,22 @@ public interface PathValidator {
    * Validate that a file pattern is conforming.
    *
    * @param filepattern The file pattern to verify.
-   * @return The post-validation filepattern.
    */
-  String validateInputFilePatternSupported(String filepattern);
+  void validateInputFilePatternSupported(String filepattern);
 
   /**
    * Validate that an output file prefix is conforming.
    *
    * @param filePrefix the file prefix to verify.
-   * @return The post-validation filePrefix.
    */
-  String validateOutputFilePrefixSupported(String filePrefix);
+  void validateOutputFilePrefixSupported(String filePrefix);
+
+  /**
+   * Validates that an output path is conforming.
+   *
+   * @param resourceId the file prefix to verify.
+   */
+  void validateOutputResourceSupported(ResourceId resourceId);
 
   /**
    * Validate that a path is a valid path and that the path

http://git-wip-us.apache.org/repos/asf/beam/blob/17358248/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
index 5991c96..1506aa9 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.sdk.io;
 
+import static com.google.common.base.MoreObjects.firstNonNull;
 import static org.apache.avro.file.DataFileConstants.SNAPPY_CODEC;
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
 import static org.hamcrest.Matchers.containsInAnyOrder;
@@ -34,6 +35,8 @@ import com.google.common.collect.Lists;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -52,9 +55,9 @@ import org.apache.avro.reflect.ReflectDatumReader;
 import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.coders.DefaultCoder;
 import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
+import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
+import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.ValueProvider;
-import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
@@ -278,33 +281,31 @@ public class AvroIOTest {
   }
 
   private static class WindowedFilenamePolicy extends FilenamePolicy {
-    String outputFilePrefix;
+    final String outputFilePrefix;
 
     WindowedFilenamePolicy(String outputFilePrefix) {
       this.outputFilePrefix = outputFilePrefix;
     }
 
     @Override
-    public ValueProvider<String> getBaseOutputFilenameProvider() {
-      return StaticValueProvider.of(outputFilePrefix);
+    public ResourceId windowedFilename(
+        ResourceId outputDirectory, WindowedContext input, String extension) {
+      String filename = String.format(
+          "%s-%s-%s-of-%s-pane-%s%s%s",
+          outputFilePrefix,
+          input.getWindow(),
+          input.getShardNumber(),
+          input.getNumShards() - 1,
+          input.getPaneInfo().getIndex(),
+          input.getPaneInfo().isLast() ? "-final" : "",
+          extension);
+      return outputDirectory.resolve(filename, StandardResolveOptions.RESOLVE_FILE);
     }
 
     @Override
-    public String windowedFilename(WindowedContext input) {
-      String filename = outputFilePrefix + "-" + input.getWindow().toString() +  "-"
-          + input.getShardNumber() + "-of-" + (input.getNumShards() - 1) + "-pane-"
-          + input.getPaneInfo().getIndex();
-      if (input.getPaneInfo().isLast()) {
-        filename += "-final";
-      }
-      return filename;
-    }
-
-    @Override
-    public String unwindowedFilename(Context input) {
-      String filename = outputFilePrefix + input.getShardNumber() + "-of-"
-          + (input.getNumShards() - 1);
-      return filename;
+    public ResourceId unwindowedFilename(
+        ResourceId outputDirectory, Context input, String extension) {
+      throw new UnsupportedOperationException("Expecting windowed outputs only");
     }
 
     @Override
@@ -320,8 +321,8 @@ public class AvroIOTest {
   @Test
   @Category({ValidatesRunner.class, UsesTestStream.class})
   public void testWindowedAvroIOWrite() throws Throwable {
-    File baseOutputFile = new File(tmpFolder.getRoot(), "prefix");
-    final String outputFilePrefix = baseOutputFile.getAbsolutePath();
+    Path baseDir = Files.createTempDirectory(tmpFolder.getRoot().toPath(), "testwrite");
+    String baseFilename = baseDir.resolve("prefix").toString();
 
     Instant base = new Instant(0);
     ArrayList<GenericClass> allElements = new ArrayList<>();
@@ -349,7 +350,6 @@ public class AvroIOTest {
           secondWindowTimestamps.get(random.nextInt(secondWindowTimestamps.size()))));
     }
 
-
     TimestampedValue<GenericClass>[] firstWindowArray =
         firstWindowElements.toArray(new TimestampedValue[100]);
     TimestampedValue<GenericClass>[] secondWindowArray =
@@ -364,11 +364,13 @@ public class AvroIOTest {
         Arrays.copyOfRange(secondWindowArray, 1, secondWindowArray.length))
         .advanceWatermarkToInfinity();
 
+    FilenamePolicy policy = new WindowedFilenamePolicy(baseFilename);
     windowedAvroWritePipeline
         .apply(values)
         .apply(Window.<GenericClass>into(FixedWindows.of(Duration.standardMinutes(1))))
         .apply(AvroIO.write(GenericClass.class)
-            .to(new WindowedFilenamePolicy(outputFilePrefix))
+            .to(baseFilename)
+            .withFilenamePolicy(policy)
             .withWindowedWrites()
             .withNumShards(2));
     windowedAvroWritePipeline.run();
@@ -381,7 +383,7 @@ public class AvroIOTest {
         IntervalWindow intervalWindow = new IntervalWindow(
             windowStart, Duration.standardMinutes(1));
         expectedFiles.add(
-            new File(outputFilePrefix + "-" + intervalWindow.toString() + "-" + shard
+            new File(baseFilename + "-" + intervalWindow.toString() + "-" + shard
                 + "-of-1" + "-pane-0-final"));
       }
     }
@@ -442,7 +444,7 @@ public class AvroIOTest {
   @Test
   @SuppressWarnings("unchecked")
   @Category(NeedsRunner.class)
-  public void testMetdata() throws Exception {
+  public void testMetadata() throws Exception {
     List<GenericClass> values = ImmutableList.of(new GenericClass(3, "hi"),
         new GenericClass(5, "bar"));
     File outputFile = tmpFolder.newFile("output.avro");
@@ -481,7 +483,8 @@ public class AvroIOTest {
     p.apply(Create.of(ImmutableList.copyOf(expectedElements))).apply(write);
     p.run();
 
-    String shardNameTemplate = write.getShardTemplate();
+    String shardNameTemplate =
+        firstNonNull(write.getShardTemplate(), DefaultFilenamePolicy.DEFAULT_SHARD_TEMPLATE);
 
     assertTestOutputs(expectedElements, numShards, outputFilePrefix, shardNameTemplate);
   }
@@ -494,7 +497,7 @@ public class AvroIOTest {
     for (int i = 0; i < numShards; i++) {
       expectedFiles.add(
           new File(
-              FileBasedSink.constructName(
+              DefaultFilenamePolicy.constructName(
                   outputFilePrefix, shardNameTemplate, "" /* no suffix */, i, numShards)));
     }
 
@@ -530,10 +533,10 @@ public class AvroIOTest {
 
   @Test
   public void testReadDisplayData() {
-    AvroIO.Read<String> read = AvroIO.read(String.class).from("foo.*");
+    AvroIO.Read<String> read = AvroIO.read(String.class).from("/foo.*");
 
     DisplayData displayData = DisplayData.from(read);
-    assertThat(displayData, hasDisplayItem("filePattern", "foo.*"));
+    assertThat(displayData, hasDisplayItem("filePattern", "/foo.*"));
   }
 
   @Test
@@ -542,7 +545,7 @@ public class AvroIOTest {
     DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
 
     AvroIO.Read<GenericRecord> read =
-        AvroIO.readGenericRecords(Schema.create(Schema.Type.STRING)).from("foo.*");
+        AvroIO.readGenericRecords(Schema.create(Schema.Type.STRING)).from("/foo.*");
 
     Set<DisplayData> displayData = evaluator.displayDataForPrimitiveSourceTransforms(read);
     assertThat("AvroIO.Read should include the file pattern in its primitive transform",

http://git-wip-us.apache.org/repos/asf/beam/blob/17358248/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DefaultFilenamePolicyTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DefaultFilenamePolicyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DefaultFilenamePolicyTest.java
new file mode 100644
index 0000000..c895da8
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DefaultFilenamePolicyTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io;
+
+import static org.apache.beam.sdk.io.DefaultFilenamePolicy.constructName;
+import static org.junit.Assert.assertEquals;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests of {@link DefaultFilenamePolicy}.
+ */
+@RunWith(JUnit4.class)
+public class DefaultFilenamePolicyTest {
+  @Test
+  public void testConstructName() {
+    assertEquals("output-001-of-123.txt",
+        constructName("output", "-SSS-of-NNN", ".txt", 1, 123));
+
+    assertEquals("out.txt/part-00042",
+        constructName("out.txt", "/part-SSSSS", "", 42, 100));
+
+    assertEquals("out.txt",
+        constructName("ou", "t.t", "xt", 1, 1));
+
+    assertEquals("out0102shard.txt",
+        constructName("out", "SSNNshard", ".txt", 1, 2));
+
+    assertEquals("out-2/1.part-1-of-2.txt",
+        constructName("out", "-N/S.part-S-of-N", ".txt", 1, 2));
+  }
+
+  @Test
+  public void testConstructNameWithLargeShardCount() {
+    assertEquals("out-100-of-5000.txt",
+        constructName("out", "-SS-of-NN", ".txt", 100, 5000));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/17358248/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
index 7efe47c..d9bcef4 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
@@ -17,9 +17,10 @@
  */
 package org.apache.beam.sdk.io;
 
-import static org.apache.beam.sdk.io.FileBasedSink.constructName;
+import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -37,7 +38,6 @@ import java.nio.ByteBuffer;
 import java.nio.channels.Channels;
 import java.nio.channels.WritableByteChannel;
 import java.nio.charset.StandardCharsets;
-import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -52,9 +52,8 @@ import org.apache.beam.sdk.io.FileBasedSink.FileResult;
 import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
 import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy.Context;
 import org.apache.beam.sdk.io.FileBasedSink.WritableByteChannelFactory;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.util.IOChannelUtils;
+import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
+import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
 import org.apache.commons.compress.compressors.deflate.DeflateCompressorInputStream;
 import org.junit.Rule;
@@ -64,50 +63,28 @@ import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
 /**
- * Tests for FileBasedSink.
+ * Tests for {@link FileBasedSink}.
  */
 @RunWith(JUnit4.class)
 public class FileBasedSinkTest {
   @Rule
   public TemporaryFolder tmpFolder = new TemporaryFolder();
 
-  private String baseOutputFilename = "output";
-  private String tempDirectory = "temp";
+  private final String tempDirectoryName = "temp";
 
-  private String appendToTempFolder(String filename) {
-    return Paths.get(tmpFolder.getRoot().getPath(), filename).toString();
+  private ResourceId getTemporaryFolder() {
+    return LocalResources.fromFile(tmpFolder.getRoot(), /* isDirectory */ true);
   }
 
-  private String getBaseOutputFilename() {
-    return appendToTempFolder(baseOutputFilename);
+  private ResourceId getBaseOutputDirectory() {
+    String baseOutputDirname = "output";
+    return getTemporaryFolder()
+        .resolve(baseOutputDirname, StandardResolveOptions.RESOLVE_DIRECTORY);
   }
 
-  private String getBaseTempDirectory() {
-    return appendToTempFolder(tempDirectory);
-  }
-
-  @Test
-  public void testConstructName() {
-    assertEquals("output-001-of-123.txt",
-        constructName("output", "-SSS-of-NNN", ".txt", 1, 123));
-
-    assertEquals("out.txt/part-00042",
-        constructName("out.txt", "/part-SSSSS", "", 42, 100));
-
-    assertEquals("out.txt",
-        constructName("ou", "t.t", "xt", 1, 1));
-
-    assertEquals("out0102shard.txt",
-        constructName("out", "SSNNshard", ".txt", 1, 2));
-
-    assertEquals("out-2/1.part-1-of-2.txt",
-        constructName("out", "-N/S.part-S-of-N", ".txt", 1, 2));
-  }
-
-  @Test
-  public void testConstructNameWithLargeShardCount() {
-    assertEquals("out-100-of-5000.txt",
-        constructName("out", "-SS-of-NN", ".txt", 100, 5000));
+  private ResourceId getBaseTempDirectory() {
+    return getTemporaryFolder()
+        .resolve(tempDirectoryName, StandardResolveOptions.RESOLVE_DIRECTORY);
   }
 
   /**
@@ -117,30 +94,31 @@ public class FileBasedSinkTest {
   @Test
   public void testWriter() throws Exception {
     String testUid = "testId";
-    String expectedFilename = IOChannelUtils.resolve(getBaseTempDirectory(), testUid);
-    SimpleSink.SimpleWriter writer = buildWriter();
-
+    ResourceId expectedFile = getBaseTempDirectory()
+        .resolve(testUid, StandardResolveOptions.RESOLVE_FILE);
     List<String> values = Arrays.asList("sympathetic vulture", "boresome hummingbird");
     List<String> expected = new ArrayList<>();
     expected.add(SimpleSink.SimpleWriter.HEADER);
     expected.addAll(values);
     expected.add(SimpleSink.SimpleWriter.FOOTER);
 
+    SimpleSink.SimpleWriter writer =
+        buildWriteOperationWithTempDir(getBaseTempDirectory()).createWriter();
     writer.openUnwindowed(testUid, -1, -1);
     for (String value : values) {
       writer.write(value);
     }
     FileResult result = writer.close();
 
-    assertEquals(expectedFilename, result.getFilename());
-    assertFileContains(expected, expectedFilename);
+    assertEquals(expectedFile, result.getFilename());
+    assertFileContains(expected, expectedFile);
   }
 
   /**
    * Assert that a file contains the lines provided, in the same order as expected.
    */
-  private void assertFileContains(List<String> expected, String filename) throws Exception {
-    try (BufferedReader reader = new BufferedReader(new FileReader(filename))) {
+  private void assertFileContains(List<String> expected, ResourceId file) throws Exception {
+    try (BufferedReader reader = new BufferedReader(new FileReader(file.toString()))) {
       List<String> actual = new ArrayList<>();
       for (;;) {
         String line = reader.readLine();
@@ -149,7 +127,7 @@ public class FileBasedSinkTest {
         }
         actual.add(line);
       }
-      assertEquals(expected, actual);
+      assertEquals("contents for " + file, expected, actual);
     }
   }
 
@@ -165,19 +143,11 @@ public class FileBasedSinkTest {
   }
 
   /**
-   * Removes temporary files when temporary and output filenames differ.
+   * Removes temporary files when temporary and output directories differ.
    */
   @Test
   public void testRemoveWithTempFilename() throws Exception {
-    testRemoveTemporaryFiles(3, tempDirectory);
-  }
-
-  /**
-   * Removes only temporary files, even if temporary and output files share the same base filename.
-   */
-  @Test
-  public void testRemoveWithSameFilename() throws Exception {
-    testRemoveTemporaryFiles(3, baseOutputFilename);
+    testRemoveTemporaryFiles(3, getBaseTempDirectory());
   }
 
   /**
@@ -205,13 +175,13 @@ public class FileBasedSinkTest {
    */
   @Test
   public void testFinalizeWithIntermediateState() throws Exception {
-    List<File> files = generateTemporaryFilesForFinalize(3);
     SimpleSink.SimpleWriteOperation writeOp = buildWriteOperation();
+    List<File> files = generateTemporaryFilesForFinalize(3);
     runFinalize(writeOp, files);
 
-    // create a temporary file
-    tmpFolder.newFolder(tempDirectory);
-    tmpFolder.newFile(tempDirectory + "/1");
+    // create a temporary file and then rerun finalize
+    tmpFolder.newFolder(tempDirectoryName);
+    tmpFolder.newFile(tempDirectoryName + "/1");
 
     runFinalize(writeOp, files);
   }
@@ -222,9 +192,9 @@ public class FileBasedSinkTest {
   private List<File> generateTemporaryFilesForFinalize(int numFiles) throws Exception {
     List<File> temporaryFiles = new ArrayList<>();
     for (int i = 0; i < numFiles; i++) {
-      String temporaryFilename =
-          FileBasedWriteOperation.buildTemporaryFilename(tempDirectory, "" + i);
-      File tmpFile = new File(tmpFolder.getRoot(), temporaryFilename);
+      ResourceId temporaryFile =
+          FileBasedWriteOperation.buildTemporaryFilename(getBaseTempDirectory(), "" + i);
+      File tmpFile = new File(tmpFolder.getRoot(), temporaryFile.toString());
       tmpFile.getParentFile().mkdirs();
       assertTrue(tmpFile.createNewFile());
       temporaryFiles.add(tmpFile);
@@ -238,26 +208,26 @@ public class FileBasedSinkTest {
    */
   private void runFinalize(SimpleSink.SimpleWriteOperation writeOp, List<File> temporaryFiles)
       throws Exception {
-    PipelineOptions options = PipelineOptionsFactory.create();
-
     int numFiles = temporaryFiles.size();
 
     List<FileResult> fileResults = new ArrayList<>();
     // Create temporary output bundles and output File objects.
-    for (int i = 0; i < numFiles; i++) {
-      fileResults.add(new FileResult(temporaryFiles.get(i).toString(), null));
+    for (File f : temporaryFiles) {
+      ResourceId file = LocalResources.fromFile(f, false);
+      fileResults.add(new FileResult(file, null));
     }
 
-    writeOp.finalize(fileResults, options);
+    writeOp.finalize(fileResults);
 
+    ResourceId outputDirectory = writeOp.getSink().getBaseOutputDirectoryProvider().get();
     for (int i = 0; i < numFiles; i++) {
-      String outputFilename = writeOp.getSink().getFileNamePolicy().unwindowedFilename(
-          new Context(i, numFiles));
-      assertTrue(new File(outputFilename).exists());
+      ResourceId outputFilename = writeOp.getSink().getFilenamePolicy()
+          .unwindowedFilename(outputDirectory, new Context(i, numFiles), "");
+      assertTrue(new File(outputFilename.toString()).exists());
       assertFalse(temporaryFiles.get(i).exists());
     }
 
-    assertFalse(new File(writeOp.tempDirectory.get()).exists());
+    assertFalse(new File(writeOp.tempDirectory.get().toString()).exists());
     // Test that repeated requests of the temp directory return a stable result.
     assertEquals(writeOp.tempDirectory.get(), writeOp.tempDirectory.get());
   }
@@ -266,28 +236,43 @@ public class FileBasedSinkTest {
    * Create n temporary and output files and verify that removeTemporaryFiles only
    * removes temporary files.
    */
-  private void testRemoveTemporaryFiles(int numFiles, String baseTemporaryFilename)
+  private void testRemoveTemporaryFiles(int numFiles, ResourceId tempDirectory)
       throws Exception {
-    PipelineOptions options = PipelineOptionsFactory.create();
-    SimpleSink.SimpleWriteOperation writeOp = buildWriteOperation(baseTemporaryFilename);
+    String prefix = "file";
+    SimpleSink sink =
+        new SimpleSink(getBaseOutputDirectory(), prefix, "", "");
+
+    FileBasedWriteOperation<String> writeOp =
+        new SimpleSink.SimpleWriteOperation(sink, tempDirectory);
 
     List<File> temporaryFiles = new ArrayList<>();
     List<File> outputFiles = new ArrayList<>();
     for (int i = 0; i < numFiles; i++) {
-      File tmpFile = new File(tmpFolder.getRoot(),
-          FileBasedWriteOperation.buildTemporaryFilename(baseTemporaryFilename, "" + i));
+      ResourceId tempResource =
+          FileBasedWriteOperation.buildTemporaryFilename(tempDirectory, prefix + i);
+      File tmpFile = new File(tempResource.toString());
       tmpFile.getParentFile().mkdirs();
-      assertTrue(tmpFile.createNewFile());
+      assertTrue("not able to create new temp file", tmpFile.createNewFile());
       temporaryFiles.add(tmpFile);
-      File outputFile = tmpFolder.newFile(baseOutputFilename + i);
+      ResourceId outputFileId =
+          getBaseOutputDirectory().resolve(prefix + i, StandardResolveOptions.RESOLVE_FILE);
+      File outputFile = new File(outputFileId.toString());
+      outputFile.getParentFile().mkdirs();
+      assertTrue("not able to create new output file", outputFile.createNewFile());
       outputFiles.add(outputFile);
     }
 
-    writeOp.removeTemporaryFiles(Collections.<String>emptySet(), true, options);
+    writeOp.removeTemporaryFiles(Collections.<ResourceId>emptySet(), true);
 
     for (int i = 0; i < numFiles; i++) {
-      assertFalse(temporaryFiles.get(i).exists());
-      assertTrue(outputFiles.get(i).exists());
+      File temporaryFile = temporaryFiles.get(i);
+      assertThat(
+          String.format("temp file %s exists", temporaryFile),
+          temporaryFile.exists(), is(false));
+      File outputFile = outputFiles.get(i);
+      assertThat(
+          String.format("output file %s exists", outputFile),
+          outputFile.exists(), is(true));
     }
   }
 
@@ -296,111 +281,79 @@ public class FileBasedSinkTest {
    */
   @Test
   public void testCopyToOutputFiles() throws Exception {
-    PipelineOptions options = PipelineOptionsFactory.create();
     SimpleSink.SimpleWriteOperation writeOp = buildWriteOperation();
+    ResourceId outputDirectory = writeOp.getSink().getBaseOutputDirectoryProvider().get();
 
     List<String> inputFilenames = Arrays.asList("input-1", "input-2", "input-3");
     List<String> inputContents = Arrays.asList("1", "2", "3");
     List<String> expectedOutputFilenames = Arrays.asList(
-        "output-00000-of-00003.test", "output-00001-of-00003.test", "output-00002-of-00003.test");
+        "file-00-of-03.test", "file-01-of-03.test", "file-02-of-03.test");
 
-    Map<String, String> inputFilePaths = new HashMap<>();
-    List<String> expectedOutputPaths = new ArrayList<>();
+    Map<ResourceId, ResourceId> inputFilePaths = new HashMap<>();
+    List<ResourceId> expectedOutputPaths = new ArrayList<>();
 
     for (int i = 0; i < inputFilenames.size(); i++) {
       // Generate output paths.
-      File outputFile = tmpFolder.newFile(expectedOutputFilenames.get(i));
-      expectedOutputPaths.add(outputFile.toString());
+      expectedOutputPaths.add(
+          getBaseOutputDirectory()
+              .resolve(expectedOutputFilenames.get(i), StandardResolveOptions.RESOLVE_FILE));
 
       // Generate and write to input paths.
       File inputTmpFile = tmpFolder.newFile(inputFilenames.get(i));
-      List<String> lines = Arrays.asList(inputContents.get(i));
+      List<String> lines = Collections.singletonList(inputContents.get(i));
       writeFile(lines, inputTmpFile);
-      inputFilePaths.put(inputTmpFile.toString(),
-          writeOp.getSink().getFileNamePolicy().unwindowedFilename(
-              new Context(i, inputFilenames.size())));
+      inputFilePaths.put(LocalResources.fromFile(inputTmpFile, false),
+          writeOp.getSink().getFilenamePolicy()
+              .unwindowedFilename(outputDirectory, new Context(i, inputFilenames.size()), ""));
     }
 
     // Copy input files to output files.
-    writeOp.copyToOutputFiles(inputFilePaths, options);
+    writeOp.copyToOutputFiles(inputFilePaths);
 
     // Assert that the contents were copied.
     for (int i = 0; i < expectedOutputPaths.size(); i++) {
-      assertFileContains(Arrays.asList(inputContents.get(i)), expectedOutputPaths.get(i));
+      assertFileContains(
+          Collections.singletonList(inputContents.get(i)), expectedOutputPaths.get(i));
     }
   }
 
-  public List<String> generateDestinationFilenames(FilenamePolicy policy, int numFiles) {
-    List<String> filenames = new ArrayList<>();
+  public List<ResourceId> generateDestinationFilenames(
+      ResourceId outputDirectory, FilenamePolicy policy, int numFiles) {
+    List<ResourceId> filenames = new ArrayList<>();
     for (int i = 0; i < numFiles; i++) {
-      filenames.add(policy.unwindowedFilename(new Context(i, numFiles)));
+      filenames.add(policy.unwindowedFilename(outputDirectory, new Context(i, numFiles), ""));
     }
     return filenames;
   }
 
   /**
-   * Output filenames use the supplied naming template.
-   */
-  @Test
-  public void testGenerateOutputFilenamesWithTemplate() {
-    List<String> expected;
-    List<String> actual;
-    SimpleSink sink = new SimpleSink(getBaseOutputFilename(), "test", ".SS.of.NN");
-    FilenamePolicy policy = sink.getFileNamePolicy();
-
-    expected = Arrays.asList(appendToTempFolder("output.00.of.03.test"),
-        appendToTempFolder("output.01.of.03.test"), appendToTempFolder("output.02.of.03.test"));
-    actual = generateDestinationFilenames(policy, 3);
-    assertEquals(expected, actual);
-
-    expected = Arrays.asList(appendToTempFolder("output.00.of.01.test"));
-    actual = generateDestinationFilenames(policy, 1);
-    assertEquals(expected, actual);
-
-    expected = new ArrayList<>();
-    actual = generateDestinationFilenames(policy, 0);
-    assertEquals(expected, actual);
-
-    // Also validate that we handle the case where the user specified "." that we do
-    // not prefix an additional "." making "..test"
-    sink = new SimpleSink(getBaseOutputFilename(), ".test", ".SS.of.NN");
-    expected = Arrays.asList(appendToTempFolder("output.00.of.03.test"),
-        appendToTempFolder("output.01.of.03.test"), appendToTempFolder("output.02.of.03.test"));
-    actual = generateDestinationFilenames(policy, 3);
-    assertEquals(expected, actual);
-
-    expected = Arrays.asList(appendToTempFolder("output.00.of.01.test"));
-    actual = generateDestinationFilenames(policy, 1);
-    assertEquals(expected, actual);
-
-    expected = new ArrayList<>();
-    actual = generateDestinationFilenames(policy, 0);
-    assertEquals(expected, actual);
-  }
-
-  /**
    * Output filenames are generated correctly when an extension is supplied.
    */
   @Test
-  public void testGenerateOutputFilenamesWithExtension() {
-    List<String> expected;
-    List<String> actual;
-    SimpleSink.SimpleWriteOperation writeOp = buildWriteOperation();
-    FilenamePolicy policy = writeOp.getSink().getFileNamePolicy();
+  public void testGenerateOutputFilenames() {
+    List<ResourceId> expected;
+    List<ResourceId> actual;
+    ResourceId root = getBaseOutputDirectory();
+
+    SimpleSink sink = new SimpleSink(root, "file", ".SSSSS.of.NNNNN", ".test");
+    FilenamePolicy policy = sink.getFilenamePolicy();
 
     expected = Arrays.asList(
-        appendToTempFolder("output-00000-of-00003.test"),
-        appendToTempFolder("output-00001-of-00003.test"),
-        appendToTempFolder("output-00002-of-00003.test"));
-    actual = generateDestinationFilenames(policy, 3);
+        root.resolve("file.00000.of.00003.test", StandardResolveOptions.RESOLVE_FILE),
+        root.resolve("file.00001.of.00003.test", StandardResolveOptions.RESOLVE_FILE),
+        root.resolve("file.00002.of.00003.test", StandardResolveOptions.RESOLVE_FILE)
+    );
+    actual = generateDestinationFilenames(root, policy, 3);
     assertEquals(expected, actual);
 
-    expected = Arrays.asList(appendToTempFolder("output-00000-of-00001.test"));
-    actual = generateDestinationFilenames(policy, 1);
+    expected = Collections.singletonList(
+        root.resolve("file.00000.of.00001.test", StandardResolveOptions.RESOLVE_FILE)
+    );
+    actual = generateDestinationFilenames(root, policy, 1);
     assertEquals(expected, actual);
 
     expected = new ArrayList<>();
-    actual = generateDestinationFilenames(policy, 0);
+    actual = generateDestinationFilenames(root, policy, 0);
     assertEquals(expected, actual);
   }
 
@@ -408,16 +361,21 @@ public class FileBasedSinkTest {
    * Reject non-distinct output filenames.
    */
   @Test
-  public void testCollidingOutputFilenames() {
-    SimpleSink sink = new SimpleSink("output", "test", "-NN");
+  public void testCollidingOutputFilenames() throws IOException {
+    ResourceId root = getBaseOutputDirectory();
+    SimpleSink sink = new SimpleSink(root, "file", "-NN", "test");
     SimpleSink.SimpleWriteOperation writeOp = new SimpleSink.SimpleWriteOperation(sink);
 
+    ResourceId temp1 = root.resolve("temp1", StandardResolveOptions.RESOLVE_FILE);
+    ResourceId temp2 = root.resolve("temp2", StandardResolveOptions.RESOLVE_FILE);
+    ResourceId temp3 = root.resolve("temp3", StandardResolveOptions.RESOLVE_FILE);
+    ResourceId output = root.resolve("file-03.test", StandardResolveOptions.RESOLVE_FILE);
     // More than one shard does.
     try {
       Iterable<FileResult> results = Lists.newArrayList(
-          new FileResult("temp1", "file1"),
-          new FileResult("temp2", "file1"),
-          new FileResult("temp3", "file1"));
+          new FileResult(temp1, output),
+          new FileResult(temp2, output),
+          new FileResult(temp3, output));
 
       writeOp.buildOutputFilenames(results);
       fail("Should have failed.");
@@ -432,22 +390,28 @@ public class FileBasedSinkTest {
    */
   @Test
   public void testGenerateOutputFilenamesWithoutExtension() {
-    List<String> expected;
-    List<String> actual;
-    SimpleSink sink = new SimpleSink(appendToTempFolder(baseOutputFilename), "");
-    FilenamePolicy policy = sink.getFileNamePolicy();
-
-    expected = Arrays.asList(appendToTempFolder("output-00000-of-00003"),
-        appendToTempFolder("output-00001-of-00003"), appendToTempFolder("output-00002-of-00003"));
-    actual = generateDestinationFilenames(policy, 3);
+    List<ResourceId> expected;
+    List<ResourceId> actual;
+    ResourceId root = getBaseOutputDirectory();
+    SimpleSink sink = new SimpleSink(root, "file", "-SSSSS-of-NNNNN", "");
+    FilenamePolicy policy = sink.getFilenamePolicy();
+
+    expected = Arrays.asList(
+        root.resolve("file-00000-of-00003", StandardResolveOptions.RESOLVE_FILE),
+        root.resolve("file-00001-of-00003", StandardResolveOptions.RESOLVE_FILE),
+        root.resolve("file-00002-of-00003", StandardResolveOptions.RESOLVE_FILE)
+    );
+    actual = generateDestinationFilenames(root, policy, 3);
     assertEquals(expected, actual);
 
-    expected = Arrays.asList(appendToTempFolder("output-00000-of-00001"));
-    actual = generateDestinationFilenames(policy, 1);
+    expected = Collections.singletonList(
+        root.resolve("file-00000-of-00001", StandardResolveOptions.RESOLVE_FILE)
+    );
+    actual = generateDestinationFilenames(root, policy, 1);
     assertEquals(expected, actual);
 
     expected = new ArrayList<>();
-    actual = generateDestinationFilenames(policy, 0);
+    actual = generateDestinationFilenames(root, policy, 0);
     assertEquals(expected, actual);
   }
 
@@ -511,7 +475,7 @@ public class FileBasedSinkTest {
 
   private File writeValuesWithWritableByteChannelFactory(final WritableByteChannelFactory factory,
       String... values)
-      throws IOException, FileNotFoundException {
+      throws IOException {
     final File file = tmpFolder.newFile("test.gz");
     final WritableByteChannel channel =
         factory.create(Channels.newChannel(new FileOutputStream(file)));
@@ -529,12 +493,13 @@ public class FileBasedSinkTest {
   @Test
   public void testFileBasedWriterWithWritableByteChannelFactory() throws Exception {
     final String testUid = "testId";
-    SimpleSink.SimpleWriteOperation writeOp =
-        new SimpleSink(getBaseOutputFilename(), "txt", new DrunkWritableByteChannelFactory())
+    ResourceId root = getBaseOutputDirectory();
+    FileBasedWriteOperation<String> writeOp =
+        new SimpleSink(root, "file", "-SS-of-NN", "txt", new DrunkWritableByteChannelFactory())
             .createWriteOperation();
-    final FileBasedWriter<String> writer =
-        writeOp.createWriter(null);
-    final String expectedFilename = IOChannelUtils.resolve(writeOp.tempDirectory.get(), testUid);
+    final FileBasedWriter<String> writer = writeOp.createWriter();
+    final ResourceId expectedFile =
+        writeOp.tempDirectory.get().resolve(testUid, StandardResolveOptions.RESOLVE_FILE);
 
     final List<String> expected = new ArrayList<>();
     expected.add("header");
@@ -551,38 +516,29 @@ public class FileBasedSinkTest {
     writer.write("b");
     final FileResult result = writer.close();
 
-    assertEquals(expectedFilename, result.getFilename());
-    assertFileContains(expected, expectedFilename);
+    assertEquals(expectedFile, result.getFilename());
+    assertFileContains(expected, expectedFile);
   }
 
   /**
    * Build a SimpleSink with default options.
    */
   private SimpleSink buildSink() {
-    return new SimpleSink(getBaseOutputFilename(), "test");
+    return new SimpleSink(getBaseOutputDirectory(), "file", "-SS-of-NN", ".test");
   }
 
   /**
-   * Build a SimpleWriteOperation with default options and the given base temporary filename.
+   * Build a SimpleWriteOperation with default options and the given temporary directory.
    */
-  private SimpleSink.SimpleWriteOperation buildWriteOperation(String baseTemporaryFilename) {
+  private SimpleSink.SimpleWriteOperation buildWriteOperationWithTempDir(ResourceId tempDirectory) {
     SimpleSink sink = buildSink();
-    return new SimpleSink.SimpleWriteOperation(sink, appendToTempFolder(baseTemporaryFilename));
+    return new SimpleSink.SimpleWriteOperation(sink, tempDirectory);
   }
 
   /**
    * Build a write operation with the default options for it and its parent sink.
    */
   private SimpleSink.SimpleWriteOperation buildWriteOperation() {
-    SimpleSink sink = buildSink();
-    return new SimpleSink.SimpleWriteOperation(sink, getBaseTempDirectory());
-  }
-
-  /**
-   * Build a writer with the default options for its parent write operation and sink.
-   */
-  private SimpleSink.SimpleWriter buildWriter() {
-    SimpleSink.SimpleWriteOperation writeOp = buildWriteOperation();
-    return new SimpleSink.SimpleWriter(writeOp);
+    return buildSink().createWriteOperation();
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/17358248/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java
index f83642a..9265520 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java
@@ -19,24 +19,25 @@ package org.apache.beam.sdk.io;
 
 import java.nio.ByteBuffer;
 import java.nio.channels.WritableByteChannel;
-import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
 import org.apache.beam.sdk.util.MimeTypes;
 
 /**
- * A simple FileBasedSink that writes String values as lines with header and footer lines.
+ * A simple {@link FileBasedSink} that writes {@link String} values as lines with
+ * header and footer.
  */
 class SimpleSink extends FileBasedSink<String> {
-  public SimpleSink(String baseOutputFilename, String extension) {
-    super(baseOutputFilename, extension);
+  public SimpleSink(ResourceId baseOutputDirectory, String prefix, String template, String suffix) {
+    this(baseOutputDirectory, prefix, template, suffix, CompressionType.UNCOMPRESSED);
   }
 
-  public SimpleSink(String baseOutputFilename, String extension,
+  public SimpleSink(ResourceId baseOutputDirectory, String prefix, String template, String suffix,
                     WritableByteChannelFactory writableByteChannelFactory) {
-    super(baseOutputFilename, extension, writableByteChannelFactory);
-  }
-
-  public SimpleSink(String baseOutputFilename, String extension, String fileNamingTemplate) {
-    super(baseOutputFilename, extension, fileNamingTemplate);
+    super(
+        StaticValueProvider.of(baseOutputDirectory),
+        new DefaultFilenamePolicy(StaticValueProvider.of(prefix), template, suffix),
+        writableByteChannelFactory);
   }
 
   @Override
@@ -45,8 +46,8 @@ class SimpleSink extends FileBasedSink<String> {
   }
 
   static final class SimpleWriteOperation extends FileBasedWriteOperation<String> {
-    public SimpleWriteOperation(SimpleSink sink, String tempOutputFilename) {
-      super(sink, tempOutputFilename);
+    public SimpleWriteOperation(SimpleSink sink, ResourceId tempOutputDirectory) {
+      super(sink, tempOutputDirectory);
     }
 
     public SimpleWriteOperation(SimpleSink sink) {
@@ -54,7 +55,7 @@ class SimpleSink extends FileBasedSink<String> {
     }
 
     @Override
-    public SimpleWriter createWriter(PipelineOptions options) throws Exception {
+    public SimpleWriter createWriter() throws Exception {
       return new SimpleWriter(this);
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/17358248/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
index 66b605f..685da82 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.sdk.io;
 
+import static com.google.common.base.MoreObjects.firstNonNull;
 import static org.apache.beam.sdk.TestUtils.LINES2_ARRAY;
 import static org.apache.beam.sdk.TestUtils.LINES_ARRAY;
 import static org.apache.beam.sdk.TestUtils.NO_LINES_ARRAY;
@@ -28,7 +29,6 @@ import static org.apache.beam.sdk.io.TextIO.CompressionType.UNCOMPRESSED;
 import static org.apache.beam.sdk.io.TextIO.CompressionType.ZIP;
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasValue;
-import static org.apache.beam.sdk.util.IOChannelUtils.resolve;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThan;
@@ -62,6 +62,7 @@ import java.nio.file.SimpleFileVisitor;
 import java.nio.file.attribute.BasicFileAttributes;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Set;
 import java.util.zip.GZIPOutputStream;
@@ -73,6 +74,8 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
 import org.apache.beam.sdk.io.FileBasedSink.WritableByteChannelFactory;
 import org.apache.beam.sdk.io.TextIO.CompressionType;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.options.ValueProvider;
@@ -80,19 +83,16 @@ import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.SourceTestUtils;
 import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.testing.TestPipelineOptions;
 import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
 import org.apache.beam.sdk.util.CoderUtils;
-import org.apache.beam.sdk.util.IOChannelUtils;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream;
 import org.apache.commons.compress.compressors.deflate.DeflateCompressorOutputStream;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
-import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -101,7 +101,7 @@ import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
 /**
- * Tests for TextIO Read and Write transforms.
+ * Tests for {@link TextIO} {@link TextIO.Read} and {@link TextIO.Write} transforms.
  */
 // TODO: Change the tests to use ValidatesRunner instead of NeedsRunner
 @RunWith(JUnit4.class)
@@ -168,7 +168,6 @@ public class TextIOTest {
 
   @BeforeClass
   public static void setupClass() throws IOException {
-    IOChannelUtils.registerIOFactoriesAllowOverride(TestPipeline.testingPipelineOptions());
     tempFolder = Files.createTempDirectory("TextIOTest");
     // empty files
     emptyTxt = writeToFile(EMPTY, "empty.txt", CompressionType.UNCOMPRESSED);
@@ -314,7 +313,7 @@ public class TextIOTest {
     p.run();
 
     assertOutputFiles(elems, header, footer, numShards, baseDir, outputName,
-        write.getShardTemplate());
+        firstNonNull(write.getShardTemplate(), DefaultFilenamePolicy.DEFAULT_SHARD_TEMPLATE));
   }
 
   public static void assertOutputFiles(
@@ -328,17 +327,18 @@ public class TextIOTest {
       throws Exception {
     List<File> expectedFiles = new ArrayList<>();
     if (numShards == 0) {
-      String pattern =
-          resolve(rootLocation.toAbsolutePath().toString(), outputName + "*");
-      for (String expected : IOChannelUtils.getFactory(pattern).match(pattern)) {
-        expectedFiles.add(new File(expected));
+      String pattern = rootLocation.toAbsolutePath().resolve(outputName + "*").toString();
+      List<MatchResult> matches = FileSystems.match(Collections.singletonList(pattern));
+      for (Metadata expectedFile : Iterables.getOnlyElement(matches).metadata()) {
+        expectedFiles.add(new File(expectedFile.resourceId().toString()));
       }
     } else {
       for (int i = 0; i < numShards; i++) {
         expectedFiles.add(
             new File(
                 rootLocation.toString(),
-                FileBasedSink.constructName(outputName, shardNameTemplate, "", i, numShards)));
+                DefaultFilenamePolicy.constructName(
+                    outputName, shardNameTemplate, "", i, numShards)));
       }
     }
 
@@ -483,7 +483,7 @@ public class TextIOTest {
   @Test
   public void testWriteDisplayData() {
     TextIO.Write write = TextIO.write()
-        .to("foo")
+        .to("/foo")
         .withSuffix("bar")
         .withShardNameTemplate("-SS-of-NN-")
         .withNumShards(100)
@@ -492,7 +492,7 @@ public class TextIOTest {
 
     DisplayData displayData = DisplayData.from(write);
 
-    assertThat(displayData, hasDisplayItem("filePrefix", "foo"));
+    assertThat(displayData, hasDisplayItem("filePrefix", "/foo"));
     assertThat(displayData, hasDisplayItem("fileSuffix", "bar"));
     assertThat(displayData, hasDisplayItem("fileHeader", "myHeader"));
     assertThat(displayData, hasDisplayItem("fileFooter", "myFooter"));
@@ -523,23 +523,6 @@ public class TextIOTest {
     assertThat(displayData, hasDisplayItem("fileFooter", "myFooter"));
   }
 
-  @Test
-  @Category(ValidatesRunner.class)
-  @Ignore("[BEAM-436] DirectRunner ValidatesRunner tempLocation configuration insufficient")
-  public void testPrimitiveWriteDisplayData() throws IOException {
-    PipelineOptions options = DisplayDataEvaluator.getDefaultOptions();
-    String tempRoot = options.as(TestPipelineOptions.class).getTempRoot();
-    String outputPath = IOChannelUtils.getFactory(tempRoot).resolve(tempRoot, "foobar");
-
-    DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
-
-    TextIO.Write write = TextIO.write().to(outputPath);
-
-    Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write);
-    assertThat("TextIO.Write should include the file prefix in its primitive display data",
-        displayData, hasItem(hasDisplayItem(hasValue(startsWith(outputPath)))));
-  }
-
   /** Options for testing. */
   public interface RuntimeTestOptions extends PipelineOptions {
     ValueProvider<String> getInput();