You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/12/06 00:29:52 UTC

[beam] 05/13: consolidates windowed/unwindowed finalize fns somewhat

This is an automated email from the ASF dual-hosted git repository.

jkff pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 97df5e703d4a891ab63a40b46c4e87d7c373168b
Author: Eugene Kirpichov <ki...@google.com>
AuthorDate: Wed Nov 15 19:51:10 2017 -0800

    consolidates windowed/unwindowed finalize fns somewhat
---
 .../java/org/apache/beam/sdk/io/FileBasedSink.java | 138 +++++++++----
 .../java/org/apache/beam/sdk/io/WriteFiles.java    | 221 ++++++---------------
 .../org/apache/beam/sdk/io/FileBasedSinkTest.java  |  19 +-
 3 files changed, 171 insertions(+), 207 deletions(-)

diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index c8bdbfc..5bc84be 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -28,10 +28,12 @@ import static org.apache.beam.sdk.values.TypeDescriptors.extractFromTypeParamete
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.MoreObjects;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Ordering;
+import com.google.common.collect.Sets;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -39,6 +41,7 @@ import java.io.Serializable;
 import java.nio.channels.WritableByteChannel;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashSet;
@@ -46,6 +49,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.atomic.AtomicLong;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.annotations.Experimental;
@@ -72,6 +76,7 @@ import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.HasDisplayData;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo.PaneInfoCoder;
 import org.apache.beam.sdk.util.MimeTypes;
@@ -103,10 +108,9 @@ import org.slf4j.LoggerFactory;
  *
  * <p>In order to ensure fault-tolerance, a bundle may be executed multiple times (e.g., in the
  * event of failure/retry or for redundancy). However, exactly one of these executions will have its
