You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/05/10 19:20:28 UTC

[4/5] beam git commit: Implement dynamic-sharding for windowed file outputs, and add an integration test.

Implement dynamic-sharding for windowed file outputs, and add an integration test.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2d5fbf6a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2d5fbf6a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2d5fbf6a

Branch: refs/heads/master
Commit: 2d5fbf6a3d9df29c9cd3a96caabdb396f1857e75
Parents: 5bbc042
Author: Reuven Lax <re...@google.com>
Authored: Wed Apr 5 12:13:44 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Wed May 10 12:18:41 2017 -0700

----------------------------------------------------------------------
 .../apache/beam/examples/WindowedWordCount.java |   7 +-
 .../examples/common/WriteOneFilePerWindow.java  |  24 +-
 .../beam/examples/WindowedWordCountIT.java      |  23 +-
 ...aultCoderCloudObjectTranslatorRegistrar.java |   2 -
 .../org/apache/beam/sdk/io/FileBasedSink.java   | 225 +++++++++++--------
 .../java/org/apache/beam/sdk/io/WriteFiles.java | 144 +++++++-----
 .../apache/beam/sdk/io/FileBasedSinkTest.java   | 148 ++++++------
 7 files changed, 332 insertions(+), 241 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/2d5fbf6a/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java
index 45746af..20b48e4 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java
@@ -161,12 +161,15 @@ public class WindowedWordCount {
     @Default.InstanceFactory(DefaultToMinTimestampPlusOneHour.class)
     Long getMaxTimestampMillis();
     void setMaxTimestampMillis(Long value);
+
+    @Description("Fixed number of shards to produce per window, or null for runner-chosen sharding")
+    Integer getNumShards();
+    void setNumShards(Integer numShards);
   }
 
   public static void main(String[] args) throws IOException {
     Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
     final String output = options.getOutput();
-    final Duration windowSize = Duration.standardMinutes(options.getWindowSize());
     final Instant minTimestamp = new Instant(options.getMinTimestampMillis());
     final Instant maxTimestamp = new Instant(options.getMaxTimestampMillis());
 
@@ -207,7 +210,7 @@ public class WindowedWordCount {
      */
     wordCounts
         .apply(MapElements.via(new WordCount.FormatAsTextFn()))
-        .apply(new WriteOneFilePerWindow(output));
+        .apply(new WriteOneFilePerWindow(output, options.getNumShards()));
 
     PipelineResult result = pipeline.run();
     try {

http://git-wip-us.apache.org/repos/asf/beam/blob/2d5fbf6a/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java b/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java
index ed35c8a..5e6df9c 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java
@@ -19,6 +19,7 @@ package org.apache.beam.examples.common;
 
 import static com.google.common.base.Verify.verifyNotNull;
 
+import javax.annotation.Nullable;
 import org.apache.beam.sdk.io.FileBasedSink;
 import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
 import org.apache.beam.sdk.io.TextIO;
@@ -40,12 +41,14 @@ import org.joda.time.format.ISODateTimeFormat;
  * lessons.
  */
 public class WriteOneFilePerWindow extends PTransform<PCollection<String>, PDone> {
-
   private static final DateTimeFormatter FORMATTER = ISODateTimeFormat.hourMinute();
-  private final String filenamePrefix;
+  private String filenamePrefix;
+  @Nullable
+  private Integer numShards;
 
-  public WriteOneFilePerWindow(String filenamePrefix) {
+  public WriteOneFilePerWindow(String filenamePrefix, Integer numShards) {
     this.filenamePrefix = filenamePrefix;
+    this.numShards = numShards;
   }
 
   @Override
@@ -61,12 +64,15 @@ public class WriteOneFilePerWindow extends PTransform<PCollection<String>, PDone
           resource);
     }
 
-    return input.apply(
-        TextIO.write()
-            .to(resource.getCurrentDirectory())
-            .withFilenamePolicy(new PerWindowFiles(prefix))
-            .withWindowedWrites()
-            .withNumShards(3));
+
+    TextIO.Write write = TextIO.write()
+        .to(resource.getCurrentDirectory())
+        .withFilenamePolicy(new PerWindowFiles(prefix))
+        .withWindowedWrites();
+    if (numShards != null) {
+      write = write.withNumShards(numShards);
+    }
+    return input.apply(write);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/2d5fbf6a/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java
----------------------------------------------------------------------
diff --git a/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java b/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java
index 93c4543..eb7e4c4 100644
--- a/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java
+++ b/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java
@@ -86,14 +86,29 @@ public class WindowedWordCountIT {
   }
 
   @Test
-  public void testWindowedWordCountInBatch() throws Exception {
-    testWindowedWordCountPipeline(defaultOptions());
+  public void testWindowedWordCountInBatchDynamicSharding() throws Exception {
+    WindowedWordCountITOptions options = batchOptions();
+    // This is the default value, but make it explicit
+    options.setNumShards(null);
+    testWindowedWordCountPipeline(options);
   }
 
   @Test
+  public void testWindowedWordCountInBatchStaticSharding() throws Exception {
+    WindowedWordCountITOptions options = batchOptions();
+    options.setNumShards(3);
+    testWindowedWordCountPipeline(options);
+  }
+
+  // TODO: add a test with streaming and dynamic sharding after resolving
+  // https://issues.apache.org/jira/browse/BEAM-1438
+
+  @Test
   @Category(StreamingIT.class)
-  public void testWindowedWordCountInStreaming() throws Exception {
-    testWindowedWordCountPipeline(streamingOptions());
+  public void testWindowedWordCountInStreamingStaticSharding() throws Exception {
+    WindowedWordCountITOptions options = streamingOptions();
+    options.setNumShards(3);
+    testWindowedWordCountPipeline(options);
   }
 
   private WindowedWordCountITOptions defaultOptions() throws Exception {

http://git-wip-us.apache.org/repos/asf/beam/blob/2d5fbf6a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java
index 4567098..5d42a5f 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java
@@ -47,7 +47,6 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.TextualIntegerCoder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
-import org.apache.beam.sdk.io.FileBasedSink.FileResultCoder;
 import org.apache.beam.sdk.io.gcp.bigquery.TableDestinationCoder;
 import org.apache.beam.sdk.io.gcp.bigquery.TableRowJsonCoder;
 
@@ -91,7 +90,6 @@ public class DefaultCoderCloudObjectTranslatorRegistrar
           ByteCoder.class,
           DoubleCoder.class,
           DurationCoder.class,
-          FileResultCoder.class,
           FooterCoder.class,
           InstantCoder.class,
           IsmShardCoder.class,

http://git-wip-us.apache.org/repos/asf/beam/blob/2d5fbf6a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index c2e230d..2117794 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -34,7 +34,9 @@ import java.io.Serializable;
 import java.nio.channels.Channels;
 import java.nio.channels.WritableByteChannel;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -43,10 +45,12 @@ import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.zip.GZIPOutputStream;
 import javax.annotation.Nullable;
-import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.NullableCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.StructuredCoder;
+import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy.Context;
 import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy.WindowedContext;
 import org.apache.beam.sdk.io.fs.MatchResult;
@@ -63,6 +67,7 @@ import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.HasDisplayData;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo.PaneInfoCoder;
 import org.apache.beam.sdk.util.MimeTypes;
 import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream;
 import org.apache.commons.compress.compressors.deflate.DeflateCompressorOutputStream;
@@ -306,8 +311,7 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
   }
 
   /** The policy used to generate names of files to be produced. */
-  @VisibleForTesting
-  final FilenamePolicy filenamePolicy;
+  private final FilenamePolicy filenamePolicy;
   /** The directory to which files will be written. */
   private final ValueProvider<ResourceId> baseOutputDirectoryProvider;
 
@@ -523,27 +527,43 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
 
     protected final Map<ResourceId, ResourceId> buildOutputFilenames(
         Iterable<FileResult> writerResults) {
+      int numShards = Iterables.size(writerResults);
       Map<ResourceId, ResourceId> outputFilenames = new HashMap<>();
-      List<ResourceId> files = new ArrayList<>();
+
+      List<FileResult> unshardedFiles = new ArrayList<>();
+      FilenamePolicy policy = getSink().getFilenamePolicy();
       for (FileResult result : writerResults) {
-        if (result.getDestinationFilename() != null) {
-          outputFilenames.put(result.getFilename(), result.getDestinationFilename());
+        if (result.getShard() != WriteFiles.UNKNOWN_SHARDNUM) {
+          outputFilenames.put(result.getTempFilename(),
+              result.getDestinationFile(policy, getSink().getBaseOutputDirectoryProvider().get(),
+                  numShards, getSink().getExtension()));
         } else {
-          files.add(result.getFilename());
+          unshardedFiles.add(result);
         }
       }
 
       // writerResults won't contain destination filenames, so we dynamically generate them here.
-      if (files.size() > 0) {
+      if (unshardedFiles.size() > 0) {
         checkArgument(outputFilenames.isEmpty());
-        // Sort files for idempotence.
-        files = Ordering.usingToString().sortedCopy(files);
-        ResourceId outputDirectory = getSink().getBaseOutputDirectoryProvider().get();
-        FilenamePolicy filenamePolicy = getSink().filenamePolicy;
-        for (int i = 0; i < files.size(); i++) {
-          outputFilenames.put(files.get(i),
-              filenamePolicy.unwindowedFilename(outputDirectory, new Context(i, files.size()),
-                  getSink().getExtension()));
+
+        // Sort files for idempotence. Sort by temporary filename.
+        // Note that this codepath should not be used when processing triggered windows. In the
+        // case of triggers, the list of FileResult objects in the Finalize iterable is not
+        // deterministic, and might change over retries. This breaks the assumption below that
+        // sorting the FileResult objects provides idempotency.
+        unshardedFiles = Ordering.from(new Comparator<FileResult>() {
+          @Override
+          public int compare(FileResult first, FileResult second) {
+            return first.getTempFilename().toString().compareTo(
+                second.getTempFilename().toString());
+          }
+        }).sortedCopy(unshardedFiles);
+        for (int i = 0; i < unshardedFiles.size(); i++) {
+          FileResult result = unshardedFiles.get(i);
+          result.setShard(i);
+          outputFilenames.put(result.getTempFilename(),
+              result.getDestinationFile(policy, getSink().getBaseOutputDirectoryProvider().get(),
+                  numShards, getSink().getExtension()));
         }
       }
 
@@ -647,6 +667,19 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
     public FileBasedSink<T> getSink() {
       return sink;
     }
+
+    @Override
+    public String toString() {
+      String tempDirectoryStr =
+          tempDirectory.isAccessible() ? tempDirectory.get().toString() : tempDirectory.toString();
+      return getClass().getSimpleName()
+          + "{"
+          + "tempDirectory="
+          + tempDirectoryStr
+          + ", windowedWrites="
+          + windowedWrites
+          + '}';
+    }
   }
 
   /** Returns the extension that will be written to the produced files. */
@@ -683,7 +716,6 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
     private BoundedWindow window;
     private PaneInfo paneInfo;
     private int shard = -1;
-    private int numShards = -1;
 
     /** The output file for this bundle. May be null if opening failed. */
     private @Nullable ResourceId outputFile;
@@ -738,24 +770,23 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
     protected void finishWrite() throws Exception {}
 
     /**
-     *  Performs bundle initialization. For example, creates a temporary file for writing or
+     * Performs bundle initialization. For example, creates a temporary file for writing or
      * initializes any state that will be used across calls to {@link Writer#write}.
      *
-     * <p>The unique id that is given to open should be used to ensure that the writer's output
-     * does not interfere with the output of other Writers, as a bundle may be executed many
-     * times for fault tolerance.
+     * <p>The unique id that is given to open should be used to ensure that the writer's output does
+     * not interfere with the output of other Writers, as a bundle may be executed many times for
+     * fault tolerance.
      *
-     * <p>The window and paneInfo arguments are populated when windowed writes are requested.
-     * shard and numShards are populated for the case of static sharding. In cases where the
-     * runner is dynamically picking sharding, shard and numShards might both be set to -1.
+     * <p>The window and paneInfo arguments are populated when windowed writes are requested. shard
+     * id populated for the case of static sharding. In cases where the runner is dynamically
+     * picking sharding, shard might be set to -1.
      */
-    public final void openWindowed(
-        String uId, BoundedWindow window, PaneInfo paneInfo, int shard, int numShards)
+    public final void openWindowed(String uId, BoundedWindow window, PaneInfo paneInfo, int shard)
         throws Exception {
       if (!getWriteOperation().windowedWrites) {
         throw new IllegalStateException("openWindowed called a non-windowed sink.");
       }
-      open(uId, window, paneInfo, shard, numShards);
+      open(uId, window, paneInfo, shard);
     }
 
     /**
@@ -767,13 +798,11 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
      * Similar to {@link #openWindowed} however for the case where unwindowed writes were
      * requested.
      */
-    public final void openUnwindowed(String uId,
-                                     int shard,
-                                     int numShards) throws Exception {
+    public final void openUnwindowed(String uId, int shard) throws Exception {
       if (getWriteOperation().windowedWrites) {
         throw new IllegalStateException("openUnwindowed called a windowed sink.");
       }
-      open(uId, null, null, shard, numShards);
+      open(uId, null, null, shard);
     }
 
     // Helper function to close a channel, on exception cases.
@@ -792,13 +821,11 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
     private void open(String uId,
                       @Nullable BoundedWindow window,
                       @Nullable PaneInfo paneInfo,
-                      int shard,
-                      int numShards) throws Exception {
+                      int shard) throws Exception {
       this.id = uId;
       this.window = window;
       this.paneInfo = paneInfo;
       this.shard = shard;
-      this.numShards = numShards;
       ResourceId tempDirectory = getWriteOperation().tempDirectory.get();
       outputFile = tempDirectory.resolve(id, StandardResolveOptions.RESOLVE_FILE);
       verifyNotNull(
@@ -874,23 +901,8 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
         throw new IOException(String.format("Failed closing channel to %s", outputFile), e);
       }
 
-      FileBasedSink<T> sink = getWriteOperation().getSink();
-      ResourceId outputDirectory = sink.getBaseOutputDirectoryProvider().get();
-      FilenamePolicy filenamePolicy = sink.filenamePolicy;
-      String extension = sink.getExtension();
-      @Nullable ResourceId destinationFile;
-      if (window != null) {
-        destinationFile = filenamePolicy.windowedFilename(outputDirectory, new WindowedContext(
-            window, paneInfo, shard, numShards), extension);
-      } else if (numShards > 0) {
-        destinationFile = filenamePolicy.unwindowedFilename(
-            outputDirectory, new Context(shard, numShards), extension);
-      } else {
-        // Destination filename to be generated in the next step.
-        destinationFile = null;
-      }
-      FileResult result = new FileResult(outputFile, destinationFile);
-      LOG.debug("Result for bundle {}: {} {}", this.id, outputFile, destinationFile);
+      FileResult result = new FileResult(outputFile, shard, window, paneInfo);
+      LOG.debug("Result for bundle {}: {}", this.id, outputFile);
       return result;
     }
 
@@ -906,31 +918,56 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
    * Result of a single bundle write. Contains the filename produced by the bundle, and if known
    * the final output filename.
    */
-  public static final class FileResult implements Serializable {
-    private final ResourceId filename;
-    @Nullable private final ResourceId destinationFilename;
+  public static final class FileResult {
+    private final ResourceId tempFilename;
+    private int shard;
+    private BoundedWindow window;
+    private PaneInfo paneInfo;
 
-    public FileResult(ResourceId filename, @Nullable ResourceId destinationFilename) {
-      this.filename = filename;
-      this.destinationFilename = destinationFilename;
+    public FileResult(ResourceId tempFilename, int shard, BoundedWindow window,
+                      PaneInfo paneInfo) {
+      this.tempFilename = tempFilename;
+      this.shard = shard;
+      this.window = window;
+      this.paneInfo = paneInfo;
     }
 
-    public ResourceId getFilename() {
-      return filename;
+    public ResourceId getTempFilename() {
+      return tempFilename;
     }
 
-    /**
-     * The filename to be written. Will be null if the output filename is unknown because the number
-     * of shards is determined dynamically by the runner.
-     */
-    @Nullable public ResourceId getDestinationFilename() {
-      return destinationFilename;
+    public int getShard() {
+      return shard;
+    }
+
+    public void setShard(int shard) {
+      this.shard = shard;
+    }
+
+    public BoundedWindow getWindow() {
+      return window;
+    }
+
+    public PaneInfo getPaneInfo() {
+      return paneInfo;
+    }
+
+    public ResourceId getDestinationFile(FilenamePolicy policy, ResourceId outputDirectory,
+                                         int numShards, String extension) {
+      checkArgument(getShard() != WriteFiles.UNKNOWN_SHARDNUM);
+      checkArgument(numShards > 0);
+      if (getWindow() != null) {
+        return policy.windowedFilename(outputDirectory, new WindowedContext(
+            getWindow(), getPaneInfo(), getShard(), numShards), extension);
+      } else {
+        return policy.unwindowedFilename(outputDirectory, new Context(getShard(), numShards),
+            extension);
+      }
     }
 
     public String toString() {
-      return MoreObjects.toStringHelper(FileResult.class)
-          .add("filename", filename)
-          .add("destinationFilename", destinationFilename)
+      return MoreObjects.toStringHelper(FileBasedSink.FileResult.class)
+          .add("tempFilename", tempFilename)
           .toString();
     }
   }
@@ -938,12 +975,23 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
   /**
    * A coder for {@link FileResult} objects.
    */
-  public static final class FileResultCoder extends AtomicCoder<FileResult> {
-    private static final FileResultCoder INSTANCE = new FileResultCoder();
-    private final NullableCoder<String> stringCoder = NullableCoder.of(StringUtf8Coder.of());
+  public static final class FileResultCoder extends StructuredCoder<FileResult> {
+    private final Coder<String> stringCoder = StringUtf8Coder.of();
+    private final Coder<Integer> integerCoder = VarIntCoder.of();
+    private final Coder<PaneInfo> paneInfoCoder = NullableCoder.of(PaneInfoCoder.INSTANCE);
+    private final Coder<BoundedWindow> windowCoder;
+
+    protected FileResultCoder(Coder<BoundedWindow> windowCoder) {
+      this.windowCoder = NullableCoder.of(windowCoder);
+    }
+
+    public static FileResultCoder of(Coder<BoundedWindow> windowCoder) {
+      return new FileResultCoder(windowCoder);
+    }
 
-    public static FileResultCoder of() {
-      return INSTANCE;
+    @Override
+    public List<? extends Coder<?>> getCoderArguments() {
+      return Arrays.asList(windowCoder);
     }
 
     @Override
@@ -952,29 +1000,30 @@ public abstract class FileBasedSink<T> implements Serializable, HasDisplayData {
       if (value == null) {
         throw new CoderException("cannot encode a null value");
       }
-      stringCoder.encode(value.getFilename().toString(), outStream);
-      if (value.getDestinationFilename() == null) {
-        stringCoder.encode(null, outStream);
-      } else {
-        stringCoder.encode(value.getDestinationFilename().toString(), outStream);
-      }
+      stringCoder.encode(value.getTempFilename().toString(), outStream);
+      windowCoder.encode(value.getWindow(), outStream);
+      paneInfoCoder.encode(value.getPaneInfo(), outStream);
+      integerCoder.encode(value.getShard(), outStream);
     }
 
     @Override
-    public FileResult decode(InputStream inStream) throws IOException {
-      String filename = stringCoder.decode(inStream);
-      assert filename != null;  // fixes a compiler warning
-      @Nullable String destinationFilename = stringCoder.decode(inStream);
-      return new FileResult(
-          FileSystems.matchNewResource(filename, false /* isDirectory */),
-          destinationFilename == null
-              ? null
-              : FileSystems.matchNewResource(destinationFilename, false /* isDirectory */));
+    public FileResult decode(InputStream inStream)
+        throws IOException {
+      String tempFilename = stringCoder.decode(inStream);
+      assert tempFilename != null;  // fixes a compiler warning
+      BoundedWindow window = windowCoder.decode(inStream);
+      PaneInfo paneInfo = paneInfoCoder.decode(inStream);
+      int shard = integerCoder.decode(inStream);
+      return new FileResult(FileSystems.matchNewResource(tempFilename, false /* isDirectory */),
+          shard, window, paneInfo);
     }
 
     @Override
     public void verifyDeterministic() throws NonDeterministicException {
       stringCoder.verifyDeterministic();
+      windowCoder.verifyDeterministic();
+      paneInfoCoder.verifyDeterministic();
+      integerCoder.verifyDeterministic();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/2d5fbf6a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
index af24a8f..cab5862 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
@@ -22,12 +22,15 @@ import static com.google.common.base.Preconditions.checkNotNull;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import java.util.List;
+import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.FileBasedSink.FileResult;
@@ -48,6 +51,7 @@ import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
@@ -82,9 +86,7 @@ import org.slf4j.LoggerFactory;
 public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> {
   private static final Logger LOG = LoggerFactory.getLogger(WriteFiles.class);
 
-  private static final int UNKNOWN_SHARDNUM = -1;
-  private static final int UNKNOWN_NUMSHARDS = -1;
-
+  static final int UNKNOWN_SHARDNUM = -1;
   private FileBasedSink<T> sink;
   private WriteOperation<T> writeOperation;
   // This allows the number of shards to be dynamically computed based on the input
@@ -119,9 +121,18 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> {
 
   @Override
   public PDone expand(PCollection<T> input) {
-    checkArgument(IsBounded.BOUNDED == input.isBounded() || windowedWrites,
-        "%s can only be applied to an unbounded PCollection if doing windowed writes",
-        WriteFiles.class.getSimpleName());
+    if (input.isBounded() == IsBounded.UNBOUNDED) {
+      checkArgument(windowedWrites,
+          "Must use windowed writes when applying %s to an unbounded PCollection",
+          WriteFiles.class.getSimpleName());
+      // The reason for this is https://issues.apache.org/jira/browse/BEAM-1438
+      // and similar behavior in other runners.
+      checkArgument(
+          computeNumShards != null || numShardsProvider != null,
+          "When applying %s to an unbounded PCollection, "
+              + "must specify number of output shards explicitly",
+          WriteFiles.class.getSimpleName());
+    }
     this.writeOperation = sink.createWriteOperation();
     this.writeOperation.setWindowedWrites(windowedWrites);
     return createWrite(input);
@@ -246,26 +257,50 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> {
     // initialized in processElement.
     private Writer<T> writer = null;
     private BoundedWindow window = null;
+    private Map<KV<BoundedWindow, PaneInfo>, Writer<T>> windowedWriters;
 
-    WriteBundles() {
-    }
-
-    @ProcessElement
-    public void processElement(ProcessContext c, BoundedWindow window) throws Exception {
-      // Lazily initialize the Writer
-      if (writer == null) {
-        LOG.info("Opening writer for write operation {}", writeOperation);
-        writer = writeOperation.createWriter();
-
-        if (windowedWrites) {
-          writer.openWindowed(UUID.randomUUID().toString(), window, c.pane(), UNKNOWN_SHARDNUM,
-              UNKNOWN_NUMSHARDS);
-        } else {
-          writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM, UNKNOWN_NUMSHARDS);
+    Writer<T> getWriter(BoundedWindow window, PaneInfo paneInfo) throws Exception {
+      Writer<T> writer;
+      if (windowedWrites) {
+        // If we are doing windowed writes, we need to ensure that we have separate files for
+        // data in different windows.
+        KV<BoundedWindow, PaneInfo> key = KV.of(window, paneInfo);
+        writer = windowedWriters.get(key);
+        if (writer == null) {
+          LOG.info("Opening writer for write operation {}, window {}", writeOperation, window);
+          writer = writeOperation.createWriter();
+          writer.openWindowed(UUID.randomUUID().toString(), window, paneInfo, UNKNOWN_SHARDNUM);
+          windowedWriters.put(key, writer);
+          LOG.debug("Done opening writer {} for operation {} window {}", writer, writeOperation,
+              window);
+        }
+      } else {
+        // Unwindowed writes. Just cache a single writer for the bundle.
+        writer = this.writer;
+        if (writer == null) {
+          LOG.info("Opening writer for write operation {}", writeOperation);
+          writer = this.writer = writeOperation.createWriter();
+          writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM);
+          LOG.debug("Done opening writer {} for operation {}", writer, writeOperation);
         }
         this.window = window;
         LOG.debug("Done opening writer {} for operation {}", writer, writeOperation);
       }
+      return writer;
+    }
+
+    @StartBundle
+    public void startBundle(StartBundleContext c) {
+      // Reset state in case of reuse. We need to make sure that each bundle gets unique writers.
+      writer = null;
+      if (windowedWrites) {
+        windowedWriters = Maps.newHashMap();
+      }
+    }
+
+    @ProcessElement
+    public void processElement(ProcessContext c, BoundedWindow window) throws Exception {
+      Writer<T> writer = getWriter(window, c.pane());
       try {
         writer.write(c.element());
       } catch (Exception e) {
@@ -292,6 +327,13 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> {
         c.output(result, window.maxTimestamp(), window);
         // Reset state in case of reuse.
         writer = null;
+      } else {
+        for (Map.Entry<KV<BoundedWindow, PaneInfo>, Writer<T>> entry :
+            windowedWriters.entrySet()) {
+          FileResult result = entry.getValue().close();
+          BoundedWindow window = entry.getKey().getKey();
+          c.output(result, window.maxTimestamp(), window);
+        }
       }
     }
 
@@ -308,24 +350,16 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> {
    * @see WriteBundles
    */
   private class WriteShardedBundles extends DoFn<KV<Integer, Iterable<T>>, FileResult> {
-    private final PCollectionView<Integer> numShardsView;
-
-    WriteShardedBundles(PCollectionView<Integer> numShardsView) {
-      this.numShardsView = numShardsView;
-    }
-
     @ProcessElement
     public void processElement(ProcessContext c, BoundedWindow window) throws Exception {
-      int numShards = numShardsView != null ? c.sideInput(numShardsView) : getNumShards().get();
       // In a sharded write, single input element represents one shard. We can open and close
       // the writer in each call to processElement.
       LOG.info("Opening writer for write operation {}", writeOperation);
       Writer<T> writer = writeOperation.createWriter();
       if (windowedWrites) {
-        writer.openWindowed(UUID.randomUUID().toString(), window, c.pane(), c.element().getKey(),
-            numShards);
+        writer.openWindowed(UUID.randomUUID().toString(), window, c.pane(), c.element().getKey());
       } else {
-        writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM, UNKNOWN_NUMSHARDS);
+        writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM);
       }
       LOG.debug("Done opening writer {} for operation {}", writer, writeOperation);
 
@@ -378,7 +412,7 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> {
 
     @ProcessElement
     public void processElement(ProcessContext context) {
-      int shardCount = 0;
+      final int shardCount;
       if (numShardsView != null) {
         shardCount = context.sideInput(numShardsView);
       } else {
@@ -448,35 +482,33 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> {
     // initial ParDo.
     PCollection<FileResult> results;
     final PCollectionView<Integer> numShardsView;
+    Coder<BoundedWindow> shardedWindowCoder =
+        (Coder<BoundedWindow>) input.getWindowingStrategy().getWindowFn().windowCoder();
     if (computeNumShards == null && numShardsProvider == null) {
-      if (windowedWrites) {
-        throw new IllegalStateException("When doing windowed writes, numShards must be set"
-            + "explicitly to a positive value");
-      }
       numShardsView = null;
-      results = input
-          .apply("WriteBundles",
-              ParDo.of(new WriteBundles()));
+      results = input.apply("WriteBundles", ParDo.of(new WriteBundles()));
     } else {
+      List<PCollectionView<?>> sideInputs = Lists.newArrayList();
       if (computeNumShards != null) {
         numShardsView = input.apply(computeNumShards);
-        results  = input
-            .apply("ApplyShardLabel", ParDo.of(
-                new ApplyShardingKey<T>(numShardsView, null)).withSideInputs(numShardsView))
-            .apply("GroupIntoShards", GroupByKey.<Integer, T>create())
-            .apply("WriteShardedBundles",
-                ParDo.of(new WriteShardedBundles(numShardsView))
-                    .withSideInputs(numShardsView));
+        sideInputs.add(numShardsView);
       } else {
         numShardsView = null;
-        results = input
-            .apply("ApplyShardLabel", ParDo.of(new ApplyShardingKey<T>(null, numShardsProvider)))
-            .apply("GroupIntoShards", GroupByKey.<Integer, T>create())
-            .apply("WriteShardedBundles",
-                ParDo.of(new WriteShardedBundles(null)));
       }
+
+      PCollection<KV<Integer, Iterable<T>>> sharded =
+          input
+              .apply("ApplyShardLabel", ParDo.of(
+                  new ApplyShardingKey<T>(numShardsView,
+                      (numShardsView != null) ? null : numShardsProvider))
+                  .withSideInputs(sideInputs))
+              .apply("GroupIntoShards", GroupByKey.<Integer, T>create());
+      shardedWindowCoder =
+          (Coder<BoundedWindow>) sharded.getWindowingStrategy().getWindowFn().windowCoder();
+
+      results = sharded.apply("WriteShardedBundles", ParDo.of(new WriteShardedBundles()));
     }
-    results.setCoder(FileResultCoder.of());
+    results.setCoder(FileResultCoder.of(shardedWindowCoder));
 
     if (windowedWrites) {
       // When processing streaming windowed writes, results will arrive multiple times. This
@@ -486,7 +518,8 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> {
       // whenever new data arrives.
       PCollection<KV<Void, FileResult>> keyedResults =
           results.apply("AttachSingletonKey", WithKeys.<Void, FileResult>of((Void) null));
-      keyedResults.setCoder(KvCoder.of(VoidCoder.of(), FileResultCoder.of()));
+      keyedResults.setCoder(KvCoder.of(VoidCoder.of(),
+          FileResultCoder.of(shardedWindowCoder)));
 
       // Is the continuation trigger sufficient?
       keyedResults
@@ -497,7 +530,7 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> {
               LOG.info("Finalizing write operation {}.", writeOperation);
               List<FileResult> results = Lists.newArrayList(c.element().getValue());
               writeOperation.finalize(results);
-              LOG.debug("Done finalizing write operation {}", writeOperation);
+              LOG.debug("Done finalizing write operation");
             }
           }));
     } else {
@@ -543,8 +576,7 @@ public class WriteFiles<T> extends PTransform<PCollection<T>, PDone> {
                     extraShardsNeeded, results.size(), minShardsNeeded);
                 for (int i = 0; i < extraShardsNeeded; ++i) {
                   Writer<T> writer = writeOperation.createWriter();
-                  writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM,
-                      UNKNOWN_NUMSHARDS);
+                  writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM);
                   FileResult emptyWrite = writer.close();
                   results.add(emptyWrite);
                 }

http://git-wip-us.apache.org/repos/asf/beam/blob/2d5fbf6a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
index c3f2a58..caad759 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
@@ -67,8 +67,7 @@ import org.junit.runners.JUnit4;
  */
 @RunWith(JUnit4.class)
 public class FileBasedSinkTest {
-  @Rule
-  public TemporaryFolder tmpFolder = new TemporaryFolder();
+  @Rule public TemporaryFolder tmpFolder = new TemporaryFolder();
 
   private final String tempDirectoryName = "temp";
 
@@ -88,13 +87,13 @@ public class FileBasedSinkTest {
   }
 
   /**
-   * Writer opens the correct file, writes the header, footer, and elements in the
-   * correct order, and returns the correct filename.
+   * Writer opens the correct file, writes the header, footer, and elements in the correct
+   * order, and returns the correct filename.
    */
   @Test
   public void testWriter() throws Exception {
     String testUid = "testId";
-    ResourceId expectedFile = getBaseTempDirectory()
+    ResourceId expectedTempFile = getBaseTempDirectory()
         .resolve(testUid, StandardResolveOptions.RESOLVE_FILE);
     List<String> values = Arrays.asList("sympathetic vulture", "boresome hummingbird");
     List<String> expected = new ArrayList<>();
@@ -104,14 +103,15 @@ public class FileBasedSinkTest {
 
     SimpleSink.SimpleWriter writer =
         buildWriteOperationWithTempDir(getBaseTempDirectory()).createWriter();
-    writer.openUnwindowed(testUid, -1, -1);
+    writer.openUnwindowed(testUid, -1);
     for (String value : values) {
       writer.write(value);
     }
     FileResult result = writer.close();
 
-    assertEquals(expectedFile, result.getFilename());
-    assertFileContains(expected, expectedFile);
+    FileBasedSink sink = writer.getWriteOperation().getSink();
+    assertEquals(expectedTempFile, result.getTempFilename());
+    assertFileContains(expected, expectedTempFile);
   }
 
   /**
@@ -131,9 +131,7 @@ public class FileBasedSinkTest {
     }
   }
 
-  /**
-   * Write lines to a file.
-   */
+  /** Write lines to a file. */
   private void writeFile(List<String> lines, File file) throws Exception {
     try (PrintWriter writer = new PrintWriter(new FileOutputStream(file))) {
       for (String line : lines) {
@@ -150,18 +148,14 @@ public class FileBasedSinkTest {
     testRemoveTemporaryFiles(3, getBaseTempDirectory());
   }
 
-  /**
-   * Finalize copies temporary files to output files and removes any temporary files.
-   */
+  /** Finalize copies temporary files to output files and removes any temporary files. */
   @Test
   public void testFinalize() throws Exception {
     List<File> files = generateTemporaryFilesForFinalize(3);
     runFinalize(buildWriteOperation(), files);
   }
 
-  /**
-   * Finalize can be called repeatedly.
-   */
+  /** Finalize can be called repeatedly. */
   @Test
   public void testFinalizeMultipleCalls() throws Exception {
     List<File> files = generateTemporaryFilesForFinalize(3);
@@ -170,9 +164,7 @@ public class FileBasedSinkTest {
     runFinalize(writeOp, files);
   }
 
-  /**
-   * Finalize can be called when some temporary files do not exist and output files exist.
-   */
+  /** Finalize can be called when some temporary files do not exist and output files exist. */
   @Test
   public void testFinalizeWithIntermediateState() throws Exception {
     SimpleSink.SimpleWriteOperation writeOp = buildWriteOperation();
@@ -186,9 +178,7 @@ public class FileBasedSinkTest {
     runFinalize(writeOp, files);
   }
 
-  /**
-   * Generate n temporary files using the temporary file pattern of Writer.
-   */
+  /** Generate n temporary files using the temporary file pattern of Writer. */
   private List<File> generateTemporaryFilesForFinalize(int numFiles) throws Exception {
     List<File> temporaryFiles = new ArrayList<>();
     for (int i = 0; i < numFiles; i++) {
@@ -203,18 +193,20 @@ public class FileBasedSinkTest {
     return temporaryFiles;
   }
 
-  /**
-   * Finalize and verify that files are copied and temporary files are optionally removed.
-   */
+  /** Finalize and verify that files are copied and temporary files are optionally removed. */
   private void runFinalize(SimpleSink.SimpleWriteOperation writeOp, List<File> temporaryFiles)
       throws Exception {
     int numFiles = temporaryFiles.size();
 
     List<FileResult> fileResults = new ArrayList<>();
     // Create temporary output bundles and output File objects.
-    for (File f : temporaryFiles) {
-      ResourceId file = LocalResources.fromFile(f, false);
-      fileResults.add(new FileResult(file, null));
+    for (int i = 0; i < numFiles; i++) {
+      fileResults.add(
+          new FileResult(
+              LocalResources.fromFile(temporaryFiles.get(i), false),
+              WriteFiles.UNKNOWN_SHARDNUM,
+              null,
+              null));
     }
 
     writeOp.finalize(fileResults);
@@ -233,8 +225,8 @@ public class FileBasedSinkTest {
   }
 
   /**
-   * Create n temporary and output files and verify that removeTemporaryFiles only
-   * removes temporary files.
+   * Create n temporary and output files and verify that removeTemporaryFiles only removes temporary
+   * files.
    */
   private void testRemoveTemporaryFiles(int numFiles, ResourceId tempDirectory)
       throws Exception {
@@ -276,9 +268,7 @@ public class FileBasedSinkTest {
     }
   }
 
-  /**
-   * Output files are copied to the destination location with the correct names and contents.
-   */
+  /** Output files are copied to the destination location with the correct names and contents. */
   @Test
   public void testCopyToOutputFiles() throws Exception {
     SimpleSink.SimpleWriteOperation writeOp = buildWriteOperation();
@@ -329,6 +319,7 @@ public class FileBasedSinkTest {
   /**
    * Output filenames are generated correctly when an extension is supplied.
    */
+
   @Test
   public void testGenerateOutputFilenames() {
     List<ResourceId> expected;
@@ -357,9 +348,7 @@ public class FileBasedSinkTest {
     assertEquals(expected, actual);
   }
 
-  /**
-   * Reject non-distinct output filenames.
-   */
+  /** Reject non-distinct output filenames. */
   @Test
   public void testCollidingOutputFilenames() throws IOException {
     ResourceId root = getBaseOutputDirectory();
@@ -372,22 +361,19 @@ public class FileBasedSinkTest {
     ResourceId output = root.resolve("file-03.test", StandardResolveOptions.RESOLVE_FILE);
     // More than one shard does.
     try {
-      Iterable<FileResult> results = Lists.newArrayList(
-          new FileResult(temp1, output),
-          new FileResult(temp2, output),
-          new FileResult(temp3, output));
-
+      Iterable<FileResult> results =
+          Lists.newArrayList(
+              new FileResult(temp1, 1, null, null),
+              new FileResult(temp2, 1, null, null),
+              new FileResult(temp3, 1, null, null));
       writeOp.buildOutputFilenames(results);
       fail("Should have failed.");
     } catch (IllegalStateException exn) {
-      assertEquals("Only generated 1 distinct file names for 3 files.",
-                   exn.getMessage());
+      assertEquals("Only generated 1 distinct file names for 3 files.", exn.getMessage());
     }
   }
 
-  /**
-   * Output filenames are generated correctly when an extension is not supplied.
-   */
+  /** Output filenames are generated correctly when an extension is not supplied. */
   @Test
   public void testGenerateOutputFilenamesWithoutExtension() {
     List<ResourceId> expected;
@@ -415,53 +401,59 @@ public class FileBasedSinkTest {
     assertEquals(expected, actual);
   }
 
-  /**
-   * {@link CompressionType#BZIP2} correctly writes BZip2 data.
-   */
+  /** {@link CompressionType#BZIP2} correctly writes BZip2 data. */
   @Test
   public void testCompressionTypeBZIP2() throws FileNotFoundException, IOException {
     final File file =
         writeValuesWithWritableByteChannelFactory(CompressionType.BZIP2, "abc", "123");
     // Read Bzip2ed data back in using Apache commons API (de facto standard).
-    assertReadValues(new BufferedReader(new InputStreamReader(
-        new BZip2CompressorInputStream(new FileInputStream(file)), StandardCharsets.UTF_8.name())),
-        "abc", "123");
+    assertReadValues(
+        new BufferedReader(
+            new InputStreamReader(
+                new BZip2CompressorInputStream(new FileInputStream(file)),
+                StandardCharsets.UTF_8.name())),
+        "abc",
+        "123");
   }
 
-  /**
-   * {@link CompressionType#GZIP} correctly writes Gzipped data.
-   */
+  /** {@link CompressionType#GZIP} correctly writes Gzipped data. */
   @Test
   public void testCompressionTypeGZIP() throws FileNotFoundException, IOException {
     final File file = writeValuesWithWritableByteChannelFactory(CompressionType.GZIP, "abc", "123");
     // Read Gzipped data back in using standard API.
-    assertReadValues(new BufferedReader(new InputStreamReader(
-        new GZIPInputStream(new FileInputStream(file)), StandardCharsets.UTF_8.name())), "abc",
+    assertReadValues(
+        new BufferedReader(
+            new InputStreamReader(
+                new GZIPInputStream(new FileInputStream(file)), StandardCharsets.UTF_8.name())),
+        "abc",
         "123");
   }
 
-  /**
-   * {@link CompressionType#DEFLATE} correctly writes deflate data.
-   */
+  /** {@link CompressionType#DEFLATE} correctly writes deflate data. */
   @Test
   public void testCompressionTypeDEFLATE() throws FileNotFoundException, IOException {
-    final File file = writeValuesWithWritableByteChannelFactory(
-        CompressionType.DEFLATE, "abc", "123");
+    final File file =
+        writeValuesWithWritableByteChannelFactory(CompressionType.DEFLATE, "abc", "123");
     // Read Gzipped data back in using standard API.
-    assertReadValues(new BufferedReader(new InputStreamReader(new DeflateCompressorInputStream(
-        new FileInputStream(file)), StandardCharsets.UTF_8.name())), "abc", "123");
+    assertReadValues(
+        new BufferedReader(
+            new InputStreamReader(
+                new DeflateCompressorInputStream(new FileInputStream(file)),
+                StandardCharsets.UTF_8.name())),
+        "abc",
+        "123");
   }
 
-  /**
-   * {@link CompressionType#UNCOMPRESSED} correctly writes uncompressed data.
-   */
+  /** {@link CompressionType#UNCOMPRESSED} correctly writes uncompressed data. */
   @Test
   public void testCompressionTypeUNCOMPRESSED() throws FileNotFoundException, IOException {
     final File file =
         writeValuesWithWritableByteChannelFactory(CompressionType.UNCOMPRESSED, "abc", "123");
     // Read uncompressed data back in using standard API.
-    assertReadValues(new BufferedReader(new InputStreamReader(
-        new FileInputStream(file), StandardCharsets.UTF_8.name())), "abc",
+    assertReadValues(
+        new BufferedReader(
+            new InputStreamReader(new FileInputStream(file), StandardCharsets.UTF_8.name())),
+        "abc",
         "123");
   }
 
@@ -487,8 +479,8 @@ public class FileBasedSinkTest {
   }
 
   /**
-   * {@link Writer} writes to the {@link WritableByteChannel} provided by
-   * {@link DrunkWritableByteChannelFactory}.
+   * {@link Writer} writes to the {@link WritableByteChannel} provided by {@link
+   * DrunkWritableByteChannelFactory}.
    */
   @Test
   public void testFileBasedWriterWithWritableByteChannelFactory() throws Exception {
@@ -511,18 +503,16 @@ public class FileBasedSinkTest {
     expected.add("footer");
     expected.add("footer");
 
-    writer.openUnwindowed(testUid, -1, -1);
+    writer.openUnwindowed(testUid, -1);
     writer.write("a");
     writer.write("b");
     final FileResult result = writer.close();
 
-    assertEquals(expectedFile, result.getFilename());
+    assertEquals(expectedFile, result.getTempFilename());
     assertFileContains(expected, expectedFile);
   }
 
-  /**
-   * Build a SimpleSink with default options.
-   */
+  /** Build a SimpleSink with default options. */
   private SimpleSink buildSink() {
     return new SimpleSink(getBaseOutputDirectory(), "file", "-SS-of-NN", ".test");
   }
@@ -535,9 +525,7 @@ public class FileBasedSinkTest {
     return new SimpleSink.SimpleWriteOperation(sink, tempDirectory);
   }
 
-  /**
-   * Build a write operation with the default options for it and its parent sink.
-   */
+  /** Build a write operation with the default options for it and its parent sink. */
   private SimpleSink.SimpleWriteOperation buildWriteOperation() {
     return buildSink().createWriteOperation();
   }