You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/11/17 20:31:19 UTC

[28/50] [abbrv] beam git commit: WriteFiles improvements:

WriteFiles improvements:

* More clear and consistent shard number assignment logic
* Uses Reshuffle instead of GBK in finalize


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2eea9fee
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2eea9fee
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2eea9fee

Branch: refs/heads/tez-runner
Commit: 2eea9fee30e7daf8dbc9efa3157ed69a675b6ae1
Parents: 9f780c9
Author: Eugene Kirpichov <ki...@google.com>
Authored: Mon Nov 13 11:58:11 2017 -0800
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue Nov 14 19:23:53 2017 -0800

----------------------------------------------------------------------
 .../apex/translation/ParDoTranslator.java       |   2 +-
 .../runners/apex/examples/WordCountTest.java    |   2 +-
 .../construction/WriteFilesTranslationTest.java |   1 -
 .../beam/runners/spark/io/AvroPipelineTest.java |  12 +-
 .../org/apache/beam/sdk/io/FileBasedSink.java   | 169 ++++-----
 .../java/org/apache/beam/sdk/io/WriteFiles.java | 375 ++++++++++++-------
 .../apache/beam/sdk/io/FileBasedSinkTest.java   |  77 ++--
 .../org/apache/beam/sdk/io/TextIOWriteTest.java |  84 +++--
 .../org/apache/beam/sdk/io/WriteFilesTest.java  |  47 ++-
 9 files changed, 451 insertions(+), 318 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/2eea9fee/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java
index dd4bd67..6a052d1 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java
@@ -213,7 +213,7 @@ class ParDoTranslator<InputT, OutputT>
             sideInputCollection.getWindowingStrategy());
       }
       if (!sideInputCollection.getCoder().equals(firstSideInput.getCoder())) {
-        String msg = "Multiple side inputs with different coders.";
+        String msg = context.getFullName() + ": Multiple side inputs with different coders.";
         throw new UnsupportedOperationException(msg);
       }
       sourceCollections.add(context.<PCollection<Object>>getViewInput(sideInput));

