You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2017/07/20 17:09:45 UTC

[18/28] beam git commit: Revert "[BEAM-2610] This closes #3553"

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index 583af60..8102316 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -33,7 +33,6 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.Serializable;
-import java.lang.reflect.TypeVariable;
 import java.nio.channels.Channels;
 import java.nio.channels.WritableByteChannel;
 import java.util.ArrayList;
@@ -50,10 +49,8 @@ import java.util.zip.GZIPOutputStream;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
-import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.coders.NullableCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.StructuredCoder;
@@ -76,7 +73,6 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo.PaneInfoCoder;
 import org.apache.beam.sdk.util.MimeTypes;
-import org.apache.beam.sdk.values.TypeDescriptor;
 import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream;
 import org.apache.commons.compress.compressors.deflate.DeflateCompressorOutputStream;
 import org.joda.time.Instant;
@@ -86,43 +82,43 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Abstract class for file-based output. An implementation of FileBasedSink writes file-based output
- * and defines the format of output files (how values are written, headers/footers, MIME type,
- * etc.).
+ * Abstract class for file-based output. An implementation of FileBasedSink writes file-based
+ * output and defines the format of output files (how values are written, headers/footers, MIME
+ * type, etc.).
  *
  * <p>At pipeline construction time, the methods of FileBasedSink are called to validate the sink
  * and to create a {@link WriteOperation} that manages the process of writing to the sink.
  *
  * <p>The process of writing to file-based sink is as follows:
- *
  * <ol>
- *   <li>An optional subclass-defined initialization,
- *   <li>a parallel write of bundles to temporary files, and finally,
- *   <li>these temporary files are renamed with final output filenames.
+ * <li>An optional subclass-defined initialization,
+ * <li>a parallel write of bundles to temporary files, and finally,
+ * <li>these temporary files are renamed with final output filenames.
  * </ol>
  *
  * <p>In order to ensure fault-tolerance, a bundle may be executed multiple times (e.g., in the
  * event of failure/retry or for redundancy). However, exactly one of these executions will have its
