You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/12/06 00:29:54 UTC

[beam] 07/13: Refactors WriteFiles into sub-transforms

This is an automated email from the ASF dual-hosted git repository.

jkff pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 2f73a9534a709dd1fe775ab31e0598d2d89f123c
Author: Eugene Kirpichov <ki...@google.com>
AuthorDate: Fri Nov 17 12:25:45 2017 -0800

    Refactors WriteFiles into sub-transforms
---
 .../java/org/apache/beam/sdk/io/WriteFiles.java    | 630 +++++++++++----------
 1 file changed, 322 insertions(+), 308 deletions(-)

diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
index 87459e9..0a538b1 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
@@ -24,7 +24,6 @@ import static com.google.common.base.Preconditions.checkState;
 
 import com.google.common.base.Objects;
 import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -37,7 +36,6 @@ import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
 import javax.annotation.Nullable;
-import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
@@ -47,6 +45,7 @@ import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.ShardedKeyCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.io.FileBasedSink.DynamicDestinations;
 import org.apache.beam.sdk.io.FileBasedSink.FileResult;
 import org.apache.beam.sdk.io.FileBasedSink.FileResultCoder;
 import org.apache.beam.sdk.io.FileBasedSink.WriteOperation;
@@ -176,53 +175,12 @@ public class WriteFiles<UserT, DestinationT, OutputT>
     return PCollectionViews.toAdditionalInputs(sideInputs);
   }
 
-  @Override
-  public WriteFilesResult<DestinationT> expand(PCollection<UserT> input) {
-    if (input.isBounded() == IsBounded.UNBOUNDED) {
-      checkArgument(windowedWrites,
-          "Must use windowed writes when applying %s to an unbounded PCollection",
-          WriteFiles.class.getSimpleName());
-    }
-    if (windowedWrites) {
-      // The reason for this is https://issues.apache.org/jira/browse/BEAM-1438
-      // and similar behavior in other runners.
-      checkArgument(
-          computeNumShards != null || numShardsProvider != null,
-          "When using windowed writes, must specify number of output shards explicitly",
-          WriteFiles.class.getSimpleName());
-    }
-    this.writeOperation = sink.createWriteOperation();
-    this.writeOperation.setWindowedWrites(windowedWrites);
-    return createWrite(input);
-  }
-
-  @Override
-  public void validate(PipelineOptions options) {
-    sink.validate(options);
-  }
-
-  @Override
-  public void populateDisplayData(DisplayData.Builder builder) {
-    super.populateDisplayData(builder);
-    builder
-        .add(DisplayData.item("sink", sink.getClass()).withLabel("WriteFiles Sink"))
-        .include("sink", sink);
-    if (getSharding() != null) {
-      builder.include("sharding", getSharding());
-    } else {
-      builder.addIfNotNull(DisplayData.item("numShards", getNumShards())
-          .withLabel("Fixed Number of Shards"));
-    }
-  }
-
   /** Returns the {@link FileBasedSink} associated with this PTransform. */
   public FileBasedSink<UserT, DestinationT, OutputT> getSink() {
     return sink;
   }
 
-  /**
-   * Returns whether or not to perform windowed writes.
-   */
+  /** Returns whether or not to perform windowed writes. */
   public boolean isWindowedWrites() {
     return windowedWrites;
   }
@@ -339,50 +297,189 @@ public class WriteFiles<UserT, DestinationT, OutputT>
         sink, computeNumShards, numShardsProvider, true, maxNumWritersPerBundle, sideInputs);
   }
 