http://git-wip-us.apache.org/repos/asf/beam/blob/2eea9fee/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
index ba75746..e050c15 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
@@ -108,7 +108,7 @@ public class WordCountTest {
       .apply(ParDo.of(new ExtractWordsFn()))
       .apply(Count.<String>perElement())
       .apply(ParDo.of(new FormatAsStringFn()))
-      .apply("WriteCounts", TextIO.write().to(options.getOutput()))
+      .apply("WriteCounts", TextIO.write().to(options.getOutput()).withNumShards(2))
       ;
     p.run().waitUntilFinish();
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/2eea9fee/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java
index 4bc61d4..ccb366e 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java
@@ -64,7 +64,6 @@ public class WriteFilesTranslationTest {
     public static Iterable<WriteFiles<Object, Void, Object>> data() {
       return ImmutableList.of(
           WriteFiles.to(new DummySink()),
-          WriteFiles.to(new DummySink()).withWindowedWrites(),
           WriteFiles.to(new DummySink()).withNumShards(17),
           WriteFiles.to(new DummySink()).withWindowedWrites().withNumShards(42));
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/2eea9fee/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
index adde8d2..e17a6b8 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
@@ -47,7 +47,7 @@ import org.junit.rules.TemporaryFolder;
 public class AvroPipelineTest {
 
   private File inputFile;
-  private File outputDir;
+  private File outputFile;
 
   @Rule
   public final TemporaryFolder tmpDir = new TemporaryFolder();
@@ -58,8 +58,7 @@ public class AvroPipelineTest {
   @Before
   public void setUp() throws IOException {
     inputFile = tmpDir.newFile("test.avro");
-    outputDir = tmpDir.newFolder("out");
-    outputDir.delete();
+    outputFile = new File(tmpDir.getRoot(), "out.avro");
   }
 
   @Test
@@ -73,7 +72,10 @@ public class AvroPipelineTest {
 
     PCollection<GenericRecord> input = pipeline.apply(
         AvroIO.readGenericRecords(schema).from(inputFile.getAbsolutePath()));
-    input.apply(AvroIO.writeGenericRecords(schema).to(outputDir.getAbsolutePath()));
+    input.apply(
+        AvroIO.writeGenericRecords(schema)
+            .to(outputFile.getAbsolutePath())
+            .withoutSharding());
     pipeline.run();
 
     List<GenericRecord> records = readGenericFile();
@@ -98,7 +100,7 @@ public class AvroPipelineTest {
     List<GenericRecord> records = Lists.newArrayList();
     GenericDatumReader<GenericRecord> genericDatumReader = new GenericDatumReader<>();
     try (DataFileReader<GenericRecord> dataFileReader =
-             new DataFileReader<>(new File(outputDir + "-00000-of-00001"), genericDatumReader)) {
+        new DataFileReader<>(outputFile, genericDatumReader)) {
       for (GenericRecord record : dataFileReader) {
         records.add(record);
       }

http://git-wip-us.apache.org/repos/asf/beam/blob/2eea9fee/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index 830e16b..d4cb57d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -44,6 +44,7 @@ import java.util.Comparator;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
 import javax.annotation.Nullable;
@@ -74,6 +75,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo.PaneInfoCoder;
 import org.apache.beam.sdk.util.MimeTypes;
+import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TypeDescriptor;
 import org.apache.beam.sdk.values.TypeDescriptors.TypeVariableExtractor;
@@ -123,6 +125,7 @@ import org.slf4j.LoggerFactory;
 public abstract class FileBasedSink<UserT, DestinationT, OutputT>
     implements Serializable, HasDisplayData {
   private static final Logger LOG = LoggerFactory.getLogger(FileBasedSink.class);
+  static final String TEMP_DIRECTORY_PREFIX = ".temp-beam";
 
   /** @deprecated use {@link Compression}. */
   @Deprecated
@@ -511,7 +514,7 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
         implements SerializableFunction<ResourceId, ResourceId> {
       private static final AtomicLong TEMP_COUNT = new AtomicLong(0);
       private static final DateTimeFormatter TEMPDIR_TIMESTAMP =
-          DateTimeFormat.forPattern("yyyy-MM-DD_HH-mm-ss");
+          DateTimeFormat.forPattern("yyyy-MM-dd_HH-mm-ss");
       // The intent of the code is to have a consistent value of tempDirectory across
       // all workers, which wouldn't happen if now() was called inline.
       private final String timestamp = Instant.now().toString(TEMPDIR_TIMESTAMP);
@@ -522,7 +525,7 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
       @Override
       public ResourceId apply(ResourceId tempDirectory) {
         // Temp directory has a timestamp and a unique ID
-        String tempDirName = String.format(".temp-beam-%s-%s", timestamp, tempId);
+        String tempDirName = String.format(TEMP_DIRECTORY_PREFIX + "-%s-%s", timestamp, tempId);
         return tempDirectory
             .getCurrentDirectory()
             .resolve(tempDirName, StandardResolveOptions.RESOLVE_DIRECTORY);
@@ -558,30 +561,6 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
       this.windowedWrites = windowedWrites;
     }
 
-    /**
-     * Finalizes writing by copying temporary output files to their final location.
-     *
-     * <p>Finalization may be overridden by subclass implementations to perform customized
-     * finalization (e.g., initiating some operation on output bundles, merging them, etc.). {@code
-     * writerResults} contains the filenames of written bundles.
-     *
-     * <p>If subclasses override this method, they must guarantee that its implementation is
-     * idempotent, as it may be executed multiple times in the case of failure or for redundancy. It
-     * is a best practice to attempt to try to make this method atomic.
-     *
-     * <p>Returns the map of temporary files generated to final filenames. Callers must call {@link
-     * #removeTemporaryFiles(Set)} to cleanup the temporary files.
-     *
-     * @param writerResults the results of writes (FileResult).
-     */
-    public Map<ResourceId, ResourceId> finalize(Iterable<FileResult<DestinationT>> writerResults)
-        throws Exception {
-      // Collect names of temporary files and copies them.
-      Map<ResourceId, ResourceId> outputFilenames = buildOutputFilenames(writerResults);
-      copyToOutputFiles(outputFilenames);
-      return outputFilenames;
-    }
-
     /*
      * Remove temporary files after finalization.
      *
@@ -603,34 +582,52 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
     }
 
     @Experimental(Kind.FILESYSTEM)
-    protected final Map<ResourceId, ResourceId> buildOutputFilenames(
+    protected final List<KV<FileResult<DestinationT>, ResourceId>> buildOutputFilenames(
+        @Nullable DestinationT dest,
+        @Nullable BoundedWindow window,
+        @Nullable Integer numShards,
         Iterable<FileResult<DestinationT>> writerResults) {
-      int numShards = Iterables.size(writerResults);
-      Map<ResourceId, ResourceId> outputFilenames = Maps.newHashMap();
-
-      // Either all results have a shard number set (if the sink is configured with a fixed
-      // number of shards), or they all don't (otherwise).
-      Boolean isShardNumberSetEverywhere = null;
-      for (FileResult<DestinationT> result : writerResults) {
-        boolean isShardNumberSetHere = (result.getShard() != UNKNOWN_SHARDNUM);
-        if (isShardNumberSetEverywhere == null) {
-          isShardNumberSetEverywhere = isShardNumberSetHere;
-        } else {
-          checkArgument(
-              isShardNumberSetEverywhere == isShardNumberSetHere,
-              "Found a mix of files with and without shard number set: %s",
-              result);
-        }
+      for (FileResult<DestinationT> res : writerResults) {
+        checkArgument(
+            Objects.equals(dest, res.getDestination()),
+            "File result has wrong destination: expected %s, got %s",
+            dest, res.getDestination());
+        checkArgument(
+            Objects.equals(window, res.getWindow()),
+            "File result has wrong window: expected %s, got %s",
+            window, res.getWindow());
       }
+      List<KV<FileResult<DestinationT>, ResourceId>> outputFilenames = Lists.newArrayList();
 
-      if (isShardNumberSetEverywhere == null) {
-        isShardNumberSetEverywhere = true;
+      final int effectiveNumShards;
+      if (numShards != null) {
+        effectiveNumShards = numShards;
+        for (FileResult<DestinationT> res : writerResults) {
+          checkArgument(
+              res.getShard() != UNKNOWN_SHARDNUM,
+              "Fixed sharding into %s shards was specified, "
+                  + "but file result %s does not specify a shard",
+              numShards,
+              res);
+        }
+      } else {
+        effectiveNumShards = Iterables.size(writerResults);
+        for (FileResult<DestinationT> res : writerResults) {
+          checkArgument(
+              res.getShard() == UNKNOWN_SHARDNUM,
+              "Runner-chosen sharding was specified, "
+                  + "but file result %s explicitly specifies a shard",
+              res);
+        }
       }
 
       List<FileResult<DestinationT>> resultsWithShardNumbers = Lists.newArrayList();
-      if (isShardNumberSetEverywhere) {
+      if (numShards != null) {
         resultsWithShardNumbers = Lists.newArrayList(writerResults);
       } else {
+        checkState(
+            !windowedWrites,
+            "When doing windowed writes, shards should have been assigned when writing");
         // Sort files for idempotence. Sort by temporary filename.
         // Note that this codepath should not be used when processing triggered windows. In the
         // case of triggers, the list of FileResult objects in the Finalize iterable is not
@@ -653,23 +650,24 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
         }
       }
 
+      Map<ResourceId, FileResult<DestinationT>> distinctFilenames = Maps.newHashMap();
       for (FileResult<DestinationT> result : resultsWithShardNumbers) {
         checkArgument(
             result.getShard() != UNKNOWN_SHARDNUM, "Should have set shard number on %s", result);
-        outputFilenames.put(
-            result.getTempFilename(),
-            result.getDestinationFile(
-                getSink().getDynamicDestinations(),
-                numShards,
-                getSink().getWritableByteChannelFactory()));
+        ResourceId finalFilename = result.getDestinationFile(
+            getSink().getDynamicDestinations(),
+            effectiveNumShards,
+            getSink().getWritableByteChannelFactory());
+        checkArgument(
+            !distinctFilenames.containsKey(finalFilename),
+            "Filename policy must generate unique filenames, but generated the same name %s "
+                + "for file results %s and %s",
+            finalFilename,
+            result,
+            distinctFilenames.get(finalFilename));
+        distinctFilenames.put(finalFilename, result);
+        outputFilenames.add(KV.of(result, finalFilename));
       }
-
-      int numDistinctShards = new HashSet<>(outputFilenames.values()).size();
-      checkState(
-          numDistinctShards == outputFilenames.size(),
-          "Only generated %s distinct file names for %s files.",
-          numDistinctShards,
-          outputFilenames.size());
       return outputFilenames;
     }
 
@@ -684,24 +682,23 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
      * the policy) is "dir/file", the extension is ".txt", and the fileNamingTemplate is
      * "-SSS-of-NNN", the contents of A will be copied to dir/file-000-of-003.txt, the contents of B
      * will be copied to dir/file-001-of-003.txt, etc.
-     *
-     * @param filenames the filenames of temporary files.
      */
     @VisibleForTesting
     @Experimental(Kind.FILESYSTEM)
-    final void copyToOutputFiles(Map<ResourceId, ResourceId> filenames) throws IOException {
-      int numFiles = filenames.size();
+    final void copyToOutputFiles(
+        List<KV<FileResult<DestinationT>, ResourceId>> resultsToFinalFilenames) throws IOException {
+      int numFiles = resultsToFinalFilenames.size();
       if (numFiles > 0) {
         LOG.debug("Copying {} files.", numFiles);
-        List<ResourceId> srcFiles = new ArrayList<>(filenames.size());
-        List<ResourceId> dstFiles = new ArrayList<>(filenames.size());
-        for (Map.Entry<ResourceId, ResourceId> srcDestPair : filenames.entrySet()) {
-          srcFiles.add(srcDestPair.getKey());
-          dstFiles.add(srcDestPair.getValue());
+        List<ResourceId> srcFiles = new ArrayList<>(resultsToFinalFilenames.size());
+        List<ResourceId> dstFiles = new ArrayList<>(resultsToFinalFilenames.size());
+        for (KV<FileResult<DestinationT>, ResourceId> entry : resultsToFinalFilenames) {
+          srcFiles.add(entry.getKey().getTempFilename());
+          dstFiles.add(entry.getValue());
           LOG.info(
               "Will copy temporary file {} to final location {}",
-              srcDestPair.getKey(),
-              srcDestPair.getValue());
+              entry.getKey().getTempFilename(),
+              entry.getValue());
         }
         // During a failure case, files may have been deleted in an earlier step. Thus
         // we ignore missing files here.
@@ -732,7 +729,10 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
 
       // This may still fail to remove temporary outputs of some failed bundles, but at least
       // the common case (where all bundles succeed) is guaranteed to be fully addressed.
-      Set<ResourceId> matches = new HashSet<>();
+      Set<ResourceId> allMatches = new HashSet<>(knownFiles);
+      for (ResourceId match : allMatches) {
+        LOG.info("Will remove known temporary file {}", match);
+      }
       // TODO: Windows OS cannot resolves and matches '*' in the path,
       // ignore the exception for now to avoid failing the pipeline.
       if (shouldRemoveTemporaryDirectory) {
@@ -741,29 +741,24 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
               Iterables.getOnlyElement(
                   FileSystems.match(Collections.singletonList(tempDir.toString() + "*")));
           for (Metadata matchResult : singleMatch.metadata()) {
-            matches.add(matchResult.resourceId());
-            LOG.info("Will remove temporary file {}", matchResult.resourceId());
+            if (allMatches.add(matchResult.resourceId())) {
+              LOG.info("Will also remove unknown temporary file {}", matchResult.resourceId());
+            }
           }
         } catch (Exception e) {
           LOG.warn("Failed to match temporary files under: [{}].", tempDir);
         }
       }
-      Set<ResourceId> allMatches = new HashSet<>(matches);
-      allMatches.addAll(knownFiles);
-      LOG.debug(
-          "Removing {} temporary files found under {} ({} matched glob, {} known files)",
-          allMatches.size(),
-          tempDir,
-          matches.size(),
-          allMatches.size() - matches.size());
       FileSystems.delete(allMatches, StandardMoveOptions.IGNORE_MISSING_FILES);
 
-      // Deletion of the temporary directory might fail, if not all temporary files are removed.
-      try {
-        FileSystems.delete(
-            Collections.singletonList(tempDir), StandardMoveOptions.IGNORE_MISSING_FILES);
-      } catch (Exception e) {
-        LOG.warn("Failed to remove temporary directory: [{}].", tempDir);
+      if (shouldRemoveTemporaryDirectory) {
+        // Deletion of the temporary directory might fail, if not all temporary files are removed.
+        try {
+          FileSystems.delete(
+              Collections.singletonList(tempDir), StandardMoveOptions.IGNORE_MISSING_FILES);
+        } catch (Exception e) {
+          LOG.warn("Failed to remove temporary directory: [{}].", tempDir);
+        }
       }
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/2eea9fee/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
index 7e37e78..8328d7b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
@@ -25,6 +25,7 @@ import static com.google.common.base.Preconditions.checkState;
 import com.google.common.base.Objects;
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Multimap;
@@ -32,6 +33,7 @@ import com.google.common.collect.Sets;
 import com.google.common.hash.Hashing;
 import java.io.IOException;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -62,6 +64,8 @@ import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Reshuffle;
+import org.apache.beam.sdk.transforms.Values;
 import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.WithKeys;
 import org.apache.beam.sdk.transforms.display.DisplayData;
@@ -180,12 +184,13 @@ public class WriteFiles<UserT, DestinationT, OutputT>
       checkArgument(windowedWrites,
           "Must use windowed writes when applying %s to an unbounded PCollection",
           WriteFiles.class.getSimpleName());
+    }
+    if (windowedWrites) {
       // The reason for this is https://issues.apache.org/jira/browse/BEAM-1438
       // and similar behavior in other runners.
       checkArgument(
           computeNumShards != null || numShardsProvider != null,
-          "When applying %s to an unbounded PCollection, "
-              + "must specify number of output shards explicitly",
+          "When using windowed writes, must specify number of output shards explicitly",
           WriteFiles.class.getSimpleName());
     }
     this.writeOperation = sink.createWriteOperation();
@@ -502,15 +507,16 @@ public class WriteFiles<UserT, DestinationT, OutputT>
         if (writer == null) {
           LOG.debug("Opening writer for write operation {}", writeOperation);
           writer = writeOperation.createWriter();
+          int shardNumber =
+              shardNumberAssignment == ShardAssignment.ASSIGN_WHEN_WRITING
+                  ? c.element().getKey().getShardNumber()
+                  : UNKNOWN_SHARDNUM;
           if (windowedWrites) {
-            int shardNumber =
-                shardNumberAssignment == ShardAssignment.ASSIGN_WHEN_WRITING
-                    ? c.element().getKey().getShardNumber()
-                    : UNKNOWN_SHARDNUM;
             writer.openWindowed(
                 UUID.randomUUID().toString(), window, c.pane(), shardNumber, destination);
           } else {
-            writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM, destination);
+            writer.openUnwindowed(
+                UUID.randomUUID().toString(), shardNumber, destination);
           }
           LOG.debug("Done opening writer");
           writers.put(destination, writer);
@@ -532,7 +538,7 @@ public class WriteFiles<UserT, DestinationT, OutputT>
           throw e;
         }
       }
-      }
+    }
 
     @Override
     public void populateDisplayData(DisplayData.Builder builder) {
@@ -615,13 +621,15 @@ public class WriteFiles<UserT, DestinationT, OutputT>
     }
   }
 
-  Multimap<DestinationT, FileResult<DestinationT>> perDestinationResults(
-      Iterable<FileResult<DestinationT>> results) {
-    Multimap<DestinationT, FileResult<DestinationT>> perDestination = ArrayListMultimap.create();
+  private static <DestinationT>
+      Multimap<KV<DestinationT, BoundedWindow>, FileResult<DestinationT>>
+          groupByDestinationAndWindow(Iterable<FileResult<DestinationT>> results) {
+    Multimap<KV<DestinationT, BoundedWindow>, FileResult<DestinationT>> res =
+        ArrayListMultimap.create();
     for (FileResult<DestinationT> result : results) {
-      perDestination.put(result.getDestination(), result);
+      res.put(KV.of(result.getDestination(), result.getWindow()), result);
     }
-    return perDestination;
+    return res;
   }
 
   /**
@@ -752,51 +760,28 @@ public class WriteFiles<UserT, DestinationT, OutputT>
 
     PCollection<KV<DestinationT, String>> outputFilenames;
     if (windowedWrites) {
-      // When processing streaming windowed writes, results will arrive multiple times. This
-      // means we can't share the below implementation that turns the results into a side input,
-      // as new data arriving into a side input does not trigger the listening DoFn. Instead
-      // we aggregate the result set using a singleton GroupByKey, so the DoFn will be triggered
-      // whenever new data arrives.
-      PCollection<KV<Void, FileResult<DestinationT>>> keyedResults =
-          results.apply(
-              "AttachSingletonKey", WithKeys.<Void, FileResult<DestinationT>>of((Void) null));
-      keyedResults.setCoder(
-          KvCoder.of(VoidCoder.of(), FileResultCoder.of(shardedWindowCoder, destinationCoder)));
-
-      // Is the continuation trigger sufficient?
+      // We need to materialize the FileResult's before the renaming stage: this can be done either
+      // via a side input or via a GBK. However, when processing streaming windowed writes, results
+      // will arrive multiple times. This means we can't share the below implementation that turns
+      // the results into a side input, as new data arriving into a side input does not trigger the
+      // listening DoFn. We also can't use a GBK because we need only the materialization, but not
+      // the (potentially lossy, if the user's trigger is lossy) continuation triggering that GBK
+      // would give. So, we use a reshuffle (over a single key to maximize bundling).
       outputFilenames =
-          keyedResults
-              .apply("FinalizeGroupByKey", GroupByKey.<Void, FileResult<DestinationT>>create())
+          results
+              .apply(WithKeys.<Void, FileResult<DestinationT>>of((Void) null))
+              .setCoder(KvCoder.of(VoidCoder.of(), results.getCoder()))
+              .apply("Reshuffle", Reshuffle.<Void, FileResult<DestinationT>>of())
+              .apply(Values.<FileResult<DestinationT>>create())
               .apply(
                   "FinalizeWindowed",
                   ParDo.of(
-                      new DoFn<
-                          KV<Void, Iterable<FileResult<DestinationT>>>,
-                          KV<DestinationT, String>>() {
-                        @ProcessElement
-                        public void processElement(ProcessContext c) throws Exception {
-                          Set<ResourceId> tempFiles = Sets.newHashSet();
-                          Multimap<DestinationT, FileResult<DestinationT>> results =
-                              perDestinationResults(c.element().getValue());
-                          for (Map.Entry<DestinationT, Collection<FileResult<DestinationT>>> entry :
-                              results.asMap().entrySet()) {
-                            LOG.info(
-                                "Finalizing write operation {} for destination {} num shards: {}.",
-                                writeOperation,
-                                entry.getKey(),
-                                entry.getValue().size());
-                            Map<ResourceId, ResourceId> finalizeMap =
-                                writeOperation.finalize(entry.getValue());
-                            tempFiles.addAll(finalizeMap.keySet());
-                            for (ResourceId outputFile : finalizeMap.values()) {
-                              c.output(KV.of(entry.getKey(), outputFile.toString()));
-                            }
-                            LOG.debug("Done finalizing write operation for {}.", entry.getKey());
-                          }
-                          writeOperation.removeTemporaryFiles(tempFiles);
-                          LOG.debug("Removed temporary files for {}.", writeOperation);
-                        }
-                      }))
+                          new FinalizeWindowedFn<DestinationT>(
+                              numShardsView, numShardsProvider, writeOperation))
+                      .withSideInputs(
+                          numShardsView == null
+                              ? ImmutableList.<PCollectionView<?>>of()
+                              : ImmutableList.of(numShardsView)))
               .setCoder(KvCoder.of(destinationCoder, StringUtf8Coder.of()));
     } else {
       final PCollectionView<Iterable<FileResult<DestinationT>>> resultsView =
@@ -817,58 +802,15 @@ public class WriteFiles<UserT, DestinationT, OutputT>
       // set numShards, then all shards will be written out as empty files. For this reason we
       // use a side input here.
       PCollection<Void> singletonCollection = p.apply(Create.of((Void) null));
-      outputFilenames = singletonCollection.apply(
-          "FinalizeUnwindowed",
-          ParDo.of(
-                  new DoFn<Void, KV<DestinationT, String>>() {
-                    @ProcessElement
-                    public void processElement(ProcessContext c) throws Exception {
-                      sink.getDynamicDestinations().setSideInputAccessorFromProcessContext(c);
-                      // We must always output at least 1 shard, and honor user-specified numShards
-                      // if set.
-                      int minShardsNeeded;
-                      if (numShardsView != null) {
-                        minShardsNeeded = c.sideInput(numShardsView);
-                      } else if (numShardsProvider != null) {
-                        minShardsNeeded = numShardsProvider.get();
-                      } else {
-                        minShardsNeeded = 1;
-                      }
-                      Set<ResourceId> tempFiles = Sets.newHashSet();
-                      Multimap<DestinationT, FileResult<DestinationT>> perDestination =
-                          perDestinationResults(c.sideInput(resultsView));
-                      for (Map.Entry<DestinationT, Collection<FileResult<DestinationT>>> entry :
-                          perDestination.asMap().entrySet()) {
-                        Map<ResourceId, ResourceId> finalizeMap = Maps.newHashMap();
-                        finalizeMap.putAll(
-                            finalizeForDestinationFillEmptyShards(
-                                entry.getKey(), entry.getValue(), minShardsNeeded));
-                        tempFiles.addAll(finalizeMap.keySet());
-                        for (ResourceId outputFile :finalizeMap.values()) {
-                          c.output(KV.of(entry.getKey(), outputFile.toString()));
-                        }
-                      }
-                      if (perDestination.isEmpty()) {
-                        // If there is no input at all, write empty files to the default
-                        // destination.
-                        Map<ResourceId, ResourceId> finalizeMap = Maps.newHashMap();
-                        DestinationT destination =
-                            getSink().getDynamicDestinations().getDefaultDestination();
-                        finalizeMap.putAll(
-                            finalizeForDestinationFillEmptyShards(
-                                destination,
-                                Lists.<FileResult<DestinationT>>newArrayList(),
-                                minShardsNeeded));
-                        tempFiles.addAll(finalizeMap.keySet());
-                        for (ResourceId outputFile :finalizeMap.values()) {
-                          c.output(KV.of(destination, outputFile.toString()));
-                        }
-                      }
-                      writeOperation.removeTemporaryFiles(tempFiles);
-                    }
-                  })
-              .withSideInputs(finalizeSideInputs.build()))
-            .setCoder(KvCoder.of(destinationCoder, StringUtf8Coder.of()));
+      outputFilenames =
+          singletonCollection
+              .apply(
+                  "FinalizeUnwindowed",
+                  ParDo.of(
+                          new FinalizeUnwindowedFn<>(
+                              numShardsView, numShardsProvider, resultsView, writeOperation))
+                      .withSideInputs(finalizeSideInputs.build()))
+              .setCoder(KvCoder.of(destinationCoder, StringUtf8Coder.of()));
     }
 
     TupleTag<KV<DestinationT, String>> perDestinationOutputFilenamesTag =
@@ -879,41 +821,196 @@ public class WriteFiles<UserT, DestinationT, OutputT>
         outputFilenames);
   }
 
-  /**
-   * Finalize a list of files for a single destination. If a minimum number of shards is needed,
-   * this function will generate empty files for this destination to ensure that all shards are
-   * generated.
-   */
-  private Map<ResourceId, ResourceId> finalizeForDestinationFillEmptyShards(
-      DestinationT destination, Collection<FileResult<DestinationT>> results, int minShardsNeeded)
-      throws Exception {
-    checkState(!windowedWrites);
-
-    LOG.info(
-        "Finalizing write operation {} for destination {} num shards {}.",
-        writeOperation,
-        destination,
-        results.size());
-    int extraShardsNeeded = minShardsNeeded - results.size();
-    if (extraShardsNeeded > 0) {
+  private static class FinalizeWindowedFn<DestinationT>
+      extends DoFn<FileResult<DestinationT>, KV<DestinationT, String>> {
+    @Nullable private final PCollectionView<Integer> numShardsView;
+    @Nullable private final ValueProvider<Integer> numShardsProvider;
+    private final WriteOperation<DestinationT, ?> writeOperation;
+
+    @Nullable private transient List<FileResult<DestinationT>> fileResults;
+    @Nullable private Integer fixedNumShards;
+
+    public FinalizeWindowedFn(
+        @Nullable PCollectionView<Integer> numShardsView,
+        @Nullable ValueProvider<Integer> numShardsProvider,
+        WriteOperation<DestinationT, ?> writeOperation) {
+      this.numShardsView = numShardsView;
+      this.numShardsProvider = numShardsProvider;
+      this.writeOperation = writeOperation;
+    }
+
+    @StartBundle
+    public void startBundle() {
+      fileResults = Lists.newArrayList();
+      fixedNumShards = null;
+    }
+
+    @ProcessElement
+    public void processElement(ProcessContext c) {
+      fileResults.add(c.element());
+      if (fixedNumShards == null) {
+        if (numShardsView != null) {
+          fixedNumShards = c.sideInput(numShardsView);
+        } else if (numShardsProvider != null) {
+          fixedNumShards = numShardsProvider.get();
+        } else {
+          fixedNumShards = null;
+        }
+      }
+    }
+
+    @FinishBundle
+    public void finishBundle(FinishBundleContext c) throws Exception {
+      Set<ResourceId> tempFiles = Sets.newHashSet();
+      Multimap<KV<DestinationT, BoundedWindow>, FileResult<DestinationT>> results =
+          groupByDestinationAndWindow(fileResults);
+      List<KV<FileResult<DestinationT>, ResourceId>> resultsToFinalFilenames = Lists.newArrayList();
+      for (Map.Entry<KV<DestinationT, BoundedWindow>, Collection<FileResult<DestinationT>>>
+          destEntry : results.asMap().entrySet()) {
+        DestinationT destination = destEntry.getKey().getKey();
+        BoundedWindow window = destEntry.getKey().getValue();
+        resultsToFinalFilenames.addAll(writeOperation.buildOutputFilenames(
+            destination, window, fixedNumShards, destEntry.getValue()));
+      }
+      LOG.info("Will finalize {} files", resultsToFinalFilenames.size());
+      for (KV<FileResult<DestinationT>, ResourceId> entry : resultsToFinalFilenames) {
+        FileResult<DestinationT> res = entry.getKey();
+        tempFiles.add(res.getTempFilename());
+        c.output(
+            KV.of(res.getDestination(), entry.getValue().toString()),
+            res.getWindow().maxTimestamp(),
+            res.getWindow());
+      }
+      writeOperation.copyToOutputFiles(resultsToFinalFilenames);
+      writeOperation.removeTemporaryFiles(tempFiles);
+    }
+  }
+
+  private static class FinalizeUnwindowedFn<DestinationT>
+      extends DoFn<Void, KV<DestinationT, String>> {
+    @Nullable private final PCollectionView<Integer> numShardsView;
+    @Nullable private final ValueProvider<Integer> numShardsProvider;
+    private final PCollectionView<Iterable<FileResult<DestinationT>>> resultsView;
+    private final WriteOperation<DestinationT, ?> writeOperation;
+
+    public FinalizeUnwindowedFn(
+        @Nullable PCollectionView<Integer> numShardsView,
+        @Nullable ValueProvider<Integer> numShardsProvider,
+        PCollectionView<Iterable<FileResult<DestinationT>>> resultsView,
+        WriteOperation<DestinationT, ?> writeOperation) {
+      this.numShardsView = numShardsView;
+      this.numShardsProvider = numShardsProvider;
+      this.resultsView = resultsView;
+      this.writeOperation = writeOperation;
+    }
+
+    @ProcessElement
+    public void processElement(ProcessContext c) throws Exception {
+      writeOperation.getSink().getDynamicDestinations().setSideInputAccessorFromProcessContext(c);
+      @Nullable Integer fixedNumShards;
+      if (numShardsView != null) {
+        fixedNumShards = c.sideInput(numShardsView);
+      } else if (numShardsProvider != null) {
+        fixedNumShards = numShardsProvider.get();
+      } else {
+        fixedNumShards = null;
+      }
+      Multimap<DestinationT, FileResult<DestinationT>> resultsByDestMultimap =
+          ArrayListMultimap.create();
+      for (FileResult<DestinationT> result : c.sideInput(resultsView)) {
+        resultsByDestMultimap.put(result.getDestination(), result);
+      }
+      Map<DestinationT, Collection<FileResult<DestinationT>>> resultsByDest =
+          resultsByDestMultimap.asMap();
+      if (resultsByDest.isEmpty()) {
+        Collection<FileResult<DestinationT>> empty = ImmutableList.of();
+        resultsByDest =
+            Collections.singletonMap(
+                writeOperation.getSink().getDynamicDestinations().getDefaultDestination(), empty);
+      }
+      List<KV<FileResult<DestinationT>, ResourceId>> resultsToFinalFilenames = Lists.newArrayList();
+      for (Map.Entry<DestinationT, Collection<FileResult<DestinationT>>>
+          destEntry : resultsByDest.entrySet()) {
+        resultsToFinalFilenames.addAll(
+            finalizeForDestinationFillEmptyShards(
+                destEntry.getKey(), fixedNumShards, destEntry.getValue()));
+      }
+      Set<ResourceId> tempFiles = Sets.newHashSet();
+      for (KV<FileResult<DestinationT>, ResourceId> entry :
+          resultsToFinalFilenames) {
+        tempFiles.add(entry.getKey().getTempFilename());
+        c.output(KV.of(entry.getKey().getDestination(), entry.getValue().toString()));
+      }
+      writeOperation.copyToOutputFiles(resultsToFinalFilenames);
+      writeOperation.removeTemporaryFiles(tempFiles);
+    }
+
+    /**
+     * Finalize a list of files for a single destination. If a minimum number of shards is needed,
+     * this function will generate empty files for this destination to ensure that all shards are
+     * generated.
+     */
+    private List<KV<FileResult<DestinationT>, ResourceId>> finalizeForDestinationFillEmptyShards(
+        DestinationT destination,
+        @Nullable Integer fixedNumShards,
+        Collection<FileResult<DestinationT>> existingResults)
+        throws Exception {
+      checkState(!writeOperation.windowedWrites);
+
       LOG.info(
-          "Creating {} empty output shards in addition to {} written "
-              + "for a total of {} for destination {}.",
-          extraShardsNeeded,
-          results.size(),
-          minShardsNeeded,
-          destination);
-      for (int i = 0; i < extraShardsNeeded; ++i) {
-        Writer<DestinationT, OutputT> writer = writeOperation.createWriter();
-        // Currently this code path is only called in the unwindowed case.
-        writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM, destination);
-        FileResult<DestinationT> emptyWrite = writer.close();
-        results.add(emptyWrite);
+          "Finalizing write operation {} for destination {} num shards {}.",
+          writeOperation,
+          destination,
+          existingResults.size());
+      if (fixedNumShards != null) {
+        checkArgument(
+            existingResults.size() <= fixedNumShards,
+            "Fixed sharding into %s shards was specified, but got %s file results",
+            fixedNumShards,
+            existingResults.size());
+      }
+      // We must always output at least 1 shard, and honor user-specified numShards
+      // if set.
+      Set<Integer> missingShardNums;
+      if (fixedNumShards == null) {
+        missingShardNums = ImmutableSet.of(UNKNOWN_SHARDNUM);
+      } else {
+        missingShardNums = Sets.newHashSet();
+        for (int i = 0; i < fixedNumShards; ++i) {
+          missingShardNums.add(i);
+        }
+        for (FileResult<DestinationT> res : existingResults) {
+          checkArgument(
+              res.getShard() != UNKNOWN_SHARDNUM,
+              "Fixed sharding into %s shards was specified, "
+                  + "but file result %s does not specify a shard",
+              fixedNumShards,
+              res);
+          missingShardNums.remove(res.getShard());
+        }
+      }
+      List<FileResult<DestinationT>> completeResults = Lists.newArrayList(existingResults);
+      if (!missingShardNums.isEmpty()) {
+        LOG.info(
+            "Creating {} empty output shards in addition to {} written for destination {}.",
+            missingShardNums.size(),
+            existingResults.size(),
+            destination);
+        for (int shard : missingShardNums) {
+          Writer<DestinationT, ?> writer = writeOperation.createWriter();
+          // Currently this code path is only called in the unwindowed case.
+          writer.openUnwindowed(UUID.randomUUID().toString(), shard, destination);
+          FileResult<DestinationT> emptyWrite = writer.close();
+          completeResults.add(emptyWrite);
+        }
+        LOG.debug("Done creating extra shards for {}.", destination);
       }
-      LOG.debug("Done creating extra shards for {}.", destination);
+      return
+          writeOperation.buildOutputFilenames(
+              destination,
+              null,
+              (fixedNumShards == null) ? null : completeResults.size(),
+              completeResults);
     }
-    Map<ResourceId, ResourceId> finalizeMap = writeOperation.finalize(results);
-    LOG.debug("Done finalizing write operation {} for destination {}", writeOperation, destination);
-    return finalizeMap;
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/2eea9fee/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
index 0a96b7e..29f3c1b 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.sdk.io;
 
+import static org.apache.beam.sdk.io.WriteFiles.UNKNOWN_SHARDNUM;
+import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -25,6 +27,7 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileInputStream;
@@ -41,9 +44,8 @@ import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
+import java.util.Set;
 import java.util.zip.GZIPInputStream;
 import org.apache.beam.sdk.io.FileBasedSink.CompressionType;
 import org.apache.beam.sdk.io.FileBasedSink.FileResult;
@@ -52,6 +54,7 @@ import org.apache.beam.sdk.io.FileBasedSink.WriteOperation;
 import org.apache.beam.sdk.io.FileBasedSink.Writer;
 import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
 import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.values.KV;
 import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
 import org.apache.commons.compress.compressors.deflate.DeflateCompressorInputStream;
 import org.junit.Rule;
@@ -97,7 +100,7 @@ public class FileBasedSinkTest {
     expected.addAll(values);
     expected.add(SimpleSink.SimpleWriter.FOOTER);
 
-    SimpleSink.SimpleWriter writer =
+    SimpleSink.SimpleWriter<Void> writer =
         buildWriteOperationWithTempDir(getBaseTempDirectory()).createWriter();
     writer.openUnwindowed(testUid, -1, null);
     for (String value : values) {
@@ -186,7 +189,7 @@ public class FileBasedSinkTest {
   }
 
   /** Finalize and verify that files are copied and temporary files are optionally removed. */
-  private void runFinalize(SimpleSink.SimpleWriteOperation writeOp, List<File> temporaryFiles)
+  private void runFinalize(SimpleSink.SimpleWriteOperation<Void> writeOp, List<File> temporaryFiles)
       throws Exception {
     int numFiles = temporaryFiles.size();
 
@@ -196,13 +199,21 @@ public class FileBasedSinkTest {
       fileResults.add(
           new FileResult<Void>(
               LocalResources.fromFile(temporaryFiles.get(i), false),
-              WriteFiles.UNKNOWN_SHARDNUM,
+              UNKNOWN_SHARDNUM,
               null,
               null,
               null));
     }
 
-    writeOp.removeTemporaryFiles(writeOp.finalize(fileResults).keySet());
+    // TODO: test with null first argument?
+    List<KV<FileResult<Void>, ResourceId>> resultsToFinalFilenames =
+        writeOp.buildOutputFilenames(null, null, null, fileResults);
+    Set<ResourceId> tempFiles = Sets.newHashSet();
+    for (KV<FileResult<Void>, ResourceId> res : resultsToFinalFilenames) {
+      tempFiles.add(res.getKey().getTempFilename());
+    }
+    writeOp.copyToOutputFiles(resultsToFinalFilenames);
+    writeOp.removeTemporaryFiles(tempFiles);
 
     for (int i = 0; i < numFiles; i++) {
       ResourceId outputFilename =
@@ -263,14 +274,14 @@ public class FileBasedSinkTest {
   /** Output files are copied to the destination location with the correct names and contents. */
   @Test
   public void testCopyToOutputFiles() throws Exception {
-    SimpleSink.SimpleWriteOperation writeOp = buildWriteOperation();
+    SimpleSink.SimpleWriteOperation<Void> writeOp = buildWriteOperation();
     List<String> inputFilenames = Arrays.asList("input-1", "input-2", "input-3");
     List<String> inputContents = Arrays.asList("1", "2", "3");
     List<String> expectedOutputFilenames =
         Arrays.asList("file-00-of-03.test", "file-01-of-03.test", "file-02-of-03.test");
 
-    Map<ResourceId, ResourceId> inputFilePaths = new HashMap<>();
-    List<ResourceId> expectedOutputPaths = new ArrayList<>();
+    List<KV<FileResult<Void>, ResourceId>> resultsToFinalFilenames = Lists.newArrayList();
+    List<ResourceId> expectedOutputPaths = Lists.newArrayList();
 
     for (int i = 0; i < inputFilenames.size(); i++) {
       // Generate output paths.
@@ -282,17 +293,20 @@ public class FileBasedSinkTest {
       File inputTmpFile = tmpFolder.newFile(inputFilenames.get(i));
       List<String> lines = Collections.singletonList(inputContents.get(i));
       writeFile(lines, inputTmpFile);
-      inputFilePaths.put(
-          LocalResources.fromFile(inputTmpFile, false),
-          writeOp
-              .getSink()
-              .getDynamicDestinations()
-              .getFilenamePolicy(null)
-              .unwindowedFilename(i, inputFilenames.size(), CompressionType.UNCOMPRESSED));
+      ResourceId finalFilename = writeOp
+          .getSink()
+          .getDynamicDestinations()
+          .getFilenamePolicy(null)
+          .unwindowedFilename(i, inputFilenames.size(), CompressionType.UNCOMPRESSED);
+      resultsToFinalFilenames.add(
+          KV.of(
+              new FileResult<Void>(
+                  LocalResources.fromFile(inputTmpFile, false), UNKNOWN_SHARDNUM, null, null, null),
+              finalFilename));
     }
 
     // Copy input files to output files.
-    writeOp.copyToOutputFiles(inputFilePaths);
+    writeOp.copyToOutputFiles(resultsToFinalFilenames);
 
     // Assert that the contents were copied.
     for (int i = 0; i < expectedOutputPaths.size(); i++) {
@@ -302,7 +316,7 @@ public class FileBasedSinkTest {
   }
 
   public List<ResourceId> generateDestinationFilenames(
-      ResourceId outputDirectory, FilenamePolicy policy, int numFiles) {
+      FilenamePolicy policy, int numFiles) {
     List<ResourceId> filenames = new ArrayList<>();
     for (int i = 0; i < numFiles; i++) {
       filenames.add(policy.unwindowedFilename(i, numFiles, CompressionType.UNCOMPRESSED));
@@ -327,17 +341,17 @@ public class FileBasedSinkTest {
             root.resolve("file.00000.of.00003.test", StandardResolveOptions.RESOLVE_FILE),
             root.resolve("file.00001.of.00003.test", StandardResolveOptions.RESOLVE_FILE),
             root.resolve("file.00002.of.00003.test", StandardResolveOptions.RESOLVE_FILE));
-    actual = generateDestinationFilenames(root, policy, 3);
+    actual = generateDestinationFilenames(policy, 3);
     assertEquals(expected, actual);
 
     expected =
         Collections.singletonList(
             root.resolve("file.00000.of.00001.test", StandardResolveOptions.RESOLVE_FILE));
-    actual = generateDestinationFilenames(root, policy, 1);
+    actual = generateDestinationFilenames(policy, 1);
     assertEquals(expected, actual);
 
     expected = new ArrayList<>();
-    actual = generateDestinationFilenames(root, policy, 0);
+    actual = generateDestinationFilenames(policy, 0);
     assertEquals(expected, actual);
   }
 
@@ -352,18 +366,19 @@ public class FileBasedSinkTest {
     ResourceId temp1 = root.resolve("temp1", StandardResolveOptions.RESOLVE_FILE);
     ResourceId temp2 = root.resolve("temp2", StandardResolveOptions.RESOLVE_FILE);
     ResourceId temp3 = root.resolve("temp3", StandardResolveOptions.RESOLVE_FILE);
-    ResourceId output = root.resolve("file-03.test", StandardResolveOptions.RESOLVE_FILE);
     // More than one shard does.
     try {
       Iterable<FileResult<Void>> results =
           Lists.newArrayList(
-              new FileResult<Void>(temp1, 1, null, null, null),
-              new FileResult<Void>(temp2, 1, null, null, null),
-              new FileResult<Void>(temp3, 1, null, null, null));
-      writeOp.buildOutputFilenames(results);
+              new FileResult<Void>(temp1, 1 /* shard */, null, null, null),
+              new FileResult<Void>(temp2, 1 /* shard */, null, null, null),
+              new FileResult<Void>(temp3, 1 /* shard */, null, null, null));
+      writeOp.buildOutputFilenames(null, null, 5 /* numShards */, results);
       fail("Should have failed.");
-    } catch (IllegalStateException exn) {
-      assertEquals("Only generated 1 distinct file names for 3 files.", exn.getMessage());
+    } catch (IllegalArgumentException exn) {
+      assertThat(exn.getMessage(), containsString("generated the same name"));
+      assertThat(exn.getMessage(), containsString("temp1"));
+      assertThat(exn.getMessage(), containsString("temp2"));
     }
   }
 
@@ -383,17 +398,17 @@ public class FileBasedSinkTest {
             root.resolve("file-00000-of-00003", StandardResolveOptions.RESOLVE_FILE),
             root.resolve("file-00001-of-00003", StandardResolveOptions.RESOLVE_FILE),
             root.resolve("file-00002-of-00003", StandardResolveOptions.RESOLVE_FILE));
-    actual = generateDestinationFilenames(root, policy, 3);
+    actual = generateDestinationFilenames(policy, 3);
     assertEquals(expected, actual);
 
     expected =
         Collections.singletonList(
             root.resolve("file-00000-of-00001", StandardResolveOptions.RESOLVE_FILE));
-    actual = generateDestinationFilenames(root, policy, 1);
+    actual = generateDestinationFilenames(policy, 1);
     assertEquals(expected, actual);
 
     expected = new ArrayList<>();
-    actual = generateDestinationFilenames(root, policy, 0);
+    actual = generateDestinationFilenames(policy, 0);
     assertEquals(expected, actual);
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/2eea9fee/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOWriteTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOWriteTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOWriteTest.java
index 0f40067..1ade0d0 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOWriteTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOWriteTest.java
@@ -38,12 +38,8 @@ import com.google.common.collect.Lists;
 import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileReader;
-import java.io.IOException;
-import java.nio.file.FileVisitResult;
 import java.nio.file.Files;
 import java.nio.file.Path;
-import java.nio.file.SimpleFileVisitor;
-import java.nio.file.attribute.BasicFileAttributes;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -68,50 +64,28 @@ import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.Values;
 import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.windowing.AfterPane;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.values.PCollection;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
+import org.joda.time.Duration;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
 
 /** Tests for {@link TextIO.Write}. */
 public class TextIOWriteTest {
   private static final String MY_HEADER = "myHeader";
   private static final String MY_FOOTER = "myFooter";
 
-  private static Path tempFolder;
+  @Rule public transient TemporaryFolder tempFolder = new TemporaryFolder();
 
-  @Rule public TestPipeline p = TestPipeline.create();
+  @Rule public transient TestPipeline p = TestPipeline.create();
 
-  @Rule public ExpectedException expectedException = ExpectedException.none();
-
-  @BeforeClass
-  public static void setupClass() throws IOException {
-    tempFolder = Files.createTempDirectory("TextIOTest");
-  }
-
-  @AfterClass
-  public static void teardownClass() throws IOException {
-    Files.walkFileTree(
-        tempFolder,
-        new SimpleFileVisitor<Path>() {
-          @Override
-          public FileVisitResult visitFile(Path file, BasicFileAttributes attrs)
-              throws IOException {
-            Files.delete(file);
-            return FileVisitResult.CONTINUE;
-          }
-
-          @Override
-          public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
-            Files.delete(dir);
-            return FileVisitResult.CONTINUE;
-          }
-        });
-  }
+  @Rule public transient ExpectedException expectedException = ExpectedException.none();
 
   static class TestDynamicDestinations
       extends FileBasedSink.DynamicDestinations<String, String, String> {
@@ -174,7 +148,9 @@ public class TextIOWriteTest {
   public void testDynamicDestinations() throws Exception {
     ResourceId baseDir =
         FileSystems.matchNewResource(
-            Files.createTempDirectory(tempFolder, "testDynamicDestinations").toString(), true);
+            Files.createTempDirectory(tempFolder.getRoot().toPath(), "testDynamicDestinations")
+                .toString(),
+            true);
 
     List<String> elements = Lists.newArrayList("aaaa", "aaab", "baaa", "baab", "caaa", "caab");
     PCollection<String> input = p.apply(Create.of(elements).withCoder(StringUtf8Coder.of()));
@@ -262,7 +238,9 @@ public class TextIOWriteTest {
   public void testDynamicDefaultFilenamePolicy() throws Exception {
     ResourceId baseDir =
         FileSystems.matchNewResource(
-            Files.createTempDirectory(tempFolder, "testDynamicDestinations").toString(), true);
+            Files.createTempDirectory(tempFolder.getRoot().toPath(), "testDynamicDestinations")
+                .toString(),
+            true);
 
     List<UserWriteType> elements =
         Lists.newArrayList(
@@ -371,7 +349,7 @@ public class TextIOWriteTest {
   private void runTestWrite(String[] elems, String header, String footer, int numShards)
       throws Exception {
     String outputName = "file.txt";
-    Path baseDir = Files.createTempDirectory(tempFolder, "testwrite");
+    Path baseDir = Files.createTempDirectory(tempFolder.getRoot().toPath(), "testwrite");
     ResourceId baseFilename =
         FileBasedSink.convertToFileResourceIfPossible(baseDir.resolve(outputName).toString());
 
@@ -544,7 +522,7 @@ public class TextIOWriteTest {
     String outputName = "file.txt";
     ResourceId baseDir =
         FileSystems.matchNewResource(
-            Files.createTempDirectory(tempFolder, "testwrite").toString(), true);
+            Files.createTempDirectory(tempFolder.getRoot().toPath(), "testwrite").toString(), true);
 
     PCollection<String> input = p.apply(Create.of(Arrays.asList(LINES2_ARRAY)).withCoder(coder));
 
@@ -640,4 +618,34 @@ public class TextIOWriteTest {
 
     p.apply(Create.of("")).apply(TextIO.write().to(options.getOutput()));
   }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testWindowedWritesWithOnceTrigger() throws Throwable {
+    // Tests for https://issues.apache.org/jira/browse/BEAM-3169
+    PCollection<String> data =
+        p.apply(Create.of("0", "1", "2"))
+            .apply(
+                Window.<String>into(FixedWindows.of(Duration.standardSeconds(1)))
+                    // According to this trigger, all data should be written.
+                    // However, the continuation of this trigger is elementCountAtLeast(1),
+                    // so with a buggy implementation that used a GBK before renaming files,
+                    // only 1 file would be renamed.
+                    .triggering(AfterPane.elementCountAtLeast(3))
+                    .withAllowedLateness(Duration.standardMinutes(1))
+                    .discardingFiredPanes());
+    PCollection<String> filenames =
+        data.apply(
+                TextIO.write()
+                    .to(new File(tempFolder.getRoot(), "windowed-writes").getAbsolutePath())
+                    .withNumShards(2)
+                    .withWindowedWrites()
+                    .<Void>withOutputFilenames())
+            .getPerDestinationOutputFilenames()
+            .apply(Values.<String>create());
+
+    PAssert.that(filenames.apply(TextIO.readAll())).containsInAnyOrder("0", "1", "2");
+
+    p.run();
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/2eea9fee/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java
index e0f7b39..40ae0ea 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java
@@ -21,11 +21,14 @@ import static com.google.common.base.MoreObjects.firstNonNull;
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.includesDisplayDataFor;
 import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
 import static org.hamcrest.Matchers.nullValue;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
@@ -35,7 +38,6 @@ import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileReader;
 import java.io.IOException;
-import java.nio.file.Paths;
 import java.text.DecimalFormat;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -83,6 +85,7 @@ import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollection.IsBounded;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.commons.compress.utils.Sets;
+import org.hamcrest.Matchers;
 import org.joda.time.Duration;
 import org.joda.time.format.DateTimeFormatter;
 import org.joda.time.format.ISODateTimeFormat;
@@ -156,10 +159,6 @@ public class WriteFilesTest {
     }
   }
 
-  private String appendToTempFolder(String filename) {
-    return Paths.get(tmpFolder.getRoot().getPath(), filename).toString();
-  }
-
   private String getBaseOutputFilename() {
     return getBaseOutputDirectory().resolve("file", StandardResolveOptions.RESOLVE_FILE).toString();
   }
@@ -187,7 +186,11 @@ public class WriteFilesTest {
         IDENTITY_MAP,
         getBaseOutputFilename(),
         WriteFiles.to(makeSimpleSink()));
-    checkFileContents(getBaseOutputFilename(), Collections.<String>emptyList(), Optional.of(1));
+    checkFileContents(
+        getBaseOutputFilename(),
+        Collections.<String>emptyList(),
+        Optional.of(1),
+        true /* expectRemovedTempDirectory */);
   }
 
   /**
@@ -241,7 +244,8 @@ public class WriteFilesTest {
 
     p.run();
 
-    checkFileContents(getBaseOutputFilename(), inputs, Optional.of(3));
+    checkFileContents(
+        getBaseOutputFilename(), inputs, Optional.of(3), true /* expectRemovedTempDirectory */);
   }
 
   /**
@@ -314,7 +318,10 @@ public class WriteFilesTest {
         inputs,
         Window.<String>into(FixedWindows.of(Duration.millis(2))),
         getBaseOutputFilename(),
-        WriteFiles.to(makeSimpleSink()).withMaxNumWritersPerBundle(2).withWindowedWrites());
+        WriteFiles.to(makeSimpleSink())
+            .withMaxNumWritersPerBundle(2)
+            .withWindowedWrites()
+            .withNumShards(1));
   }
 
   public void testBuildWrite() {
@@ -379,11 +386,10 @@ public class WriteFilesTest {
 
   @Test
   @Category(NeedsRunner.class)
-  public void testUnboundedNeedsSharding() {
+  public void testWindowedWritesNeedSharding() {
     thrown.expect(IllegalArgumentException.class);
     thrown.expectMessage(
-        "When applying WriteFiles to an unbounded PCollection, "
-            + "must specify number of output shards explicitly");
+        "When using windowed writes, must specify number of output shards explicitly");
 
     SimpleSink<Void> sink = makeSimpleSink();
     p.apply(Create.of("foo"))
@@ -491,7 +497,11 @@ public class WriteFilesTest {
       for (int j = i; j < numInputs; j += 5) {
         expected.add("record_" + j);
       }
-      checkFileContents(base.toString(), expected, Optional.of(numShards));
+      checkFileContents(
+          base.toString(),
+          expected,
+          Optional.of(numShards),
+          bounded /* expectRemovedTempDirectory */);
     }
   }
 
@@ -659,14 +669,15 @@ public class WriteFilesTest {
     p.run();
 
     Optional<Integer> numShards =
-        (write.getNumShards() != null)
+        (write.getNumShards() != null && !write.isWindowedWrites())
             ? Optional.of(write.getNumShards().get())
             : Optional.<Integer>absent();
-    checkFileContents(baseName, inputs, numShards);
+    checkFileContents(baseName, inputs, numShards, !write.isWindowedWrites());
   }
 
   static void checkFileContents(
-      String baseName, List<String> inputs, Optional<Integer> numExpectedShards)
+      String baseName, List<String> inputs, Optional<Integer> numExpectedShards,
+      boolean expectRemovedTempDirectory)
       throws IOException {
     List<File> outputFiles = Lists.newArrayList();
     final String pattern = baseName + "*";
@@ -675,6 +686,7 @@ public class WriteFilesTest {
     for (Metadata meta : metadata) {
       outputFiles.add(new File(meta.resourceId().toString()));
     }
+    assertFalse("Should have produced at least 1 output file", outputFiles.isEmpty());
     if (numExpectedShards.isPresent()) {
       assertEquals(numExpectedShards.get().intValue(), outputFiles.size());
       Pattern shardPattern = Pattern.compile("\\d{4}-of-\\d{4}");
@@ -710,6 +722,11 @@ public class WriteFilesTest {
       }
     }
     assertThat(actual, containsInAnyOrder(inputs.toArray()));
+    if (expectRemovedTempDirectory) {
+      assertThat(
+          Lists.newArrayList(new File(baseName).getParentFile().list()),
+          Matchers.everyItem(not(containsString(FileBasedSink.TEMP_DIRECTORY_PREFIX))));
+    }
   }
 
   /** Options for test, exposed for PipelineOptionsFactory. */