- * result passed to the finalize method. Each call to {@link Writer#openWindowed} or {@link
- * Writer#openUnwindowed} is passed a unique <i>bundle id</i> when it is called by the WriteFiles
- * transform, so even redundant or retried bundles will have a unique way of identifying their
- * output.
+ * result passed to the finalize method. Each call to {@link Writer#openWindowed}
+ * or {@link Writer#openUnwindowed} is passed a unique <i>bundle id</i> when it is called
+ * by the WriteFiles transform, so even redundant or retried bundles will have a unique way of
+ * identifying
+ * their output.
  *
  * <p>The bundle id should be used to guarantee that a bundle's output is unique. This uniqueness
  * guarantee is important; if a bundle is to be output to a file, for example, the name of the file
  * will encode the unique bundle id to avoid conflicts with other writers.
  *
- * <p>{@link FileBasedSink} can take a custom {@link FilenamePolicy} object to determine output
- * filenames, and this policy object can be used to write windowed or triggered PCollections into
- * separate files per window pane. This allows file output from unbounded PCollections, and also
- * works for bounded PCollecctions.
+ * {@link FileBasedSink} can take a custom {@link FilenamePolicy} object to determine output
+ * filenames, and this policy object can be used to write windowed or triggered
+ * PCollections into separate files per window pane. This allows file output from unbounded
+ * PCollections, and also works for bounded PCollecctions.
  *
  * <p>Supported file systems are those registered with {@link FileSystems}.
  *
- * @param <OutputT> the type of values written to the sink.
+ * @param <T> the type of values written to the sink.
  */
 @Experimental(Kind.FILESYSTEM)
-public abstract class FileBasedSink<OutputT, DestinationT> implements Serializable, HasDisplayData {
+public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
   private static final Logger LOG = LoggerFactory.getLogger(FileBasedSink.class);
 
   /**
@@ -177,7 +173,7 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
     }
 
     @Override
-    public String getSuggestedFilenameSuffix() {
+    public String getFilenameSuffix() {
       return filenameSuffix;
     }
 
@@ -209,8 +205,6 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
     }
   }
 
-  private final DynamicDestinations<?, DestinationT> dynamicDestinations;
-
   /**
    * The {@link WritableByteChannelFactory} that is used to wrap the raw data output to the
    * underlying channel. The default is to not compress the output using
@@ -219,70 +213,8 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
   private final WritableByteChannelFactory writableByteChannelFactory;
 
   /**
-   * A class that allows value-dependent writes in {@link FileBasedSink}.
-   *
-   * <p>Users can define a custom type to represent destinations, and provide a mapping to turn this
-   * destination type into an instance of {@link FilenamePolicy}.
+   * A naming policy for output files.
    */
-  @Experimental(Kind.FILESYSTEM)
-  public abstract static class DynamicDestinations<UserT, DestinationT>
-      implements HasDisplayData, Serializable {
-    /**
-     * Returns an object that represents at a high level the destination being written to. May not
-     * return null.
-     */
-    public abstract DestinationT getDestination(UserT element);
-
-    /**
-     * Returns the default destination. This is used for collections that have no elements as the
-     * destination to write empty files to.
-     */
-    public abstract DestinationT getDefaultDestination();
-
-    /**
-     * Returns the coder for {@link DestinationT}. If this is not overridden, then the coder
-     * registry will be use to find a suitable coder. This must be a deterministic coder, as {@link
-     * DestinationT} will be used as a key type in a {@link
-     * org.apache.beam.sdk.transforms.GroupByKey}.
-     */
-    @Nullable
-    public Coder<DestinationT> getDestinationCoder() {
-      return null;
-    }
-
-    /** Converts a destination into a {@link FilenamePolicy}. May not return null. */
-    public abstract FilenamePolicy getFilenamePolicy(DestinationT destination);
-
-    /** Populates the display data. */
-    @Override
-    public void populateDisplayData(DisplayData.Builder builder) {}
-
-    // Gets the destination coder. If the user does not provide one, try to find one in the coder
-    // registry. If no coder can be found, throws CannotProvideCoderException.
-    final Coder<DestinationT> getDestinationCoderWithDefault(CoderRegistry registry)
-        throws CannotProvideCoderException {
-      Coder<DestinationT> destinationCoder = getDestinationCoder();
-      if (destinationCoder != null) {
-        return destinationCoder;
-      }
-      // If dynamicDestinations doesn't provide a coder, try to find it in the coder registry.
-      // We must first use reflection to figure out what the type parameter is.
-      TypeDescriptor<?> superDescriptor =
-          TypeDescriptor.of(getClass()).getSupertype(DynamicDestinations.class);
-      if (!superDescriptor.getRawType().equals(DynamicDestinations.class)) {
-        throw new AssertionError(
-            "Couldn't find the DynamicDestinations superclass of " + this.getClass());
-      }
-      TypeVariable typeVariable = superDescriptor.getTypeParameter("DestinationT");
-      @SuppressWarnings("unchecked")
-      TypeDescriptor<DestinationT> descriptor =
-          (TypeDescriptor<DestinationT>) superDescriptor.resolveType(typeVariable);
-      return registry.getCoder(descriptor);
-    }
-  }
-
-  /** A naming policy for output files. */
-  @Experimental(Kind.FILESYSTEM)
   public abstract static class FilenamePolicy implements Serializable {
     /**
      * Context used for generating a name based on shard number, and num shards.
@@ -355,28 +287,29 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
     /**
      * When a sink has requested windowed or triggered output, this method will be invoked to return
      * the file {@link ResourceId resource} to be created given the base output directory and a
-     * {@link OutputFileHints} containing information about the file, including a suggested
-     * extension (e.g. coming from {@link CompressionType}).
+     * (possibly empty) extension from {@link FileBasedSink} configuration
+     * (e.g., {@link CompressionType}).
      *
-     * <p>The {@link WindowedContext} object gives access to the window and pane, as well as
-     * sharding information. The policy must return unique and consistent filenames for different
-     * windows and panes.
+     * <p>The {@link WindowedContext} object gives access to the window and pane,
+     * as well as sharding information. The policy must return unique and consistent filenames
+     * for different windows and panes.
      */
     @Experimental(Kind.FILESYSTEM)
-    public abstract ResourceId windowedFilename(WindowedContext c, OutputFileHints outputFileHints);
+    public abstract ResourceId windowedFilename(
+        ResourceId outputDirectory, WindowedContext c, String extension);
 
     /**
      * When a sink has not requested windowed or triggered output, this method will be invoked to
      * return the file {@link ResourceId resource} to be created given the base output directory and
-     * a {@link OutputFileHints} containing information about the file, including a suggested (e.g.
-     * coming from {@link CompressionType}).
+     * a (possibly empty) extension applied by additional {@link FileBasedSink} configuration
+     * (e.g., {@link CompressionType}).
      *
      * <p>The {@link Context} object only provides sharding information, which is used by the policy
      * to generate unique and consistent filenames.
      */
     @Experimental(Kind.FILESYSTEM)
-    @Nullable
-    public abstract ResourceId unwindowedFilename(Context c, OutputFileHints outputFileHints);
+    @Nullable public abstract ResourceId unwindowedFilename(
+        ResourceId outputDirectory, Context c, String extension);
 
     /**
      * Populates the display data.
@@ -385,8 +318,19 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
     }
   }
 
+  /** The policy used to generate names of files to be produced. */
+  private final FilenamePolicy filenamePolicy;
   /** The directory to which files will be written. */
-  private final ValueProvider<ResourceId> tempDirectoryProvider;
+  private final ValueProvider<ResourceId> baseOutputDirectoryProvider;
+
+  /**
+   * Construct a {@link FileBasedSink} with the given filename policy, producing uncompressed files.
+   */
+  @Experimental(Kind.FILESYSTEM)
+  public FileBasedSink(
+      ValueProvider<ResourceId> baseOutputDirectoryProvider, FilenamePolicy filenamePolicy) {
+    this(baseOutputDirectoryProvider, filenamePolicy, CompressionType.UNCOMPRESSED);
+  }
 
   private static class ExtractDirectory implements SerializableFunction<ResourceId, ResourceId> {
     @Override
@@ -396,91 +340,95 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
   }
 
   /**
-   * Construct a {@link FileBasedSink} with the given temp directory, producing uncompressed files.
+   * Construct a {@link FileBasedSink} with the given filename policy and output channel type.
    */
   @Experimental(Kind.FILESYSTEM)
   public FileBasedSink(
-      ValueProvider<ResourceId> tempDirectoryProvider,
-      DynamicDestinations<?, DestinationT> dynamicDestinations) {
-    this(tempDirectoryProvider, dynamicDestinations, CompressionType.UNCOMPRESSED);
-  }
-
-  /** Construct a {@link FileBasedSink} with the given temp directory and output channel type. */
-  @Experimental(Kind.FILESYSTEM)
-  public FileBasedSink(
-      ValueProvider<ResourceId> tempDirectoryProvider,
-      DynamicDestinations<?, DestinationT> dynamicDestinations,
+      ValueProvider<ResourceId> baseOutputDirectoryProvider,
+      FilenamePolicy filenamePolicy,
       WritableByteChannelFactory writableByteChannelFactory) {
-    this.tempDirectoryProvider =
-        NestedValueProvider.of(tempDirectoryProvider, new ExtractDirectory());
-    this.dynamicDestinations = checkNotNull(dynamicDestinations);
+    this.baseOutputDirectoryProvider =
+        NestedValueProvider.of(baseOutputDirectoryProvider, new ExtractDirectory());
+    this.filenamePolicy = filenamePolicy;
     this.writableByteChannelFactory = writableByteChannelFactory;
   }
 
-  /** Return the {@link DynamicDestinations} used. */
-  @SuppressWarnings("unchecked")
-  public <UserT> DynamicDestinations<UserT, DestinationT> getDynamicDestinations() {
-    return (DynamicDestinations<UserT, DestinationT>) dynamicDestinations;
+  /**
+   * Returns the base directory inside which files will be written according to the configured
+   * {@link FilenamePolicy}.
+   */
+  @Experimental(Kind.FILESYSTEM)
+  public ValueProvider<ResourceId> getBaseOutputDirectoryProvider() {
+    return baseOutputDirectoryProvider;
   }
 
   /**
-   * Returns the directory inside which temprary files will be written according to the configured
-   * {@link FilenamePolicy}.
+   * Returns the policy by which files will be named inside of the base output directory. Note that
+   * the {@link FilenamePolicy} may itself specify one or more inner directories before each output
+   * file, say when writing windowed outputs in a {@code output/YYYY/MM/DD/file.txt} format.
    */
   @Experimental(Kind.FILESYSTEM)
-  public ValueProvider<ResourceId> getTempDirectoryProvider() {
-    return tempDirectoryProvider;
+  public final FilenamePolicy getFilenamePolicy() {
+    return filenamePolicy;
   }
 
   public void validate(PipelineOptions options) {}
 
-  /** Return a subclass of {@link WriteOperation} that will manage the write to the sink. */
-  public abstract WriteOperation<OutputT, DestinationT> createWriteOperation();
+  /**
+   * Return a subclass of {@link WriteOperation} that will manage the write
+   * to the sink.
+   */
+  public abstract WriteOperation<T> createWriteOperation();
 
   public void populateDisplayData(DisplayData.Builder builder) {
-    getDynamicDestinations().populateDisplayData(builder);
+    getFilenamePolicy().populateDisplayData(builder);
   }
 
   /**
    * Abstract operation that manages the process of writing to {@link FileBasedSink}.
    *
-   * <p>The primary responsibilities of the WriteOperation is the management of output files. During
-   * a write, {@link Writer}s write bundles to temporary file locations. After the bundles have been
-   * written,
-   *
+   * <p>The primary responsibilities of the WriteOperation is the management of output
+   * files. During a write, {@link Writer}s write bundles to temporary file
+   * locations. After the bundles have been written,
    * <ol>
-   *   <li>{@link WriteOperation#finalize} is given a list of the temporary files containing the
-   *       output bundles.
-   *   <li>During finalize, these temporary files are copied to final output locations and named
-   *       according to a file naming template.
-   *   <li>Finally, any temporary files that were created during the write are removed.
+   * <li>{@link WriteOperation#finalize} is given a list of the temporary
+   * files containing the output bundles.
+   * <li>During finalize, these temporary files are copied to final output locations and named
+   * according to a file naming template.
+   * <li>Finally, any temporary files that were created during the write are removed.
    * </ol>
    *
-   * <p>Subclass implementations of WriteOperation must implement {@link
-   * WriteOperation#createWriter} to return a concrete FileBasedSinkWriter.
+   * <p>Subclass implementations of WriteOperation must implement
+   * {@link WriteOperation#createWriter} to return a concrete
+   * FileBasedSinkWriter.
    *
-   * <h2>Temporary and Output File Naming:</h2>
+   * <h2>Temporary and Output File Naming:</h2> During the write, bundles are written to temporary
+   * files using the tempDirectory that can be provided via the constructor of
+   * WriteOperation. These temporary files will be named
+   * {@code {tempDirectory}/{bundleId}}, where bundleId is the unique id of the bundle.
+   * For example, if tempDirectory is "gs://my-bucket/my_temp_output", the output for a
+   * bundle with bundle id 15723 will be "gs://my-bucket/my_temp_output/15723".
    *
-   * <p>During the write, bundles are written to temporary files using the tempDirectory that can be
-   * provided via the constructor of WriteOperation. These temporary files will be named {@code
-   * {tempDirectory}/{bundleId}}, where bundleId is the unique id of the bundle. For example, if
-   * tempDirectory is "gs://my-bucket/my_temp_output", the output for a bundle with bundle id 15723
-   * will be "gs://my-bucket/my_temp_output/15723".
+   * <p>Final output files are written to baseOutputFilename with the format
+   * {@code {baseOutputFilename}-0000i-of-0000n.{extension}} where n is the total number of bundles
+   * written and extension is the file extension. Both baseOutputFilename and extension are required
+   * constructor arguments.
    *
-   * <p>Final output files are written to the location specified by the {@link FilenamePolicy}. If
-   * no filename policy is specified, then the {@link DefaultFilenamePolicy} will be used. The
-   * directory that the files are written to is determined by the {@link FilenamePolicy} instance.
+   * <p>Subclass implementations can change the file naming template by supplying a value for
+   * fileNamingTemplate.
    *
    * <p>Note that in the case of permanent failure of a bundle's write, no clean up of temporary
    * files will occur.
    *
    * <p>If there are no elements in the PCollection being written, no output will be generated.
    *
-   * @param <OutputT> the type of values written to the sink.
+   * @param <T> the type of values written to the sink.
    */
-  public abstract static class WriteOperation<OutputT, DestinationT> implements Serializable {
-    /** The Sink that this WriteOperation will write to. */
-    protected final FileBasedSink<OutputT, DestinationT> sink;
+  public abstract static class WriteOperation<T> implements Serializable {
+    /**
+     * The Sink that this WriteOperation will write to.
+     */
+    protected final FileBasedSink<T> sink;
 
     /** Directory for temporary output files. */
     protected final ValueProvider<ResourceId> tempDirectory;
@@ -497,19 +445,17 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
     }
 
     /**
-     * Constructs a WriteOperation using the default strategy for generating a temporary directory
-     * from the base output filename.
+     * Constructs a WriteOperation using the default strategy for generating a temporary
+     * directory from the base output filename.
      *
-     * <p>Default is a uniquely named subdirectory of the provided tempDirectory, e.g. if
-     * tempDirectory is /path/to/foo/, the temporary directory will be
-     * /path/to/foo/temp-beam-foo-$date.
+     * <p>Default is a uniquely named sibling of baseOutputFilename, e.g. if baseOutputFilename is
+     * /path/to/foo, the temporary directory will be /path/to/temp-beam-foo-$date.
      *
      * @param sink the FileBasedSink that will be used to configure this write operation.
      */
-    public WriteOperation(FileBasedSink<OutputT, DestinationT> sink) {
-      this(
-          sink,
-          NestedValueProvider.of(sink.getTempDirectoryProvider(), new TemporaryDirectoryBuilder()));
+    public WriteOperation(FileBasedSink<T> sink) {
+      this(sink, NestedValueProvider.of(
+          sink.getBaseOutputDirectoryProvider(), new TemporaryDirectoryBuilder()));
     }
 
     private static class TemporaryDirectoryBuilder
@@ -525,12 +471,10 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
       private final Long tempId = TEMP_COUNT.getAndIncrement();
 
       @Override
-      public ResourceId apply(ResourceId tempDirectory) {
+      public ResourceId apply(ResourceId baseOutputDirectory) {
         // Temp directory has a timestamp and a unique ID
         String tempDirName = String.format(".temp-beam-%s-%s", timestamp, tempId);
-        return tempDirectory
-            .getCurrentDirectory()
-            .resolve(tempDirName, StandardResolveOptions.RESOLVE_DIRECTORY);
+        return baseOutputDirectory.resolve(tempDirName, StandardResolveOptions.RESOLVE_DIRECTORY);
       }
     }
 
@@ -541,22 +485,22 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
      * @param tempDirectory the base directory to be used for temporary output files.
      */
     @Experimental(Kind.FILESYSTEM)
-    public WriteOperation(FileBasedSink<OutputT, DestinationT> sink, ResourceId tempDirectory) {
+    public WriteOperation(FileBasedSink<T> sink, ResourceId tempDirectory) {
       this(sink, StaticValueProvider.of(tempDirectory));
     }
 
     private WriteOperation(
-        FileBasedSink<OutputT, DestinationT> sink, ValueProvider<ResourceId> tempDirectory) {
+        FileBasedSink<T> sink, ValueProvider<ResourceId> tempDirectory) {
       this.sink = sink;
       this.tempDirectory = tempDirectory;
       this.windowedWrites = false;
     }
 
     /**
-     * Clients must implement to return a subclass of {@link Writer}. This method must not mutate
-     * the state of the object.
+     * Clients must implement to return a subclass of {@link Writer}. This
+     * method must not mutate the state of the object.
      */
-    public abstract Writer<OutputT, DestinationT> createWriter() throws Exception;
+    public abstract Writer<T> createWriter() throws Exception;
 
     /**
      * Indicates that the operation will be performing windowed writes.
@@ -570,8 +514,8 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
      * removing temporary files.
      *
      * <p>Finalization may be overridden by subclass implementations to perform customized
-     * finalization (e.g., initiating some operation on output bundles, merging them, etc.). {@code
-     * writerResults} contains the filenames of written bundles.
+     * finalization (e.g., initiating some operation on output bundles, merging them, etc.).
+     * {@code writerResults} contains the filenames of written bundles.
      *
      * <p>If subclasses override this method, they must guarantee that its implementation is
      * idempotent, as it may be executed multiple times in the case of failure or for redundancy. It
@@ -579,7 +523,7 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
      *
      * @param writerResults the results of writes (FileResult).
      */
-    public void finalize(Iterable<FileResult<DestinationT>> writerResults) throws Exception {
+    public void finalize(Iterable<FileResult> writerResults) throws Exception {
       // Collect names of temporary files and rename them.
       Map<ResourceId, ResourceId> outputFilenames = buildOutputFilenames(writerResults);
       copyToOutputFiles(outputFilenames);
@@ -598,14 +542,17 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
 
     @Experimental(Kind.FILESYSTEM)
     protected final Map<ResourceId, ResourceId> buildOutputFilenames(
-        Iterable<FileResult<DestinationT>> writerResults) {
+        Iterable<FileResult> writerResults) {
       int numShards = Iterables.size(writerResults);
       Map<ResourceId, ResourceId> outputFilenames = new HashMap<>();
 
+      FilenamePolicy policy = getSink().getFilenamePolicy();
+      ResourceId baseOutputDir = getSink().getBaseOutputDirectoryProvider().get();
+
       // Either all results have a shard number set (if the sink is configured with a fixed
       // number of shards), or they all don't (otherwise).
       Boolean isShardNumberSetEverywhere = null;
-      for (FileResult<DestinationT> result : writerResults) {
+      for (FileResult result : writerResults) {
         boolean isShardNumberSetHere = (result.getShard() != UNKNOWN_SHARDNUM);
         if (isShardNumberSetEverywhere == null) {
           isShardNumberSetEverywhere = isShardNumberSetHere;
@@ -621,7 +568,7 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
         isShardNumberSetEverywhere = true;
       }
 
-      List<FileResult<DestinationT>> resultsWithShardNumbers = Lists.newArrayList();
+      List<FileResult> resultsWithShardNumbers = Lists.newArrayList();
       if (isShardNumberSetEverywhere) {
         resultsWithShardNumbers = Lists.newArrayList(writerResults);
       } else {
@@ -630,32 +577,29 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
         // case of triggers, the list of FileResult objects in the Finalize iterable is not
         // deterministic, and might change over retries. This breaks the assumption below that
         // sorting the FileResult objects provides idempotency.
-        List<FileResult<DestinationT>> sortedByTempFilename =
+        List<FileResult> sortedByTempFilename =
             Ordering.from(
-                    new Comparator<FileResult<DestinationT>>() {
-                      @Override
-                      public int compare(
-                          FileResult<DestinationT> first, FileResult<DestinationT> second) {
-                        String firstFilename = first.getTempFilename().toString();
-                        String secondFilename = second.getTempFilename().toString();
-                        return firstFilename.compareTo(secondFilename);
-                      }
-                    })
+                new Comparator<FileResult>() {
+                  @Override
+                  public int compare(FileResult first, FileResult second) {
+                    String firstFilename = first.getTempFilename().toString();
+                    String secondFilename = second.getTempFilename().toString();
+                    return firstFilename.compareTo(secondFilename);
+                  }
+                })
                 .sortedCopy(writerResults);
         for (int i = 0; i < sortedByTempFilename.size(); i++) {
           resultsWithShardNumbers.add(sortedByTempFilename.get(i).withShard(i));
         }
       }
 
-      for (FileResult<DestinationT> result : resultsWithShardNumbers) {
+      for (FileResult result : resultsWithShardNumbers) {
         checkArgument(
             result.getShard() != UNKNOWN_SHARDNUM, "Should have set shard number on %s", result);
         outputFilenames.put(
             result.getTempFilename(),
             result.getDestinationFile(
-                getSink().getDynamicDestinations(),
-                numShards,
-                getSink().getWritableByteChannelFactory()));
+                policy, baseOutputDir, numShards, getSink().getExtension()));
       }
 
       int numDistinctShards = new HashSet<>(outputFilenames.values()).size();
@@ -671,18 +615,18 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
      *
      * <p>Can be called from subclasses that override {@link WriteOperation#finalize}.
      *
-     * <p>Files will be named according to the {@link FilenamePolicy}. The order of the output files
-     * will be the same as the sorted order of the input filenames. In other words (when using
-     * {@link DefaultFilenamePolicy}), if the input filenames are ["C", "A", "B"], baseFilename (int
-     * the policy) is "dir/file", the extension is ".txt", and the fileNamingTemplate is
-     * "-SSS-of-NNN", the contents of A will be copied to dir/file-000-of-003.txt, the contents of B
-     * will be copied to dir/file-001-of-003.txt, etc.
+     * <p>Files will be named according to the file naming template. The order of the output files
+     * will be the same as the sorted order of the input filenames.  In other words, if the input
+     * filenames are ["C", "A", "B"], baseOutputFilename is "file", the extension is ".txt", and
+     * the fileNamingTemplate is "-SSS-of-NNN", the contents of A will be copied to
+     * file-000-of-003.txt, the contents of B will be copied to file-001-of-003.txt, etc.
      *
      * @param filenames the filenames of temporary files.
      */
     @VisibleForTesting
     @Experimental(Kind.FILESYSTEM)
-    final void copyToOutputFiles(Map<ResourceId, ResourceId> filenames) throws IOException {
+    final void copyToOutputFiles(Map<ResourceId, ResourceId> filenames)
+        throws IOException {
       int numFiles = filenames.size();
       if (numFiles > 0) {
         LOG.debug("Copying {} files.", numFiles);
@@ -754,8 +698,10 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
       }
     }
 
-    /** Returns the FileBasedSink for this write operation. */
-    public FileBasedSink<OutputT, DestinationT> getSink() {
+    /**
+     * Returns the FileBasedSink for this write operation.
+     */
+    public FileBasedSink<T> getSink() {
       return sink;
     }
 
@@ -773,28 +719,33 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
     }
   }
 
-  /** Returns the {@link WritableByteChannelFactory} used. */
-  protected final WritableByteChannelFactory getWritableByteChannelFactory() {
-    return writableByteChannelFactory;
+  /** Returns the extension that will be written to the produced files. */
+  protected final String getExtension() {
+    String extension = MoreObjects.firstNonNull(writableByteChannelFactory.getFilenameSuffix(), "");
+    if (!extension.isEmpty() && !extension.startsWith(".")) {
+      extension = "." + extension;
+    }
+    return extension;
   }
 
   /**
-   * Abstract writer that writes a bundle to a {@link FileBasedSink}. Subclass implementations
-   * provide a method that can write a single value to a {@link WritableByteChannel}.
+   * Abstract writer that writes a bundle to a {@link FileBasedSink}. Subclass
+   * implementations provide a method that can write a single value to a
+   * {@link WritableByteChannel}.
    *
    * <p>Subclass implementations may also override methods that write headers and footers before and
    * after the values in a bundle, respectively, as well as provide a MIME type for the output
    * channel.
    *
-   * <p>Multiple {@link Writer} instances may be created on the same worker, and therefore any
-   * access to static members or methods should be thread safe.
+   * <p>Multiple {@link Writer} instances may be created on the same worker, and therefore
+   * any access to static members or methods should be thread safe.
    *
-   * @param <OutputT> the type of values to write.
+   * @param <T> the type of values to write.
    */
-  public abstract static class Writer<OutputT, DestinationT> {
+  public abstract static class Writer<T> {
     private static final Logger LOG = LoggerFactory.getLogger(Writer.class);
 
-    private final WriteOperation<OutputT, DestinationT> writeOperation;
+    private final WriteOperation<T> writeOperation;
 
     /** Unique id for this output bundle. */
     private String id;
@@ -802,7 +753,6 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
     private BoundedWindow window;
     private PaneInfo paneInfo;
     private int shard = -1;
-    private DestinationT destination;
 
     /** The output file for this bundle. May be null if opening failed. */
     private @Nullable ResourceId outputFile;
@@ -822,8 +772,10 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
      */
     private final String mimeType;
 
-    /** Construct a new {@link Writer} that will produce files of the given MIME type. */
-    public Writer(WriteOperation<OutputT, DestinationT> writeOperation, String mimeType) {
+    /**
+     * Construct a new {@link Writer} that will produce files of the given MIME type.
+     */
+    public Writer(WriteOperation<T> writeOperation, String mimeType) {
       checkNotNull(writeOperation);
       this.writeOperation = writeOperation;
       this.mimeType = mimeType;
@@ -866,29 +818,28 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
      * id populated for the case of static sharding. In cases where the runner is dynamically
      * picking sharding, shard might be set to -1.
      */
-    public final void openWindowed(
-        String uId, BoundedWindow window, PaneInfo paneInfo, int shard, DestinationT destination)
+    public final void openWindowed(String uId, BoundedWindow window, PaneInfo paneInfo, int shard)
         throws Exception {
       if (!getWriteOperation().windowedWrites) {
         throw new IllegalStateException("openWindowed called a non-windowed sink.");
       }
-      open(uId, window, paneInfo, shard, destination);
+      open(uId, window, paneInfo, shard);
     }
 
     /**
      * Called for each value in the bundle.
      */
-    public abstract void write(OutputT value) throws Exception;
+    public abstract void write(T value) throws Exception;
 
     /**
-     * Similar to {@link #openWindowed} however for the case where unwindowed writes were requested.
+     * Similar to {@link #openWindowed} however for the case where unwindowed writes were
+     * requested.
      */
-    public final void openUnwindowed(String uId, int shard, DestinationT destination)
-        throws Exception {
+    public final void openUnwindowed(String uId, int shard) throws Exception {
       if (getWriteOperation().windowedWrites) {
         throw new IllegalStateException("openUnwindowed called a windowed sink.");
       }
-      open(uId, null, null, shard, destination);
+      open(uId, null, null, shard);
     }
 
     // Helper function to close a channel, on exception cases.
@@ -904,18 +855,14 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
       }
     }
 
-    private void open(
-        String uId,
-        @Nullable BoundedWindow window,
-        @Nullable PaneInfo paneInfo,
-        int shard,
-        DestinationT destination)
-        throws Exception {
+    private void open(String uId,
+                      @Nullable BoundedWindow window,
+                      @Nullable PaneInfo paneInfo,
+                      int shard) throws Exception {
       this.id = uId;
       this.window = window;
       this.paneInfo = paneInfo;
       this.shard = shard;
-      this.destination = destination;
       ResourceId tempDirectory = getWriteOperation().tempDirectory.get();
       outputFile = tempDirectory.resolve(id, StandardResolveOptions.RESOLVE_FILE);
       verifyNotNull(
@@ -961,7 +908,7 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
     }
 
     /** Closes the channel and returns the bundle result. */
-    public final FileResult<DestinationT> close() throws Exception {
+    public final FileResult close() throws Exception {
       checkState(outputFile != null, "FileResult.close cannot be called with a null outputFile");
 
       LOG.debug("Writing footer to {}.", outputFile);
@@ -991,41 +938,35 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
         throw new IOException(String.format("Failed closing channel to %s", outputFile), e);
       }
 
-      FileResult<DestinationT> result =
-          new FileResult<>(outputFile, shard, window, paneInfo, destination);
+      FileResult result = new FileResult(outputFile, shard, window, paneInfo);
       LOG.debug("Result for bundle {}: {}", this.id, outputFile);
       return result;
     }
 
-    /** Return the WriteOperation that this Writer belongs to. */
-    public WriteOperation<OutputT, DestinationT> getWriteOperation() {
+    /**
+     * Return the WriteOperation that this Writer belongs to.
+     */
+    public WriteOperation<T> getWriteOperation() {
       return writeOperation;
     }
   }
 
   /**
-   * Result of a single bundle write. Contains the filename produced by the bundle, and if known the
-   * final output filename.
+   * Result of a single bundle write. Contains the filename produced by the bundle, and if known
+   * the final output filename.
    */
-  public static final class FileResult<DestinationT> {
+  public static final class FileResult {
     private final ResourceId tempFilename;
     private final int shard;
     private final BoundedWindow window;
     private final PaneInfo paneInfo;
-    private final DestinationT destination;
 
     @Experimental(Kind.FILESYSTEM)
-    public FileResult(
-        ResourceId tempFilename,
-        int shard,
-        BoundedWindow window,
-        PaneInfo paneInfo,
-        DestinationT destination) {
+    public FileResult(ResourceId tempFilename, int shard, BoundedWindow window, PaneInfo paneInfo) {
       this.tempFilename = tempFilename;
       this.shard = shard;
       this.window = window;
       this.paneInfo = paneInfo;
-      this.destination = destination;
     }
 
     @Experimental(Kind.FILESYSTEM)
@@ -1037,8 +978,8 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
       return shard;
     }
 
-    public FileResult<DestinationT> withShard(int shard) {
-      return new FileResult<>(tempFilename, shard, window, paneInfo, destination);
+    public FileResult withShard(int shard) {
+      return new FileResult(tempFilename, shard, window, paneInfo);
     }
 
     public BoundedWindow getWindow() {
@@ -1049,24 +990,17 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
       return paneInfo;
     }
 
-    public DestinationT getDestination() {
-      return destination;
-    }
-
     @Experimental(Kind.FILESYSTEM)
-    public ResourceId getDestinationFile(
-        DynamicDestinations<?, DestinationT> dynamicDestinations,
-        int numShards,
-        OutputFileHints outputFileHints) {
+    public ResourceId getDestinationFile(FilenamePolicy policy, ResourceId outputDirectory,
+                                         int numShards, String extension) {
       checkArgument(getShard() != UNKNOWN_SHARDNUM);
       checkArgument(numShards > 0);
-      FilenamePolicy policy = dynamicDestinations.getFilenamePolicy(destination);
       if (getWindow() != null) {
-        return policy.windowedFilename(
-            new WindowedContext(getWindow(), getPaneInfo(), getShard(), numShards),
-            outputFileHints);
+        return policy.windowedFilename(outputDirectory, new WindowedContext(
+            getWindow(), getPaneInfo(), getShard(), numShards), extension);
       } else {
-        return policy.unwindowedFilename(new Context(getShard(), numShards), outputFileHints);
+        return policy.unwindowedFilename(outputDirectory, new Context(getShard(), numShards),
+            extension);
       }
     }
 
@@ -1080,24 +1014,22 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
     }
   }
 
-  /** A coder for {@link FileResult} objects. */
-  public static final class FileResultCoder<DestinationT>
-      extends StructuredCoder<FileResult<DestinationT>> {
+  /**
+   * A coder for {@link FileResult} objects.
+   */
+  public static final class FileResultCoder extends StructuredCoder<FileResult> {
     private static final Coder<String> FILENAME_CODER = StringUtf8Coder.of();
     private static final Coder<Integer> SHARD_CODER = VarIntCoder.of();
     private static final Coder<PaneInfo> PANE_INFO_CODER = NullableCoder.of(PaneInfoCoder.INSTANCE);
+
     private final Coder<BoundedWindow> windowCoder;
-    private final Coder<DestinationT> destinationCoder;
 
-    protected FileResultCoder(
-        Coder<BoundedWindow> windowCoder, Coder<DestinationT> destinationCoder) {
+    protected FileResultCoder(Coder<BoundedWindow> windowCoder) {
       this.windowCoder = NullableCoder.of(windowCoder);
-      this.destinationCoder = destinationCoder;
     }
 
-    public static <DestinationT> FileResultCoder<DestinationT> of(
-        Coder<BoundedWindow> windowCoder, Coder<DestinationT> destinationCoder) {
-      return new FileResultCoder<>(windowCoder, destinationCoder);
+    public static FileResultCoder of(Coder<BoundedWindow> windowCoder) {
+      return new FileResultCoder(windowCoder);
     }
 
     @Override
@@ -1106,7 +1038,8 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
     }
 
     @Override
-    public void encode(FileResult<DestinationT> value, OutputStream outStream) throws IOException {
+    public void encode(FileResult value, OutputStream outStream)
+        throws IOException {
       if (value == null) {
         throw new CoderException("cannot encode a null value");
       }
@@ -1114,22 +1047,17 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
       windowCoder.encode(value.getWindow(), outStream);
       PANE_INFO_CODER.encode(value.getPaneInfo(), outStream);
       SHARD_CODER.encode(value.getShard(), outStream);
-      destinationCoder.encode(value.getDestination(), outStream);
     }
 
     @Override
-    public FileResult<DestinationT> decode(InputStream inStream) throws IOException {
+    public FileResult decode(InputStream inStream)
+        throws IOException {
       String tempFilename = FILENAME_CODER.decode(inStream);
       BoundedWindow window = windowCoder.decode(inStream);
       PaneInfo paneInfo = PANE_INFO_CODER.decode(inStream);
       int shard = SHARD_CODER.decode(inStream);
-      DestinationT destination = destinationCoder.decode(inStream);
-      return new FileResult<>(
-          FileSystems.matchNewResource(tempFilename, false /* isDirectory */),
-          shard,
-          window,
-          paneInfo,
-          destination);
+      return new FileResult(FileSystems.matchNewResource(tempFilename, false /* isDirectory */),
+          shard, window, paneInfo);
     }
 
     @Override
@@ -1138,15 +1066,25 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
       windowCoder.verifyDeterministic();
       PANE_INFO_CODER.verifyDeterministic();
       SHARD_CODER.verifyDeterministic();
-      destinationCoder.verifyDeterministic();
     }
   }
 
   /**
-   * Provides hints about how to generate output files, such as a suggested filename suffix (e.g.
-   * based on the compression type), and the file MIME type.
+   * Implementations create instances of {@link WritableByteChannel} used by {@link FileBasedSink}
+   * and related classes to allow <em>decorating</em>, or otherwise transforming, the raw data that
+   * would normally be written directly to the {@link WritableByteChannel} passed into
+   * {@link WritableByteChannelFactory#create(WritableByteChannel)}.
+   *
+   * <p>Subclasses should override {@link #toString()} with something meaningful, as it is used when
+   * building {@link DisplayData}.
    */
-  public interface OutputFileHints extends Serializable {
+  public interface WritableByteChannelFactory extends Serializable {
+    /**
+     * @param channel the {@link WritableByteChannel} to wrap
+     * @return the {@link WritableByteChannel} to be used during output
+     */
+    WritableByteChannel create(WritableByteChannel channel) throws IOException;
+
     /**
      * Returns the MIME type that should be used for the files that will hold the output data. May
      * return {@code null} if this {@code WritableByteChannelFactory} does not meaningfully change
@@ -1163,23 +1101,6 @@ public abstract class FileBasedSink<OutputT, DestinationT> implements Serializab
      * @return an optional filename suffix, eg, ".gz" is returned by {@link CompressionType#GZIP}
      */
     @Nullable
-    String getSuggestedFilenameSuffix();
-  }
-
-  /**
-   * Implementations create instances of {@link WritableByteChannel} used by {@link FileBasedSink}
-   * and related classes to allow <em>decorating</em>, or otherwise transforming, the raw data that
-   * would normally be written directly to the {@link WritableByteChannel} passed into {@link
-   * WritableByteChannelFactory#create(WritableByteChannel)}.
-   *
-   * <p>Subclasses should override {@link #toString()} with something meaningful, as it is used when
-   * building {@link DisplayData}.
-   */
-  public interface WritableByteChannelFactory extends OutputFileHints {
-    /**
-     * @param channel the {@link WritableByteChannel} to wrap
-     * @return the {@link WritableByteChannel} to be used during output
-     */
-    WritableByteChannel create(WritableByteChannel channel) throws IOException;
+    String getFilenameSuffix();
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/OffsetBasedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/OffsetBasedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/OffsetBasedSource.java
index c3687a9..05f0d97 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/OffsetBasedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/OffsetBasedSource.java
@@ -23,7 +23,6 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.NoSuchElementException;
-import org.apache.beam.sdk.io.range.OffsetRange;
 import org.apache.beam.sdk.io.range.OffsetRangeTracker;
 import org.apache.beam.sdk.io.range.RangeTracker;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -111,7 +110,8 @@ public abstract class OffsetBasedSource<T> extends BoundedSource<T> {
   @Override
   public List<? extends OffsetBasedSource<T>> split(
       long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
-    // Split the range into bundles based on the desiredBundleSizeBytes. If the desired bundle
+    // Split the range into bundles based on the desiredBundleSizeBytes. Final bundle is adjusted to
+    // make sure that we do not end up with a too small bundle at the end. If the desired bundle
     // size is smaller than the minBundleSize of the source then minBundleSize will be used instead.
 
     long desiredBundleSizeOffsetUnits = Math.max(
@@ -119,10 +119,20 @@ public abstract class OffsetBasedSource<T> extends BoundedSource<T> {
         minBundleSize);
 
     List<OffsetBasedSource<T>> subSources = new ArrayList<>();
-    for (OffsetRange range :
-        new OffsetRange(startOffset, Math.min(endOffset, getMaxEndOffset(options)))
-            .split(desiredBundleSizeOffsetUnits, minBundleSize)) {
-      subSources.add(createSourceForSubrange(range.getFrom(), range.getTo()));
+    long start = startOffset;
+    long maxEnd = Math.min(endOffset, getMaxEndOffset(options));
+
+    while (start < maxEnd) {
+      long end = start + desiredBundleSizeOffsetUnits;
+      end = Math.min(end, maxEnd);
+      // Avoid having a too small bundle at the end and ensure that we respect minBundleSize.
+      long remaining = maxEnd - end;
+      if ((remaining < desiredBundleSizeOffsetUnits / 4) || (remaining < minBundleSize)) {
+        end = maxEnd;
+      }
+      subSources.add(createSourceForSubrange(start, end));
+
+      start = end;
     }
     return subSources;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
index 6e7b243..e288075 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
@@ -45,7 +45,6 @@ import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.transforms.SerializableFunctions;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.util.MimeTypes;
 import org.apache.beam.sdk.values.PBegin;
@@ -356,11 +355,12 @@ public class TFRecordIO {
     public PDone expand(PCollection<byte[]> input) {
       checkState(getOutputPrefix() != null,
           "need to set the output prefix of a TFRecordIO.Write transform");
-      WriteFiles<byte[], Void, byte[]> write =
-          WriteFiles.<byte[], Void, byte[]>to(
+      WriteFiles<byte[]> write = WriteFiles.to(
               new TFRecordSink(
-                  getOutputPrefix(), getShardTemplate(), getFilenameSuffix(), getCompressionType()),
-              SerializableFunctions.<byte[]>identity());
+                  getOutputPrefix(),
+                  getShardTemplate(),
+                  getFilenameSuffix(),
+                  getCompressionType()));
       if (getNumShards() > 0) {
         write = write.withNumShards(getNumShards());
       }
@@ -546,20 +546,20 @@ public class TFRecordIO {
     }
   }
 
-  /** A {@link FileBasedSink} for TFRecord files. Produces TFRecord files. */
+  /**
+   * A {@link FileBasedSink} for TFRecord files. Produces TFRecord files.
+   */
   @VisibleForTesting
-  static class TFRecordSink extends FileBasedSink<byte[], Void> {
+  static class TFRecordSink extends FileBasedSink<byte[]> {
     @VisibleForTesting
-    TFRecordSink(
-        ValueProvider<ResourceId> outputPrefix,
+    TFRecordSink(ValueProvider<ResourceId> outputPrefix,
         @Nullable String shardTemplate,
         @Nullable String suffix,
         TFRecordIO.CompressionType compressionType) {
       super(
           outputPrefix,
-          DynamicFileDestinations.constant(
-              DefaultFilenamePolicy.fromStandardParameters(
-                  outputPrefix, shardTemplate, suffix, false)),
+          DefaultFilenamePolicy.constructUsingStandardParameters(
+              outputPrefix, shardTemplate, suffix, false),
           writableByteChannelFactory(compressionType));
     }
 
@@ -571,7 +571,7 @@ public class TFRecordIO {
     }
 
     @Override
-    public WriteOperation<byte[], Void> createWriteOperation() {
+    public WriteOperation<byte[]> createWriteOperation() {
       return new TFRecordWriteOperation(this);
     }
 
@@ -590,24 +590,30 @@ public class TFRecordIO {
       return CompressionType.UNCOMPRESSED;
     }
 
-    /** A {@link WriteOperation WriteOperation} for TFRecord files. */
-    private static class TFRecordWriteOperation extends WriteOperation<byte[], Void> {
+    /**
+     * A {@link WriteOperation
+     * WriteOperation} for TFRecord files.
+     */
+    private static class TFRecordWriteOperation extends WriteOperation<byte[]> {
       private TFRecordWriteOperation(TFRecordSink sink) {
         super(sink);
       }
 
       @Override
-      public Writer<byte[], Void> createWriter() throws Exception {
+      public Writer<byte[]> createWriter() throws Exception {
         return new TFRecordWriter(this);
       }
     }
 
-    /** A {@link Writer Writer} for TFRecord files. */
-    private static class TFRecordWriter extends Writer<byte[], Void> {
+    /**
+     * A {@link Writer Writer}
+     * for TFRecord files.
+     */
+    private static class TFRecordWriter extends Writer<byte[]> {
       private WritableByteChannel outChannel;
       private TFRecordCodec codec;
 
-      private TFRecordWriter(WriteOperation<byte[], Void> writeOperation) {
+      private TFRecordWriter(WriteOperation<byte[]> writeOperation) {
         super(writeOperation, MimeTypes.BINARY);
       }