-  private static class WriterKey<DestinationT> {
-    private final BoundedWindow window;
-    private final PaneInfo paneInfo;
-    private final DestinationT destination;
+  @Override
+  public void validate(PipelineOptions options) {
+    sink.validate(options);
+  }
 
-    WriterKey(BoundedWindow window, PaneInfo paneInfo, DestinationT destination) {
-      this.window = window;
-      this.paneInfo = paneInfo;
-      this.destination = destination;
+  @Override
+  public WriteFilesResult<DestinationT> expand(PCollection<UserT> input) {
+    if (input.isBounded() == IsBounded.UNBOUNDED) {
+      checkArgument(
+          windowedWrites,
+          "Must use windowed writes when applying %s to an unbounded PCollection",
+          WriteFiles.class.getSimpleName());
+    }
+    if (windowedWrites) {
+      // The reason for this is https://issues.apache.org/jira/browse/BEAM-1438
+      // and similar behavior in other runners.
+      checkArgument(
+          computeNumShards != null || numShardsProvider != null,
+          "When using windowed writes, must specify number of output shards explicitly",
+          WriteFiles.class.getSimpleName());
     }
+    this.writeOperation = sink.createWriteOperation();
+    this.writeOperation.setWindowedWrites(windowedWrites);
 
-    @Override
-    public boolean equals(Object o) {
-      if (!(o instanceof WriterKey)) {
-        return false;
-      }
-      WriterKey other = (WriterKey) o;
-      return Objects.equal(window, other.window)
-          && Objects.equal(paneInfo, other.paneInfo)
-          && Objects.equal(destination, other.destination);
+    if (!windowedWrites) {
+      // Re-window the data into the global window and remove any existing triggers.
+      input =
+          input.apply(
+              "RewindowIntoGlobal",
+              Window.<UserT>into(new GlobalWindows())
+                  .triggering(DefaultTrigger.of())
+                  .discardingFiredPanes());
+    }
+
+    Coder<DestinationT> destinationCoder;
+    try {
+      destinationCoder =
+          getDynamicDestinations()
+              .getDestinationCoderWithDefault(input.getPipeline().getCoderRegistry());
+      destinationCoder.verifyDeterministic();
+    } catch (CannotProvideCoderException | NonDeterministicException e) {
+      throw new RuntimeException(e);
+    }
+    @SuppressWarnings("unchecked")
+    Coder<BoundedWindow> windowCoder =
+        (Coder<BoundedWindow>) input.getWindowingStrategy().getWindowFn().windowCoder();
+    FileResultCoder<DestinationT> fileResultCoder =
+        FileResultCoder.of(windowCoder, destinationCoder);
+
+    PCollectionView<Integer> numShardsView =
+        (computeNumShards == null) ? null : input.apply(computeNumShards);
+
+    PCollection<FileResult<DestinationT>> tempFileResults =
+        (computeNumShards == null && numShardsProvider == null)
+            ? input.apply(
+                "WriteUnshardedBundlesToTempFiles",
+                new WriteUnshardedBundlesToTempFiles(destinationCoder, fileResultCoder))
+            : input.apply(
+                "WriteShardedBundlesToTempFiles",
+                new WriteShardedBundlesToTempFiles(
+                    destinationCoder, fileResultCoder, numShardsView));
+
+    return tempFileResults
+        .apply("GatherTempFileResults", new GatherResults<>(fileResultCoder))
+        .apply(
+            "FinalizeTempFileBundles",
+            new FinalizeTempFileBundles(numShardsView, destinationCoder));
+  }
+
+  @Override
+  public void populateDisplayData(DisplayData.Builder builder) {
+    super.populateDisplayData(builder);
+    builder
+        .add(DisplayData.item("sink", sink.getClass()).withLabel("WriteFiles Sink"))
+        .include("sink", sink);
+    if (getSharding() != null) {
+      builder.include("sharding", getSharding());
+    } else {
+      builder.addIfNotNull(
+          DisplayData.item("numShards", getNumShards()).withLabel("Fixed Number of Shards"));
+    }
+  }
+
+  private DynamicDestinations<UserT, DestinationT, OutputT> getDynamicDestinations() {
+    return (DynamicDestinations<UserT, DestinationT, OutputT>)
+        writeOperation.getSink().getDynamicDestinations();
+  }
+
+  private class GatherResults<ResultT>
+      extends PTransform<PCollection<ResultT>, PCollection<Iterable<ResultT>>> {
+    private final Coder<ResultT> resultCoder;
+
+    private GatherResults(Coder<ResultT> resultCoder) {
+      this.resultCoder = resultCoder;
     }
 
     @Override
-    public int hashCode() {
-      return Objects.hashCode(window, paneInfo, destination);
+    public PCollection<Iterable<ResultT>> expand(PCollection<ResultT> input) {
+      if (windowedWrites) {
+        // Reshuffle the results to make them stable against retries.
+        // Use a single void key to maximize size of bundles for finalization.
+        return input
+            .apply("Add void key", WithKeys.<Void, ResultT>of((Void) null))
+            .apply("Reshuffle", Reshuffle.<Void, ResultT>of())
+            .apply("Drop key", Values.<ResultT>create())
+            .apply("Gather bundles", ParDo.of(new GatherBundlesPerWindowFn<ResultT>()))
+            .setCoder(IterableCoder.of(resultCoder));
+      } else {
+        // Pass results via a side input rather than reshuffle, because we need to get an empty
+        // iterable to finalize if there are no results.
+        return input
+            .getPipeline()
+            .apply(
+                Reify.viewInGlobalWindow(
+                    input.apply(View.<ResultT>asIterable()), IterableCoder.of(resultCoder)));
+      }
     }
   }
 
-  // Hash the destination in a manner that we can then use as a key in a GBK. Since Java's
-  // hashCode isn't guaranteed to be stable across machines, we instead serialize the destination
-  // and use murmur3_32 to hash it. We enforce that destinationCoder must be deterministic, so
-  // this can be used as a key.
-  private static <DestinationT> int hashDestination(
-      DestinationT destination, Coder<DestinationT> destinationCoder) throws IOException {
-    return Hashing.murmur3_32()
-        .hashBytes(CoderUtils.encodeToByteArray(destinationCoder, destination))
-        .asInt();
+  private class WriteUnshardedBundlesToTempFiles
+      extends PTransform<PCollection<UserT>, PCollection<FileResult<DestinationT>>> {
+    private final Coder<DestinationT> destinationCoder;
+    private final Coder<FileResult<DestinationT>> fileResultCoder;
+
+    private WriteUnshardedBundlesToTempFiles(
+        Coder<DestinationT> destinationCoder, Coder<FileResult<DestinationT>> fileResultCoder) {
+      this.destinationCoder = destinationCoder;
+      this.fileResultCoder = fileResultCoder;
+    }
+
+    @Override
+    public PCollection<FileResult<DestinationT>> expand(PCollection<UserT> input) {
+      TupleTag<FileResult<DestinationT>> writtenRecordsTag = new TupleTag<>("writtenRecords");
+      TupleTag<KV<ShardedKey<Integer>, UserT>> spilledRecordsTag = new TupleTag<>("spilledRecords");
+      PCollectionTuple writeTuple =
+          input.apply(
+              "WriteUnshardedBundles",
+              ParDo.of(
+                      new WriteUnshardedTempFilesWithSpillingFn(
+                          spilledRecordsTag, destinationCoder))
+                  .withSideInputs(sideInputs)
+                  .withOutputTags(writtenRecordsTag, TupleTagList.of(spilledRecordsTag)));
+      PCollection<FileResult<DestinationT>> writtenBundleFiles =
+          writeTuple.get(writtenRecordsTag).setCoder(fileResultCoder);
+      // Any "spilled" elements are written using WriteShardedBundles. Assign shard numbers in
+      // finalize to stay consistent with what WriteWindowedBundles does.
+      PCollection<FileResult<DestinationT>> writtenSpilledFiles =
+          writeTuple
+              .get(spilledRecordsTag)
+              .setCoder(KvCoder.of(ShardedKeyCoder.of(VarIntCoder.of()), input.getCoder()))
+              // Here we group by a synthetic shard number in the range [0, spill factor),
+              // just for the sake of getting some parallelism within each destination when
+              // writing the spilled records, whereas the non-spilled records don't have a shard
+              // number assigned at all. Drop the shard number on the spilled records so that
+              // shard numbers are assigned together to both the spilled and non-spilled files in
+              // finalize.
+              .apply("GroupSpilled", GroupByKey.<ShardedKey<Integer>, UserT>create())
+              .apply(
+                  "WriteSpilled",
+                  ParDo.of(new WriteShardsIntoTempFilesFn()).withSideInputs(sideInputs))
+              .setCoder(fileResultCoder)
+              .apply(
+                  "DropShardNum",
+                  ParDo.of(
+                      new DoFn<FileResult<DestinationT>, FileResult<DestinationT>>() {
+                        @ProcessElement
+                        public void process(ProcessContext c) {
+                          c.output(c.element().withShard(UNKNOWN_SHARDNUM));
+                        }
+                      }));
+      return PCollectionList.of(writtenBundleFiles)
+          .and(writtenSpilledFiles)
+          .apply(Flatten.<FileResult<DestinationT>>pCollections())
+          .setCoder(fileResultCoder);
+    }
   }
 
   /**
    * Writes all the elements in a bundle using a {@link Writer} produced by the {@link
    * WriteOperation} associated with the {@link FileBasedSink}.
    */
-  private class WriteBundles extends DoFn<UserT, FileResult<DestinationT>> {
+  private class WriteUnshardedTempFilesWithSpillingFn
+      extends DoFn<UserT, FileResult<DestinationT>> {
     private final TupleTag<KV<ShardedKey<Integer>, UserT>> unwrittenRecordsTag;
     private final Coder<DestinationT> destinationCoder;
 
@@ -391,7 +488,7 @@ public class WriteFiles<UserT, DestinationT, OutputT>
 
     private int spilledShardNum = UNKNOWN_SHARDNUM;
 
-    WriteBundles(
+    WriteUnshardedTempFilesWithSpillingFn(
         TupleTag<KV<ShardedKey<Integer>, UserT>> unwrittenRecordsTag,
         Coder<DestinationT> destinationCoder) {
       this.unwrittenRecordsTag = unwrittenRecordsTag;
@@ -406,14 +503,14 @@ public class WriteFiles<UserT, DestinationT, OutputT>
 
     @ProcessElement
     public void processElement(ProcessContext c, BoundedWindow window) throws Exception {
-      sink.getDynamicDestinations().setSideInputAccessorFromProcessContext(c);
+      getDynamicDestinations().setSideInputAccessorFromProcessContext(c);
       PaneInfo paneInfo = c.pane();
       // If we are doing windowed writes, we need to ensure that we have separate files for
       // data in different windows/panes. Similar for dynamic writes, make sure that different
       // destinations go to different writers.
       // In the case of unwindowed writes, the window and the pane will always be the same, and
       // the map will only have a single element.
-      DestinationT destination = sink.getDynamicDestinations().getDestination(c.element());
+      DestinationT destination = getDynamicDestinations().getDestination(c.element());
       WriterKey<DestinationT> key = new WriterKey<>(window, c.pane(), destination);
       Writer<DestinationT, OutputT> writer = writers.get(key);
       if (writer == null) {
@@ -444,7 +541,7 @@ public class WriteFiles<UserT, DestinationT, OutputT>
           return;
         }
       }
-      writeOrClose(writer, getSink().getDynamicDestinations().formatRecord(c.element()));
+      writeOrClose(writer, getDynamicDestinations().formatRecord(c.element()));
     }
 
     @FinishBundle
@@ -468,64 +565,6 @@ public class WriteFiles<UserT, DestinationT, OutputT>
             window);
       }
     }
-
-    @Override
-    public void populateDisplayData(DisplayData.Builder builder) {
-      builder.delegate(WriteFiles.this);
-    }
-  }
-
-  /*
-   * Like {@link WriteBundles}, but where the elements for each shard have been collected into a
-   * single iterable.
-   */
-  private class WriteShardedBundles
-      extends DoFn<KV<ShardedKey<Integer>, Iterable<UserT>>, FileResult<DestinationT>> {
-    @ProcessElement
-    public void processElement(ProcessContext c, BoundedWindow window) throws Exception {
-      sink.getDynamicDestinations().setSideInputAccessorFromProcessContext(c);
-      // Since we key by a 32-bit hash of the destination, there might be multiple destinations
-      // in this iterable. The number of destinations is generally very small (1000s or less), so
-      // there will rarely be hash collisions.
-      Map<DestinationT, Writer<DestinationT, OutputT>> writers = Maps.newHashMap();
-      for (UserT input : c.element().getValue()) {
-        DestinationT destination = sink.getDynamicDestinations().getDestination(input);
-        Writer<DestinationT, OutputT> writer = writers.get(destination);
-        if (writer == null) {
-          String uuid = UUID.randomUUID().toString();
-          LOG.info(
-              "Opening writer {} for window {} pane {} destination {}",
-              uuid,
-              window,
-              c.pane(),
-              destination);
-          writer = writeOperation.createWriter();
-          writer.open(uuid, destination);
-          writers.put(destination, writer);
-        }
-        writeOrClose(writer, getSink().getDynamicDestinations().formatRecord(input));
-      }
-
-      // Close all writers.
-      for (Map.Entry<DestinationT, Writer<DestinationT, OutputT>> entry : writers.entrySet()) {
-        Writer<DestinationT, OutputT> writer = entry.getValue();
-        try {
-          // Close the writer; if this throws let the error propagate.
-          writer.close();
-        } catch (Exception e) {
-          // If anything goes wrong, make sure to delete the temporary file.
-          writer.cleanup();
-          throw e;
-        }
-        int shard = c.element().getKey().getShardNumber();
-        c.output(new FileResult<>(writer.getOutputFile(), shard, window, c.pane(), entry.getKey()));
-      }
-    }
-
-    @Override
-    public void populateDisplayData(DisplayData.Builder builder) {
-      builder.delegate(WriteFiles.this);
-    }
   }
 
   private static <DestinationT, OutputT> void writeOrClose(
@@ -549,21 +588,90 @@ public class WriteFiles<UserT, DestinationT, OutputT>
     }
   }
 
-  private class ApplyShardingKey extends DoFn<UserT, KV<ShardedKey<Integer>, UserT>> {
+  private static class WriterKey<DestinationT> {
+    private final BoundedWindow window;
+    private final PaneInfo paneInfo;
+    private final DestinationT destination;
+
+    WriterKey(BoundedWindow window, PaneInfo paneInfo, DestinationT destination) {
+      this.window = window;
+      this.paneInfo = paneInfo;
+      this.destination = destination;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (!(o instanceof WriterKey)) {
+        return false;
+      }
+      WriterKey other = (WriterKey) o;
+      return Objects.equal(window, other.window)
+          && Objects.equal(paneInfo, other.paneInfo)
+          && Objects.equal(destination, other.destination);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hashCode(window, paneInfo, destination);
+    }
+  }
+
+  // Hash the destination in a manner that we can then use as a key in a GBK. Since Java's
+  // hashCode isn't guaranteed to be stable across machines, we instead serialize the destination
+  // and use murmur3_32 to hash it. We enforce that destinationCoder must be deterministic, so
+  // this can be used as a key.
+  private static <DestinationT> int hashDestination(
+      DestinationT destination, Coder<DestinationT> destinationCoder) throws IOException {
+    return Hashing.murmur3_32()
+        .hashBytes(CoderUtils.encodeToByteArray(destinationCoder, destination))
+        .asInt();
+  }
+
+  private class WriteShardedBundlesToTempFiles
+      extends PTransform<PCollection<UserT>, PCollection<FileResult<DestinationT>>> {
+    private final Coder<DestinationT> destinationCoder;
+    private final Coder<FileResult<DestinationT>> fileResultCoder;
+    private final PCollectionView<Integer> numShardsView;
+
+    private WriteShardedBundlesToTempFiles(
+        Coder<DestinationT> destinationCoder,
+        Coder<FileResult<DestinationT>> fileResultCoder,
+        PCollectionView<Integer> numShardsView) {
+      this.destinationCoder = destinationCoder;
+      this.fileResultCoder = fileResultCoder;
+      this.numShardsView = numShardsView;
+    }
+
+    @Override
+    public PCollection<FileResult<DestinationT>> expand(PCollection<UserT> input) {
+      return input
+          .apply(
+              "ApplyShardingKey",
+              ParDo.of(new ApplyShardingKeyFn(numShardsView, destinationCoder))
+                  .withSideInputs(
+                      numShardsView == null
+                          ? ImmutableList.<PCollectionView<Integer>>of()
+                          : ImmutableList.of(numShardsView)))
+          .setCoder(KvCoder.of(ShardedKeyCoder.of(VarIntCoder.of()), input.getCoder()))
+          .apply("GroupIntoShards", GroupByKey.<ShardedKey<Integer>, UserT>create())
+          .apply(
+              "WriteShardsIntoTempFiles",
+              ParDo.of(new WriteShardsIntoTempFilesFn()).withSideInputs(sideInputs))
+          .setCoder(fileResultCoder);
+    }
+  }
+
+  private class ApplyShardingKeyFn extends DoFn<UserT, KV<ShardedKey<Integer>, UserT>> {
     private final @Nullable PCollectionView<Integer> numShardsView;
-    private final ValueProvider<Integer> numShardsProvider;
     private final Coder<DestinationT> destinationCoder;
 
     private int shardNumber;
 
-    ApplyShardingKey(
-        PCollectionView<Integer> numShardsView,
-        ValueProvider<Integer> numShardsProvider,
-        Coder<DestinationT> destinationCoder) {
-      this.destinationCoder = destinationCoder;
+    ApplyShardingKeyFn(
+        @Nullable PCollectionView<Integer> numShardsView, Coder<DestinationT> destinationCoder) {
       this.numShardsView = numShardsView;
-      this.numShardsProvider = numShardsProvider;
-      shardNumber = UNKNOWN_SHARDNUM;
+      this.destinationCoder = destinationCoder;
+      this.shardNumber = UNKNOWN_SHARDNUM;
     }
 
     @ProcessElement
@@ -595,7 +703,7 @@ public class WriteFiles<UserT, DestinationT, OutputT>
       // the destinations. This does mean that multiple destinations might end up on the same shard,
       // however the number of collisions should be small, so there's no need to worry about memory
       // issues.
-      DestinationT destination = sink.getDynamicDestinations().getDestination(context.element());
+      DestinationT destination = getDynamicDestinations().getDestination(context.element());
       context.output(
           KV.of(
               ShardedKey.of(hashDestination(destination, destinationCoder), shardNumber),
@@ -603,168 +711,86 @@ public class WriteFiles<UserT, DestinationT, OutputT>
     }
   }
 
-  private static <DestinationT>
-      Multimap<KV<DestinationT, BoundedWindow>, FileResult<DestinationT>>
-          groupByDestinationAndWindow(Iterable<FileResult<DestinationT>> results) {
-    Multimap<KV<DestinationT, BoundedWindow>, FileResult<DestinationT>> res =
-        ArrayListMultimap.create();
-    for (FileResult<DestinationT> result : results) {
-      res.put(KV.of(result.getDestination(), result.getWindow()), result);
+  private class WriteShardsIntoTempFilesFn
+      extends DoFn<KV<ShardedKey<Integer>, Iterable<UserT>>, FileResult<DestinationT>> {
+    @ProcessElement
+    public void processElement(ProcessContext c, BoundedWindow window) throws Exception {
+      getDynamicDestinations().setSideInputAccessorFromProcessContext(c);
+      // Since we key by a 32-bit hash of the destination, there might be multiple destinations
+      // in this iterable. The number of destinations is generally very small (1000s or less), so
+      // there will rarely be hash collisions.
+      Map<DestinationT, Writer<DestinationT, OutputT>> writers = Maps.newHashMap();
+      for (UserT input : c.element().getValue()) {
+        DestinationT destination = getDynamicDestinations().getDestination(input);
+        Writer<DestinationT, OutputT> writer = writers.get(destination);
+        if (writer == null) {
+          String uuid = UUID.randomUUID().toString();
+          LOG.info(
+              "Opening writer {} for window {} pane {} destination {}",
+              uuid,
+              window,
+              c.pane(),
+              destination);
+          writer = writeOperation.createWriter();
+          writer.open(uuid, destination);
+          writers.put(destination, writer);
+        }
+        writeOrClose(writer, getDynamicDestinations().formatRecord(input));
+      }
+
+      // Close all writers.
+      for (Map.Entry<DestinationT, Writer<DestinationT, OutputT>> entry : writers.entrySet()) {
+        Writer<DestinationT, OutputT> writer = entry.getValue();
+        try {
+          // Close the writer; if this throws let the error propagate.
+          writer.close();
+        } catch (Exception e) {
+          // If anything goes wrong, make sure to delete the temporary file.
+          writer.cleanup();
+          throw e;
+        }
+        int shard = c.element().getKey().getShardNumber();
+        c.output(new FileResult<>(writer.getOutputFile(), shard, window, c.pane(), entry.getKey()));
+      }
     }
-    return res;
   }
 
-  /**
-   * A write is performed as sequence of three {@link ParDo}'s.
-   *
-   * <p>This singleton collection containing the WriteOperation is then used as a side input to a
-   * ParDo over the PCollection of elements to write. In this bundle-writing phase, {@link
-   * WriteOperation#createWriter} is called to obtain a {@link Writer}. {@link Writer#open} and
-   * {@link Writer#close} are called in {@link DoFn.StartBundle} and {@link DoFn.FinishBundle},
-   * respectively, and {@link Writer#write} method is called for every element in the bundle. The
-   * output of this ParDo is a PCollection of <i>writer result</i> objects (see {@link
-   * FileBasedSink} for a description of writer results)-one for each bundle.
-   *
-   * <p>The final do-once ParDo uses a singleton collection asinput and the collection of writer
-   * results as a side-input. In this ParDo, {@link WriteOperation#finalizeDestination} is called to finalize
-   * the write.
-   *
-   * <p>If the write of any element in the PCollection fails, {@link Writer#close} will be called
-   * before the exception that caused the write to fail is propagated and the write result will be
-   * discarded.
-   *
-   * <p>Since the {@link WriteOperation} is serialized after the initialization ParDo and
-   * deserialized in the bundle-writing and finalization phases, any state change to the
-   * WriteOperation object that occurs during initialization is visible in the latter phases.
-   * However, the WriteOperation is not serialized after the bundle-writing phase. This is why
-   * implementations should guarantee that {@link WriteOperation#createWriter} does not mutate
-   * WriteOperation).
-   */
-  private WriteFilesResult<DestinationT> createWrite(PCollection<UserT> input) {
-    Pipeline p = input.getPipeline();
+  private class FinalizeTempFileBundles
+      extends PTransform<
+          PCollection<Iterable<FileResult<DestinationT>>>, WriteFilesResult<DestinationT>> {
+    @Nullable private final PCollectionView<Integer> numShardsView;
+    private final Coder<DestinationT> destinationCoder;
 
-    if (!windowedWrites) {
-      // Re-window the data into the global window and remove any existing triggers.
-      input =
-          input.apply(
-              Window.<UserT>into(new GlobalWindows())
-                  .triggering(DefaultTrigger.of())
-                  .discardingFiredPanes());
+    private FinalizeTempFileBundles(
+        @Nullable PCollectionView<Integer> numShardsView, Coder<DestinationT> destinationCoder) {
+      this.numShardsView = numShardsView;
+      this.destinationCoder = destinationCoder;
     }
 
-    final FileBasedSink.DynamicDestinations<?, DestinationT, OutputT> destinations =
-        writeOperation.getSink().getDynamicDestinations();
-
-    // Perform the per-bundle writes as a ParDo on the input PCollection (with the
-    // WriteOperation as a side input) and collect the results of the writes in a
-    // PCollection. There is a dependency between this ParDo and the first (the
-    // WriteOperation PCollection as a side input), so this will happen after the
-    // initial ParDo.
-    final PCollectionView<Integer> numShardsView =
-        (computeNumShards == null) ? null : input.apply(computeNumShards);
-    List<PCollectionView<Integer>> shardingSideInputs = numShardsView == null
-        ? ImmutableList.<PCollectionView<Integer>>of()
-        : ImmutableList.of(numShardsView);
+    @Override
+    public WriteFilesResult<DestinationT> expand(
+        PCollection<Iterable<FileResult<DestinationT>>> input) {
 
-    @SuppressWarnings("unchecked")
-    Coder<BoundedWindow> shardedWindowCoder =
-        (Coder<BoundedWindow>) input.getWindowingStrategy().getWindowFn().windowCoder();
-    final Coder<DestinationT> destinationCoder;
-    try {
-      destinationCoder =
-          destinations.getDestinationCoderWithDefault(input.getPipeline().getCoderRegistry());
-      destinationCoder.verifyDeterministic();
-    } catch (CannotProvideCoderException | NonDeterministicException e) {
-      throw new RuntimeException(e);
-    }
-    final FileResultCoder<DestinationT> fileResultCoder =
-        FileResultCoder.of(shardedWindowCoder, destinationCoder);
-
-    PCollection<FileResult<DestinationT>> results;
-    if (computeNumShards == null && numShardsProvider == null) {
-      TupleTag<FileResult<DestinationT>> writtenRecordsTag =
-          new TupleTag<>("writtenRecordsTag");
-      TupleTag<KV<ShardedKey<Integer>, UserT>> spilledRecordsTag =
-          new TupleTag<>("spilledRecordsTag");
-      String writeName = windowedWrites ? "WriteWindowedBundles" : "WriteBundles";
-      PCollectionTuple writeTuple =
-          input.apply(
-              writeName,
-              ParDo.of(new WriteBundles(spilledRecordsTag, destinationCoder))
-                  .withSideInputs(sideInputs)
-                  .withOutputTags(writtenRecordsTag, TupleTagList.of(spilledRecordsTag)));
-      PCollection<FileResult<DestinationT>> writtenBundleFiles =
-          writeTuple.get(writtenRecordsTag).setCoder(fileResultCoder);
-      // Any "spilled" elements are written using WriteShardedBundles. Assign shard numbers in
-      // finalize to stay consistent with what WriteWindowedBundles does.
-      PCollection<FileResult<DestinationT>> writtenSpilledFiles =
-          writeTuple
-              .get(spilledRecordsTag)
-              .setCoder(KvCoder.of(ShardedKeyCoder.of(VarIntCoder.of()), input.getCoder()))
-              // Here we group by a synthetic shard number in the range [0, spill factor),
-              // just for the sake of getting some parallelism within each destination when
-              // writing the spilled records, whereas the non-spilled records don't have a shard
-              // number assigned at all. Drop the shard number on the spilled records so that
-              // shard numbers are assigned together to both the spilled and non-spilled files in
-              // finalize.
-              .apply("GroupSpilled", GroupByKey.<ShardedKey<Integer>, UserT>create())
-              .apply(
-                  "WriteSpilled", ParDo.of(new WriteShardedBundles()).withSideInputs(sideInputs))
-              .setCoder(fileResultCoder)
-              .apply("DropShardNum", ParDo.of(
-                  new DoFn<FileResult<DestinationT>, FileResult<DestinationT>>() {
-                    @ProcessElement
-                    public void process(ProcessContext c) {
-                      c.output(c.element().withShard(UNKNOWN_SHARDNUM));
-                    }
-                  }));
-      results =
-          PCollectionList.of(writtenBundleFiles)
-              .and(writtenSpilledFiles)
-              .apply(Flatten.<FileResult<DestinationT>>pCollections());
-    } else {
-      results =
+      List<PCollectionView<?>> finalizeSideInputs = Lists.newArrayList(sideInputs);
+      if (numShardsView != null) {
+        finalizeSideInputs.add(numShardsView);
+      }
+      PCollection<KV<DestinationT, String>> outputFilenames =
           input
-              .apply(
-                  "ApplyShardLabel",
-                  ParDo.of(new ApplyShardingKey(numShardsView, numShardsProvider, destinationCoder))
-                      .withSideInputs(shardingSideInputs))
-              .setCoder(KvCoder.of(ShardedKeyCoder.of(VarIntCoder.of()), input.getCoder()))
-              .apply("GroupIntoShards", GroupByKey.<ShardedKey<Integer>, UserT>create())
-              .apply(
-                  "WriteShardedBundles",
-                  ParDo.of(new WriteShardedBundles()).withSideInputs(this.sideInputs));
-    }
-    results.setCoder(fileResultCoder);
+              .apply("Finalize", ParDo.of(new FinalizeFn()).withSideInputs(finalizeSideInputs))
+              .setCoder(KvCoder.of(destinationCoder, StringUtf8Coder.of()));
 
-    PCollection<Iterable<FileResult<DestinationT>>> fileResultBundles;
-    if (windowedWrites) {
-      // Reshuffle the results to make them stable against retries.
-      // Use a single void key to maximize size of bundles for finalization.
-      PCollection<FileResult<DestinationT>> stableResults = results
-          .apply("Add void key", WithKeys.<Void, FileResult<DestinationT>>of((Void) null))
-          .apply("Reshuffle", Reshuffle.<Void, FileResult<DestinationT>>of())
-          .apply("Drop key", Values.<FileResult<DestinationT>>create());
-      fileResultBundles =
-          stableResults
-              .apply(
-                  "Gather bundles",
-                  ParDo.of(new GatherBundlesPerWindowFn<FileResult<DestinationT>>()))
-              .setCoder(IterableCoder.of(fileResultCoder));
-    } else {
-      // Pass results via a side input rather than reshuffle, because we need to get an empty
-      // iterable to finalize if there are no results.
-      fileResultBundles =
-          p.apply(
-              Reify.viewInGlobalWindow(
-                  results.apply(View.<FileResult<DestinationT>>asIterable()),
-                  IterableCoder.of(fileResultCoder)));
+      TupleTag<KV<DestinationT, String>> perDestinationOutputFilenamesTag =
+          new TupleTag<>("perDestinationOutputFilenames");
+      return WriteFilesResult.in(
+          input.getPipeline(), perDestinationOutputFilenamesTag, outputFilenames);
     }
 
-    class FinalizeFn extends DoFn<Iterable<FileResult<DestinationT>>, KV<DestinationT, String>> {
+    private class FinalizeFn
+        extends DoFn<Iterable<FileResult<DestinationT>>, KV<DestinationT, String>> {
       @ProcessElement
       public void process(ProcessContext c) throws Exception {
-        writeOperation.getSink().getDynamicDestinations().setSideInputAccessorFromProcessContext(c);
+        getDynamicDestinations().setSideInputAccessorFromProcessContext(c);
         @Nullable Integer fixedNumShards;
         if (numShardsView != null) {
           fixedNumShards = c.sideInput(numShardsView);
@@ -776,11 +802,11 @@ public class WriteFiles<UserT, DestinationT, OutputT>
         }
         List<FileResult<DestinationT>> fileResults = Lists.newArrayList(c.element());
         LOG.info("Finalizing {} file results", fileResults.size());
-        DestinationT defaultDest = destinations.getDefaultDestination();
+        DestinationT defaultDest = getDynamicDestinations().getDefaultDestination();
         List<KV<FileResult<DestinationT>, ResourceId>> resultsToFinalFilenames =
             fileResults.isEmpty()
                 ? writeOperation.finalizeDestination(
-                defaultDest, GlobalWindow.INSTANCE, fixedNumShards, fileResults)
+                    defaultDest, GlobalWindow.INSTANCE, fixedNumShards, fileResults)
                 : finalizeAllDestinations(fileResults, fixedNumShards);
         for (KV<FileResult<DestinationT>, ResourceId> entry : resultsToFinalFilenames) {
           FileResult<DestinationT> res = entry.getKey();
@@ -789,18 +815,6 @@ public class WriteFiles<UserT, DestinationT, OutputT>
         writeOperation.moveToOutputFiles(resultsToFinalFilenames);
       }
     }
-
-    List<PCollectionView<?>> sideInputs =
-        FluentIterable.concat(this.sideInputs, shardingSideInputs).toList();
-    PCollection<KV<DestinationT, String>> outputFilenames =
-        fileResultBundles
-            .apply("Finalize", ParDo.of(new FinalizeFn()).withSideInputs(sideInputs))
-            .setCoder(KvCoder.of(destinationCoder, StringUtf8Coder.of()));
-
-    TupleTag<KV<DestinationT, String>> perDestinationOutputFilenamesTag =
-        new TupleTag<>("perDestinationOutputFilenames");
-    return WriteFilesResult.in(
-        input.getPipeline(), perDestinationOutputFilenamesTag, outputFilenames);
   }
 
   private List<KV<FileResult<DestinationT>, ResourceId>> finalizeAllDestinations(

-- 
To stop receiving notification emails like this one, please contact
"commits@beam.apache.org" <co...@beam.apache.org>.