- * result passed to the finalize method. Each call to {@link Writer#open} or {@link
- * Writer#openUnwindowed} is passed a unique <i>bundle id</i> when it is called by the WriteFiles
- * transform, so even redundant or retried bundles will have a unique way of identifying their
- * output.
+ * result passed to the finalize method. Each call to {@link Writer#open} is passed a unique
+ * <i>bundle id</i> when it is called by the WriteFiles transform, so even redundant or retried
+ * bundles will have a unique way of identifying their output.
  *
  * <p>The bundle id should be used to guarantee that a bundle's output is unique. This uniqueness
  * guarantee is important; if a bundle is to be output to a file, for example, the name of the file
@@ -447,7 +451,7 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
    * written,
    *
    * <ol>
-   *   <li>{@link WriteOperation#finalize} is given a list of the temporary files containing the
+   *   <li>{@link WriteOperation#finalizeDestination} is given a list of the temporary files containing the
    *       output bundles.
    *   <li>During finalize, these temporary files are copied to final output locations and named
    *       according to a file naming template.
@@ -577,17 +581,22 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
      * not be cleaned up. Note that {@link WriteFiles} does attempt clean up files if exceptions
      * are thrown, however there are still some scenarios where temporary files might be left.
      */
-    public void removeTemporaryFiles(Set<ResourceId> filenames) throws IOException {
+    public void removeTemporaryFiles(Collection<ResourceId> filenames) throws IOException {
       removeTemporaryFiles(filenames, !windowedWrites);
     }
 
     @Experimental(Kind.FILESYSTEM)
-    protected final List<KV<FileResult<DestinationT>, ResourceId>> buildOutputFilenames(
+    protected final List<KV<FileResult<DestinationT>, ResourceId>> finalizeDestination(
         @Nullable DestinationT dest,
         @Nullable BoundedWindow window,
         @Nullable Integer numShards,
-        Iterable<FileResult<DestinationT>> writerResults) {
-      for (FileResult<DestinationT> res : writerResults) {
+        Collection<FileResult<DestinationT>> existingResults) throws Exception {
+      Collection<FileResult<DestinationT>> completeResults =
+          windowedWrites
+              ? existingResults
+              : createMissingEmptyShards(dest, numShards, existingResults);
+
+      for (FileResult<DestinationT> res : completeResults) {
         checkArgument(
             Objects.equals(dest, res.getDestination()),
             "File result has wrong destination: expected %s, got %s",
@@ -602,7 +611,7 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
       final int effectiveNumShards;
       if (numShards != null) {
         effectiveNumShards = numShards;
-        for (FileResult<DestinationT> res : writerResults) {
+        for (FileResult<DestinationT> res : completeResults) {
           checkArgument(
               res.getShard() != UNKNOWN_SHARDNUM,
               "Fixed sharding into %s shards was specified, "
@@ -611,8 +620,8 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
               res);
         }
       } else {
-        effectiveNumShards = Iterables.size(writerResults);
-        for (FileResult<DestinationT> res : writerResults) {
+        effectiveNumShards = Iterables.size(completeResults);
+        for (FileResult<DestinationT> res : completeResults) {
           checkArgument(
               res.getShard() == UNKNOWN_SHARDNUM,
               "Runner-chosen sharding was specified, "
@@ -623,7 +632,7 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
 
       List<FileResult<DestinationT>> resultsWithShardNumbers = Lists.newArrayList();
       if (numShards != null) {
-        resultsWithShardNumbers = Lists.newArrayList(writerResults);
+        resultsWithShardNumbers = Lists.newArrayList(completeResults);
       } else {
         checkState(
             !windowedWrites,
@@ -644,7 +653,7 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
                         return firstFilename.compareTo(secondFilename);
                       }
                     })
-                .sortedCopy(writerResults);
+                .sortedCopy(completeResults);
         for (int i = 0; i < sortedByTempFilename.size(); i++) {
           resultsWithShardNumbers.add(sortedByTempFilename.get(i).withShard(i));
         }
@@ -672,10 +681,71 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
       return outputFilenames;
     }
 
+    private Collection<FileResult<DestinationT>> createMissingEmptyShards(
+        @Nullable DestinationT dest,
+        @Nullable Integer numShards,
+        Collection<FileResult<DestinationT>> existingResults)
+        throws Exception {
+      Collection<FileResult<DestinationT>> completeResults;
+      LOG.info("Finalizing for destination {} num shards {}.", dest, existingResults.size());
+      if (numShards != null) {
+        checkArgument(
+            existingResults.size() <= numShards,
+            "Fixed sharding into %s shards was specified, but got %s file results",
+            numShards,
+            existingResults.size());
+      }
+      // We must always output at least 1 shard, and honor user-specified numShards
+      // if set.
+      Set<Integer> missingShardNums;
+      if (numShards == null) {
+        missingShardNums = ImmutableSet.of(UNKNOWN_SHARDNUM);
+      } else {
+        missingShardNums = Sets.newHashSet();
+        for (int i = 0; i < numShards; ++i) {
+          missingShardNums.add(i);
+        }
+        for (FileResult<DestinationT> res : existingResults) {
+          checkArgument(
+              res.getShard() != UNKNOWN_SHARDNUM,
+              "Fixed sharding into %s shards was specified, "
+                  + "but file result %s does not specify a shard",
+              numShards,
+              res);
+          missingShardNums.remove(res.getShard());
+        }
+      }
+      completeResults = Lists.newArrayList(existingResults);
+      if (!missingShardNums.isEmpty()) {
+        LOG.info(
+            "Creating {} empty output shards in addition to {} written for destination {}.",
+            missingShardNums.size(),
+            existingResults.size(),
+            dest);
+        for (int shard : missingShardNums) {
+          String uuid = UUID.randomUUID().toString();
+          LOG.info("Opening empty writer {} for destination {}", uuid, dest);
+          Writer<DestinationT, ?> writer = createWriter();
+          // Currently this code path is only called in the unwindowed case.
+          writer.open(uuid, dest);
+          writer.close();
+          completeResults.add(
+              new FileResult<>(
+                  writer.getOutputFile(),
+                  shard,
+                  GlobalWindow.INSTANCE,
+                  PaneInfo.ON_TIME_AND_ONLY_FIRING,
+                  dest));
+        }
+        LOG.debug("Done creating extra shards for {}.", dest);
+      }
+      return completeResults;
+    }
+
     /**
      * Copy temporary files to final output filenames using the file naming template.
      *
-     * <p>Can be called from subclasses that override {@link WriteOperation#finalize}.
+     * <p>Can be called from subclasses that override {@link WriteOperation#finalizeDestination}.
      *
      * <p>Files will be named according to the {@link FilenamePolicy}. The order of the output files
      * will be the same as the sorted order of the input filenames. In other words (when using
@@ -686,40 +756,38 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
      */
     @VisibleForTesting
     @Experimental(Kind.FILESYSTEM)
-    final void copyToOutputFiles(
+    final void moveToOutputFiles(
         List<KV<FileResult<DestinationT>, ResourceId>> resultsToFinalFilenames) throws IOException {
       int numFiles = resultsToFinalFilenames.size();
-      if (numFiles > 0) {
-        LOG.debug("Copying {} files.", numFiles);
-        List<ResourceId> srcFiles = new ArrayList<>(resultsToFinalFilenames.size());
-        List<ResourceId> dstFiles = new ArrayList<>(resultsToFinalFilenames.size());
-        for (KV<FileResult<DestinationT>, ResourceId> entry : resultsToFinalFilenames) {
-          srcFiles.add(entry.getKey().getTempFilename());
-          dstFiles.add(entry.getValue());
-          LOG.info(
-              "Will copy temporary file {} to final location {}",
-              entry.getKey().getTempFilename(),
-              entry.getValue());
-        }
-        // During a failure case, files may have been deleted in an earlier step. Thus
-        // we ignore missing files here.
-        FileSystems.copy(srcFiles, dstFiles, StandardMoveOptions.IGNORE_MISSING_FILES);
-      } else {
-        LOG.info("No output files to write.");
+      LOG.debug("Copying {} files.", numFiles);
+      List<ResourceId> srcFiles = new ArrayList<>(resultsToFinalFilenames.size());
+      List<ResourceId> dstFiles = new ArrayList<>(resultsToFinalFilenames.size());
+      for (KV<FileResult<DestinationT>, ResourceId> entry : resultsToFinalFilenames) {
+        srcFiles.add(entry.getKey().getTempFilename());
+        dstFiles.add(entry.getValue());
+        LOG.info(
+            "Will copy temporary file {} to final location {}",
+            entry.getKey().getTempFilename(),
+            entry.getValue());
       }
+      // During a failure case, files may have been deleted in an earlier step. Thus
+      // we ignore missing files here.
+      FileSystems.copy(srcFiles, dstFiles, StandardMoveOptions.IGNORE_MISSING_FILES);
+      removeTemporaryFiles(srcFiles);
     }
 
     /**
      * Removes temporary output files. Uses the temporary directory to find files to remove.
      *
-     * <p>Can be called from subclasses that override {@link WriteOperation#finalize}.
+     * <p>Can be called from subclasses that override {@link WriteOperation#finalizeDestination}.
      * <b>Note:</b>If finalize is overridden and does <b>not</b> rename or otherwise finalize
      * temporary files, this method will remove them.
      */
     @VisibleForTesting
     @Experimental(Kind.FILESYSTEM)
     final void removeTemporaryFiles(
-        Set<ResourceId> knownFiles, boolean shouldRemoveTemporaryDirectory) throws IOException {
+        Collection<ResourceId> knownFiles, boolean shouldRemoveTemporaryDirectory)
+        throws IOException {
       ResourceId tempDir = tempDirectory.get();
       LOG.debug("Removing temporary bundle output files in {}.", tempDir);
 
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
index 28ac1a5..9cfabfe 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
@@ -26,18 +26,14 @@ import com.google.common.base.Objects;
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Multimap;
-import com.google.common.collect.Sets;
 import com.google.common.hash.Hashing;
 import java.io.IOException;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
 import javax.annotation.Nullable;
@@ -66,6 +62,7 @@ import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.Reshuffle;
+import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.Values;
 import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.WithKeys;
@@ -630,7 +627,7 @@ public class WriteFiles<UserT, DestinationT, OutputT>
    * FileBasedSink} for a description of writer results)-one for each bundle.
    *
    * <p>The final do-once ParDo uses a singleton collection asinput and the collection of writer
-   * results as a side-input. In this ParDo, {@link WriteOperation#finalize} is called to finalize
+   * results as a side-input. In this ParDo, {@link WriteOperation#finalizeDestination} is called to finalize
    * the write.
    *
    * <p>If the write of any element in the PCollection fails, {@link Writer#close} will be called
@@ -661,11 +658,25 @@ public class WriteFiles<UserT, DestinationT, OutputT>
     // PCollection. There is a dependency between this ParDo and the first (the
     // WriteOperation PCollection as a side input), so this will happen after the
     // initial ParDo.
-    PCollectionView<Integer> numShardsView =
+    final PCollectionView<Integer> numShardsView =
         (computeNumShards == null) ? null : input.apply(computeNumShards);
     List<PCollectionView<Integer>> shardingSideInputs = numShardsView == null
         ? ImmutableList.<PCollectionView<Integer>>of()
         : ImmutableList.of(numShardsView);
+    SerializableFunction<DoFn.ProcessContext, Integer> getFixedNumShards =
+        new SerializableFunction<DoFn.ProcessContext, Integer>() {
+          @Override
+          public Integer apply(DoFn<?, ?>.ProcessContext c) {
+            if (numShardsView != null) {
+              return c.sideInput(numShardsView);
+            } else if (numShardsProvider != null) {
+              return numShardsProvider.get();
+            } else {
+              return null;
+            }
+          }
+        };
+
     @SuppressWarnings("unchecked")
     Coder<BoundedWindow> shardedWindowCoder =
         (Coder<BoundedWindow>) input.getWindowingStrategy().getWindowFn().windowCoder();
@@ -755,16 +766,11 @@ public class WriteFiles<UserT, DestinationT, OutputT>
               .apply(Values.<FileResult<DestinationT>>create())
               .apply(
                   "FinalizeWindowed",
-                  ParDo.of(
-                          new FinalizeWindowedFn<>(
-                              numShardsView, numShardsProvider, writeOperation))
-                      .withSideInputs(
-                          numShardsView == null
-                              ? ImmutableList.<PCollectionView<?>>of()
-                              : ImmutableList.of(numShardsView)))
+                  ParDo.of(new FinalizeWindowedFn<>(getFixedNumShards, writeOperation))
+                      .withSideInputs(shardingSideInputs))
               .setCoder(KvCoder.of(destinationCoder, StringUtf8Coder.of()));
     } else {
-      final PCollectionView<Iterable<FileResult<DestinationT>>> resultsView =
+      PCollectionView<Iterable<FileResult<DestinationT>>> resultsView =
           results.apply(View.<FileResult<DestinationT>>asIterable());
 
       // Finalize the write in another do-once ParDo on the singleton collection containing the
@@ -775,14 +781,13 @@ public class WriteFiles<UserT, DestinationT, OutputT>
       // For the non-windowed case, we guarantee that  if no data is written but the user has
       // set numShards, then all shards will be written out as empty files. For this reason we
       // use a side input here.
-      PCollection<Void> singletonCollection = p.apply(Create.of((Void) null));
       outputFilenames =
-          singletonCollection
+          p.apply(Create.of((Void) null))
               .apply(
                   "FinalizeUnwindowed",
                   ParDo.of(
                           new FinalizeUnwindowedFn<>(
-                              numShardsView, numShardsProvider, resultsView, writeOperation))
+                              getFixedNumShards, resultsView, writeOperation))
                       .withSideInputs(
                           FluentIterable.concat(sideInputs, shardingSideInputs)
                               .append(resultsView)
@@ -798,19 +803,16 @@ public class WriteFiles<UserT, DestinationT, OutputT>
 
   private static class FinalizeWindowedFn<DestinationT>
       extends DoFn<FileResult<DestinationT>, KV<DestinationT, String>> {
-    @Nullable private final PCollectionView<Integer> numShardsView;
-    @Nullable private final ValueProvider<Integer> numShardsProvider;
+    private final SerializableFunction<DoFn.ProcessContext, Integer> getFixedNumShards;
     private final WriteOperation<DestinationT, ?> writeOperation;
 
     @Nullable private transient List<FileResult<DestinationT>> fileResults;
     @Nullable private Integer fixedNumShards;
 
     public FinalizeWindowedFn(
-        @Nullable PCollectionView<Integer> numShardsView,
-        @Nullable ValueProvider<Integer> numShardsProvider,
+        SerializableFunction<DoFn.ProcessContext, Integer> getFixedNumShards,
         WriteOperation<DestinationT, ?> writeOperation) {
-      this.numShardsView = numShardsView;
-      this.numShardsProvider = numShardsProvider;
+      this.getFixedNumShards = getFixedNumShards;
       this.writeOperation = writeOperation;
     }
 
@@ -824,58 +826,37 @@ public class WriteFiles<UserT, DestinationT, OutputT>
     public void processElement(ProcessContext c) {
       fileResults.add(c.element());
       if (fixedNumShards == null) {
-        if (numShardsView != null) {
-          fixedNumShards = c.sideInput(numShardsView);
-        } else if (numShardsProvider != null) {
-          fixedNumShards = numShardsProvider.get();
-        } else {
-          throw new IllegalStateException(
-              "When finalizing a windowed write, should have set fixed sharding");
-        }
+        fixedNumShards = getFixedNumShards.apply(c);
+        checkState(fixedNumShards != null, "Windowed write should have set fixed sharding");
       }
     }
 
     @FinishBundle
     public void finishBundle(FinishBundleContext c) throws Exception {
-      Set<ResourceId> tempFiles = Sets.newHashSet();
-      Multimap<KV<DestinationT, BoundedWindow>, FileResult<DestinationT>> results =
-          groupByDestinationAndWindow(fileResults);
-      List<KV<FileResult<DestinationT>, ResourceId>> resultsToFinalFilenames = Lists.newArrayList();
-      for (Map.Entry<KV<DestinationT, BoundedWindow>, Collection<FileResult<DestinationT>>>
-          destEntry : results.asMap().entrySet()) {
-        DestinationT destination = destEntry.getKey().getKey();
-        BoundedWindow window = destEntry.getKey().getValue();
-        resultsToFinalFilenames.addAll(writeOperation.buildOutputFilenames(
-            destination, window, fixedNumShards, destEntry.getValue()));
-      }
-      LOG.info("Will finalize {} files", resultsToFinalFilenames.size());
+      List<KV<FileResult<DestinationT>, ResourceId>> resultsToFinalFilenames =
+          finalizeAllDestinations(writeOperation, fileResults, fixedNumShards);
       for (KV<FileResult<DestinationT>, ResourceId> entry : resultsToFinalFilenames) {
         FileResult<DestinationT> res = entry.getKey();
-        tempFiles.add(res.getTempFilename());
         c.output(
             KV.of(res.getDestination(), entry.getValue().toString()),
             res.getWindow().maxTimestamp(),
             res.getWindow());
       }
-      writeOperation.copyToOutputFiles(resultsToFinalFilenames);
-      writeOperation.removeTemporaryFiles(tempFiles);
+      writeOperation.moveToOutputFiles(resultsToFinalFilenames);
     }
   }
 
   private static class FinalizeUnwindowedFn<DestinationT>
       extends DoFn<Void, KV<DestinationT, String>> {
-    @Nullable private final PCollectionView<Integer> numShardsView;
-    @Nullable private final ValueProvider<Integer> numShardsProvider;
+    private final SerializableFunction<DoFn.ProcessContext, Integer> getFixedNumShards;
     private final PCollectionView<Iterable<FileResult<DestinationT>>> resultsView;
     private final WriteOperation<DestinationT, ?> writeOperation;
 
     public FinalizeUnwindowedFn(
-        @Nullable PCollectionView<Integer> numShardsView,
-        @Nullable ValueProvider<Integer> numShardsProvider,
+        SerializableFunction<DoFn.ProcessContext, Integer> getFixedNumShards,
         PCollectionView<Iterable<FileResult<DestinationT>>> resultsView,
         WriteOperation<DestinationT, ?> writeOperation) {
-      this.numShardsView = numShardsView;
-      this.numShardsProvider = numShardsProvider;
+      this.getFixedNumShards = getFixedNumShards;
       this.resultsView = resultsView;
       this.writeOperation = writeOperation;
     }
@@ -883,118 +864,40 @@ public class WriteFiles<UserT, DestinationT, OutputT>
     @ProcessElement
     public void processElement(ProcessContext c) throws Exception {
       writeOperation.getSink().getDynamicDestinations().setSideInputAccessorFromProcessContext(c);
-      @Nullable Integer fixedNumShards;
-      if (numShardsView != null) {
-        fixedNumShards = c.sideInput(numShardsView);
-      } else if (numShardsProvider != null) {
-        fixedNumShards = numShardsProvider.get();
-      } else {
-        fixedNumShards = null;
-      }
-      Multimap<DestinationT, FileResult<DestinationT>> resultsByDestMultimap =
-          ArrayListMultimap.create();
-      for (FileResult<DestinationT> result : c.sideInput(resultsView)) {
-        resultsByDestMultimap.put(result.getDestination(), result);
-      }
-      Map<DestinationT, Collection<FileResult<DestinationT>>> resultsByDest =
-          resultsByDestMultimap.asMap();
-      if (resultsByDest.isEmpty()) {
-        Collection<FileResult<DestinationT>> empty = ImmutableList.of();
-        resultsByDest =
-            Collections.singletonMap(
-                writeOperation.getSink().getDynamicDestinations().getDefaultDestination(), empty);
-      }
-      List<KV<FileResult<DestinationT>, ResourceId>> resultsToFinalFilenames = Lists.newArrayList();
-      for (Map.Entry<DestinationT, Collection<FileResult<DestinationT>>>
-          destEntry : resultsByDest.entrySet()) {
-        resultsToFinalFilenames.addAll(
-            finalizeForDestinationFillEmptyShards(
-                destEntry.getKey(), fixedNumShards, destEntry.getValue()));
-      }
-      Set<ResourceId> tempFiles = Sets.newHashSet();
-      for (KV<FileResult<DestinationT>, ResourceId> entry :
-          resultsToFinalFilenames) {
-        tempFiles.add(entry.getKey().getTempFilename());
+      List<FileResult<DestinationT>> fileResults = Lists.newArrayList(c.sideInput(resultsView));
+      List<KV<FileResult<DestinationT>, ResourceId>> resultsToFinalFilenames =
+          fileResults.isEmpty()
+              ? writeOperation.finalizeDestination(
+                  writeOperation.getSink().getDynamicDestinations().getDefaultDestination(),
+                  GlobalWindow.INSTANCE,
+                  getFixedNumShards.apply(c),
+                  ImmutableList.<FileResult<DestinationT>>of())
+              : finalizeAllDestinations(writeOperation, fileResults, getFixedNumShards.apply(c));
+      for (KV<FileResult<DestinationT>, ResourceId> entry : resultsToFinalFilenames) {
         c.output(KV.of(entry.getKey().getDestination(), entry.getValue().toString()));
       }
-      writeOperation.copyToOutputFiles(resultsToFinalFilenames);
-      writeOperation.removeTemporaryFiles(tempFiles);
+      writeOperation.moveToOutputFiles(resultsToFinalFilenames);
     }
+  }
 
-    /**
-     * Finalize a list of files for a single destination. If a minimum number of shards is needed,
-     * this function will generate empty files for this destination to ensure that all shards are
-     * generated.
-     */
-    private List<KV<FileResult<DestinationT>, ResourceId>> finalizeForDestinationFillEmptyShards(
-        DestinationT destination,
-        @Nullable Integer fixedNumShards,
-        Collection<FileResult<DestinationT>> existingResults)
-        throws Exception {
-      checkState(!writeOperation.windowedWrites);
-
-      LOG.info(
-          "Finalizing write operation {} for destination {} num shards {}.",
-          writeOperation,
-          destination,
-          existingResults.size());
-      if (fixedNumShards != null) {
-        checkArgument(
-            existingResults.size() <= fixedNumShards,
-            "Fixed sharding into %s shards was specified, but got %s file results",
-            fixedNumShards,
-            existingResults.size());
-      }
-      // We must always output at least 1 shard, and honor user-specified numShards
-      // if set.
-      Set<Integer> missingShardNums;
-      if (fixedNumShards == null) {
-        missingShardNums = ImmutableSet.of(UNKNOWN_SHARDNUM);
-      } else {
-        missingShardNums = Sets.newHashSet();
-        for (int i = 0; i < fixedNumShards; ++i) {
-          missingShardNums.add(i);
-        }
-        for (FileResult<DestinationT> res : existingResults) {
-          checkArgument(
-              res.getShard() != UNKNOWN_SHARDNUM,
-              "Fixed sharding into %s shards was specified, "
-                  + "but file result %s does not specify a shard",
+  private static <DestinationT>
+      List<KV<FileResult<DestinationT>, ResourceId>> finalizeAllDestinations(
+          WriteOperation<DestinationT, ?> writeOperation,
+          List<FileResult<DestinationT>> fileResults,
+          Integer fixedNumShards)
+          throws Exception {
+    List<KV<FileResult<DestinationT>, ResourceId>> resultsToFinalFilenames = Lists.newArrayList();
+    Multimap<KV<DestinationT, BoundedWindow>, FileResult<DestinationT>> resultsByDestMultimap =
+        groupByDestinationAndWindow(fileResults);
+    for (Map.Entry<KV<DestinationT, BoundedWindow>, Collection<FileResult<DestinationT>>>
+        destEntry : resultsByDestMultimap.asMap().entrySet()) {
+      resultsToFinalFilenames.addAll(
+          writeOperation.finalizeDestination(
+              destEntry.getKey().getKey(),
+              destEntry.getKey().getValue(),
               fixedNumShards,
-              res);
-          missingShardNums.remove(res.getShard());
-        }
-      }
-      List<FileResult<DestinationT>> completeResults = Lists.newArrayList(existingResults);
-      if (!missingShardNums.isEmpty()) {
-        LOG.info(
-            "Creating {} empty output shards in addition to {} written for destination {}.",
-            missingShardNums.size(),
-            existingResults.size(),
-            destination);
-        for (int shard : missingShardNums) {
-          String uuid = UUID.randomUUID().toString();
-          LOG.info("Opening empty writer {} for destination {}", uuid, writeOperation, destination);
-          Writer<DestinationT, ?> writer = writeOperation.createWriter();
-          // Currently this code path is only called in the unwindowed case.
-          writer.open(uuid, destination);
-          writer.close();
-          completeResults.add(
-              new FileResult<>(
-                  writer.getOutputFile(),
-                  shard,
-                  GlobalWindow.INSTANCE,
-                  PaneInfo.ON_TIME_AND_ONLY_FIRING,
-                  destination));
-        }
-        LOG.debug("Done creating extra shards for {}.", destination);
-      }
-      return
-          writeOperation.buildOutputFilenames(
-              destination,
-              GlobalWindow.INSTANCE,
-              (fixedNumShards == null) ? null : completeResults.size(),
-              completeResults);
+              destEntry.getValue()));
     }
+    return resultsToFinalFilenames;
   }
 }
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
index f7988bb..561d036 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
@@ -27,7 +27,6 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
 import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileInputStream;
@@ -45,7 +44,6 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
-import java.util.Set;
 import java.util.zip.GZIPInputStream;
 import org.apache.beam.sdk.io.FileBasedSink.CompressionType;
 import org.apache.beam.sdk.io.FileBasedSink.FileResult;
@@ -205,13 +203,8 @@ public class FileBasedSinkTest {
 
     // TODO: test with null first argument?
     List<KV<FileResult<Void>, ResourceId>> resultsToFinalFilenames =
-        writeOp.buildOutputFilenames(null, null, null, fileResults);
-    Set<ResourceId> tempFiles = Sets.newHashSet();
-    for (KV<FileResult<Void>, ResourceId> res : resultsToFinalFilenames) {
-      tempFiles.add(res.getKey().getTempFilename());
-    }
-    writeOp.copyToOutputFiles(resultsToFinalFilenames);
-    writeOp.removeTemporaryFiles(tempFiles);
+        writeOp.finalizeDestination(null, null, null, fileResults);
+    writeOp.moveToOutputFiles(resultsToFinalFilenames);
 
     for (int i = 0; i < numFiles; i++) {
       ResourceId outputFilename =
@@ -304,7 +297,7 @@ public class FileBasedSinkTest {
     }
 
     // Copy input files to output files.
-    writeOp.copyToOutputFiles(resultsToFinalFilenames);
+    writeOp.moveToOutputFiles(resultsToFinalFilenames);
 
     // Assert that the contents were copied.
     for (int i = 0; i < expectedOutputPaths.size(); i++) {
@@ -355,7 +348,7 @@ public class FileBasedSinkTest {
 
   /** Reject non-distinct output filenames. */
   @Test
-  public void testCollidingOutputFilenames() throws IOException {
+  public void testCollidingOutputFilenames() throws Exception {
     ResourceId root = getBaseOutputDirectory();
     SimpleSink<Void> sink =
         SimpleSink.makeSimpleSink(root, "file", "-NN", "test", Compression.UNCOMPRESSED);
@@ -366,12 +359,12 @@ public class FileBasedSinkTest {
     ResourceId temp3 = root.resolve("temp3", StandardResolveOptions.RESOLVE_FILE);
     // More than one shard does.
     try {
-      Iterable<FileResult<Void>> results =
+      List<FileResult<Void>> results =
           Lists.newArrayList(
               new FileResult<Void>(temp1, 1 /* shard */, null, null, null),
               new FileResult<Void>(temp2, 1 /* shard */, null, null, null),
               new FileResult<Void>(temp3, 1 /* shard */, null, null, null));
-      writeOp.buildOutputFilenames(null, null, 5 /* numShards */, results);
+      writeOp.finalizeDestination(null, null, 5 /* numShards */, results);
       fail("Should have failed.");
     } catch (IllegalArgumentException exn) {
       assertThat(exn.getMessage(), containsString("generated the same name"));

-- 
To stop receiving notification emails like this one, please contact
"commits@beam.apache.org" <co...@beam.apache.org>.