You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/12/06 00:29:47 UTC

[beam] branch master updated (b059664 -> 761ec1a)

This is an automated email from the ASF dual-hosted git repository.

jkff pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git.


    from b059664  Merge pull request #4218: Build output redirection option
     new 90402e4  enforce fixed sharding
     new 5795c32  Merges Writer.openWindowed/Unwindowed and removes result of close()
     new b2d0671  non-null window/pane in FileResult
     new 54eacf4  remove ShardAssignment
     new 97df5e7  consolidates windowed/unwindowed finalize fns somewhat
     new c615438  Unifies windowed and unwindowed finalize.
     new 2f73a95  Refactors WriteFiles into sub-transforms
     new 5b600da  Converts WriteFiles to AutoValue
     new 3ecf13b  Makes checkstyle and findbugs happy
     new 83837eb  Renames spilled back to unwritten
     new 060f05c  Fixes tests
     new d314339  Reintroduces dynamic sharding with windowed writes for bounded collections
     new 761ec1a  This closes #4145: Many simplifications to WriteFiles

The 13 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/beam/examples/WindowedWordCount.java    |    5 +-
 .../examples/common/WriteOneFilePerWindow.java     |   12 +-
 .../apache/beam/examples/WindowedWordCountIT.java  |    8 +
 .../beam/runners/apex/examples/WordCountTest.java  |    2 +-
 .../core/construction/WriteFilesTranslation.java   |    7 +-
 .../construction/WriteFilesTranslationTest.java    |   13 +-
 .../beam/runners/dataflow/DataflowRunnerTest.java  |    2 +-
 .../beam/runners/spark/io/AvroPipelineTest.java    |    5 +-
 .../java/org/apache/beam/sdk/io/FileBasedSink.java |  281 +++---
 .../java/org/apache/beam/sdk/io/WriteFiles.java    | 1061 ++++++++------------
 .../java/org/apache/beam/sdk/transforms/Reify.java |   73 +-
 .../apache/beam/sdk/values/TypeDescriptors.java    |    4 +
 .../org/apache/beam/sdk/io/FileBasedSinkTest.java  |   66 +-
 .../org/apache/beam/sdk/io/WriteFilesTest.java     |   27 +-
 14 files changed, 745 insertions(+), 821 deletions(-)

-- 
To stop receiving notification emails like this one, please contact
['"commits@beam.apache.org" <co...@beam.apache.org>'].

[beam] 02/13: Merges Writer.openWindowed/Unwindowed and removes result of close()

Posted by jk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jkff pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 5795c32c91a6de02b6731eacb5eef8ae55f069f5
Author: Eugene Kirpichov <ek...@gmail.com>
AuthorDate: Tue Oct 17 17:06:43 2017 -0700

    Merges Writer.openWindowed/Unwindowed and removes result of close()
---
 .../java/org/apache/beam/sdk/io/FileBasedSink.java | 94 ++++++----------------
 .../java/org/apache/beam/sdk/io/WriteFiles.java    | 69 +++++++++-------
 .../org/apache/beam/sdk/io/FileBasedSinkTest.java  | 15 ++--
 3 files changed, 71 insertions(+), 107 deletions(-)

diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index d4cb57d..2108253 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -103,7 +103,7 @@ import org.slf4j.LoggerFactory;
  *
  * <p>In order to ensure fault-tolerance, a bundle may be executed multiple times (e.g., in the
  * event of failure/retry or for redundancy). However, exactly one of these executions will have its
- * result passed to the finalize method. Each call to {@link Writer#openWindowed} or {@link
+ * result passed to the finalize method. Each call to {@link Writer#open} or {@link
  * Writer#openUnwindowed} is passed a unique <i>bundle id</i> when it is called by the WriteFiles
  * transform, so even redundant or retried bundles will have a unique way of identifying their
  * output.
@@ -805,10 +805,7 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
     /** Unique id for this output bundle. */
     private @Nullable String id;
 
-    private @Nullable BoundedWindow window;
-    private @Nullable PaneInfo paneInfo;
-    private int shard = -1;
-    private @Nullable DestinationT destination;
+    private DestinationT destination;
 
     /** The output file for this bundle. May be null if opening failed. */
     private @Nullable ResourceId outputFile;
@@ -868,53 +865,10 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
      * id populated for the case of static sharding. In cases where the runner is dynamically
      * picking sharding, shard might be set to -1.
      */
-    public final void openWindowed(
-        String uId, BoundedWindow window, PaneInfo paneInfo, int shard, DestinationT destination)
-        throws Exception {
-      if (!getWriteOperation().windowedWrites) {
-        throw new IllegalStateException("openWindowed called a non-windowed sink.");
-      }
-      open(uId, window, paneInfo, shard, destination);
-    }
-
-    /** Called for each value in the bundle. */
-    public abstract void write(OutputT value) throws Exception;
-
-    /**
-     * Similar to {@link #openWindowed} however for the case where unwindowed writes were requested.
-     */
-    public final void openUnwindowed(String uId, int shard, DestinationT destination)
-        throws Exception {
-      if (getWriteOperation().windowedWrites) {
-        throw new IllegalStateException("openUnwindowed called a windowed sink.");
-      }
-      open(uId, null, null, shard, destination);
-    }
-
-    // Helper function to close a channel, on exception cases.
-    // Always throws prior exception, with any new closing exception suppressed.
-    private static void closeChannelAndThrow(
-        WritableByteChannel channel, ResourceId filename, Exception prior) throws Exception {
-      try {
-        channel.close();
-      } catch (Exception e) {
-        LOG.error("Closing channel for {} failed.", filename, e);
-        prior.addSuppressed(e);
-        throw prior;
-      }
-    }
-
-    private void open(
-        String uId,
-        @Nullable BoundedWindow window,
-        @Nullable PaneInfo paneInfo,
-        int shard,
-        DestinationT destination)
+    public final void open(
+        String uId, DestinationT destination)
         throws Exception {
       this.id = uId;
-      this.window = window;
-      this.paneInfo = paneInfo;
-      this.shard = shard;
       this.destination = destination;
       ResourceId tempDirectory = getWriteOperation().tempDirectory.get();
       outputFile = tempDirectory.resolve(id, StandardResolveOptions.RESOLVE_FILE);
@@ -925,15 +879,6 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
           getWriteOperation().getSink().writableByteChannelFactory;
       // The factory may force a MIME type or it may return null, indicating to use the sink's MIME.
       String channelMimeType = firstNonNull(factory.getMimeType(), mimeType);
-      LOG.info(
-          "Opening temporary file {} with MIME type {} "
-              + "to write destination {} shard {} window {} pane {}",
-          outputFile,
-          channelMimeType,
-          destination,
-          shard,
-          window,
-          paneInfo);
       WritableByteChannel tempChannel = FileSystems.create(outputFile, channelMimeType);
       try {
         channel = factory.create(tempChannel);
@@ -960,6 +905,26 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
       LOG.debug("Starting write of bundle {} to {}.", this.id, outputFile);
     }
 
+    /** Called for each value in the bundle. */
+    public abstract void write(OutputT value) throws Exception;
+
+    public ResourceId getOutputFile() {
+      return outputFile;
+    }
+
+    // Helper function to close a channel, on exception cases.
+    // Always throws prior exception, with any new closing exception suppressed.
+    private static void closeChannelAndThrow(
+        WritableByteChannel channel, ResourceId filename, Exception prior) throws Exception {
+      try {
+        channel.close();
+      } catch (Exception e) {
+        LOG.error("Closing channel for {} failed.", filename, e);
+        prior.addSuppressed(e);
+        throw prior;
+      }
+    }
+
     public final void cleanup() throws Exception {
       if (outputFile != null) {
         LOG.info("Deleting temporary file {}", outputFile);
@@ -970,22 +935,19 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
     }
 
     /** Closes the channel and returns the bundle result. */
-    public final FileResult<DestinationT> close() throws Exception {
+    public final void close() throws Exception {
       checkState(outputFile != null, "FileResult.close cannot be called with a null outputFile");
+      LOG.debug("Closing {}", outputFile);
 
-      LOG.debug("Writing footer to {}.", outputFile);
       try {
         writeFooter();
       } catch (Exception e) {
-        LOG.error("Writing footer to {} failed, closing channel.", outputFile, e);
         closeChannelAndThrow(channel, outputFile, e);
       }
 
-      LOG.debug("Finishing write to {}.", outputFile);
       try {
         finishWrite();
       } catch (Exception e) {
-        LOG.error("Finishing write to {} failed, closing channel.", outputFile, e);
         closeChannelAndThrow(channel, outputFile, e);
       }
 
@@ -1001,11 +963,7 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
       } catch (Exception e) {
         throw new IOException(String.format("Failed closing channel to %s", outputFile), e);
       }
-
-      FileResult<DestinationT> result =
-          new FileResult<>(outputFile, shard, window, paneInfo, destination);
       LOG.info("Successfully wrote temporary file {}", outputFile);
-      return result;
     }
 
     /** Return the WriteOperation that this Writer belongs to. */
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
index c99abce..35b28a1 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
@@ -425,18 +425,13 @@ public class WriteFiles<UserT, DestinationT, OutputT>
         if (writers.size() <= maxNumWritersPerBundle) {
           String uuid = UUID.randomUUID().toString();
           LOG.info(
-              "Opening writer {} for write operation {}, window {} pane {} destination {}",
+              "Opening writer {} for window {} pane {} destination {}",
               uuid,
-              writeOperation,
               window,
               paneInfo,
               destination);
           writer = writeOperation.createWriter();
-          if (windowedWrites) {
-            writer.openWindowed(uuid, window, paneInfo, UNKNOWN_SHARDNUM, destination);
-          } else {
-            writer.openUnwindowed(uuid, UNKNOWN_SHARDNUM, destination);
-          }
+          writer.open(uuid, destination);
           writers.put(key, writer);
           LOG.debug("Done opening writer");
         } else {
@@ -461,17 +456,23 @@ public class WriteFiles<UserT, DestinationT, OutputT>
     public void finishBundle(FinishBundleContext c) throws Exception {
       for (Map.Entry<WriterKey<DestinationT>, Writer<DestinationT, OutputT>> entry :
           writers.entrySet()) {
+        WriterKey<DestinationT> key = entry.getKey();
         Writer<DestinationT, OutputT> writer = entry.getValue();
-        FileResult<DestinationT> result;
         try {
-          result = writer.close();
+          writer.close();
         } catch (Exception e) {
           // If anything goes wrong, make sure to delete the temporary file.
           writer.cleanup();
           throw e;
         }
-        BoundedWindow window = entry.getKey().window;
-        c.output(result, window.maxTimestamp(), window);
+        BoundedWindow window = key.window;
+        FileResult<DestinationT> res =
+            windowedWrites
+                ? new FileResult<>(
+                    writer.getOutputFile(), UNKNOWN_SHARDNUM, window, key.paneInfo, key.destination)
+                : new FileResult<>(
+                    writer.getOutputFile(), UNKNOWN_SHARDNUM, null, null, key.destination);
+        c.output(res, window.maxTimestamp(), window);
       }
     }
 
@@ -505,20 +506,15 @@ public class WriteFiles<UserT, DestinationT, OutputT>
         DestinationT destination = sink.getDynamicDestinations().getDestination(input);
         Writer<DestinationT, OutputT> writer = writers.get(destination);
         if (writer == null) {
-          LOG.debug("Opening writer for write operation {}", writeOperation);
+          String uuid = UUID.randomUUID().toString();
+          LOG.info(
+              "Opening writer {} for window {} pane {} destination {}",
+              uuid,
+              window,
+              c.pane(),
+              destination);
           writer = writeOperation.createWriter();
-          int shardNumber =
-              shardNumberAssignment == ShardAssignment.ASSIGN_WHEN_WRITING
-                  ? c.element().getKey().getShardNumber()
-                  : UNKNOWN_SHARDNUM;
-          if (windowedWrites) {
-            writer.openWindowed(
-                UUID.randomUUID().toString(), window, c.pane(), shardNumber, destination);
-          } else {
-            writer.openUnwindowed(
-                UUID.randomUUID().toString(), shardNumber, destination);
-          }
-          LOG.debug("Done opening writer");
+          writer.open(uuid, destination);
           writers.put(destination, writer);
         }
         writeOrClose(writer, getSink().getDynamicDestinations().formatRecord(input));
@@ -527,16 +523,26 @@ public class WriteFiles<UserT, DestinationT, OutputT>
       // Close all writers.
       for (Map.Entry<DestinationT, Writer<DestinationT, OutputT>> entry : writers.entrySet()) {
         Writer<DestinationT, OutputT> writer = entry.getValue();
-        FileResult<DestinationT> result;
         try {
           // Close the writer; if this throws let the error propagate.
-          result = writer.close();
-          c.output(result);
+          writer.close();
         } catch (Exception e) {
           // If anything goes wrong, make sure to delete the temporary file.
           writer.cleanup();
           throw e;
         }
+        int shardNumber =
+            shardNumberAssignment == ShardAssignment.ASSIGN_WHEN_WRITING
+                ? c.element().getKey().getShardNumber()
+                : UNKNOWN_SHARDNUM;
+        if (windowedWrites) {
+          c.output(
+              new FileResult<>(
+                  writer.getOutputFile(), shardNumber, window, c.pane(), entry.getKey()));
+        } else {
+          c.output(
+              new FileResult<>(writer.getOutputFile(), shardNumber, null, null, entry.getKey()));
+        }
       }
     }
 
@@ -998,11 +1004,14 @@ public class WriteFiles<UserT, DestinationT, OutputT>
             existingResults.size(),
             destination);
         for (int shard : missingShardNums) {
+          String uuid = UUID.randomUUID().toString();
+          LOG.info("Opening empty writer {} for destination {}", uuid, writeOperation, destination);
           Writer<DestinationT, ?> writer = writeOperation.createWriter();
           // Currently this code path is only called in the unwindowed case.
-          writer.openUnwindowed(UUID.randomUUID().toString(), shard, destination);
-          FileResult<DestinationT> emptyWrite = writer.close();
-          completeResults.add(emptyWrite);
+          writer.open(uuid, destination);
+          writer.close();
+          completeResults.add(
+              new FileResult<>(writer.getOutputFile(), shard, null, null, destination));
         }
         LOG.debug("Done creating extra shards for {}.", destination);
       }
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
index 29f3c1b..f7988bb 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
@@ -102,14 +102,12 @@ public class FileBasedSinkTest {
 
     SimpleSink.SimpleWriter<Void> writer =
         buildWriteOperationWithTempDir(getBaseTempDirectory()).createWriter();
-    writer.openUnwindowed(testUid, -1, null);
+    writer.open(testUid, null);
     for (String value : values) {
       writer.write(value);
     }
-    FileResult result = writer.close();
-
-    FileBasedSink sink = writer.getWriteOperation().getSink();
-    assertEquals(expectedTempFile, result.getTempFilename());
+    writer.close();
+    assertEquals(expectedTempFile, writer.getOutputFile());
     assertFileContains(expected, expectedTempFile);
   }
 
@@ -514,12 +512,11 @@ public class FileBasedSinkTest {
     expected.add("footer");
     expected.add("footer");
 
-    writer.openUnwindowed(testUid, -1, null);
+    writer.open(testUid, null);
     writer.write("a");
     writer.write("b");
-    final FileResult result = writer.close();
-
-    assertEquals(expectedFile, result.getTempFilename());
+    writer.close();
+    assertEquals(expectedFile, writer.getOutputFile());
     assertFileContains(expected, expectedFile);
   }
 

-- 
To stop receiving notification emails like this one, please contact
"commits@beam.apache.org" <co...@beam.apache.org>.

[beam] 06/13: Unifies windowed and unwindowed finalize.

Posted by jk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jkff pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit c6154382263a68d7eca893c7da3617d177e4c1df
Author: Eugene Kirpichov <ki...@google.com>
AuthorDate: Wed Nov 15 20:19:09 2017 -0800

    Unifies windowed and unwindowed finalize.
---
 .../java/org/apache/beam/sdk/io/WriteFiles.java    | 232 ++++++++-------------
 .../java/org/apache/beam/sdk/transforms/Reify.java |  73 ++++++-
 .../apache/beam/sdk/values/TypeDescriptors.java    |   4 +
 3 files changed, 163 insertions(+), 146 deletions(-)

diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
index 9cfabfe..87459e9 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
@@ -42,11 +42,11 @@ import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
+import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.ShardedKeyCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
-import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.FileBasedSink.FileResult;
 import org.apache.beam.sdk.io.FileBasedSink.FileResultCoder;
 import org.apache.beam.sdk.io.FileBasedSink.WriteOperation;
@@ -55,14 +55,13 @@ import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
-import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Reify;
 import org.apache.beam.sdk.transforms.Reshuffle;
-import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.Values;
 import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.WithKeys;
@@ -653,6 +652,9 @@ public class WriteFiles<UserT, DestinationT, OutputT>
                   .discardingFiredPanes());
     }
 
+    final FileBasedSink.DynamicDestinations<?, DestinationT, OutputT> destinations =
+        writeOperation.getSink().getDynamicDestinations();
+
     // Perform the per-bundle writes as a ParDo on the input PCollection (with the
     // WriteOperation as a side input) and collect the results of the writes in a
     // PCollection. There is a dependency between this ParDo and the first (the
@@ -663,19 +665,6 @@ public class WriteFiles<UserT, DestinationT, OutputT>
     List<PCollectionView<Integer>> shardingSideInputs = numShardsView == null
         ? ImmutableList.<PCollectionView<Integer>>of()
         : ImmutableList.of(numShardsView);
-    SerializableFunction<DoFn.ProcessContext, Integer> getFixedNumShards =
-        new SerializableFunction<DoFn.ProcessContext, Integer>() {
-          @Override
-          public Integer apply(DoFn<?, ?>.ProcessContext c) {
-            if (numShardsView != null) {
-              return c.sideInput(numShardsView);
-            } else if (numShardsProvider != null) {
-              return numShardsProvider.get();
-            } else {
-              return null;
-            }
-          }
-        };
 
     @SuppressWarnings("unchecked")
     Coder<BoundedWindow> shardedWindowCoder =
@@ -683,13 +672,12 @@ public class WriteFiles<UserT, DestinationT, OutputT>
     final Coder<DestinationT> destinationCoder;
     try {
       destinationCoder =
-          sink.getDynamicDestinations()
-              .getDestinationCoderWithDefault(input.getPipeline().getCoderRegistry());
+          destinations.getDestinationCoderWithDefault(input.getPipeline().getCoderRegistry());
       destinationCoder.verifyDeterministic();
     } catch (CannotProvideCoderException | NonDeterministicException e) {
       throw new RuntimeException(e);
     }
-    FileResultCoder<DestinationT> fileResultCoder =
+    final FileResultCoder<DestinationT> fileResultCoder =
         FileResultCoder.of(shardedWindowCoder, destinationCoder);
 
     PCollection<FileResult<DestinationT>> results;
@@ -749,155 +737,109 @@ public class WriteFiles<UserT, DestinationT, OutputT>
     }
     results.setCoder(fileResultCoder);
 
-    PCollection<KV<DestinationT, String>> outputFilenames;
+    PCollection<Iterable<FileResult<DestinationT>>> fileResultBundles;
     if (windowedWrites) {
-      // We need to materialize the FileResult's before the renaming stage: this can be done either
-      // via a side input or via a GBK. However, when processing streaming windowed writes, results
-      // will arrive multiple times. This means we can't share the below implementation that turns
-      // the results into a side input, as new data arriving into a side input does not trigger the
-      // listening DoFn. We also can't use a GBK because we need only the materialization, but not
-      // the (potentially lossy, if the user's trigger is lossy) continuation triggering that GBK
-      // would give. So, we use a reshuffle (over a single key to maximize bundling).
-      outputFilenames =
-          results
-              .apply(WithKeys.<Void, FileResult<DestinationT>>of((Void) null))
-              .setCoder(KvCoder.of(VoidCoder.of(), results.getCoder()))
-              .apply("Reshuffle", Reshuffle.<Void, FileResult<DestinationT>>of())
-              .apply(Values.<FileResult<DestinationT>>create())
+      // Reshuffle the results to make them stable against retries.
+      // Use a single void key to maximize size of bundles for finalization.
+      PCollection<FileResult<DestinationT>> stableResults = results
+          .apply("Add void key", WithKeys.<Void, FileResult<DestinationT>>of((Void) null))
+          .apply("Reshuffle", Reshuffle.<Void, FileResult<DestinationT>>of())
+          .apply("Drop key", Values.<FileResult<DestinationT>>create());
+      fileResultBundles =
+          stableResults
               .apply(
-                  "FinalizeWindowed",
-                  ParDo.of(new FinalizeWindowedFn<>(getFixedNumShards, writeOperation))
-                      .withSideInputs(shardingSideInputs))
-              .setCoder(KvCoder.of(destinationCoder, StringUtf8Coder.of()));
+                  "Gather bundles",
+                  ParDo.of(new GatherBundlesPerWindowFn<FileResult<DestinationT>>()))
+              .setCoder(IterableCoder.of(fileResultCoder));
     } else {
-      PCollectionView<Iterable<FileResult<DestinationT>>> resultsView =
-          results.apply(View.<FileResult<DestinationT>>asIterable());
-
-      // Finalize the write in another do-once ParDo on the singleton collection containing the
-      // Writer. The results from the per-bundle writes are given as an Iterable side input.
-      // The WriteOperation's state is the same as after its initialization in the first
-      // do-once ParDo. There is a dependency between this ParDo and the parallel write (the writer
-      // results collection as a side input), so it will happen after the parallel write.
-      // For the non-windowed case, we guarantee that  if no data is written but the user has
-      // set numShards, then all shards will be written out as empty files. For this reason we
-      // use a side input here.
-      outputFilenames =
-          p.apply(Create.of((Void) null))
-              .apply(
-                  "FinalizeUnwindowed",
-                  ParDo.of(
-                          new FinalizeUnwindowedFn<>(
-                              getFixedNumShards, resultsView, writeOperation))
-                      .withSideInputs(
-                          FluentIterable.concat(sideInputs, shardingSideInputs)
-                              .append(resultsView)
-                              .toList()))
-              .setCoder(KvCoder.of(destinationCoder, StringUtf8Coder.of()));
+      // Pass results via a side input rather than reshuffle, because we need to get an empty
+      // iterable to finalize if there are no results.
+      fileResultBundles =
+          p.apply(
+              Reify.viewInGlobalWindow(
+                  results.apply(View.<FileResult<DestinationT>>asIterable()),
+                  IterableCoder.of(fileResultCoder)));
     }
 
+    class FinalizeFn extends DoFn<Iterable<FileResult<DestinationT>>, KV<DestinationT, String>> {
+      @ProcessElement
+      public void process(ProcessContext c) throws Exception {
+        writeOperation.getSink().getDynamicDestinations().setSideInputAccessorFromProcessContext(c);
+        @Nullable Integer fixedNumShards;
+        if (numShardsView != null) {
+          fixedNumShards = c.sideInput(numShardsView);
+        } else if (numShardsProvider != null) {
+          fixedNumShards = numShardsProvider.get();
+        } else {
+          checkState(!windowedWrites, "Windowed write should have set fixed sharding");
+          fixedNumShards = null;
+        }
+        List<FileResult<DestinationT>> fileResults = Lists.newArrayList(c.element());
+        LOG.info("Finalizing {} file results", fileResults.size());
+        DestinationT defaultDest = destinations.getDefaultDestination();
+        List<KV<FileResult<DestinationT>, ResourceId>> resultsToFinalFilenames =
+            fileResults.isEmpty()
+                ? writeOperation.finalizeDestination(
+                defaultDest, GlobalWindow.INSTANCE, fixedNumShards, fileResults)
+                : finalizeAllDestinations(fileResults, fixedNumShards);
+        for (KV<FileResult<DestinationT>, ResourceId> entry : resultsToFinalFilenames) {
+          FileResult<DestinationT> res = entry.getKey();
+          c.output(KV.of(res.getDestination(), entry.getValue().toString()));
+        }
+        writeOperation.moveToOutputFiles(resultsToFinalFilenames);
+      }
+    }
+
+    List<PCollectionView<?>> sideInputs =
+        FluentIterable.concat(this.sideInputs, shardingSideInputs).toList();
+    PCollection<KV<DestinationT, String>> outputFilenames =
+        fileResultBundles
+            .apply("Finalize", ParDo.of(new FinalizeFn()).withSideInputs(sideInputs))
+            .setCoder(KvCoder.of(destinationCoder, StringUtf8Coder.of()));
+
     TupleTag<KV<DestinationT, String>> perDestinationOutputFilenamesTag =
         new TupleTag<>("perDestinationOutputFilenames");
     return WriteFilesResult.in(
         input.getPipeline(), perDestinationOutputFilenamesTag, outputFilenames);
   }
 
-  private static class FinalizeWindowedFn<DestinationT>
-      extends DoFn<FileResult<DestinationT>, KV<DestinationT, String>> {
-    private final SerializableFunction<DoFn.ProcessContext, Integer> getFixedNumShards;
-    private final WriteOperation<DestinationT, ?> writeOperation;
-
-    @Nullable private transient List<FileResult<DestinationT>> fileResults;
-    @Nullable private Integer fixedNumShards;
-
-    public FinalizeWindowedFn(
-        SerializableFunction<DoFn.ProcessContext, Integer> getFixedNumShards,
-        WriteOperation<DestinationT, ?> writeOperation) {
-      this.getFixedNumShards = getFixedNumShards;
-      this.writeOperation = writeOperation;
+  private List<KV<FileResult<DestinationT>, ResourceId>> finalizeAllDestinations(
+      List<FileResult<DestinationT>> fileResults, @Nullable Integer fixedNumShards)
+      throws Exception {
+    Multimap<KV<DestinationT, BoundedWindow>, FileResult<DestinationT>> res =
+        ArrayListMultimap.create();
+    for (FileResult<DestinationT> result : fileResults) {
+      res.put(KV.of(result.getDestination(), result.getWindow()), result);
+    }
+    List<KV<FileResult<DestinationT>, ResourceId>> resultsToFinalFilenames = Lists.newArrayList();
+    for (Map.Entry<KV<DestinationT, BoundedWindow>, Collection<FileResult<DestinationT>>>
+        destEntry : res.asMap().entrySet()) {
+      KV<DestinationT, BoundedWindow> destWindow = destEntry.getKey();
+      resultsToFinalFilenames.addAll(
+          writeOperation.finalizeDestination(
+              destWindow.getKey(), destWindow.getValue(), fixedNumShards, destEntry.getValue()));
     }
+    return resultsToFinalFilenames;
+  }
+
+  private static class GatherBundlesPerWindowFn<T> extends DoFn<T, Iterable<T>> {
+    @Nullable private transient Multimap<BoundedWindow, T> bundles = null;
 
     @StartBundle
     public void startBundle() {
-      fileResults = Lists.newArrayList();
-      fixedNumShards = null;
+      bundles = ArrayListMultimap.create();
     }
 
     @ProcessElement
-    public void processElement(ProcessContext c) {
-      fileResults.add(c.element());
-      if (fixedNumShards == null) {
-        fixedNumShards = getFixedNumShards.apply(c);
-        checkState(fixedNumShards != null, "Windowed write should have set fixed sharding");
-      }
+    public void process(ProcessContext c, BoundedWindow w) {
+      bundles.put(w, c.element());
     }
 
     @FinishBundle
     public void finishBundle(FinishBundleContext c) throws Exception {
-      List<KV<FileResult<DestinationT>, ResourceId>> resultsToFinalFilenames =
-          finalizeAllDestinations(writeOperation, fileResults, fixedNumShards);
-      for (KV<FileResult<DestinationT>, ResourceId> entry : resultsToFinalFilenames) {
-        FileResult<DestinationT> res = entry.getKey();
-        c.output(
-            KV.of(res.getDestination(), entry.getValue().toString()),
-            res.getWindow().maxTimestamp(),
-            res.getWindow());
+      for (BoundedWindow w : bundles.keySet()) {
+        c.output(Lists.newArrayList(bundles.get(w)), w.maxTimestamp(), w);
       }
-      writeOperation.moveToOutputFiles(resultsToFinalFilenames);
     }
   }
-
-  private static class FinalizeUnwindowedFn<DestinationT>
-      extends DoFn<Void, KV<DestinationT, String>> {
-    private final SerializableFunction<DoFn.ProcessContext, Integer> getFixedNumShards;
-    private final PCollectionView<Iterable<FileResult<DestinationT>>> resultsView;
-    private final WriteOperation<DestinationT, ?> writeOperation;
-
-    public FinalizeUnwindowedFn(
-        SerializableFunction<DoFn.ProcessContext, Integer> getFixedNumShards,
-        PCollectionView<Iterable<FileResult<DestinationT>>> resultsView,
-        WriteOperation<DestinationT, ?> writeOperation) {
-      this.getFixedNumShards = getFixedNumShards;
-      this.resultsView = resultsView;
-      this.writeOperation = writeOperation;
-    }
-
-    @ProcessElement
-    public void processElement(ProcessContext c) throws Exception {
-      writeOperation.getSink().getDynamicDestinations().setSideInputAccessorFromProcessContext(c);
-      List<FileResult<DestinationT>> fileResults = Lists.newArrayList(c.sideInput(resultsView));
-      List<KV<FileResult<DestinationT>, ResourceId>> resultsToFinalFilenames =
-          fileResults.isEmpty()
-              ? writeOperation.finalizeDestination(
-                  writeOperation.getSink().getDynamicDestinations().getDefaultDestination(),
-                  GlobalWindow.INSTANCE,
-                  getFixedNumShards.apply(c),
-                  ImmutableList.<FileResult<DestinationT>>of())
-              : finalizeAllDestinations(writeOperation, fileResults, getFixedNumShards.apply(c));
-      for (KV<FileResult<DestinationT>, ResourceId> entry : resultsToFinalFilenames) {
-        c.output(KV.of(entry.getKey().getDestination(), entry.getValue().toString()));
-      }
-      writeOperation.moveToOutputFiles(resultsToFinalFilenames);
-    }
-  }
-
-  private static <DestinationT>
-      List<KV<FileResult<DestinationT>, ResourceId>> finalizeAllDestinations(
-          WriteOperation<DestinationT, ?> writeOperation,
-          List<FileResult<DestinationT>> fileResults,
-          Integer fixedNumShards)
-          throws Exception {
-    List<KV<FileResult<DestinationT>, ResourceId>> resultsToFinalFilenames = Lists.newArrayList();
-    Multimap<KV<DestinationT, BoundedWindow>, FileResult<DestinationT>> resultsByDestMultimap =
-        groupByDestinationAndWindow(fileResults);
-    for (Map.Entry<KV<DestinationT, BoundedWindow>, Collection<FileResult<DestinationT>>>
-        destEntry : resultsByDestMultimap.asMap().entrySet()) {
-      resultsToFinalFilenames.addAll(
-          writeOperation.finalizeDestination(
-              destEntry.getKey().getKey(),
-              destEntry.getKey().getValue(),
-              fixedNumShards,
-              destEntry.getValue()));
-    }
-    return resultsToFinalFilenames;
-  }
 }
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reify.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reify.java
index caa89e6..7f5c881 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reify.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reify.java
@@ -18,17 +18,69 @@
 
 package org.apache.beam.sdk.transforms;
 
+import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TimestampedValue;
 import org.apache.beam.sdk.values.TimestampedValue.TimestampedValueCoder;
 import org.apache.beam.sdk.values.ValueInSingleWindow;
 import org.joda.time.Duration;
 
-/** {@link PTransform PTransforms} for reifying the timestamp, window and pane of values. */
+/**
+ * {@link PTransform PTransforms} for converting between explicit and implicit form of various Beam
+ * values.
+ */
 public class Reify {
+  private static class ReifyView<K, V>
+  extends PTransform<PCollection<K>, PCollection<KV<K, V>>> {
+    private final PCollectionView<V> view;
+    private final Coder<V> coder;
+
+    private ReifyView(PCollectionView<V> view, Coder<V> coder) {
+      this.view = view;
+      this.coder = coder;
+    }
+
+    @Override
+    public PCollection<KV<K, V>> expand(PCollection<K> input) {
+      return input
+          .apply(
+              ParDo.of(
+                      new DoFn<K, KV<K, V>>() {
+                        @ProcessElement
+                        public void process(ProcessContext c) {
+                          c.output(KV.of(c.element(), c.sideInput(view)));
+                        }
+                      })
+                  .withSideInputs(view))
+          .setCoder(KvCoder.of(input.getCoder(), coder));
+    }
+  }
+
+  private static class ReifyViewInGlobalWindow<V>
+  extends PTransform<PBegin, PCollection<V>> {
+    private final PCollectionView<V> view;
+    private final Coder<V> coder;
+
+    private ReifyViewInGlobalWindow(PCollectionView<V> view, Coder<V> coder) {
+      this.view = view;
+      this.coder = coder;
+    }
+
+    @Override
+    public PCollection<V> expand(PBegin input) {
+      return input
+          .apply(Create.of((Void) null).withCoder(VoidCoder.of()))
+          .apply(Reify.<Void, V>viewAsValues(view, coder))
+          .apply(Values.<V>create());
+    }
+  }
+
   /** Private implementation of {@link #windows()}. */
   private static class Window<T>
       extends PTransform<PCollection<T>, PCollection<ValueInSingleWindow<T>>> {
@@ -184,9 +236,28 @@ public class Reify {
     return new WindowInValue<>();
   }
 
+  /** Extracts the timestamps from each value in a {@link KV}. */
   public static <K, V>
       PTransform<PCollection<KV<K, TimestampedValue<V>>>, PCollection<KV<K, V>>>
           extractTimestampsFromValues() {
     return new ExtractTimestampsFromValues<>();
   }
+
+  /**
+   * Pairs each element in a collection with the value of a side input associated with the element's
+   * window.
+   */
+  public static <K, V> PTransform<PCollection<K>, PCollection<KV<K, V>>> viewAsValues(
+      PCollectionView<V> view, Coder<V> coder) {
+    return new ReifyView<>(view, coder);
+  }
+
+  /**
+   * Returns a {@link PCollection} consisting of a single element, containing the value of the given
+   * view in the global window.
+   */
+  public static <K, V> PTransform<PBegin, PCollection<V>> viewInGlobalWindow(
+      PCollectionView<V> view, Coder<V> coder) {
+    return new ReifyViewInGlobalWindow<>(view, coder);
+  }
 }
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypeDescriptors.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypeDescriptors.java
index e59f84b..8ef2a4d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypeDescriptors.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypeDescriptors.java
@@ -291,6 +291,10 @@ public class TypeDescriptors {
     return typeDescriptor;
   }
 
+  public static TypeDescriptor<Void> voids() {
+    return new TypeDescriptor<Void>() {};
+  }
+
   /**
    * A helper interface for use with {@link #extractFromTypeParameters(Object, Class,
    * TypeVariableExtractor)}.

-- 
To stop receiving notification emails like this one, please contact
"commits@beam.apache.org" <co...@beam.apache.org>.

[beam] 07/13: Refactors WriteFiles into sub-transforms

Posted by jk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jkff pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 2f73a9534a709dd1fe775ab31e0598d2d89f123c
Author: Eugene Kirpichov <ki...@google.com>
AuthorDate: Fri Nov 17 12:25:45 2017 -0800

    Refactors WriteFiles into sub-transforms
---
 .../java/org/apache/beam/sdk/io/WriteFiles.java    | 630 +++++++++++----------
 1 file changed, 322 insertions(+), 308 deletions(-)

diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
index 87459e9..0a538b1 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
@@ -24,7 +24,6 @@ import static com.google.common.base.Preconditions.checkState;
 
 import com.google.common.base.Objects;
 import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -37,7 +36,6 @@ import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
 import javax.annotation.Nullable;
-import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
@@ -47,6 +45,7 @@ import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.ShardedKeyCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.io.FileBasedSink.DynamicDestinations;
 import org.apache.beam.sdk.io.FileBasedSink.FileResult;
 import org.apache.beam.sdk.io.FileBasedSink.FileResultCoder;
 import org.apache.beam.sdk.io.FileBasedSink.WriteOperation;
@@ -176,53 +175,12 @@ public class WriteFiles<UserT, DestinationT, OutputT>
     return PCollectionViews.toAdditionalInputs(sideInputs);
   }
 
-  @Override
-  public WriteFilesResult<DestinationT> expand(PCollection<UserT> input) {
-    if (input.isBounded() == IsBounded.UNBOUNDED) {
-      checkArgument(windowedWrites,
-          "Must use windowed writes when applying %s to an unbounded PCollection",
-          WriteFiles.class.getSimpleName());
-    }
-    if (windowedWrites) {
-      // The reason for this is https://issues.apache.org/jira/browse/BEAM-1438
-      // and similar behavior in other runners.
-      checkArgument(
-          computeNumShards != null || numShardsProvider != null,
-          "When using windowed writes, must specify number of output shards explicitly",
-          WriteFiles.class.getSimpleName());
-    }
-    this.writeOperation = sink.createWriteOperation();
-    this.writeOperation.setWindowedWrites(windowedWrites);
-    return createWrite(input);
-  }
-
-  @Override
-  public void validate(PipelineOptions options) {
-    sink.validate(options);
-  }
-
-  @Override
-  public void populateDisplayData(DisplayData.Builder builder) {
-    super.populateDisplayData(builder);
-    builder
-        .add(DisplayData.item("sink", sink.getClass()).withLabel("WriteFiles Sink"))
-        .include("sink", sink);
-    if (getSharding() != null) {
-      builder.include("sharding", getSharding());
-    } else {
-      builder.addIfNotNull(DisplayData.item("numShards", getNumShards())
-          .withLabel("Fixed Number of Shards"));
-    }
-  }
-
   /** Returns the {@link FileBasedSink} associated with this PTransform. */
   public FileBasedSink<UserT, DestinationT, OutputT> getSink() {
     return sink;
   }
 
-  /**
-   * Returns whether or not to perform windowed writes.
-   */
+  /** Returns whether or not to perform windowed writes. */
   public boolean isWindowedWrites() {
     return windowedWrites;
   }
@@ -339,50 +297,189 @@ public class WriteFiles<UserT, DestinationT, OutputT>
         sink, computeNumShards, numShardsProvider, true, maxNumWritersPerBundle, sideInputs);
   }
 
-  private static class WriterKey<DestinationT> {
-    private final BoundedWindow window;
-    private final PaneInfo paneInfo;
-    private final DestinationT destination;
+  @Override
+  public void validate(PipelineOptions options) {
+    sink.validate(options);
+  }
 
-    WriterKey(BoundedWindow window, PaneInfo paneInfo, DestinationT destination) {
-      this.window = window;
-      this.paneInfo = paneInfo;
-      this.destination = destination;
+  @Override
+  public WriteFilesResult<DestinationT> expand(PCollection<UserT> input) {
+    if (input.isBounded() == IsBounded.UNBOUNDED) {
+      checkArgument(
+          windowedWrites,
+          "Must use windowed writes when applying %s to an unbounded PCollection",
+          WriteFiles.class.getSimpleName());
+    }
+    if (windowedWrites) {
+      // The reason for this is https://issues.apache.org/jira/browse/BEAM-1438
+      // and similar behavior in other runners.
+      checkArgument(
+          computeNumShards != null || numShardsProvider != null,
+          "When using windowed writes, must specify number of output shards explicitly",
+          WriteFiles.class.getSimpleName());
     }
+    this.writeOperation = sink.createWriteOperation();
+    this.writeOperation.setWindowedWrites(windowedWrites);
 
-    @Override
-    public boolean equals(Object o) {
-      if (!(o instanceof WriterKey)) {
-        return false;
-      }
-      WriterKey other = (WriterKey) o;
-      return Objects.equal(window, other.window)
-          && Objects.equal(paneInfo, other.paneInfo)
-          && Objects.equal(destination, other.destination);
+    if (!windowedWrites) {
+      // Re-window the data into the global window and remove any existing triggers.
+      input =
+          input.apply(
+              "RewindowIntoGlobal",
+              Window.<UserT>into(new GlobalWindows())
+                  .triggering(DefaultTrigger.of())
+                  .discardingFiredPanes());
+    }
+
+    Coder<DestinationT> destinationCoder;
+    try {
+      destinationCoder =
+          getDynamicDestinations()
+              .getDestinationCoderWithDefault(input.getPipeline().getCoderRegistry());
+      destinationCoder.verifyDeterministic();
+    } catch (CannotProvideCoderException | NonDeterministicException e) {
+      throw new RuntimeException(e);
+    }
+    @SuppressWarnings("unchecked")
+    Coder<BoundedWindow> windowCoder =
+        (Coder<BoundedWindow>) input.getWindowingStrategy().getWindowFn().windowCoder();
+    FileResultCoder<DestinationT> fileResultCoder =
+        FileResultCoder.of(windowCoder, destinationCoder);
+
+    PCollectionView<Integer> numShardsView =
+        (computeNumShards == null) ? null : input.apply(computeNumShards);
+
+    PCollection<FileResult<DestinationT>> tempFileResults =
+        (computeNumShards == null && numShardsProvider == null)
+            ? input.apply(
+                "WriteUnshardedBundlesToTempFiles",
+                new WriteUnshardedBundlesToTempFiles(destinationCoder, fileResultCoder))
+            : input.apply(
+                "WriteShardedBundlesToTempFiles",
+                new WriteShardedBundlesToTempFiles(
+                    destinationCoder, fileResultCoder, numShardsView));
+
+    return tempFileResults
+        .apply("GatherTempFileResults", new GatherResults<>(fileResultCoder))
+        .apply(
+            "FinalizeTempFileBundles",
+            new FinalizeTempFileBundles(numShardsView, destinationCoder));
+  }
+
+  @Override
+  public void populateDisplayData(DisplayData.Builder builder) {
+    super.populateDisplayData(builder);
+    builder
+        .add(DisplayData.item("sink", sink.getClass()).withLabel("WriteFiles Sink"))
+        .include("sink", sink);
+    if (getSharding() != null) {
+      builder.include("sharding", getSharding());
+    } else {
+      builder.addIfNotNull(
+          DisplayData.item("numShards", getNumShards()).withLabel("Fixed Number of Shards"));
+    }
+  }
+
+  private DynamicDestinations<UserT, DestinationT, OutputT> getDynamicDestinations() {
+    return (DynamicDestinations<UserT, DestinationT, OutputT>)
+        writeOperation.getSink().getDynamicDestinations();
+  }
+
+  private class GatherResults<ResultT>
+      extends PTransform<PCollection<ResultT>, PCollection<Iterable<ResultT>>> {
+    private final Coder<ResultT> resultCoder;
+
+    private GatherResults(Coder<ResultT> resultCoder) {
+      this.resultCoder = resultCoder;
     }
 
     @Override
-    public int hashCode() {
-      return Objects.hashCode(window, paneInfo, destination);
+    public PCollection<Iterable<ResultT>> expand(PCollection<ResultT> input) {
+      if (windowedWrites) {
+        // Reshuffle the results to make them stable against retries.
+        // Use a single void key to maximize size of bundles for finalization.
+        return input
+            .apply("Add void key", WithKeys.<Void, ResultT>of((Void) null))
+            .apply("Reshuffle", Reshuffle.<Void, ResultT>of())
+            .apply("Drop key", Values.<ResultT>create())
+            .apply("Gather bundles", ParDo.of(new GatherBundlesPerWindowFn<ResultT>()))
+            .setCoder(IterableCoder.of(resultCoder));
+      } else {
+        // Pass results via a side input rather than reshuffle, because we need to get an empty
+        // iterable to finalize if there are no results.
+        return input
+            .getPipeline()
+            .apply(
+                Reify.viewInGlobalWindow(
+                    input.apply(View.<ResultT>asIterable()), IterableCoder.of(resultCoder)));
+      }
     }
   }
 
-  // Hash the destination in a manner that we can then use as a key in a GBK. Since Java's
-  // hashCode isn't guaranteed to be stable across machines, we instead serialize the destination
-  // and use murmur3_32 to hash it. We enforce that destinationCoder must be deterministic, so
-  // this can be used as a key.
-  private static <DestinationT> int hashDestination(
-      DestinationT destination, Coder<DestinationT> destinationCoder) throws IOException {
-    return Hashing.murmur3_32()
-        .hashBytes(CoderUtils.encodeToByteArray(destinationCoder, destination))
-        .asInt();
+  private class WriteUnshardedBundlesToTempFiles
+      extends PTransform<PCollection<UserT>, PCollection<FileResult<DestinationT>>> {
+    private final Coder<DestinationT> destinationCoder;
+    private final Coder<FileResult<DestinationT>> fileResultCoder;
+
+    private WriteUnshardedBundlesToTempFiles(
+        Coder<DestinationT> destinationCoder, Coder<FileResult<DestinationT>> fileResultCoder) {
+      this.destinationCoder = destinationCoder;
+      this.fileResultCoder = fileResultCoder;
+    }
+
+    @Override
+    public PCollection<FileResult<DestinationT>> expand(PCollection<UserT> input) {
+      TupleTag<FileResult<DestinationT>> writtenRecordsTag = new TupleTag<>("writtenRecords");
+      TupleTag<KV<ShardedKey<Integer>, UserT>> spilledRecordsTag = new TupleTag<>("spilledRecords");
+      PCollectionTuple writeTuple =
+          input.apply(
+              "WriteUnshardedBundles",
+              ParDo.of(
+                      new WriteUnshardedTempFilesWithSpillingFn(
+                          spilledRecordsTag, destinationCoder))
+                  .withSideInputs(sideInputs)
+                  .withOutputTags(writtenRecordsTag, TupleTagList.of(spilledRecordsTag)));
+      PCollection<FileResult<DestinationT>> writtenBundleFiles =
+          writeTuple.get(writtenRecordsTag).setCoder(fileResultCoder);
+      // Any "spilled" elements are written using WriteShardedBundles. Assign shard numbers in
+      // finalize to stay consistent with what WriteWindowedBundles does.
+      PCollection<FileResult<DestinationT>> writtenSpilledFiles =
+          writeTuple
+              .get(spilledRecordsTag)
+              .setCoder(KvCoder.of(ShardedKeyCoder.of(VarIntCoder.of()), input.getCoder()))
+              // Here we group by a synthetic shard number in the range [0, spill factor),
+              // just for the sake of getting some parallelism within each destination when
+              // writing the spilled records, whereas the non-spilled records don't have a shard
+              // number assigned at all. Drop the shard number on the spilled records so that
+              // shard numbers are assigned together to both the spilled and non-spilled files in
+              // finalize.
+              .apply("GroupSpilled", GroupByKey.<ShardedKey<Integer>, UserT>create())
+              .apply(
+                  "WriteSpilled",
+                  ParDo.of(new WriteShardsIntoTempFilesFn()).withSideInputs(sideInputs))
+              .setCoder(fileResultCoder)
+              .apply(
+                  "DropShardNum",
+                  ParDo.of(
+                      new DoFn<FileResult<DestinationT>, FileResult<DestinationT>>() {
+                        @ProcessElement
+                        public void process(ProcessContext c) {
+                          c.output(c.element().withShard(UNKNOWN_SHARDNUM));
+                        }
+                      }));
+      return PCollectionList.of(writtenBundleFiles)
+          .and(writtenSpilledFiles)
+          .apply(Flatten.<FileResult<DestinationT>>pCollections())
+          .setCoder(fileResultCoder);
+    }
   }
 
   /**
    * Writes all the elements in a bundle using a {@link Writer} produced by the {@link
    * WriteOperation} associated with the {@link FileBasedSink}.
    */
-  private class WriteBundles extends DoFn<UserT, FileResult<DestinationT>> {
+  private class WriteUnshardedTempFilesWithSpillingFn
+      extends DoFn<UserT, FileResult<DestinationT>> {
     private final TupleTag<KV<ShardedKey<Integer>, UserT>> unwrittenRecordsTag;
     private final Coder<DestinationT> destinationCoder;
 
@@ -391,7 +488,7 @@ public class WriteFiles<UserT, DestinationT, OutputT>
 
     private int spilledShardNum = UNKNOWN_SHARDNUM;
 
-    WriteBundles(
+    WriteUnshardedTempFilesWithSpillingFn(
         TupleTag<KV<ShardedKey<Integer>, UserT>> unwrittenRecordsTag,
         Coder<DestinationT> destinationCoder) {
       this.unwrittenRecordsTag = unwrittenRecordsTag;
@@ -406,14 +503,14 @@ public class WriteFiles<UserT, DestinationT, OutputT>
 
     @ProcessElement
     public void processElement(ProcessContext c, BoundedWindow window) throws Exception {
-      sink.getDynamicDestinations().setSideInputAccessorFromProcessContext(c);
+      getDynamicDestinations().setSideInputAccessorFromProcessContext(c);
       PaneInfo paneInfo = c.pane();
       // If we are doing windowed writes, we need to ensure that we have separate files for
       // data in different windows/panes. Similar for dynamic writes, make sure that different
       // destinations go to different writers.
       // In the case of unwindowed writes, the window and the pane will always be the same, and
       // the map will only have a single element.
-      DestinationT destination = sink.getDynamicDestinations().getDestination(c.element());
+      DestinationT destination = getDynamicDestinations().getDestination(c.element());
       WriterKey<DestinationT> key = new WriterKey<>(window, c.pane(), destination);
       Writer<DestinationT, OutputT> writer = writers.get(key);
       if (writer == null) {
@@ -444,7 +541,7 @@ public class WriteFiles<UserT, DestinationT, OutputT>
           return;
         }
       }
-      writeOrClose(writer, getSink().getDynamicDestinations().formatRecord(c.element()));
+      writeOrClose(writer, getDynamicDestinations().formatRecord(c.element()));
     }
 
     @FinishBundle
@@ -468,64 +565,6 @@ public class WriteFiles<UserT, DestinationT, OutputT>
             window);
       }
     }
-
-    @Override
-    public void populateDisplayData(DisplayData.Builder builder) {
-      builder.delegate(WriteFiles.this);
-    }
-  }
-
-  /*
-   * Like {@link WriteBundles}, but where the elements for each shard have been collected into a
-   * single iterable.
-   */
-  private class WriteShardedBundles
-      extends DoFn<KV<ShardedKey<Integer>, Iterable<UserT>>, FileResult<DestinationT>> {
-    @ProcessElement
-    public void processElement(ProcessContext c, BoundedWindow window) throws Exception {
-      sink.getDynamicDestinations().setSideInputAccessorFromProcessContext(c);
-      // Since we key by a 32-bit hash of the destination, there might be multiple destinations
-      // in this iterable. The number of destinations is generally very small (1000s or less), so
-      // there will rarely be hash collisions.
-      Map<DestinationT, Writer<DestinationT, OutputT>> writers = Maps.newHashMap();
-      for (UserT input : c.element().getValue()) {
-        DestinationT destination = sink.getDynamicDestinations().getDestination(input);
-        Writer<DestinationT, OutputT> writer = writers.get(destination);
-        if (writer == null) {
-          String uuid = UUID.randomUUID().toString();
-          LOG.info(
-              "Opening writer {} for window {} pane {} destination {}",
-              uuid,
-              window,
-              c.pane(),
-              destination);
-          writer = writeOperation.createWriter();
-          writer.open(uuid, destination);
-          writers.put(destination, writer);
-        }
-        writeOrClose(writer, getSink().getDynamicDestinations().formatRecord(input));
-      }
-
-      // Close all writers.
-      for (Map.Entry<DestinationT, Writer<DestinationT, OutputT>> entry : writers.entrySet()) {
-        Writer<DestinationT, OutputT> writer = entry.getValue();
-        try {
-          // Close the writer; if this throws let the error propagate.
-          writer.close();
-        } catch (Exception e) {
-          // If anything goes wrong, make sure to delete the temporary file.
-          writer.cleanup();
-          throw e;
-        }
-        int shard = c.element().getKey().getShardNumber();
-        c.output(new FileResult<>(writer.getOutputFile(), shard, window, c.pane(), entry.getKey()));
-      }
-    }
-
-    @Override
-    public void populateDisplayData(DisplayData.Builder builder) {
-      builder.delegate(WriteFiles.this);
-    }
   }
 
   private static <DestinationT, OutputT> void writeOrClose(
@@ -549,21 +588,90 @@ public class WriteFiles<UserT, DestinationT, OutputT>
     }
   }
 
-  private class ApplyShardingKey extends DoFn<UserT, KV<ShardedKey<Integer>, UserT>> {
+  private static class WriterKey<DestinationT> {
+    private final BoundedWindow window;
+    private final PaneInfo paneInfo;
+    private final DestinationT destination;
+
+    WriterKey(BoundedWindow window, PaneInfo paneInfo, DestinationT destination) {
+      this.window = window;
+      this.paneInfo = paneInfo;
+      this.destination = destination;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (!(o instanceof WriterKey)) {
+        return false;
+      }
+      WriterKey other = (WriterKey) o;
+      return Objects.equal(window, other.window)
+          && Objects.equal(paneInfo, other.paneInfo)
+          && Objects.equal(destination, other.destination);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hashCode(window, paneInfo, destination);
+    }
+  }
+
+  // Hash the destination in a manner that we can then use as a key in a GBK. Since Java's
+  // hashCode isn't guaranteed to be stable across machines, we instead serialize the destination
+  // and use murmur3_32 to hash it. We enforce that destinationCoder must be deterministic, so
+  // this can be used as a key.
+  private static <DestinationT> int hashDestination(
+      DestinationT destination, Coder<DestinationT> destinationCoder) throws IOException {
+    return Hashing.murmur3_32()
+        .hashBytes(CoderUtils.encodeToByteArray(destinationCoder, destination))
+        .asInt();
+  }
+
+  private class WriteShardedBundlesToTempFiles
+      extends PTransform<PCollection<UserT>, PCollection<FileResult<DestinationT>>> {
+    private final Coder<DestinationT> destinationCoder;
+    private final Coder<FileResult<DestinationT>> fileResultCoder;
+    private final PCollectionView<Integer> numShardsView;
+
+    private WriteShardedBundlesToTempFiles(
+        Coder<DestinationT> destinationCoder,
+        Coder<FileResult<DestinationT>> fileResultCoder,
+        PCollectionView<Integer> numShardsView) {
+      this.destinationCoder = destinationCoder;
+      this.fileResultCoder = fileResultCoder;
+      this.numShardsView = numShardsView;
+    }
+
+    @Override
+    public PCollection<FileResult<DestinationT>> expand(PCollection<UserT> input) {
+      return input
+          .apply(
+              "ApplyShardingKey",
+              ParDo.of(new ApplyShardingKeyFn(numShardsView, destinationCoder))
+                  .withSideInputs(
+                      numShardsView == null
+                          ? ImmutableList.<PCollectionView<Integer>>of()
+                          : ImmutableList.of(numShardsView)))
+          .setCoder(KvCoder.of(ShardedKeyCoder.of(VarIntCoder.of()), input.getCoder()))
+          .apply("GroupIntoShards", GroupByKey.<ShardedKey<Integer>, UserT>create())
+          .apply(
+              "WriteShardsIntoTempFiles",
+              ParDo.of(new WriteShardsIntoTempFilesFn()).withSideInputs(sideInputs))
+          .setCoder(fileResultCoder);
+    }
+  }
+
+  private class ApplyShardingKeyFn extends DoFn<UserT, KV<ShardedKey<Integer>, UserT>> {
     private final @Nullable PCollectionView<Integer> numShardsView;
-    private final ValueProvider<Integer> numShardsProvider;
     private final Coder<DestinationT> destinationCoder;
 
     private int shardNumber;
 
-    ApplyShardingKey(
-        PCollectionView<Integer> numShardsView,
-        ValueProvider<Integer> numShardsProvider,
-        Coder<DestinationT> destinationCoder) {
-      this.destinationCoder = destinationCoder;
+    ApplyShardingKeyFn(
+        @Nullable PCollectionView<Integer> numShardsView, Coder<DestinationT> destinationCoder) {
       this.numShardsView = numShardsView;
-      this.numShardsProvider = numShardsProvider;
-      shardNumber = UNKNOWN_SHARDNUM;
+      this.destinationCoder = destinationCoder;
+      this.shardNumber = UNKNOWN_SHARDNUM;
     }
 
     @ProcessElement
@@ -595,7 +703,7 @@ public class WriteFiles<UserT, DestinationT, OutputT>
       // the destinations. This does mean that multiple destinations might end up on the same shard,
       // however the number of collisions should be small, so there's no need to worry about memory
       // issues.
-      DestinationT destination = sink.getDynamicDestinations().getDestination(context.element());
+      DestinationT destination = getDynamicDestinations().getDestination(context.element());
       context.output(
           KV.of(
               ShardedKey.of(hashDestination(destination, destinationCoder), shardNumber),
@@ -603,168 +711,86 @@ public class WriteFiles<UserT, DestinationT, OutputT>
     }
   }
 
-  private static <DestinationT>
-      Multimap<KV<DestinationT, BoundedWindow>, FileResult<DestinationT>>
-          groupByDestinationAndWindow(Iterable<FileResult<DestinationT>> results) {
-    Multimap<KV<DestinationT, BoundedWindow>, FileResult<DestinationT>> res =
-        ArrayListMultimap.create();
-    for (FileResult<DestinationT> result : results) {
-      res.put(KV.of(result.getDestination(), result.getWindow()), result);
+  private class WriteShardsIntoTempFilesFn
+      extends DoFn<KV<ShardedKey<Integer>, Iterable<UserT>>, FileResult<DestinationT>> {
+    @ProcessElement
+    public void processElement(ProcessContext c, BoundedWindow window) throws Exception {
+      getDynamicDestinations().setSideInputAccessorFromProcessContext(c);
+      // Since we key by a 32-bit hash of the destination, there might be multiple destinations
+      // in this iterable. The number of destinations is generally very small (1000s or less), so
+      // there will rarely be hash collisions.
+      Map<DestinationT, Writer<DestinationT, OutputT>> writers = Maps.newHashMap();
+      for (UserT input : c.element().getValue()) {
+        DestinationT destination = getDynamicDestinations().getDestination(input);
+        Writer<DestinationT, OutputT> writer = writers.get(destination);
+        if (writer == null) {
+          String uuid = UUID.randomUUID().toString();
+          LOG.info(
+              "Opening writer {} for window {} pane {} destination {}",
+              uuid,
+              window,
+              c.pane(),
+              destination);
+          writer = writeOperation.createWriter();
+          writer.open(uuid, destination);
+          writers.put(destination, writer);
+        }
+        writeOrClose(writer, getDynamicDestinations().formatRecord(input));
+      }
+
+      // Close all writers.
+      for (Map.Entry<DestinationT, Writer<DestinationT, OutputT>> entry : writers.entrySet()) {
+        Writer<DestinationT, OutputT> writer = entry.getValue();
+        try {
+          // Close the writer; if this throws let the error propagate.
+          writer.close();
+        } catch (Exception e) {
+          // If anything goes wrong, make sure to delete the temporary file.
+          writer.cleanup();
+          throw e;
+        }
+        int shard = c.element().getKey().getShardNumber();
+        c.output(new FileResult<>(writer.getOutputFile(), shard, window, c.pane(), entry.getKey()));
+      }
     }
-    return res;
   }
 
-  /**
-   * A write is performed as sequence of three {@link ParDo}'s.
-   *
-   * <p>This singleton collection containing the WriteOperation is then used as a side input to a
-   * ParDo over the PCollection of elements to write. In this bundle-writing phase, {@link
-   * WriteOperation#createWriter} is called to obtain a {@link Writer}. {@link Writer#open} and
-   * {@link Writer#close} are called in {@link DoFn.StartBundle} and {@link DoFn.FinishBundle},
-   * respectively, and {@link Writer#write} method is called for every element in the bundle. The
-   * output of this ParDo is a PCollection of <i>writer result</i> objects (see {@link
-   * FileBasedSink} for a description of writer results)-one for each bundle.
-   *
-   * <p>The final do-once ParDo uses a singleton collection asinput and the collection of writer
-   * results as a side-input. In this ParDo, {@link WriteOperation#finalizeDestination} is called to finalize
-   * the write.
-   *
-   * <p>If the write of any element in the PCollection fails, {@link Writer#close} will be called
-   * before the exception that caused the write to fail is propagated and the write result will be
-   * discarded.
-   *
-   * <p>Since the {@link WriteOperation} is serialized after the initialization ParDo and
-   * deserialized in the bundle-writing and finalization phases, any state change to the
-   * WriteOperation object that occurs during initialization is visible in the latter phases.
-   * However, the WriteOperation is not serialized after the bundle-writing phase. This is why
-   * implementations should guarantee that {@link WriteOperation#createWriter} does not mutate
-   * WriteOperation).
-   */
-  private WriteFilesResult<DestinationT> createWrite(PCollection<UserT> input) {
-    Pipeline p = input.getPipeline();
+  private class FinalizeTempFileBundles
+      extends PTransform<
+          PCollection<Iterable<FileResult<DestinationT>>>, WriteFilesResult<DestinationT>> {
+    @Nullable private final PCollectionView<Integer> numShardsView;
+    private final Coder<DestinationT> destinationCoder;
 
-    if (!windowedWrites) {
-      // Re-window the data into the global window and remove any existing triggers.
-      input =
-          input.apply(
-              Window.<UserT>into(new GlobalWindows())
-                  .triggering(DefaultTrigger.of())
-                  .discardingFiredPanes());
+    private FinalizeTempFileBundles(
+        @Nullable PCollectionView<Integer> numShardsView, Coder<DestinationT> destinationCoder) {
+      this.numShardsView = numShardsView;
+      this.destinationCoder = destinationCoder;
     }
 
-    final FileBasedSink.DynamicDestinations<?, DestinationT, OutputT> destinations =
-        writeOperation.getSink().getDynamicDestinations();
-
-    // Perform the per-bundle writes as a ParDo on the input PCollection (with the
-    // WriteOperation as a side input) and collect the results of the writes in a
-    // PCollection. There is a dependency between this ParDo and the first (the
-    // WriteOperation PCollection as a side input), so this will happen after the
-    // initial ParDo.
-    final PCollectionView<Integer> numShardsView =
-        (computeNumShards == null) ? null : input.apply(computeNumShards);
-    List<PCollectionView<Integer>> shardingSideInputs = numShardsView == null
-        ? ImmutableList.<PCollectionView<Integer>>of()
-        : ImmutableList.of(numShardsView);
+    @Override
+    public WriteFilesResult<DestinationT> expand(
+        PCollection<Iterable<FileResult<DestinationT>>> input) {
 
-    @SuppressWarnings("unchecked")
-    Coder<BoundedWindow> shardedWindowCoder =
-        (Coder<BoundedWindow>) input.getWindowingStrategy().getWindowFn().windowCoder();
-    final Coder<DestinationT> destinationCoder;
-    try {
-      destinationCoder =
-          destinations.getDestinationCoderWithDefault(input.getPipeline().getCoderRegistry());
-      destinationCoder.verifyDeterministic();
-    } catch (CannotProvideCoderException | NonDeterministicException e) {
-      throw new RuntimeException(e);
-    }
-    final FileResultCoder<DestinationT> fileResultCoder =
-        FileResultCoder.of(shardedWindowCoder, destinationCoder);
-
-    PCollection<FileResult<DestinationT>> results;
-    if (computeNumShards == null && numShardsProvider == null) {
-      TupleTag<FileResult<DestinationT>> writtenRecordsTag =
-          new TupleTag<>("writtenRecordsTag");
-      TupleTag<KV<ShardedKey<Integer>, UserT>> spilledRecordsTag =
-          new TupleTag<>("spilledRecordsTag");
-      String writeName = windowedWrites ? "WriteWindowedBundles" : "WriteBundles";
-      PCollectionTuple writeTuple =
-          input.apply(
-              writeName,
-              ParDo.of(new WriteBundles(spilledRecordsTag, destinationCoder))
-                  .withSideInputs(sideInputs)
-                  .withOutputTags(writtenRecordsTag, TupleTagList.of(spilledRecordsTag)));
-      PCollection<FileResult<DestinationT>> writtenBundleFiles =
-          writeTuple.get(writtenRecordsTag).setCoder(fileResultCoder);
-      // Any "spilled" elements are written using WriteShardedBundles. Assign shard numbers in
-      // finalize to stay consistent with what WriteWindowedBundles does.
-      PCollection<FileResult<DestinationT>> writtenSpilledFiles =
-          writeTuple
-              .get(spilledRecordsTag)
-              .setCoder(KvCoder.of(ShardedKeyCoder.of(VarIntCoder.of()), input.getCoder()))
-              // Here we group by a synthetic shard number in the range [0, spill factor),
-              // just for the sake of getting some parallelism within each destination when
-              // writing the spilled records, whereas the non-spilled records don't have a shard
-              // number assigned at all. Drop the shard number on the spilled records so that
-              // shard numbers are assigned together to both the spilled and non-spilled files in
-              // finalize.
-              .apply("GroupSpilled", GroupByKey.<ShardedKey<Integer>, UserT>create())
-              .apply(
-                  "WriteSpilled", ParDo.of(new WriteShardedBundles()).withSideInputs(sideInputs))
-              .setCoder(fileResultCoder)
-              .apply("DropShardNum", ParDo.of(
-                  new DoFn<FileResult<DestinationT>, FileResult<DestinationT>>() {
-                    @ProcessElement
-                    public void process(ProcessContext c) {
-                      c.output(c.element().withShard(UNKNOWN_SHARDNUM));
-                    }
-                  }));
-      results =
-          PCollectionList.of(writtenBundleFiles)
-              .and(writtenSpilledFiles)
-              .apply(Flatten.<FileResult<DestinationT>>pCollections());
-    } else {
-      results =
+      List<PCollectionView<?>> finalizeSideInputs = Lists.newArrayList(sideInputs);
+      if (numShardsView != null) {
+        finalizeSideInputs.add(numShardsView);
+      }
+      PCollection<KV<DestinationT, String>> outputFilenames =
           input
-              .apply(
-                  "ApplyShardLabel",
-                  ParDo.of(new ApplyShardingKey(numShardsView, numShardsProvider, destinationCoder))
-                      .withSideInputs(shardingSideInputs))
-              .setCoder(KvCoder.of(ShardedKeyCoder.of(VarIntCoder.of()), input.getCoder()))
-              .apply("GroupIntoShards", GroupByKey.<ShardedKey<Integer>, UserT>create())
-              .apply(
-                  "WriteShardedBundles",
-                  ParDo.of(new WriteShardedBundles()).withSideInputs(this.sideInputs));
-    }
-    results.setCoder(fileResultCoder);
+              .apply("Finalize", ParDo.of(new FinalizeFn()).withSideInputs(finalizeSideInputs))
+              .setCoder(KvCoder.of(destinationCoder, StringUtf8Coder.of()));
 
-    PCollection<Iterable<FileResult<DestinationT>>> fileResultBundles;
-    if (windowedWrites) {
-      // Reshuffle the results to make them stable against retries.
-      // Use a single void key to maximize size of bundles for finalization.
-      PCollection<FileResult<DestinationT>> stableResults = results
-          .apply("Add void key", WithKeys.<Void, FileResult<DestinationT>>of((Void) null))
-          .apply("Reshuffle", Reshuffle.<Void, FileResult<DestinationT>>of())
-          .apply("Drop key", Values.<FileResult<DestinationT>>create());
-      fileResultBundles =
-          stableResults
-              .apply(
-                  "Gather bundles",
-                  ParDo.of(new GatherBundlesPerWindowFn<FileResult<DestinationT>>()))
-              .setCoder(IterableCoder.of(fileResultCoder));
-    } else {
-      // Pass results via a side input rather than reshuffle, because we need to get an empty
-      // iterable to finalize if there are no results.
-      fileResultBundles =
-          p.apply(
-              Reify.viewInGlobalWindow(
-                  results.apply(View.<FileResult<DestinationT>>asIterable()),
-                  IterableCoder.of(fileResultCoder)));
+      TupleTag<KV<DestinationT, String>> perDestinationOutputFilenamesTag =
+          new TupleTag<>("perDestinationOutputFilenames");
+      return WriteFilesResult.in(
+          input.getPipeline(), perDestinationOutputFilenamesTag, outputFilenames);
     }
 
-    class FinalizeFn extends DoFn<Iterable<FileResult<DestinationT>>, KV<DestinationT, String>> {
+    private class FinalizeFn
+        extends DoFn<Iterable<FileResult<DestinationT>>, KV<DestinationT, String>> {
       @ProcessElement
       public void process(ProcessContext c) throws Exception {
-        writeOperation.getSink().getDynamicDestinations().setSideInputAccessorFromProcessContext(c);
+        getDynamicDestinations().setSideInputAccessorFromProcessContext(c);
         @Nullable Integer fixedNumShards;
         if (numShardsView != null) {
           fixedNumShards = c.sideInput(numShardsView);
@@ -776,11 +802,11 @@ public class WriteFiles<UserT, DestinationT, OutputT>
         }
         List<FileResult<DestinationT>> fileResults = Lists.newArrayList(c.element());
         LOG.info("Finalizing {} file results", fileResults.size());
-        DestinationT defaultDest = destinations.getDefaultDestination();
+        DestinationT defaultDest = getDynamicDestinations().getDefaultDestination();
         List<KV<FileResult<DestinationT>, ResourceId>> resultsToFinalFilenames =
             fileResults.isEmpty()
                 ? writeOperation.finalizeDestination(
-                defaultDest, GlobalWindow.INSTANCE, fixedNumShards, fileResults)
+                    defaultDest, GlobalWindow.INSTANCE, fixedNumShards, fileResults)
                 : finalizeAllDestinations(fileResults, fixedNumShards);
         for (KV<FileResult<DestinationT>, ResourceId> entry : resultsToFinalFilenames) {
           FileResult<DestinationT> res = entry.getKey();
@@ -789,18 +815,6 @@ public class WriteFiles<UserT, DestinationT, OutputT>
         writeOperation.moveToOutputFiles(resultsToFinalFilenames);
       }
     }
-
-    List<PCollectionView<?>> sideInputs =
-        FluentIterable.concat(this.sideInputs, shardingSideInputs).toList();
-    PCollection<KV<DestinationT, String>> outputFilenames =
-        fileResultBundles
-            .apply("Finalize", ParDo.of(new FinalizeFn()).withSideInputs(sideInputs))
-            .setCoder(KvCoder.of(destinationCoder, StringUtf8Coder.of()));
-
-    TupleTag<KV<DestinationT, String>> perDestinationOutputFilenamesTag =
-        new TupleTag<>("perDestinationOutputFilenames");
-    return WriteFilesResult.in(
-        input.getPipeline(), perDestinationOutputFilenamesTag, outputFilenames);
   }
 
   private List<KV<FileResult<DestinationT>, ResourceId>> finalizeAllDestinations(

-- 
To stop receiving notification emails like this one, please contact
"commits@beam.apache.org" <co...@beam.apache.org>.

[beam] 04/13: remove ShardAssignment

Posted by jk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jkff pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 54eacf4b79993f40b9034bf429e387faeffdbdba
Author: Eugene Kirpichov <ki...@google.com>
AuthorDate: Wed Nov 15 18:58:28 2017 -0800

    remove ShardAssignment
---
 .../java/org/apache/beam/sdk/io/WriteFiles.java    | 118 +++++++++------------
 1 file changed, 48 insertions(+), 70 deletions(-)

diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
index 19457e6..28ac1a5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
@@ -24,6 +24,7 @@ import static com.google.common.base.Preconditions.checkState;
 
 import com.google.common.base.Objects;
 import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
@@ -478,19 +479,12 @@ public class WriteFiles<UserT, DestinationT, OutputT>
     }
   }
 
-  enum ShardAssignment { ASSIGN_IN_FINALIZE, ASSIGN_WHEN_WRITING }
-
   /*
    * Like {@link WriteBundles}, but where the elements for each shard have been collected into a
    * single iterable.
    */
   private class WriteShardedBundles
       extends DoFn<KV<ShardedKey<Integer>, Iterable<UserT>>, FileResult<DestinationT>> {
-    ShardAssignment shardNumberAssignment;
-    WriteShardedBundles(ShardAssignment shardNumberAssignment) {
-      this.shardNumberAssignment = shardNumberAssignment;
-    }
-
     @ProcessElement
     public void processElement(ProcessContext c, BoundedWindow window) throws Exception {
       sink.getDynamicDestinations().setSideInputAccessorFromProcessContext(c);
@@ -527,13 +521,8 @@ public class WriteFiles<UserT, DestinationT, OutputT>
           writer.cleanup();
           throw e;
         }
-        int shardNumber =
-            shardNumberAssignment == ShardAssignment.ASSIGN_WHEN_WRITING
-                ? c.element().getKey().getShardNumber()
-                : UNKNOWN_SHARDNUM;
-        c.output(
-            new FileResult<>(
-                writer.getOutputFile(), shardNumber, window, c.pane(), entry.getKey()));
+        int shard = c.element().getKey().getShardNumber();
+        c.output(new FileResult<>(writer.getOutputFile(), shard, window, c.pane(), entry.getKey()));
       }
     }
 
@@ -672,8 +661,11 @@ public class WriteFiles<UserT, DestinationT, OutputT>
     // PCollection. There is a dependency between this ParDo and the first (the
     // WriteOperation PCollection as a side input), so this will happen after the
     // initial ParDo.
-    PCollection<FileResult<DestinationT>> results;
-    final PCollectionView<Integer> numShardsView;
+    PCollectionView<Integer> numShardsView =
+        (computeNumShards == null) ? null : input.apply(computeNumShards);
+    List<PCollectionView<Integer>> shardingSideInputs = numShardsView == null
+        ? ImmutableList.<PCollectionView<Integer>>of()
+        : ImmutableList.of(numShardsView);
     @SuppressWarnings("unchecked")
     Coder<BoundedWindow> shardedWindowCoder =
         (Coder<BoundedWindow>) input.getWindowingStrategy().getWindowFn().windowCoder();
@@ -686,74 +678,65 @@ public class WriteFiles<UserT, DestinationT, OutputT>
     } catch (CannotProvideCoderException | NonDeterministicException e) {
       throw new RuntimeException(e);
     }
+    FileResultCoder<DestinationT> fileResultCoder =
+        FileResultCoder.of(shardedWindowCoder, destinationCoder);
 
+    PCollection<FileResult<DestinationT>> results;
     if (computeNumShards == null && numShardsProvider == null) {
-      numShardsView = null;
       TupleTag<FileResult<DestinationT>> writtenRecordsTag =
           new TupleTag<>("writtenRecordsTag");
-      TupleTag<KV<ShardedKey<Integer>, UserT>> unwrittedRecordsTag =
-          new TupleTag<>("unwrittenRecordsTag");
+      TupleTag<KV<ShardedKey<Integer>, UserT>> spilledRecordsTag =
+          new TupleTag<>("spilledRecordsTag");
       String writeName = windowedWrites ? "WriteWindowedBundles" : "WriteBundles";
       PCollectionTuple writeTuple =
           input.apply(
               writeName,
-              ParDo.of(new WriteBundles(unwrittedRecordsTag, destinationCoder))
+              ParDo.of(new WriteBundles(spilledRecordsTag, destinationCoder))
                   .withSideInputs(sideInputs)
-                  .withOutputTags(writtenRecordsTag, TupleTagList.of(unwrittedRecordsTag)));
+                  .withOutputTags(writtenRecordsTag, TupleTagList.of(spilledRecordsTag)));
       PCollection<FileResult<DestinationT>> writtenBundleFiles =
-          writeTuple
-              .get(writtenRecordsTag)
-              .setCoder(FileResultCoder.of(shardedWindowCoder, destinationCoder));
+          writeTuple.get(writtenRecordsTag).setCoder(fileResultCoder);
       // Any "spilled" elements are written using WriteShardedBundles. Assign shard numbers in
       // finalize to stay consistent with what WriteWindowedBundles does.
-      PCollection<FileResult<DestinationT>> writtenGroupedFiles =
+      PCollection<FileResult<DestinationT>> writtenSpilledFiles =
           writeTuple
-              .get(unwrittedRecordsTag)
+              .get(spilledRecordsTag)
               .setCoder(KvCoder.of(ShardedKeyCoder.of(VarIntCoder.of()), input.getCoder()))
-              .apply("GroupUnwritten", GroupByKey.<ShardedKey<Integer>, UserT>create())
+              // Here we group by a synthetic shard number in the range [0, spill factor),
+              // just for the sake of getting some parallelism within each destination when
+              // writing the spilled records, whereas the non-spilled records don't have a shard
+              // number assigned at all. Drop the shard number on the spilled records so that
+              // shard numbers are assigned together to both the spilled and non-spilled files in
+              // finalize.
+              .apply("GroupSpilled", GroupByKey.<ShardedKey<Integer>, UserT>create())
               .apply(
-                  "WriteUnwritten",
-                  ParDo.of(new WriteShardedBundles(ShardAssignment.ASSIGN_IN_FINALIZE))
-                      .withSideInputs(sideInputs))
-              .setCoder(FileResultCoder.of(shardedWindowCoder, destinationCoder));
+                  "WriteSpilled", ParDo.of(new WriteShardedBundles()).withSideInputs(sideInputs))
+              .setCoder(fileResultCoder)
+              .apply("DropShardNum", ParDo.of(
+                  new DoFn<FileResult<DestinationT>, FileResult<DestinationT>>() {
+                    @ProcessElement
+                    public void process(ProcessContext c) {
+                      c.output(c.element().withShard(UNKNOWN_SHARDNUM));
+                    }
+                  }));
       results =
           PCollectionList.of(writtenBundleFiles)
-              .and(writtenGroupedFiles)
+              .and(writtenSpilledFiles)
               .apply(Flatten.<FileResult<DestinationT>>pCollections());
     } else {
-      List<PCollectionView<?>> shardingSideInputs = Lists.newArrayList();
-      if (computeNumShards != null) {
-        numShardsView = input.apply(computeNumShards);
-        shardingSideInputs.add(numShardsView);
-      } else {
-        numShardsView = null;
-      }
-      PCollection<KV<ShardedKey<Integer>, Iterable<UserT>>> sharded =
+      results =
           input
               .apply(
                   "ApplyShardLabel",
-                  ParDo.of(
-                          new ApplyShardingKey(
-                              numShardsView,
-                              (numShardsView != null) ? null : numShardsProvider,
-                              destinationCoder))
+                  ParDo.of(new ApplyShardingKey(numShardsView, numShardsProvider, destinationCoder))
                       .withSideInputs(shardingSideInputs))
               .setCoder(KvCoder.of(ShardedKeyCoder.of(VarIntCoder.of()), input.getCoder()))
-              .apply("GroupIntoShards", GroupByKey.<ShardedKey<Integer>, UserT>create());
-      shardedWindowCoder =
-          (Coder<BoundedWindow>) sharded.getWindowingStrategy().getWindowFn().windowCoder();
-      // Since this path might be used by streaming runners processing triggers, it's important
-      // to assign shard numbers here so that they are deterministic. The ASSIGN_IN_FINALIZE
-      // strategy works by sorting all FileResult objects and assigning them numbers, which is not
-      // guaranteed to work well when processing triggers - if the finalize step retries it might
-      // see a different Iterable of FileResult objects, and it will assign different shard numbers.
-      results =
-          sharded.apply(
-              "WriteShardedBundles",
-              ParDo.of(new WriteShardedBundles(ShardAssignment.ASSIGN_WHEN_WRITING))
-                  .withSideInputs(sideInputs));
+              .apply("GroupIntoShards", GroupByKey.<ShardedKey<Integer>, UserT>create())
+              .apply(
+                  "WriteShardedBundles",
+                  ParDo.of(new WriteShardedBundles()).withSideInputs(this.sideInputs));
     }
-    results.setCoder(FileResultCoder.of(shardedWindowCoder, destinationCoder));
+    results.setCoder(fileResultCoder);
 
     PCollection<KV<DestinationT, String>> outputFilenames;
     if (windowedWrites) {
@@ -773,7 +756,7 @@ public class WriteFiles<UserT, DestinationT, OutputT>
               .apply(
                   "FinalizeWindowed",
                   ParDo.of(
-                          new FinalizeWindowedFn<DestinationT>(
+                          new FinalizeWindowedFn<>(
                               numShardsView, numShardsProvider, writeOperation))
                       .withSideInputs(
                           numShardsView == null
@@ -783,12 +766,6 @@ public class WriteFiles<UserT, DestinationT, OutputT>
     } else {
       final PCollectionView<Iterable<FileResult<DestinationT>>> resultsView =
           results.apply(View.<FileResult<DestinationT>>asIterable());
-      ImmutableList.Builder<PCollectionView<?>> finalizeSideInputs =
-          ImmutableList.<PCollectionView<?>>builder().add(resultsView);
-      if (numShardsView != null) {
-        finalizeSideInputs.add(numShardsView);
-      }
-      finalizeSideInputs.addAll(sideInputs);
 
       // Finalize the write in another do-once ParDo on the singleton collection containing the
       // Writer. The results from the per-bundle writes are given as an Iterable side input.
@@ -806,16 +783,17 @@ public class WriteFiles<UserT, DestinationT, OutputT>
                   ParDo.of(
                           new FinalizeUnwindowedFn<>(
                               numShardsView, numShardsProvider, resultsView, writeOperation))
-                      .withSideInputs(finalizeSideInputs.build()))
+                      .withSideInputs(
+                          FluentIterable.concat(sideInputs, shardingSideInputs)
+                              .append(resultsView)
+                              .toList()))
               .setCoder(KvCoder.of(destinationCoder, StringUtf8Coder.of()));
     }
 
     TupleTag<KV<DestinationT, String>> perDestinationOutputFilenamesTag =
         new TupleTag<>("perDestinationOutputFilenames");
     return WriteFilesResult.in(
-        input.getPipeline(),
-        perDestinationOutputFilenamesTag,
-        outputFilenames);
+        input.getPipeline(), perDestinationOutputFilenamesTag, outputFilenames);
   }
 
   private static class FinalizeWindowedFn<DestinationT>

-- 
To stop receiving notification emails like this one, please contact
"commits@beam.apache.org" <co...@beam.apache.org>.

[beam] 03/13: non-null window/pane in FileResult

Posted by jk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jkff pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit b2d0671185fa1bd7f100853c7921e555c84578e7
Author: Eugene Kirpichov <ki...@google.com>
AuthorDate: Wed Nov 15 18:25:14 2017 -0800

    non-null window/pane in FileResult
---
 .../java/org/apache/beam/sdk/io/FileBasedSink.java | 10 ++++--
 .../java/org/apache/beam/sdk/io/WriteFiles.java    | 38 ++++++++++------------
 2 files changed, 24 insertions(+), 24 deletions(-)

diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index 2108253..c8bdbfc 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -655,6 +655,7 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
         checkArgument(
             result.getShard() != UNKNOWN_SHARDNUM, "Should have set shard number on %s", result);
         ResourceId finalFilename = result.getDestinationFile(
+            windowedWrites,
             getSink().getDynamicDestinations(),
             effectiveNumShards,
             getSink().getWritableByteChannelFactory());
@@ -984,7 +985,7 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
   public static final class FileResult<DestinationT> {
     private final ResourceId tempFilename;
     private final int shard;
-    private final @Nullable BoundedWindow window;
+    private final BoundedWindow window;
     private final PaneInfo paneInfo;
     private final DestinationT destination;
 
@@ -992,9 +993,11 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
     public FileResult(
         ResourceId tempFilename,
         int shard,
-        @Nullable BoundedWindow window,
+        BoundedWindow window,
         PaneInfo paneInfo,
         DestinationT destination) {
+      checkArgument(window != null);
+      checkArgument(paneInfo != null);
       this.tempFilename = tempFilename;
       this.shard = shard;
       this.window = window;
@@ -1029,13 +1032,14 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
 
     @Experimental(Kind.FILESYSTEM)
     public ResourceId getDestinationFile(
+        boolean windowedWrites,
         DynamicDestinations<?, DestinationT, ?> dynamicDestinations,
         int numShards,
         OutputFileHints outputFileHints) {
       checkArgument(getShard() != UNKNOWN_SHARDNUM);
       checkArgument(numShards > 0);
       FilenamePolicy policy = dynamicDestinations.getFilenamePolicy(destination);
-      if (getWindow() != null) {
+      if (windowedWrites) {
         return policy.windowedFilename(
             getShard(), numShards, getWindow(), getPaneInfo(), outputFileHints);
       } else {
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
index 35b28a1..19457e6 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
@@ -71,6 +71,7 @@ import org.apache.beam.sdk.transforms.WithKeys;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.Window;
@@ -387,7 +388,6 @@ public class WriteFiles<UserT, DestinationT, OutputT>
   private class WriteBundles extends DoFn<UserT, FileResult<DestinationT>> {
     private final TupleTag<KV<ShardedKey<Integer>, UserT>> unwrittenRecordsTag;
     private final Coder<DestinationT> destinationCoder;
-    private final boolean windowedWrites;
 
     // Initialized in startBundle()
     private @Nullable Map<WriterKey<DestinationT>, Writer<DestinationT, OutputT>> writers;
@@ -395,10 +395,8 @@ public class WriteFiles<UserT, DestinationT, OutputT>
     private int spilledShardNum = UNKNOWN_SHARDNUM;
 
     WriteBundles(
-        boolean windowedWrites,
         TupleTag<KV<ShardedKey<Integer>, UserT>> unwrittenRecordsTag,
         Coder<DestinationT> destinationCoder) {
-      this.windowedWrites = windowedWrites;
       this.unwrittenRecordsTag = unwrittenRecordsTag;
       this.destinationCoder = destinationCoder;
     }
@@ -466,13 +464,11 @@ public class WriteFiles<UserT, DestinationT, OutputT>
           throw e;
         }
         BoundedWindow window = key.window;
-        FileResult<DestinationT> res =
-            windowedWrites
-                ? new FileResult<>(
-                    writer.getOutputFile(), UNKNOWN_SHARDNUM, window, key.paneInfo, key.destination)
-                : new FileResult<>(
-                    writer.getOutputFile(), UNKNOWN_SHARDNUM, null, null, key.destination);
-        c.output(res, window.maxTimestamp(), window);
+        c.output(
+            new FileResult<>(
+                writer.getOutputFile(), UNKNOWN_SHARDNUM, window, key.paneInfo, key.destination),
+            window.maxTimestamp(),
+            window);
       }
     }
 
@@ -535,14 +531,9 @@ public class WriteFiles<UserT, DestinationT, OutputT>
             shardNumberAssignment == ShardAssignment.ASSIGN_WHEN_WRITING
                 ? c.element().getKey().getShardNumber()
                 : UNKNOWN_SHARDNUM;
-        if (windowedWrites) {
-          c.output(
-              new FileResult<>(
-                  writer.getOutputFile(), shardNumber, window, c.pane(), entry.getKey()));
-        } else {
-          c.output(
-              new FileResult<>(writer.getOutputFile(), shardNumber, null, null, entry.getKey()));
-        }
+        c.output(
+            new FileResult<>(
+                writer.getOutputFile(), shardNumber, window, c.pane(), entry.getKey()));
       }
     }
 
@@ -706,7 +697,7 @@ public class WriteFiles<UserT, DestinationT, OutputT>
       PCollectionTuple writeTuple =
           input.apply(
               writeName,
-              ParDo.of(new WriteBundles(windowedWrites, unwrittedRecordsTag, destinationCoder))
+              ParDo.of(new WriteBundles(unwrittedRecordsTag, destinationCoder))
                   .withSideInputs(sideInputs)
                   .withOutputTags(writtenRecordsTag, TupleTagList.of(unwrittedRecordsTag)));
       PCollection<FileResult<DestinationT>> writtenBundleFiles =
@@ -1011,14 +1002,19 @@ public class WriteFiles<UserT, DestinationT, OutputT>
           writer.open(uuid, destination);
           writer.close();
           completeResults.add(
-              new FileResult<>(writer.getOutputFile(), shard, null, null, destination));
+              new FileResult<>(
+                  writer.getOutputFile(),
+                  shard,
+                  GlobalWindow.INSTANCE,
+                  PaneInfo.ON_TIME_AND_ONLY_FIRING,
+                  destination));
         }
         LOG.debug("Done creating extra shards for {}.", destination);
       }
       return
           writeOperation.buildOutputFilenames(
               destination,
-              null,
+              GlobalWindow.INSTANCE,
               (fixedNumShards == null) ? null : completeResults.size(),
               completeResults);
     }

-- 
To stop receiving notification emails like this one, please contact
"commits@beam.apache.org" <co...@beam.apache.org>.

[beam] 13/13: This closes #4145: Many simplifications to WriteFiles

Posted by jk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jkff pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 761ec1af410f0ee153893f6e7082db85d0fdc3e7
Merge: b059664 d314339
Author: Eugene Kirpichov <ki...@google.com>
AuthorDate: Tue Dec 5 16:27:17 2017 -0800

    This closes #4145: Many simplifications to WriteFiles

 .../apache/beam/examples/WindowedWordCount.java    |    5 +-
 .../examples/common/WriteOneFilePerWindow.java     |   12 +-
 .../apache/beam/examples/WindowedWordCountIT.java  |    8 +
 .../beam/runners/apex/examples/WordCountTest.java  |    2 +-
 .../core/construction/WriteFilesTranslation.java   |    7 +-
 .../construction/WriteFilesTranslationTest.java    |   13 +-
 .../beam/runners/dataflow/DataflowRunnerTest.java  |    2 +-
 .../beam/runners/spark/io/AvroPipelineTest.java    |    5 +-
 .../java/org/apache/beam/sdk/io/FileBasedSink.java |  281 +++---
 .../java/org/apache/beam/sdk/io/WriteFiles.java    | 1061 ++++++++------------
 .../java/org/apache/beam/sdk/transforms/Reify.java |   73 +-
 .../apache/beam/sdk/values/TypeDescriptors.java    |    4 +
 .../org/apache/beam/sdk/io/FileBasedSinkTest.java  |   66 +-
 .../org/apache/beam/sdk/io/WriteFilesTest.java     |   27 +-
 14 files changed, 745 insertions(+), 821 deletions(-)

-- 
To stop receiving notification emails like this one, please contact
"commits@beam.apache.org" <co...@beam.apache.org>.

[beam] 05/13: consolidates windowed/unwindowed finalize fns somewhat

Posted by jk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jkff pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 97df5e703d4a891ab63a40b46c4e87d7c373168b
Author: Eugene Kirpichov <ki...@google.com>
AuthorDate: Wed Nov 15 19:51:10 2017 -0800

    consolidates windowed/unwindowed finalize fns somewhat
---
 .../java/org/apache/beam/sdk/io/FileBasedSink.java | 138 +++++++++----
 .../java/org/apache/beam/sdk/io/WriteFiles.java    | 221 ++++++---------------
 .../org/apache/beam/sdk/io/FileBasedSinkTest.java  |  19 +-
 3 files changed, 171 insertions(+), 207 deletions(-)

diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index c8bdbfc..5bc84be 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -28,10 +28,12 @@ import static org.apache.beam.sdk.values.TypeDescriptors.extractFromTypeParamete
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.MoreObjects;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Ordering;
+import com.google.common.collect.Sets;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -39,6 +41,7 @@ import java.io.Serializable;
 import java.nio.channels.WritableByteChannel;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashSet;
@@ -46,6 +49,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.atomic.AtomicLong;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.annotations.Experimental;
@@ -72,6 +76,7 @@ import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.HasDisplayData;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo.PaneInfoCoder;
 import org.apache.beam.sdk.util.MimeTypes;
@@ -103,10 +108,9 @@ import org.slf4j.LoggerFactory;
  *
  * <p>In order to ensure fault-tolerance, a bundle may be executed multiple times (e.g., in the
  * event of failure/retry or for redundancy). However, exactly one of these executions will have its
- * result passed to the finalize method. Each call to {@link Writer#open} or {@link
- * Writer#openUnwindowed} is passed a unique <i>bundle id</i> when it is called by the WriteFiles
- * transform, so even redundant or retried bundles will have a unique way of identifying their
- * output.
+ * result passed to the finalize method. Each call to {@link Writer#open} is passed a unique
+ * <i>bundle id</i> when it is called by the WriteFiles transform, so even redundant or retried
+ * bundles will have a unique way of identifying their output.
  *
  * <p>The bundle id should be used to guarantee that a bundle's output is unique. This uniqueness
  * guarantee is important; if a bundle is to be output to a file, for example, the name of the file
@@ -447,7 +451,7 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
    * written,
    *
    * <ol>
-   *   <li>{@link WriteOperation#finalize} is given a list of the temporary files containing the
+   *   <li>{@link WriteOperation#finalizeDestination} is given a list of the temporary files containing the
    *       output bundles.
    *   <li>During finalize, these temporary files are copied to final output locations and named
    *       according to a file naming template.
@@ -577,17 +581,22 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
      * not be cleaned up. Note that {@link WriteFiles} does attempt clean up files if exceptions
      * are thrown, however there are still some scenarios where temporary files might be left.
      */
-    public void removeTemporaryFiles(Set<ResourceId> filenames) throws IOException {
+    public void removeTemporaryFiles(Collection<ResourceId> filenames) throws IOException {
       removeTemporaryFiles(filenames, !windowedWrites);
     }
 
     @Experimental(Kind.FILESYSTEM)
-    protected final List<KV<FileResult<DestinationT>, ResourceId>> buildOutputFilenames(
+    protected final List<KV<FileResult<DestinationT>, ResourceId>> finalizeDestination(
         @Nullable DestinationT dest,
         @Nullable BoundedWindow window,
         @Nullable Integer numShards,
-        Iterable<FileResult<DestinationT>> writerResults) {
-      for (FileResult<DestinationT> res : writerResults) {
+        Collection<FileResult<DestinationT>> existingResults) throws Exception {
+      Collection<FileResult<DestinationT>> completeResults =
+          windowedWrites
+              ? existingResults
+              : createMissingEmptyShards(dest, numShards, existingResults);
+
+      for (FileResult<DestinationT> res : completeResults) {
         checkArgument(
             Objects.equals(dest, res.getDestination()),
             "File result has wrong destination: expected %s, got %s",
@@ -602,7 +611,7 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
       final int effectiveNumShards;
       if (numShards != null) {
         effectiveNumShards = numShards;
-        for (FileResult<DestinationT> res : writerResults) {
+        for (FileResult<DestinationT> res : completeResults) {
           checkArgument(
               res.getShard() != UNKNOWN_SHARDNUM,
               "Fixed sharding into %s shards was specified, "
@@ -611,8 +620,8 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
               res);
         }
       } else {
-        effectiveNumShards = Iterables.size(writerResults);
-        for (FileResult<DestinationT> res : writerResults) {
+        effectiveNumShards = Iterables.size(completeResults);
+        for (FileResult<DestinationT> res : completeResults) {
           checkArgument(
               res.getShard() == UNKNOWN_SHARDNUM,
               "Runner-chosen sharding was specified, "
@@ -623,7 +632,7 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
 
       List<FileResult<DestinationT>> resultsWithShardNumbers = Lists.newArrayList();
       if (numShards != null) {
-        resultsWithShardNumbers = Lists.newArrayList(writerResults);
+        resultsWithShardNumbers = Lists.newArrayList(completeResults);
       } else {
         checkState(
             !windowedWrites,
@@ -644,7 +653,7 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
                         return firstFilename.compareTo(secondFilename);
                       }
                     })
-                .sortedCopy(writerResults);
+                .sortedCopy(completeResults);
         for (int i = 0; i < sortedByTempFilename.size(); i++) {
           resultsWithShardNumbers.add(sortedByTempFilename.get(i).withShard(i));
         }
@@ -672,10 +681,71 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
       return outputFilenames;
     }
 
+    private Collection<FileResult<DestinationT>> createMissingEmptyShards(
+        @Nullable DestinationT dest,
+        @Nullable Integer numShards,
+        Collection<FileResult<DestinationT>> existingResults)
+        throws Exception {
+      Collection<FileResult<DestinationT>> completeResults;
+      LOG.info("Finalizing for destination {} num shards {}.", dest, existingResults.size());
+      if (numShards != null) {
+        checkArgument(
+            existingResults.size() <= numShards,
+            "Fixed sharding into %s shards was specified, but got %s file results",
+            numShards,
+            existingResults.size());
+      }
+      // We must always output at least 1 shard, and honor user-specified numShards
+      // if set.
+      Set<Integer> missingShardNums;
+      if (numShards == null) {
+        missingShardNums = ImmutableSet.of(UNKNOWN_SHARDNUM);
+      } else {
+        missingShardNums = Sets.newHashSet();
+        for (int i = 0; i < numShards; ++i) {
+          missingShardNums.add(i);
+        }
+        for (FileResult<DestinationT> res : existingResults) {
+          checkArgument(
+              res.getShard() != UNKNOWN_SHARDNUM,
+              "Fixed sharding into %s shards was specified, "
+                  + "but file result %s does not specify a shard",
+              numShards,
+              res);
+          missingShardNums.remove(res.getShard());
+        }
+      }
+      completeResults = Lists.newArrayList(existingResults);
+      if (!missingShardNums.isEmpty()) {
+        LOG.info(
+            "Creating {} empty output shards in addition to {} written for destination {}.",
+            missingShardNums.size(),
+            existingResults.size(),
+            dest);
+        for (int shard : missingShardNums) {
+          String uuid = UUID.randomUUID().toString();
+          LOG.info("Opening empty writer {} for destination {}", uuid, dest);
+          Writer<DestinationT, ?> writer = createWriter();
+          // Currently this code path is only called in the unwindowed case.
+          writer.open(uuid, dest);
+          writer.close();
+          completeResults.add(
+              new FileResult<>(
+                  writer.getOutputFile(),
+                  shard,
+                  GlobalWindow.INSTANCE,
+                  PaneInfo.ON_TIME_AND_ONLY_FIRING,
+                  dest));
+        }
+        LOG.debug("Done creating extra shards for {}.", dest);
+      }
+      return completeResults;
+    }
+
     /**
      * Copy temporary files to final output filenames using the file naming template.
      *
-     * <p>Can be called from subclasses that override {@link WriteOperation#finalize}.
+     * <p>Can be called from subclasses that override {@link WriteOperation#finalizeDestination}.
      *
      * <p>Files will be named according to the {@link FilenamePolicy}. The order of the output files
      * will be the same as the sorted order of the input filenames. In other words (when using
@@ -686,40 +756,38 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
      */
     @VisibleForTesting
     @Experimental(Kind.FILESYSTEM)
-    final void copyToOutputFiles(
+    final void moveToOutputFiles(
         List<KV<FileResult<DestinationT>, ResourceId>> resultsToFinalFilenames) throws IOException {
       int numFiles = resultsToFinalFilenames.size();
-      if (numFiles > 0) {
-        LOG.debug("Copying {} files.", numFiles);
-        List<ResourceId> srcFiles = new ArrayList<>(resultsToFinalFilenames.size());
-        List<ResourceId> dstFiles = new ArrayList<>(resultsToFinalFilenames.size());
-        for (KV<FileResult<DestinationT>, ResourceId> entry : resultsToFinalFilenames) {
-          srcFiles.add(entry.getKey().getTempFilename());
-          dstFiles.add(entry.getValue());
-          LOG.info(
-              "Will copy temporary file {} to final location {}",
-              entry.getKey().getTempFilename(),
-              entry.getValue());
-        }
-        // During a failure case, files may have been deleted in an earlier step. Thus
-        // we ignore missing files here.
-        FileSystems.copy(srcFiles, dstFiles, StandardMoveOptions.IGNORE_MISSING_FILES);
-      } else {
-        LOG.info("No output files to write.");
+      LOG.debug("Copying {} files.", numFiles);
+      List<ResourceId> srcFiles = new ArrayList<>(resultsToFinalFilenames.size());
+      List<ResourceId> dstFiles = new ArrayList<>(resultsToFinalFilenames.size());
+      for (KV<FileResult<DestinationT>, ResourceId> entry : resultsToFinalFilenames) {
+        srcFiles.add(entry.getKey().getTempFilename());
+        dstFiles.add(entry.getValue());
+        LOG.info(
+            "Will copy temporary file {} to final location {}",
+            entry.getKey().getTempFilename(),
+            entry.getValue());
       }
+      // During a failure case, files may have been deleted in an earlier step. Thus
+      // we ignore missing files here.
+      FileSystems.copy(srcFiles, dstFiles, StandardMoveOptions.IGNORE_MISSING_FILES);
+      removeTemporaryFiles(srcFiles);
     }
 
     /**
      * Removes temporary output files. Uses the temporary directory to find files to remove.
      *
-     * <p>Can be called from subclasses that override {@link WriteOperation#finalize}.
+     * <p>Can be called from subclasses that override {@link WriteOperation#finalizeDestination}.
      * <b>Note:</b>If finalize is overridden and does <b>not</b> rename or otherwise finalize
      * temporary files, this method will remove them.
      */
     @VisibleForTesting
     @Experimental(Kind.FILESYSTEM)
     final void removeTemporaryFiles(
-        Set<ResourceId> knownFiles, boolean shouldRemoveTemporaryDirectory) throws IOException {
+        Collection<ResourceId> knownFiles, boolean shouldRemoveTemporaryDirectory)
+        throws IOException {
       ResourceId tempDir = tempDirectory.get();
       LOG.debug("Removing temporary bundle output files in {}.", tempDir);
 
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
index 28ac1a5..9cfabfe 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
@@ -26,18 +26,14 @@ import com.google.common.base.Objects;
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Multimap;
-import com.google.common.collect.Sets;
 import com.google.common.hash.Hashing;
 import java.io.IOException;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
 import javax.annotation.Nullable;
@@ -66,6 +62,7 @@ import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.Reshuffle;
+import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.Values;
 import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.WithKeys;
@@ -630,7 +627,7 @@ public class WriteFiles<UserT, DestinationT, OutputT>
    * FileBasedSink} for a description of writer results)-one for each bundle.
    *
    * <p>The final do-once ParDo uses a singleton collection asinput and the collection of writer
-   * results as a side-input. In this ParDo, {@link WriteOperation#finalize} is called to finalize
+   * results as a side-input. In this ParDo, {@link WriteOperation#finalizeDestination} is called to finalize
    * the write.
    *
    * <p>If the write of any element in the PCollection fails, {@link Writer#close} will be called
@@ -661,11 +658,25 @@ public class WriteFiles<UserT, DestinationT, OutputT>
     // PCollection. There is a dependency between this ParDo and the first (the
     // WriteOperation PCollection as a side input), so this will happen after the
     // initial ParDo.
-    PCollectionView<Integer> numShardsView =
+    final PCollectionView<Integer> numShardsView =
         (computeNumShards == null) ? null : input.apply(computeNumShards);
     List<PCollectionView<Integer>> shardingSideInputs = numShardsView == null
         ? ImmutableList.<PCollectionView<Integer>>of()
         : ImmutableList.of(numShardsView);
+    SerializableFunction<DoFn.ProcessContext, Integer> getFixedNumShards =
+        new SerializableFunction<DoFn.ProcessContext, Integer>() {
+          @Override
+          public Integer apply(DoFn<?, ?>.ProcessContext c) {
+            if (numShardsView != null) {
+              return c.sideInput(numShardsView);
+            } else if (numShardsProvider != null) {
+              return numShardsProvider.get();
+            } else {
+              return null;
+            }
+          }
+        };
+
     @SuppressWarnings("unchecked")
     Coder<BoundedWindow> shardedWindowCoder =
         (Coder<BoundedWindow>) input.getWindowingStrategy().getWindowFn().windowCoder();
@@ -755,16 +766,11 @@ public class WriteFiles<UserT, DestinationT, OutputT>
               .apply(Values.<FileResult<DestinationT>>create())
               .apply(
                   "FinalizeWindowed",
-                  ParDo.of(
-                          new FinalizeWindowedFn<>(
-                              numShardsView, numShardsProvider, writeOperation))
-                      .withSideInputs(
-                          numShardsView == null
-                              ? ImmutableList.<PCollectionView<?>>of()
-                              : ImmutableList.of(numShardsView)))
+                  ParDo.of(new FinalizeWindowedFn<>(getFixedNumShards, writeOperation))
+                      .withSideInputs(shardingSideInputs))
               .setCoder(KvCoder.of(destinationCoder, StringUtf8Coder.of()));
     } else {
-      final PCollectionView<Iterable<FileResult<DestinationT>>> resultsView =
+      PCollectionView<Iterable<FileResult<DestinationT>>> resultsView =
           results.apply(View.<FileResult<DestinationT>>asIterable());
 
       // Finalize the write in another do-once ParDo on the singleton collection containing the
@@ -775,14 +781,13 @@ public class WriteFiles<UserT, DestinationT, OutputT>
       // For the non-windowed case, we guarantee that  if no data is written but the user has
       // set numShards, then all shards will be written out as empty files. For this reason we
       // use a side input here.
-      PCollection<Void> singletonCollection = p.apply(Create.of((Void) null));
       outputFilenames =
-          singletonCollection
+          p.apply(Create.of((Void) null))
               .apply(
                   "FinalizeUnwindowed",
                   ParDo.of(
                           new FinalizeUnwindowedFn<>(
-                              numShardsView, numShardsProvider, resultsView, writeOperation))
+                              getFixedNumShards, resultsView, writeOperation))
                       .withSideInputs(
                           FluentIterable.concat(sideInputs, shardingSideInputs)
                               .append(resultsView)
@@ -798,19 +803,16 @@ public class WriteFiles<UserT, DestinationT, OutputT>
 
   private static class FinalizeWindowedFn<DestinationT>
       extends DoFn<FileResult<DestinationT>, KV<DestinationT, String>> {
-    @Nullable private final PCollectionView<Integer> numShardsView;
-    @Nullable private final ValueProvider<Integer> numShardsProvider;
+    private final SerializableFunction<DoFn.ProcessContext, Integer> getFixedNumShards;
     private final WriteOperation<DestinationT, ?> writeOperation;
 
     @Nullable private transient List<FileResult<DestinationT>> fileResults;
     @Nullable private Integer fixedNumShards;
 
     public FinalizeWindowedFn(
-        @Nullable PCollectionView<Integer> numShardsView,
-        @Nullable ValueProvider<Integer> numShardsProvider,
+        SerializableFunction<DoFn.ProcessContext, Integer> getFixedNumShards,
         WriteOperation<DestinationT, ?> writeOperation) {
-      this.numShardsView = numShardsView;
-      this.numShardsProvider = numShardsProvider;
+      this.getFixedNumShards = getFixedNumShards;
       this.writeOperation = writeOperation;
     }
 
@@ -824,58 +826,37 @@ public class WriteFiles<UserT, DestinationT, OutputT>
     public void processElement(ProcessContext c) {
       fileResults.add(c.element());
       if (fixedNumShards == null) {
-        if (numShardsView != null) {
-          fixedNumShards = c.sideInput(numShardsView);
-        } else if (numShardsProvider != null) {
-          fixedNumShards = numShardsProvider.get();
-        } else {
-          throw new IllegalStateException(
-              "When finalizing a windowed write, should have set fixed sharding");
-        }
+        fixedNumShards = getFixedNumShards.apply(c);
+        checkState(fixedNumShards != null, "Windowed write should have set fixed sharding");
       }
     }
 
     @FinishBundle
     public void finishBundle(FinishBundleContext c) throws Exception {
-      Set<ResourceId> tempFiles = Sets.newHashSet();
-      Multimap<KV<DestinationT, BoundedWindow>, FileResult<DestinationT>> results =
-          groupByDestinationAndWindow(fileResults);
-      List<KV<FileResult<DestinationT>, ResourceId>> resultsToFinalFilenames = Lists.newArrayList();
-      for (Map.Entry<KV<DestinationT, BoundedWindow>, Collection<FileResult<DestinationT>>>
-          destEntry : results.asMap().entrySet()) {
-        DestinationT destination = destEntry.getKey().getKey();
-        BoundedWindow window = destEntry.getKey().getValue();
-        resultsToFinalFilenames.addAll(writeOperation.buildOutputFilenames(
-            destination, window, fixedNumShards, destEntry.getValue()));
-      }
-      LOG.info("Will finalize {} files", resultsToFinalFilenames.size());
+      List<KV<FileResult<DestinationT>, ResourceId>> resultsToFinalFilenames =
+          finalizeAllDestinations(writeOperation, fileResults, fixedNumShards);
       for (KV<FileResult<DestinationT>, ResourceId> entry : resultsToFinalFilenames) {
         FileResult<DestinationT> res = entry.getKey();
-        tempFiles.add(res.getTempFilename());
         c.output(
             KV.of(res.getDestination(), entry.getValue().toString()),
             res.getWindow().maxTimestamp(),
             res.getWindow());
       }
-      writeOperation.copyToOutputFiles(resultsToFinalFilenames);
-      writeOperation.removeTemporaryFiles(tempFiles);
+      writeOperation.moveToOutputFiles(resultsToFinalFilenames);
     }
   }
 
   private static class FinalizeUnwindowedFn<DestinationT>
       extends DoFn<Void, KV<DestinationT, String>> {
-    @Nullable private final PCollectionView<Integer> numShardsView;
-    @Nullable private final ValueProvider<Integer> numShardsProvider;
+    private final SerializableFunction<DoFn.ProcessContext, Integer> getFixedNumShards;
     private final PCollectionView<Iterable<FileResult<DestinationT>>> resultsView;
     private final WriteOperation<DestinationT, ?> writeOperation;
 
     public FinalizeUnwindowedFn(
-        @Nullable PCollectionView<Integer> numShardsView,
-        @Nullable ValueProvider<Integer> numShardsProvider,
+        SerializableFunction<DoFn.ProcessContext, Integer> getFixedNumShards,
         PCollectionView<Iterable<FileResult<DestinationT>>> resultsView,
         WriteOperation<DestinationT, ?> writeOperation) {
-      this.numShardsView = numShardsView;
-      this.numShardsProvider = numShardsProvider;
+      this.getFixedNumShards = getFixedNumShards;
       this.resultsView = resultsView;
       this.writeOperation = writeOperation;
     }
@@ -883,118 +864,40 @@ public class WriteFiles<UserT, DestinationT, OutputT>
     @ProcessElement
     public void processElement(ProcessContext c) throws Exception {
       writeOperation.getSink().getDynamicDestinations().setSideInputAccessorFromProcessContext(c);
-      @Nullable Integer fixedNumShards;
-      if (numShardsView != null) {
-        fixedNumShards = c.sideInput(numShardsView);
-      } else if (numShardsProvider != null) {
-        fixedNumShards = numShardsProvider.get();
-      } else {
-        fixedNumShards = null;
-      }
-      Multimap<DestinationT, FileResult<DestinationT>> resultsByDestMultimap =
-          ArrayListMultimap.create();
-      for (FileResult<DestinationT> result : c.sideInput(resultsView)) {
-        resultsByDestMultimap.put(result.getDestination(), result);
-      }
-      Map<DestinationT, Collection<FileResult<DestinationT>>> resultsByDest =
-          resultsByDestMultimap.asMap();
-      if (resultsByDest.isEmpty()) {
-        Collection<FileResult<DestinationT>> empty = ImmutableList.of();
-        resultsByDest =
-            Collections.singletonMap(
-                writeOperation.getSink().getDynamicDestinations().getDefaultDestination(), empty);
-      }
-      List<KV<FileResult<DestinationT>, ResourceId>> resultsToFinalFilenames = Lists.newArrayList();
-      for (Map.Entry<DestinationT, Collection<FileResult<DestinationT>>>
-          destEntry : resultsByDest.entrySet()) {
-        resultsToFinalFilenames.addAll(
-            finalizeForDestinationFillEmptyShards(
-                destEntry.getKey(), fixedNumShards, destEntry.getValue()));
-      }
-      Set<ResourceId> tempFiles = Sets.newHashSet();
-      for (KV<FileResult<DestinationT>, ResourceId> entry :
-          resultsToFinalFilenames) {
-        tempFiles.add(entry.getKey().getTempFilename());
+      List<FileResult<DestinationT>> fileResults = Lists.newArrayList(c.sideInput(resultsView));
+      List<KV<FileResult<DestinationT>, ResourceId>> resultsToFinalFilenames =
+          fileResults.isEmpty()
+              ? writeOperation.finalizeDestination(
+                  writeOperation.getSink().getDynamicDestinations().getDefaultDestination(),
+                  GlobalWindow.INSTANCE,
+                  getFixedNumShards.apply(c),
+                  ImmutableList.<FileResult<DestinationT>>of())
+              : finalizeAllDestinations(writeOperation, fileResults, getFixedNumShards.apply(c));
+      for (KV<FileResult<DestinationT>, ResourceId> entry : resultsToFinalFilenames) {
         c.output(KV.of(entry.getKey().getDestination(), entry.getValue().toString()));
       }
-      writeOperation.copyToOutputFiles(resultsToFinalFilenames);
-      writeOperation.removeTemporaryFiles(tempFiles);
+      writeOperation.moveToOutputFiles(resultsToFinalFilenames);
     }
+  }
 
-    /**
-     * Finalize a list of files for a single destination. If a minimum number of shards is needed,
-     * this function will generate empty files for this destination to ensure that all shards are
-     * generated.
-     */
-    private List<KV<FileResult<DestinationT>, ResourceId>> finalizeForDestinationFillEmptyShards(
-        DestinationT destination,
-        @Nullable Integer fixedNumShards,
-        Collection<FileResult<DestinationT>> existingResults)
-        throws Exception {
-      checkState(!writeOperation.windowedWrites);
-
-      LOG.info(
-          "Finalizing write operation {} for destination {} num shards {}.",
-          writeOperation,
-          destination,
-          existingResults.size());
-      if (fixedNumShards != null) {
-        checkArgument(
-            existingResults.size() <= fixedNumShards,
-            "Fixed sharding into %s shards was specified, but got %s file results",
-            fixedNumShards,
-            existingResults.size());
-      }
-      // We must always output at least 1 shard, and honor user-specified numShards
-      // if set.
-      Set<Integer> missingShardNums;
-      if (fixedNumShards == null) {
-        missingShardNums = ImmutableSet.of(UNKNOWN_SHARDNUM);
-      } else {
-        missingShardNums = Sets.newHashSet();
-        for (int i = 0; i < fixedNumShards; ++i) {
-          missingShardNums.add(i);
-        }
-        for (FileResult<DestinationT> res : existingResults) {
-          checkArgument(
-              res.getShard() != UNKNOWN_SHARDNUM,
-              "Fixed sharding into %s shards was specified, "
-                  + "but file result %s does not specify a shard",
+  private static <DestinationT>
+      List<KV<FileResult<DestinationT>, ResourceId>> finalizeAllDestinations(
+          WriteOperation<DestinationT, ?> writeOperation,
+          List<FileResult<DestinationT>> fileResults,
+          Integer fixedNumShards)
+          throws Exception {
+    List<KV<FileResult<DestinationT>, ResourceId>> resultsToFinalFilenames = Lists.newArrayList();
+    Multimap<KV<DestinationT, BoundedWindow>, FileResult<DestinationT>> resultsByDestMultimap =
+        groupByDestinationAndWindow(fileResults);
+    for (Map.Entry<KV<DestinationT, BoundedWindow>, Collection<FileResult<DestinationT>>>
+        destEntry : resultsByDestMultimap.asMap().entrySet()) {
+      resultsToFinalFilenames.addAll(
+          writeOperation.finalizeDestination(
+              destEntry.getKey().getKey(),
+              destEntry.getKey().getValue(),
               fixedNumShards,
-              res);
-          missingShardNums.remove(res.getShard());
-        }
-      }
-      List<FileResult<DestinationT>> completeResults = Lists.newArrayList(existingResults);
-      if (!missingShardNums.isEmpty()) {
-        LOG.info(
-            "Creating {} empty output shards in addition to {} written for destination {}.",
-            missingShardNums.size(),
-            existingResults.size(),
-            destination);
-        for (int shard : missingShardNums) {
-          String uuid = UUID.randomUUID().toString();
-          LOG.info("Opening empty writer {} for destination {}", uuid, writeOperation, destination);
-          Writer<DestinationT, ?> writer = writeOperation.createWriter();
-          // Currently this code path is only called in the unwindowed case.
-          writer.open(uuid, destination);
-          writer.close();
-          completeResults.add(
-              new FileResult<>(
-                  writer.getOutputFile(),
-                  shard,
-                  GlobalWindow.INSTANCE,
-                  PaneInfo.ON_TIME_AND_ONLY_FIRING,
-                  destination));
-        }
-        LOG.debug("Done creating extra shards for {}.", destination);
-      }
-      return
-          writeOperation.buildOutputFilenames(
-              destination,
-              GlobalWindow.INSTANCE,
-              (fixedNumShards == null) ? null : completeResults.size(),
-              completeResults);
+              destEntry.getValue()));
     }
+    return resultsToFinalFilenames;
   }
 }
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
index f7988bb..561d036 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
@@ -27,7 +27,6 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
 import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileInputStream;
@@ -45,7 +44,6 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
-import java.util.Set;
 import java.util.zip.GZIPInputStream;
 import org.apache.beam.sdk.io.FileBasedSink.CompressionType;
 import org.apache.beam.sdk.io.FileBasedSink.FileResult;
@@ -205,13 +203,8 @@ public class FileBasedSinkTest {
 
     // TODO: test with null first argument?
     List<KV<FileResult<Void>, ResourceId>> resultsToFinalFilenames =
-        writeOp.buildOutputFilenames(null, null, null, fileResults);
-    Set<ResourceId> tempFiles = Sets.newHashSet();
-    for (KV<FileResult<Void>, ResourceId> res : resultsToFinalFilenames) {
-      tempFiles.add(res.getKey().getTempFilename());
-    }
-    writeOp.copyToOutputFiles(resultsToFinalFilenames);
-    writeOp.removeTemporaryFiles(tempFiles);
+        writeOp.finalizeDestination(null, null, null, fileResults);
+    writeOp.moveToOutputFiles(resultsToFinalFilenames);
 
     for (int i = 0; i < numFiles; i++) {
       ResourceId outputFilename =
@@ -304,7 +297,7 @@ public class FileBasedSinkTest {
     }
 
     // Copy input files to output files.
-    writeOp.copyToOutputFiles(resultsToFinalFilenames);
+    writeOp.moveToOutputFiles(resultsToFinalFilenames);
 
     // Assert that the contents were copied.
     for (int i = 0; i < expectedOutputPaths.size(); i++) {
@@ -355,7 +348,7 @@ public class FileBasedSinkTest {
 
   /** Reject non-distinct output filenames. */
   @Test
-  public void testCollidingOutputFilenames() throws IOException {
+  public void testCollidingOutputFilenames() throws Exception {
     ResourceId root = getBaseOutputDirectory();
     SimpleSink<Void> sink =
         SimpleSink.makeSimpleSink(root, "file", "-NN", "test", Compression.UNCOMPRESSED);
@@ -366,12 +359,12 @@ public class FileBasedSinkTest {
     ResourceId temp3 = root.resolve("temp3", StandardResolveOptions.RESOLVE_FILE);
     // More than one shard does.
     try {
-      Iterable<FileResult<Void>> results =
+      List<FileResult<Void>> results =
           Lists.newArrayList(
               new FileResult<Void>(temp1, 1 /* shard */, null, null, null),
               new FileResult<Void>(temp2, 1 /* shard */, null, null, null),
               new FileResult<Void>(temp3, 1 /* shard */, null, null, null));
-      writeOp.buildOutputFilenames(null, null, 5 /* numShards */, results);
+      writeOp.finalizeDestination(null, null, 5 /* numShards */, results);
       fail("Should have failed.");
     } catch (IllegalArgumentException exn) {
       assertThat(exn.getMessage(), containsString("generated the same name"));

-- 
To stop receiving notification emails like this one, please contact
"commits@beam.apache.org" <co...@beam.apache.org>.

[beam] 08/13: Converts WriteFiles to AutoValue

Posted by jk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jkff pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 5b600da86a21e2c4339261d73fa1c2588cb3ab8d
Author: Eugene Kirpichov <ki...@google.com>
AuthorDate: Fri Nov 17 12:38:15 2017 -0800

    Converts WriteFiles to AutoValue
---
 .../core/construction/WriteFilesTranslation.java   |   5 +-
 .../construction/WriteFilesTranslationTest.java    |  12 +-
 .../java/org/apache/beam/sdk/io/FileBasedSink.java |   4 +-
 .../java/org/apache/beam/sdk/io/WriteFiles.java    | 187 +++++++++------------
 .../org/apache/beam/sdk/io/WriteFilesTest.java     |  22 +--
 5 files changed, 104 insertions(+), 126 deletions(-)

diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java
index d0b2182..a6dd55c 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java
@@ -85,12 +85,13 @@ public class WriteFilesTranslation {
 
           @Override
           public boolean isWindowedWrites() {
-            return transform.isWindowedWrites();
+            return transform.getWindowedWrites();
           }
 
           @Override
           public boolean isRunnerDeterminedSharding() {
-            return transform.getNumShards() == null && transform.getSharding() == null;
+            return transform.getNumShardsProvider() == null
+                && transform.getComputeNumShards() == null;
           }
         },
         components);
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java
index ccb366e..2d45681 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java
@@ -80,9 +80,11 @@ public class WriteFilesTranslationTest {
 
       assertThat(
           payload.getRunnerDeterminedSharding(),
-          equalTo(writeFiles.getNumShards() == null && writeFiles.getSharding() == null));
+          equalTo(
+              writeFiles.getNumShardsProvider() == null
+                  && writeFiles.getComputeNumShards() == null));
 
-      assertThat(payload.getWindowedWrites(), equalTo(writeFiles.isWindowedWrites()));
+      assertThat(payload.getWindowedWrites(), equalTo(writeFiles.getWindowedWrites()));
 
       assertThat(
           (FileBasedSink<String, Void, String>)
@@ -102,11 +104,13 @@ public class WriteFilesTranslationTest {
 
       assertThat(
           WriteFilesTranslation.isRunnerDeterminedSharding(appliedPTransform),
-          equalTo(writeFiles.getNumShards() == null && writeFiles.getSharding() == null));
+          equalTo(
+              writeFiles.getNumShardsProvider() == null
+                  && writeFiles.getComputeNumShards() == null));
 
       assertThat(
           WriteFilesTranslation.isWindowedWrites(appliedPTransform),
-          equalTo(writeFiles.isWindowedWrites()));
+          equalTo(writeFiles.getWindowedWrites()));
       assertThat(
           WriteFilesTranslation.<String, Void, String>getSink(appliedPTransform),
           equalTo(writeFiles.getSink()));
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index 5bc84be..20d2a27 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -451,8 +451,8 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
    * written,
    *
    * <ol>
-   *   <li>{@link WriteOperation#finalizeDestination} is given a list of the temporary files containing the
-   *       output bundles.
+   *   <li>{@link WriteOperation#finalizeDestination} is given a list of the temporary files
+   *       containing the output bundles.
    *   <li>During finalize, these temporary files are copied to final output locations and named
    *       according to a file naming template.
    *   <li>Finally, any temporary files that were created during the write are removed.
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
index 0a538b1..d6c5788 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
@@ -22,6 +22,7 @@ import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 
+import com.google.auto.value.AutoValue;
 import com.google.common.base.Objects;
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.ImmutableList;
@@ -108,7 +109,8 @@ import org.slf4j.LoggerFactory;
  * <pre>{@code p.apply(WriteFiles.to(new MySink(...)).withNumShards(3));}</pre>
  */
 @Experimental(Experimental.Kind.SOURCE_SINK)
-public class WriteFiles<UserT, DestinationT, OutputT>
+@AutoValue
+public abstract class WriteFiles<UserT, DestinationT, OutputT>
     extends PTransform<PCollection<UserT>, WriteFilesResult<DestinationT>> {
   private static final Logger LOG = LoggerFactory.getLogger(WriteFiles.class);
 
@@ -125,19 +127,7 @@ public class WriteFiles<UserT, DestinationT, OutputT>
   private static final int SPILLED_RECORD_SHARDING_FACTOR = 10;
 
   static final int UNKNOWN_SHARDNUM = -1;
-  private FileBasedSink<UserT, DestinationT, OutputT> sink;
   private @Nullable WriteOperation<DestinationT, OutputT> writeOperation;
-  // This allows the number of shards to be dynamically computed based on the input
-  // PCollection.
-  private final @Nullable PTransform<PCollection<UserT>, PCollectionView<Integer>> computeNumShards;
-  // We don't use a side input for static sharding, as we want this value to be updatable
-  // when a pipeline is updated.
-  private final @Nullable ValueProvider<Integer> numShardsProvider;
-  private final boolean windowedWrites;
-  private int maxNumWritersPerBundle;
-  // This is the set of side inputs used by this transform. This is usually populated by the users's
-  // DynamicDestinations object.
-  private final List<PCollectionView<?>> sideInputs;
 
   /**
    * Creates a {@link WriteFiles} transform that writes to the given {@link FileBasedSink}, letting
@@ -146,57 +136,59 @@ public class WriteFiles<UserT, DestinationT, OutputT>
   public static <UserT, DestinationT, OutputT> WriteFiles<UserT, DestinationT, OutputT> to(
       FileBasedSink<UserT, DestinationT, OutputT> sink) {
     checkArgument(sink != null, "sink can not be null");
-    return new WriteFiles<>(
-        sink,
-        null /* runner-determined sharding */,
-        null,
-        false,
-        DEFAULT_MAX_NUM_WRITERS_PER_BUNDLE,
-        sink.getDynamicDestinations().getSideInputs());
+    return new AutoValue_WriteFiles.Builder<UserT, DestinationT, OutputT>()
+        .setSink(sink)
+        .setComputeNumShards(null)
+        .setNumShardsProvider(null)
+        .setWindowedWrites(false)
+        .setMaxNumWritersPerBundle(DEFAULT_MAX_NUM_WRITERS_PER_BUNDLE)
+        .setSideInputs(sink.getDynamicDestinations().getSideInputs())
+        .build();
   }
 
-  private WriteFiles(
-      FileBasedSink<UserT, DestinationT, OutputT> sink,
-      @Nullable PTransform<PCollection<UserT>, PCollectionView<Integer>> computeNumShards,
-      @Nullable ValueProvider<Integer> numShardsProvider,
-      boolean windowedWrites,
-      int maxNumWritersPerBundle,
-      List<PCollectionView<?>> sideInputs) {
-    this.sink = sink;
-    this.computeNumShards = computeNumShards;
-    this.numShardsProvider = numShardsProvider;
-    this.windowedWrites = windowedWrites;
-    this.maxNumWritersPerBundle = maxNumWritersPerBundle;
-    this.sideInputs = sideInputs;
-  }
+  public abstract FileBasedSink<UserT, DestinationT, OutputT> getSink();
 
-  @Override
-  public Map<TupleTag<?>, PValue> getAdditionalInputs() {
-    return PCollectionViews.toAdditionalInputs(sideInputs);
-  }
+  @Nullable
+  public abstract PTransform<PCollection<UserT>, PCollectionView<Integer>> getComputeNumShards();
 
-  /** Returns the {@link FileBasedSink} associated with this PTransform. */
-  public FileBasedSink<UserT, DestinationT, OutputT> getSink() {
-    return sink;
-  }
+  // We don't use a side input for static sharding, as we want this value to be updatable
+  // when a pipeline is updated.
+  @Nullable
+  public abstract ValueProvider<Integer> getNumShardsProvider();
 
-  /** Returns whether or not to perform windowed writes. */
-  public boolean isWindowedWrites() {
-    return windowedWrites;
-  }
+  public abstract boolean getWindowedWrites();
 
-  /**
-   * Gets the {@link PTransform} that will be used to determine sharding. This can be either a
-   * static number of shards (as following a call to {@link #withNumShards(int)}), dynamic (by
-   * {@link #withSharding(PTransform)}), or runner-determined (by {@link
-   * #withRunnerDeterminedSharding()}.
-   */
-  public @Nullable PTransform<PCollection<UserT>, PCollectionView<Integer>> getSharding() {
-    return computeNumShards;
+  abstract int getMaxNumWritersPerBundle();
+
+  abstract List<PCollectionView<?>> getSideInputs();
+
+  abstract Builder<UserT, DestinationT, OutputT> toBuilder();
+
+  @AutoValue.Builder
+  abstract static class Builder<UserT, DestinationT, OutputT> {
+    abstract Builder<UserT, DestinationT, OutputT> setSink(
+        FileBasedSink<UserT, DestinationT, OutputT> sink);
+
+    abstract Builder<UserT, DestinationT, OutputT> setComputeNumShards(
+        PTransform<PCollection<UserT>, PCollectionView<Integer>> computeNumShards);
+
+    abstract Builder<UserT, DestinationT, OutputT> setNumShardsProvider(
+        ValueProvider<Integer> numShardsProvider);
+
+    abstract Builder<UserT, DestinationT, OutputT> setWindowedWrites(boolean windowedWrites);
+
+    abstract Builder<UserT, DestinationT, OutputT> setMaxNumWritersPerBundle(
+        int maxNumWritersPerBundle);
+
+    abstract Builder<UserT, DestinationT, OutputT> setSideInputs(
+        List<PCollectionView<?>> sideInputs);
+
+    abstract WriteFiles<UserT, DestinationT, OutputT> build();
   }
 
-  public @Nullable ValueProvider<Integer> getNumShards() {
-    return numShardsProvider;
+  @Override
+  public Map<TupleTag<?>, PValue> getAdditionalInputs() {
+    return PCollectionViews.toAdditionalInputs(getSideInputs());
   }
 
   /**
@@ -225,36 +217,18 @@ public class WriteFiles<UserT, DestinationT, OutputT>
    */
   public WriteFiles<UserT, DestinationT, OutputT> withNumShards(
       ValueProvider<Integer> numShardsProvider) {
-    return new WriteFiles<>(
-        sink,
-        computeNumShards,
-        numShardsProvider,
-        windowedWrites,
-        maxNumWritersPerBundle,
-        sideInputs);
+    return toBuilder().setNumShardsProvider(numShardsProvider).build();
   }
 
   /** Set the maximum number of writers created in a bundle before spilling to shuffle. */
   public WriteFiles<UserT, DestinationT, OutputT> withMaxNumWritersPerBundle(
       int maxNumWritersPerBundle) {
-    return new WriteFiles<>(
-        sink,
-        computeNumShards,
-        numShardsProvider,
-        windowedWrites,
-        maxNumWritersPerBundle,
-        sideInputs);
+    return toBuilder().setMaxNumWritersPerBundle(maxNumWritersPerBundle).build();
   }
 
   public WriteFiles<UserT, DestinationT, OutputT> withSideInputs(
       List<PCollectionView<?>> sideInputs) {
-    return new WriteFiles<>(
-        sink,
-        computeNumShards,
-        numShardsProvider,
-        windowedWrites,
-        maxNumWritersPerBundle,
-        sideInputs);
+    return toBuilder().setSideInputs(sideInputs).build();
   }
 
   /**
@@ -268,8 +242,7 @@ public class WriteFiles<UserT, DestinationT, OutputT>
       PTransform<PCollection<UserT>, PCollectionView<Integer>> sharding) {
     checkArgument(
         sharding != null, "sharding can not be null. Use withRunnerDeterminedSharding() instead.");
-    return new WriteFiles<>(
-        sink, sharding, null, windowedWrites, maxNumWritersPerBundle, sideInputs);
+    return toBuilder().setComputeNumShards(sharding).build();
   }
 
   /**
@@ -277,7 +250,7 @@ public class WriteFiles<UserT, DestinationT, OutputT>
    * runner-determined sharding.
    */
   public WriteFiles<UserT, DestinationT, OutputT> withRunnerDeterminedSharding() {
-    return new WriteFiles<>(sink, null, null, windowedWrites, maxNumWritersPerBundle, sideInputs);
+    return toBuilder().setComputeNumShards(null).setNumShardsProvider(null).build();
   }
 
   /**
@@ -293,35 +266,34 @@ public class WriteFiles<UserT, DestinationT, OutputT>
    * <p>This option can only be used if {@link #withNumShards(int)} is also set to a positive value.
    */
   public WriteFiles<UserT, DestinationT, OutputT> withWindowedWrites() {
-    return new WriteFiles<>(
-        sink, computeNumShards, numShardsProvider, true, maxNumWritersPerBundle, sideInputs);
+    return toBuilder().setWindowedWrites(true).build();
   }
 
   @Override
   public void validate(PipelineOptions options) {
-    sink.validate(options);
+    getSink().validate(options);
   }
 
   @Override
   public WriteFilesResult<DestinationT> expand(PCollection<UserT> input) {
     if (input.isBounded() == IsBounded.UNBOUNDED) {
       checkArgument(
-          windowedWrites,
+          getWindowedWrites(),
           "Must use windowed writes when applying %s to an unbounded PCollection",
           WriteFiles.class.getSimpleName());
     }
-    if (windowedWrites) {
+    if (getWindowedWrites()) {
       // The reason for this is https://issues.apache.org/jira/browse/BEAM-1438
       // and similar behavior in other runners.
       checkArgument(
-          computeNumShards != null || numShardsProvider != null,
+          getComputeNumShards() != null || getNumShardsProvider() != null,
           "When using windowed writes, must specify number of output shards explicitly",
           WriteFiles.class.getSimpleName());
     }
-    this.writeOperation = sink.createWriteOperation();
-    this.writeOperation.setWindowedWrites(windowedWrites);
+    this.writeOperation = getSink().createWriteOperation();
+    this.writeOperation.setWindowedWrites(getWindowedWrites());
 
-    if (!windowedWrites) {
+    if (!getWindowedWrites()) {
       // Re-window the data into the global window and remove any existing triggers.
       input =
           input.apply(
@@ -347,10 +319,10 @@ public class WriteFiles<UserT, DestinationT, OutputT>
         FileResultCoder.of(windowCoder, destinationCoder);
 
     PCollectionView<Integer> numShardsView =
-        (computeNumShards == null) ? null : input.apply(computeNumShards);
+        (getComputeNumShards() == null) ? null : input.apply(getComputeNumShards());
 
     PCollection<FileResult<DestinationT>> tempFileResults =
-        (computeNumShards == null && numShardsProvider == null)
+        (getComputeNumShards() == null && getNumShardsProvider() == null)
             ? input.apply(
                 "WriteUnshardedBundlesToTempFiles",
                 new WriteUnshardedBundlesToTempFiles(destinationCoder, fileResultCoder))
@@ -370,13 +342,14 @@ public class WriteFiles<UserT, DestinationT, OutputT>
   public void populateDisplayData(DisplayData.Builder builder) {
     super.populateDisplayData(builder);
     builder
-        .add(DisplayData.item("sink", sink.getClass()).withLabel("WriteFiles Sink"))
-        .include("sink", sink);
-    if (getSharding() != null) {
-      builder.include("sharding", getSharding());
+        .add(DisplayData.item("sink", getSink().getClass()).withLabel("WriteFiles Sink"))
+        .include("sink", getSink());
+    if (getComputeNumShards() != null) {
+      builder.include("sharding", getComputeNumShards());
     } else {
       builder.addIfNotNull(
-          DisplayData.item("numShards", getNumShards()).withLabel("Fixed Number of Shards"));
+          DisplayData.item("numShards", getNumShardsProvider())
+              .withLabel("Fixed Number of Shards"));
     }
   }
 
@@ -395,7 +368,7 @@ public class WriteFiles<UserT, DestinationT, OutputT>
 
     @Override
     public PCollection<Iterable<ResultT>> expand(PCollection<ResultT> input) {
-      if (windowedWrites) {
+      if (getWindowedWrites()) {
         // Reshuffle the results to make them stable against retries.
         // Use a single void key to maximize size of bundles for finalization.
         return input
@@ -437,7 +410,7 @@ public class WriteFiles<UserT, DestinationT, OutputT>
               ParDo.of(
                       new WriteUnshardedTempFilesWithSpillingFn(
                           spilledRecordsTag, destinationCoder))
-                  .withSideInputs(sideInputs)
+                  .withSideInputs(getSideInputs())
                   .withOutputTags(writtenRecordsTag, TupleTagList.of(spilledRecordsTag)));
       PCollection<FileResult<DestinationT>> writtenBundleFiles =
           writeTuple.get(writtenRecordsTag).setCoder(fileResultCoder);
@@ -456,7 +429,7 @@ public class WriteFiles<UserT, DestinationT, OutputT>
               .apply("GroupSpilled", GroupByKey.<ShardedKey<Integer>, UserT>create())
               .apply(
                   "WriteSpilled",
-                  ParDo.of(new WriteShardsIntoTempFilesFn()).withSideInputs(sideInputs))
+                  ParDo.of(new WriteShardsIntoTempFilesFn()).withSideInputs(getSideInputs()))
               .setCoder(fileResultCoder)
               .apply(
                   "DropShardNum",
@@ -514,7 +487,7 @@ public class WriteFiles<UserT, DestinationT, OutputT>
       WriterKey<DestinationT> key = new WriterKey<>(window, c.pane(), destination);
       Writer<DestinationT, OutputT> writer = writers.get(key);
       if (writer == null) {
-        if (writers.size() <= maxNumWritersPerBundle) {
+        if (writers.size() <= getMaxNumWritersPerBundle()) {
           String uuid = UUID.randomUUID().toString();
           LOG.info(
               "Opening writer {} for window {} pane {} destination {}",
@@ -656,7 +629,7 @@ public class WriteFiles<UserT, DestinationT, OutputT>
           .apply("GroupIntoShards", GroupByKey.<ShardedKey<Integer>, UserT>create())
           .apply(
               "WriteShardsIntoTempFiles",
-              ParDo.of(new WriteShardsIntoTempFilesFn()).withSideInputs(sideInputs))
+              ParDo.of(new WriteShardsIntoTempFilesFn()).withSideInputs(getSideInputs()))
           .setCoder(fileResultCoder);
     }
   }
@@ -680,8 +653,8 @@ public class WriteFiles<UserT, DestinationT, OutputT>
       if (numShardsView != null) {
         shardCount = context.sideInput(numShardsView);
       } else {
-        checkNotNull(numShardsProvider);
-        shardCount = numShardsProvider.get();
+        checkNotNull(getNumShardsProvider());
+        shardCount = getNumShardsProvider().get();
       }
       checkArgument(
           shardCount > 0,
@@ -771,7 +744,7 @@ public class WriteFiles<UserT, DestinationT, OutputT>
     public WriteFilesResult<DestinationT> expand(
         PCollection<Iterable<FileResult<DestinationT>>> input) {
 
-      List<PCollectionView<?>> finalizeSideInputs = Lists.newArrayList(sideInputs);
+      List<PCollectionView<?>> finalizeSideInputs = Lists.newArrayList(getSideInputs());
       if (numShardsView != null) {
         finalizeSideInputs.add(numShardsView);
       }
@@ -794,10 +767,10 @@ public class WriteFiles<UserT, DestinationT, OutputT>
         @Nullable Integer fixedNumShards;
         if (numShardsView != null) {
           fixedNumShards = c.sideInput(numShardsView);
-        } else if (numShardsProvider != null) {
-          fixedNumShards = numShardsProvider.get();
+        } else if (getNumShardsProvider() != null) {
+          fixedNumShards = getNumShardsProvider().get();
         } else {
-          checkState(!windowedWrites, "Windowed write should have set fixed sharding");
+          checkState(!getWindowedWrites(), "Windowed write should have set fixed sharding");
           fixedNumShards = null;
         }
         List<FileResult<DestinationT>> fileResults = Lists.newArrayList(c.element());
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java
index 40ae0ea..b68cbf9 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java
@@ -329,21 +329,21 @@ public class WriteFilesTest {
     WriteFiles<String, ?, String> write = WriteFiles.to(sink).withNumShards(3);
     assertThat((SimpleSink<Void>) write.getSink(), is(sink));
     PTransform<PCollection<String>, PCollectionView<Integer>> originalSharding =
-        write.getSharding();
+        write.getComputeNumShards();
 
-    assertThat(write.getSharding(), is(nullValue()));
-    assertThat(write.getNumShards(), instanceOf(StaticValueProvider.class));
-    assertThat(write.getNumShards().get(), equalTo(3));
-    assertThat(write.getSharding(), equalTo(originalSharding));
+    assertThat(write.getComputeNumShards(), is(nullValue()));
+    assertThat(write.getNumShardsProvider(), instanceOf(StaticValueProvider.class));
+    assertThat(write.getNumShardsProvider().get(), equalTo(3));
+    assertThat(write.getComputeNumShards(), equalTo(originalSharding));
 
     WriteFiles<String, ?, ?> write2 = write.withSharding(SHARDING_TRANSFORM);
     assertThat((SimpleSink<Void>) write2.getSink(), is(sink));
-    assertThat(write2.getSharding(), equalTo(SHARDING_TRANSFORM));
+    assertThat(write2.getComputeNumShards(), equalTo(SHARDING_TRANSFORM));
     // original unchanged
 
     WriteFiles<String, ?, ?> writeUnsharded = write2.withRunnerDeterminedSharding();
-    assertThat(writeUnsharded.getSharding(), nullValue());
-    assertThat(write.getSharding(), equalTo(originalSharding));
+    assertThat(writeUnsharded.getComputeNumShards(), nullValue());
+    assertThat(write.getComputeNumShards(), equalTo(originalSharding));
   }
 
   @Test
@@ -669,10 +669,10 @@ public class WriteFilesTest {
     p.run();
 
     Optional<Integer> numShards =
-        (write.getNumShards() != null && !write.isWindowedWrites())
-            ? Optional.of(write.getNumShards().get())
+        (write.getNumShardsProvider() != null && !write.getWindowedWrites())
+            ? Optional.of(write.getNumShardsProvider().get())
             : Optional.<Integer>absent();
-    checkFileContents(baseName, inputs, numShards, !write.isWindowedWrites());
+    checkFileContents(baseName, inputs, numShards, !write.getWindowedWrites());
   }
 
   static void checkFileContents(

-- 
To stop receiving notification emails like this one, please contact
"commits@beam.apache.org" <co...@beam.apache.org>.

[beam] 12/13: Reintroduces dynamic sharding with windowed writes for bounded collections

Posted by jk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jkff pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit d314339ed2f8d5ce385c7b40705ef13f6ea43b45
Author: Eugene Kirpichov <ki...@google.com>
AuthorDate: Thu Nov 30 13:05:00 2017 -0800

    Reintroduces dynamic sharding with windowed writes for bounded collections
---
 .../apache/beam/examples/WindowedWordCount.java    |  5 ++--
 .../examples/common/WriteOneFilePerWindow.java     | 12 ++++++----
 .../apache/beam/examples/WindowedWordCountIT.java  |  8 +++++++
 .../beam/runners/apex/examples/WordCountTest.java  |  2 +-
 .../construction/WriteFilesTranslationTest.java    |  1 +
 .../beam/runners/spark/io/AvroPipelineTest.java    |  5 ++--
 .../java/org/apache/beam/sdk/io/FileBasedSink.java | 27 +++-------------------
 .../java/org/apache/beam/sdk/io/WriteFiles.java    | 27 +++++++++++-----------
 .../org/apache/beam/sdk/io/WriteFilesTest.java     |  5 ++--
 9 files changed, 41 insertions(+), 51 deletions(-)

diff --git a/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java
index 21cfed8..b31ce4a 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java
@@ -162,9 +162,8 @@ public class WindowedWordCount {
     void setMaxTimestampMillis(Long value);
 
     @Description("Fixed number of shards to produce per window")
-    @Default.Integer(3)
-    int getNumShards();
-    void setNumShards(int numShards);
+    Integer getNumShards();
+    void setNumShards(Integer numShards);
   }
 
   public static void main(String[] args) throws IOException {
diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java b/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java
index a5c84f6..abd14b7 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java
@@ -19,6 +19,7 @@ package org.apache.beam.examples.common;
 
 import static com.google.common.base.MoreObjects.firstNonNull;
 
+import javax.annotation.Nullable;
 import org.apache.beam.sdk.io.FileBasedSink;
 import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
 import org.apache.beam.sdk.io.FileBasedSink.OutputFileHints;
@@ -45,9 +46,10 @@ import org.joda.time.format.ISODateTimeFormat;
 public class WriteOneFilePerWindow extends PTransform<PCollection<String>, PDone> {
   private static final DateTimeFormatter FORMATTER = ISODateTimeFormat.hourMinute();
   private String filenamePrefix;
-  private int numShards;
+  @Nullable
+  private Integer numShards;
 
-  public WriteOneFilePerWindow(String filenamePrefix, int numShards) {
+  public WriteOneFilePerWindow(String filenamePrefix, Integer numShards) {
     this.filenamePrefix = filenamePrefix;
     this.numShards = numShards;
   }
@@ -59,8 +61,10 @@ public class WriteOneFilePerWindow extends PTransform<PCollection<String>, PDone
         TextIO.write()
             .to(new PerWindowFiles(resource))
             .withTempDirectory(resource.getCurrentDirectory())
-            .withWindowedWrites()
-            .withNumShards(numShards);
+            .withWindowedWrites();
+    if (numShards != null) {
+      write = write.withNumShards(numShards);
+    }
     return input.apply(write);
   }
 
diff --git a/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java b/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java
index 279de53..2f4ef34 100644
--- a/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java
+++ b/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java
@@ -87,6 +87,14 @@ public class WindowedWordCountIT {
   }
 
   @Test
+  public void testWindowedWordCountInBatchDynamicSharding() throws Exception {
+    WindowedWordCountITOptions options = batchOptions();
+    // This is the default value, but make it explicit.
+    options.setNumShards(null);
+    testWindowedWordCountPipeline(options);
+  }
+
+  @Test
   public void testWindowedWordCountInBatchStaticSharding() throws Exception {
     WindowedWordCountITOptions options = batchOptions();
     options.setNumShards(3);
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
index e050c15..ba75746 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
@@ -108,7 +108,7 @@ public class WordCountTest {
       .apply(ParDo.of(new ExtractWordsFn()))
       .apply(Count.<String>perElement())
       .apply(ParDo.of(new FormatAsStringFn()))
-      .apply("WriteCounts", TextIO.write().to(options.getOutput()).withNumShards(2))
+      .apply("WriteCounts", TextIO.write().to(options.getOutput()))
       ;
     p.run().waitUntilFinish();
   }
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java
index 2d45681..038653d 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WriteFilesTranslationTest.java
@@ -64,6 +64,7 @@ public class WriteFilesTranslationTest {
     public static Iterable<WriteFiles<Object, Void, Object>> data() {
       return ImmutableList.of(
           WriteFiles.to(new DummySink()),
+          WriteFiles.to(new DummySink()).withWindowedWrites(),
           WriteFiles.to(new DummySink()).withNumShards(17),
           WriteFiles.to(new DummySink()).withWindowedWrites().withNumShards(42));
     }
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
index e17a6b8..fc65aac 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
@@ -74,8 +74,7 @@ public class AvroPipelineTest {
         AvroIO.readGenericRecords(schema).from(inputFile.getAbsolutePath()));
     input.apply(
         AvroIO.writeGenericRecords(schema)
-            .to(outputFile.getAbsolutePath())
-            .withoutSharding());
+            .to(outputFile.getAbsolutePath()));
     pipeline.run();
 
     List<GenericRecord> records = readGenericFile();
@@ -100,7 +99,7 @@ public class AvroPipelineTest {
     List<GenericRecord> records = Lists.newArrayList();
     GenericDatumReader<GenericRecord> genericDatumReader = new GenericDatumReader<>();
     try (DataFileReader<GenericRecord> dataFileReader =
-        new DataFileReader<>(outputFile, genericDatumReader)) {
+        new DataFileReader<>(new File(outputFile + "-00000-of-00001"), genericDatumReader)) {
       for (GenericRecord record : dataFileReader) {
         records.add(record);
       }
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index 48d7521..2e5d387 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -32,7 +32,6 @@ import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import com.google.common.collect.Ordering;
 import com.google.common.collect.Sets;
 import java.io.IOException;
 import java.io.InputStream;
@@ -43,7 +42,6 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.Comparator;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -634,28 +632,9 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
       if (numShards != null) {
         resultsWithShardNumbers = Lists.newArrayList(completeResults);
       } else {
-        checkState(
-            !windowedWrites,
-            "When doing windowed writes, shards should have been assigned when writing");
-        // Sort files for idempotence. Sort by temporary filename.
-        // Note that this codepath should not be used when processing triggered windows. In the
-        // case of triggers, the list of FileResult objects in the Finalize iterable is not
-        // deterministic, and might change over retries. This breaks the assumption below that
-        // sorting the FileResult objects provides idempotency.
-        List<FileResult<DestinationT>> sortedByTempFilename =
-            Ordering.from(
-                    new Comparator<FileResult<DestinationT>>() {
-                      @Override
-                      public int compare(
-                          FileResult<DestinationT> first, FileResult<DestinationT> second) {
-                        String firstFilename = first.getTempFilename().toString();
-                        String secondFilename = second.getTempFilename().toString();
-                        return firstFilename.compareTo(secondFilename);
-                      }
-                    })
-                .sortedCopy(completeResults);
-        for (int i = 0; i < sortedByTempFilename.size(); i++) {
-          resultsWithShardNumbers.add(sortedByTempFilename.get(i).withShard(i));
+        int i = 0;
+        for (FileResult<DestinationT> res : completeResults) {
+          resultsWithShardNumbers.add(res.withShard(i++));
         }
       }
 
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
index 54f055d..499a194 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
@@ -20,7 +20,6 @@ package org.apache.beam.sdk.io;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
 
 import com.google.auto.value.AutoValue;
 import com.google.common.base.Objects;
@@ -42,8 +41,8 @@ import org.apache.beam.sdk.annotations.Internal;
 import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
-import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.ListCoder;
 import org.apache.beam.sdk.coders.ShardedKeyCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
@@ -286,13 +285,12 @@ public abstract class WriteFiles<UserT, DestinationT, OutputT>
           getWindowedWrites(),
           "Must use windowed writes when applying %s to an unbounded PCollection",
           WriteFiles.class.getSimpleName());
-    }
-    if (getWindowedWrites()) {
       // The reason for this is https://issues.apache.org/jira/browse/BEAM-1438
       // and similar behavior in other runners.
       checkArgument(
           getComputeNumShards() != null || getNumShardsProvider() != null,
-          "When using windowed writes, must specify number of output shards explicitly",
+          "When applying %s to an unbounded PCollection, "
+              + "must specify number of output shards explicitly",
           WriteFiles.class.getSimpleName());
     }
     this.writeOperation = getSink().createWriteOperation();
@@ -364,7 +362,7 @@ public abstract class WriteFiles<UserT, DestinationT, OutputT>
   }
 
   private class GatherResults<ResultT>
-      extends PTransform<PCollection<ResultT>, PCollection<Iterable<ResultT>>> {
+      extends PTransform<PCollection<ResultT>, PCollection<List<ResultT>>> {
     private final Coder<ResultT> resultCoder;
 
     private GatherResults(Coder<ResultT> resultCoder) {
@@ -372,7 +370,7 @@ public abstract class WriteFiles<UserT, DestinationT, OutputT>
     }
 
     @Override
-    public PCollection<Iterable<ResultT>> expand(PCollection<ResultT> input) {
+    public PCollection<List<ResultT>> expand(PCollection<ResultT> input) {
       if (getWindowedWrites()) {
         // Reshuffle the results to make them stable against retries.
         // Use a single void key to maximize size of bundles for finalization.
@@ -381,7 +379,9 @@ public abstract class WriteFiles<UserT, DestinationT, OutputT>
             .apply("Reshuffle", Reshuffle.<Void, ResultT>of())
             .apply("Drop key", Values.<ResultT>create())
             .apply("Gather bundles", ParDo.of(new GatherBundlesPerWindowFn<ResultT>()))
-            .setCoder(IterableCoder.of(resultCoder));
+            .setCoder(ListCoder.of(resultCoder))
+            // Reshuffle one more time to stabilize the contents of the bundle lists to finalize.
+            .apply(Reshuffle.<List<ResultT>>viaRandomKey());
       } else {
         // Pass results via a side input rather than reshuffle, because we need to get an empty
         // iterable to finalize if there are no results.
@@ -389,7 +389,7 @@ public abstract class WriteFiles<UserT, DestinationT, OutputT>
             .getPipeline()
             .apply(
                 Reify.viewInGlobalWindow(
-                    input.apply(View.<ResultT>asIterable()), IterableCoder.of(resultCoder)));
+                    input.apply(View.<ResultT>asList()), ListCoder.of(resultCoder)));
       }
     }
   }
@@ -742,7 +742,7 @@ public abstract class WriteFiles<UserT, DestinationT, OutputT>
 
   private class FinalizeTempFileBundles
       extends PTransform<
-          PCollection<Iterable<FileResult<DestinationT>>>, WriteFilesResult<DestinationT>> {
+          PCollection<List<FileResult<DestinationT>>>, WriteFilesResult<DestinationT>> {
     @Nullable private final PCollectionView<Integer> numShardsView;
     private final Coder<DestinationT> destinationCoder;
 
@@ -754,7 +754,7 @@ public abstract class WriteFiles<UserT, DestinationT, OutputT>
 
     @Override
     public WriteFilesResult<DestinationT> expand(
-        PCollection<Iterable<FileResult<DestinationT>>> input) {
+        PCollection<List<FileResult<DestinationT>>> input) {
 
       List<PCollectionView<?>> finalizeSideInputs = Lists.newArrayList(getSideInputs());
       if (numShardsView != null) {
@@ -772,7 +772,7 @@ public abstract class WriteFiles<UserT, DestinationT, OutputT>
     }
 
     private class FinalizeFn
-        extends DoFn<Iterable<FileResult<DestinationT>>, KV<DestinationT, String>> {
+        extends DoFn<List<FileResult<DestinationT>>, KV<DestinationT, String>> {
       @ProcessElement
       public void process(ProcessContext c) throws Exception {
         getDynamicDestinations().setSideInputAccessorFromProcessContext(c);
@@ -782,7 +782,6 @@ public abstract class WriteFiles<UserT, DestinationT, OutputT>
         } else if (getNumShardsProvider() != null) {
           fixedNumShards = getNumShardsProvider().get();
         } else {
-          checkState(!getWindowedWrites(), "Windowed write should have set fixed sharding");
           fixedNumShards = null;
         }
         List<FileResult<DestinationT>> fileResults = Lists.newArrayList(c.element());
@@ -821,7 +820,7 @@ public abstract class WriteFiles<UserT, DestinationT, OutputT>
     return resultsToFinalFilenames;
   }
 
-  private static class GatherBundlesPerWindowFn<T> extends DoFn<T, Iterable<T>> {
+  private static class GatherBundlesPerWindowFn<T> extends DoFn<T, List<T>> {
     @Nullable private transient Multimap<BoundedWindow, T> bundles = null;
 
     @StartBundle
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java
index b68cbf9..da4e6da 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java
@@ -386,10 +386,11 @@ public class WriteFilesTest {
 
   @Test
   @Category(NeedsRunner.class)
-  public void testWindowedWritesNeedSharding() {
+  public void testUnboundedWritesNeedSharding() {
     thrown.expect(IllegalArgumentException.class);
     thrown.expectMessage(
-        "When using windowed writes, must specify number of output shards explicitly");
+        "When applying WriteFiles to an unbounded PCollection, "
+            + "must specify number of output shards explicitly");
 
     SimpleSink<Void> sink = makeSimpleSink();
     p.apply(Create.of("foo"))

-- 
To stop receiving notification emails like this one, please contact
"commits@beam.apache.org" <co...@beam.apache.org>.

[beam] 10/13: Renames spilled back to unwritten

Posted by jk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jkff pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 83837eb20c1e1e4793f8410de1dc6d5864586f7f
Author: Eugene Kirpichov <ki...@google.com>
AuthorDate: Wed Nov 22 16:34:46 2017 -0800

    Renames spilled back to unwritten
---
 .../src/main/java/org/apache/beam/sdk/io/WriteFiles.java    | 13 +++++++------
 1 file changed, 7 insertions(+), 6 deletions(-)

diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
index 7e04332..12f5cce 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
@@ -403,22 +403,23 @@ public abstract class WriteFiles<UserT, DestinationT, OutputT>
     @Override
     public PCollection<FileResult<DestinationT>> expand(PCollection<UserT> input) {
       TupleTag<FileResult<DestinationT>> writtenRecordsTag = new TupleTag<>("writtenRecords");
-      TupleTag<KV<ShardedKey<Integer>, UserT>> spilledRecordsTag = new TupleTag<>("spilledRecords");
+      TupleTag<KV<ShardedKey<Integer>, UserT>> unwrittenRecordsTag =
+          new TupleTag<>("unwrittenRecords");
       PCollectionTuple writeTuple =
           input.apply(
               "WriteUnshardedBundles",
               ParDo.of(
                       new WriteUnshardedTempFilesWithSpillingFn(
-                          spilledRecordsTag, destinationCoder))
+                          unwrittenRecordsTag, destinationCoder))
                   .withSideInputs(getSideInputs())
-                  .withOutputTags(writtenRecordsTag, TupleTagList.of(spilledRecordsTag)));
+                  .withOutputTags(writtenRecordsTag, TupleTagList.of(unwrittenRecordsTag)));
       PCollection<FileResult<DestinationT>> writtenBundleFiles =
           writeTuple.get(writtenRecordsTag).setCoder(fileResultCoder);
       // Any "spilled" elements are written using WriteShardedBundles. Assign shard numbers in
       // finalize to stay consistent with what WriteWindowedBundles does.
       PCollection<FileResult<DestinationT>> writtenSpilledFiles =
           writeTuple
-              .get(spilledRecordsTag)
+              .get(unwrittenRecordsTag)
               .setCoder(KvCoder.of(ShardedKeyCoder.of(VarIntCoder.of()), input.getCoder()))
               // Here we group by a synthetic shard number in the range [0, spill factor),
               // just for the sake of getting some parallelism within each destination when
@@ -426,9 +427,9 @@ public abstract class WriteFiles<UserT, DestinationT, OutputT>
               // number assigned at all. Drop the shard number on the spilled records so that
               // shard numbers are assigned together to both the spilled and non-spilled files in
               // finalize.
-              .apply("GroupSpilled", GroupByKey.<ShardedKey<Integer>, UserT>create())
+              .apply("GroupUnwritten", GroupByKey.<ShardedKey<Integer>, UserT>create())
               .apply(
-                  "WriteSpilled",
+                  "WriteUnwritten",
                   ParDo.of(new WriteShardsIntoTempFilesFn()).withSideInputs(getSideInputs()))
               .setCoder(fileResultCoder)
               .apply(

-- 
To stop receiving notification emails like this one, please contact
"commits@beam.apache.org" <co...@beam.apache.org>.

[beam] 09/13: Makes checkstyle and findbugs happy

Posted by jk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jkff pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 3ecf13b9fc889f1978efef57f14f610766242901
Author: Eugene Kirpichov <ki...@google.com>
AuthorDate: Fri Nov 17 15:41:20 2017 -0800

    Makes checkstyle and findbugs happy
---
 .../java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java | 2 +-
 .../core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java  | 2 +-
 .../core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java     | 8 ++++----
 3 files changed, 6 insertions(+), 6 deletions(-)

diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index 3467d53..edf513b 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -1344,7 +1344,7 @@ public class DataflowRunnerTest implements Serializable {
         (WriteFiles<Object, Void, Object>)
             factory.getReplacementTransform(originalApplication).getTransform();
     assertThat(replacement, not(equalTo((Object) original)));
-    assertThat(replacement.getNumShards().get(), equalTo(expectedNumShards));
+    assertThat(replacement.getNumShardsProvider().get(), equalTo(expectedNumShards));
   }
 
   private static class TestSink extends FileBasedSink<Object, Void, Object> {
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index 20d2a27..12c4555 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -874,7 +874,7 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
     /** Unique id for this output bundle. */
     private @Nullable String id;
 
-    private DestinationT destination;
+    private @Nullable DestinationT destination;
 
     /** The output file for this bundle. May be null if opening failed. */
     private @Nullable ResourceId outputFile;
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
index d6c5788..7e04332 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
@@ -170,10 +170,10 @@ public abstract class WriteFiles<UserT, DestinationT, OutputT>
         FileBasedSink<UserT, DestinationT, OutputT> sink);
 
     abstract Builder<UserT, DestinationT, OutputT> setComputeNumShards(
-        PTransform<PCollection<UserT>, PCollectionView<Integer>> computeNumShards);
+        @Nullable PTransform<PCollection<UserT>, PCollectionView<Integer>> computeNumShards);
 
     abstract Builder<UserT, DestinationT, OutputT> setNumShardsProvider(
-        ValueProvider<Integer> numShardsProvider);
+        @Nullable ValueProvider<Integer> numShardsProvider);
 
     abstract Builder<UserT, DestinationT, OutputT> setWindowedWrites(boolean windowedWrites);
 
@@ -604,12 +604,12 @@ public abstract class WriteFiles<UserT, DestinationT, OutputT>
       extends PTransform<PCollection<UserT>, PCollection<FileResult<DestinationT>>> {
     private final Coder<DestinationT> destinationCoder;
     private final Coder<FileResult<DestinationT>> fileResultCoder;
-    private final PCollectionView<Integer> numShardsView;
+    private final @Nullable PCollectionView<Integer> numShardsView;
 
     private WriteShardedBundlesToTempFiles(
         Coder<DestinationT> destinationCoder,
         Coder<FileResult<DestinationT>> fileResultCoder,
-        PCollectionView<Integer> numShardsView) {
+        @Nullable PCollectionView<Integer> numShardsView) {
       this.destinationCoder = destinationCoder;
       this.fileResultCoder = fileResultCoder;
       this.numShardsView = numShardsView;

-- 
To stop receiving notification emails like this one, please contact
"commits@beam.apache.org" <co...@beam.apache.org>.

[beam] 01/13: enforce fixed sharding

Posted by jk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jkff pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 90402e4f21dbbf60ed498a67a5dfc3e276dcb07c
Author: Eugene Kirpichov <ki...@google.com>
AuthorDate: Wed Nov 15 18:06:39 2017 -0800

    enforce fixed sharding
---
 sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
index 8328d7b..c99abce 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
@@ -854,7 +854,8 @@ public class WriteFiles<UserT, DestinationT, OutputT>
         } else if (numShardsProvider != null) {
           fixedNumShards = numShardsProvider.get();
         } else {
-          fixedNumShards = null;
+          throw new IllegalStateException(
+              "When finalizing a windowed write, should have set fixed sharding");
         }
       }
     }

-- 
To stop receiving notification emails like this one, please contact
"commits@beam.apache.org" <co...@beam.apache.org>.

[beam] 11/13: Fixes tests

Posted by jk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jkff pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 060f05c659920c3a48dbc67c38db770788802d06
Author: Eugene Kirpichov <ki...@google.com>
AuthorDate: Mon Nov 27 11:41:25 2017 -0800

    Fixes tests
---
 .../core/construction/WriteFilesTranslation.java   |  2 +-
 .../java/org/apache/beam/sdk/io/FileBasedSink.java | 32 ++++++++---------
 .../java/org/apache/beam/sdk/io/WriteFiles.java    | 17 +++++++--
 .../org/apache/beam/sdk/io/FileBasedSinkTest.java  | 42 ++++++++++++----------
 4 files changed, 55 insertions(+), 38 deletions(-)

diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java
index a6dd55c..90f6453 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WriteFilesTranslation.java
@@ -303,7 +303,7 @@ public class WriteFilesTranslation {
     public Map<Class<? extends PTransform>, TransformPayloadTranslator>
         getTransformPayloadTranslators() {
       return Collections.<Class<? extends PTransform>, TransformPayloadTranslator>singletonMap(
-          WriteFiles.class, new WriteFilesTranslator());
+          WriteFiles.CONCRETE_CLASS, new WriteFilesTranslator());
     }
 
     @Override
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index 12c4555..48d7521 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -699,7 +699,10 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
       // if set.
       Set<Integer> missingShardNums;
       if (numShards == null) {
-        missingShardNums = ImmutableSet.of(UNKNOWN_SHARDNUM);
+        missingShardNums =
+            existingResults.isEmpty()
+                ? ImmutableSet.of(UNKNOWN_SHARDNUM)
+                : ImmutableSet.<Integer>of();
       } else {
         missingShardNums = Sets.newHashSet();
         for (int i = 0; i < numShards; ++i) {
@@ -726,8 +729,9 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
           String uuid = UUID.randomUUID().toString();
           LOG.info("Opening empty writer {} for destination {}", uuid, dest);
           Writer<DestinationT, ?> writer = createWriter();
+          writer.setDestination(dest);
           // Currently this code path is only called in the unwindowed case.
-          writer.open(uuid, dest);
+          writer.open(uuid);
           writer.close();
           completeResults.add(
               new FileResult<>(
@@ -760,8 +764,8 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
         List<KV<FileResult<DestinationT>, ResourceId>> resultsToFinalFilenames) throws IOException {
       int numFiles = resultsToFinalFilenames.size();
       LOG.debug("Copying {} files.", numFiles);
-      List<ResourceId> srcFiles = new ArrayList<>(resultsToFinalFilenames.size());
-      List<ResourceId> dstFiles = new ArrayList<>(resultsToFinalFilenames.size());
+      List<ResourceId> srcFiles = new ArrayList<>();
+      List<ResourceId> dstFiles = new ArrayList<>();
       for (KV<FileResult<DestinationT>, ResourceId> entry : resultsToFinalFilenames) {
         srcFiles.add(entry.getKey().getTempFilename());
         dstFiles.add(entry.getValue());
@@ -923,22 +927,14 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
     protected void finishWrite() throws Exception {}
 
     /**
-     * Performs bundle initialization. For example, creates a temporary file for writing or
-     * initializes any state that will be used across calls to {@link Writer#write}.
+     * Opens a uniquely named temporary file and initializes the writer using {@link #prepareWrite}.
      *
      * <p>The unique id that is given to open should be used to ensure that the writer's output does
      * not interfere with the output of other Writers, as a bundle may be executed many times for
      * fault tolerance.
-     *
-     * <p>The window and paneInfo arguments are populated when windowed writes are requested. shard
-     * id populated for the case of static sharding. In cases where the runner is dynamically
-     * picking sharding, shard might be set to -1.
      */
-    public final void open(
-        String uId, DestinationT destination)
-        throws Exception {
+    public final void open(String uId) throws Exception {
       this.id = uId;
-      this.destination = destination;
       ResourceId tempDirectory = getWriteOperation().tempDirectory.get();
       outputFile = tempDirectory.resolve(id, StandardResolveOptions.RESOLVE_FILE);
       verifyNotNull(
@@ -1040,6 +1036,10 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
       return writeOperation;
     }
 
+    void setDestination(DestinationT destination) {
+      this.destination = destination;
+    }
+
     /** Return the user destination object for this writer. */
     public DestinationT getDestination() {
       return destination;
@@ -1064,8 +1064,8 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
         BoundedWindow window,
         PaneInfo paneInfo,
         DestinationT destination) {
-      checkArgument(window != null);
-      checkArgument(paneInfo != null);
+      checkArgument(window != null, "window can not be null");
+      checkArgument(paneInfo != null, "paneInfo can not be null");
       this.tempFilename = tempFilename;
       this.shard = shard;
       this.window = window;
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
index 12f5cce..54f055d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
@@ -38,6 +38,7 @@ import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Internal;
 import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
@@ -114,6 +115,10 @@ public abstract class WriteFiles<UserT, DestinationT, OutputT>
     extends PTransform<PCollection<UserT>, WriteFilesResult<DestinationT>> {
   private static final Logger LOG = LoggerFactory.getLogger(WriteFiles.class);
 
+  /** For internal use by runners. */
+  @Internal
+  public static final Class<? extends WriteFiles> CONCRETE_CLASS = AutoValue_WriteFiles.class;
+
   // The maximum number of file writers to keep open in a single bundle at a time, since file
   // writers default to 64mb buffers. This comes into play when writing per-window files.
   // The first 20 files from a single WriteFiles transform will write files inline in the
@@ -497,7 +502,8 @@ public abstract class WriteFiles<UserT, DestinationT, OutputT>
               paneInfo,
               destination);
           writer = writeOperation.createWriter();
-          writer.open(uuid, destination);
+          writer.setDestination(destination);
+          writer.open(uuid);
           writers.put(key, writer);
           LOG.debug("Done opening writer");
         } else {
@@ -623,7 +629,7 @@ public abstract class WriteFiles<UserT, DestinationT, OutputT>
               "ApplyShardingKey",
               ParDo.of(new ApplyShardingKeyFn(numShardsView, destinationCoder))
                   .withSideInputs(
-                      numShardsView == null
+                      (numShardsView == null)
                           ? ImmutableList.<PCollectionView<Integer>>of()
                           : ImmutableList.of(numShardsView)))
           .setCoder(KvCoder.of(ShardedKeyCoder.of(VarIntCoder.of()), input.getCoder()))
@@ -706,7 +712,8 @@ public abstract class WriteFiles<UserT, DestinationT, OutputT>
               c.pane(),
               destination);
           writer = writeOperation.createWriter();
-          writer.open(uuid, destination);
+          writer.setDestination(destination);
+          writer.open(uuid);
           writers.put(destination, writer);
         }
         writeOrClose(writer, getDynamicDestinations().formatRecord(input));
@@ -724,6 +731,10 @@ public abstract class WriteFiles<UserT, DestinationT, OutputT>
           throw e;
         }
         int shard = c.element().getKey().getShardNumber();
+        checkArgument(
+            shard != UNKNOWN_SHARDNUM,
+            "Shard should have been set, but is unset for element %s",
+            c.element());
         c.output(new FileResult<>(writer.getOutputFile(), shard, window, c.pane(), entry.getKey()));
       }
     }
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
index 561d036..0c9bdc1 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
@@ -52,6 +52,8 @@ import org.apache.beam.sdk.io.FileBasedSink.WriteOperation;
 import org.apache.beam.sdk.io.FileBasedSink.Writer;
 import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
 import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.values.KV;
 import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
 import org.apache.commons.compress.compressors.deflate.DeflateCompressorInputStream;
@@ -100,7 +102,7 @@ public class FileBasedSinkTest {
 
     SimpleSink.SimpleWriter<Void> writer =
         buildWriteOperationWithTempDir(getBaseTempDirectory()).createWriter();
-    writer.open(testUid, null);
+    writer.open(testUid);
     for (String value : values) {
       writer.write(value);
     }
@@ -196,14 +198,14 @@ public class FileBasedSinkTest {
           new FileResult<Void>(
               LocalResources.fromFile(temporaryFiles.get(i), false),
               UNKNOWN_SHARDNUM,
-              null,
-              null,
+              GlobalWindow.INSTANCE,
+              PaneInfo.ON_TIME_AND_ONLY_FIRING,
               null));
     }
 
     // TODO: test with null first argument?
     List<KV<FileResult<Void>, ResourceId>> resultsToFinalFilenames =
-        writeOp.finalizeDestination(null, null, null, fileResults);
+        writeOp.finalizeDestination(null, GlobalWindow.INSTANCE, null, fileResults);
     writeOp.moveToOutputFiles(resultsToFinalFilenames);
 
     for (int i = 0; i < numFiles; i++) {
@@ -213,7 +215,7 @@ public class FileBasedSinkTest {
               .getDynamicDestinations()
               .getFilenamePolicy(null)
               .unwindowedFilename(i, numFiles, CompressionType.UNCOMPRESSED);
-      assertTrue(new File(outputFilename.toString()).exists());
+      assertTrue(outputFilename.toString(), new File(outputFilename.toString()).exists());
       assertFalse(temporaryFiles.get(i).exists());
     }
 
@@ -292,7 +294,11 @@ public class FileBasedSinkTest {
       resultsToFinalFilenames.add(
           KV.of(
               new FileResult<Void>(
-                  LocalResources.fromFile(inputTmpFile, false), UNKNOWN_SHARDNUM, null, null, null),
+                  LocalResources.fromFile(inputTmpFile, false),
+                  UNKNOWN_SHARDNUM,
+                  GlobalWindow.INSTANCE,
+                  PaneInfo.ON_TIME_AND_ONLY_FIRING,
+                  null),
               finalFilename));
     }
 
@@ -354,22 +360,22 @@ public class FileBasedSinkTest {
         SimpleSink.makeSimpleSink(root, "file", "-NN", "test", Compression.UNCOMPRESSED);
     SimpleSink.SimpleWriteOperation<Void> writeOp = new SimpleSink.SimpleWriteOperation<>(sink);
 
-    ResourceId temp1 = root.resolve("temp1", StandardResolveOptions.RESOLVE_FILE);
-    ResourceId temp2 = root.resolve("temp2", StandardResolveOptions.RESOLVE_FILE);
-    ResourceId temp3 = root.resolve("temp3", StandardResolveOptions.RESOLVE_FILE);
-    // More than one shard does.
     try {
-      List<FileResult<Void>> results =
-          Lists.newArrayList(
-              new FileResult<Void>(temp1, 1 /* shard */, null, null, null),
-              new FileResult<Void>(temp2, 1 /* shard */, null, null, null),
-              new FileResult<Void>(temp3, 1 /* shard */, null, null, null));
-      writeOp.finalizeDestination(null, null, 5 /* numShards */, results);
+      List<FileResult<Void>> results = Lists.newArrayList();
+      for (int i = 0; i < 3; ++i) {
+        results.add(new FileResult<Void>(
+            root.resolve("temp" + i, StandardResolveOptions.RESOLVE_FILE),
+            1 /* shard - should be different, but is the same */,
+            GlobalWindow.INSTANCE,
+            PaneInfo.ON_TIME_AND_ONLY_FIRING,
+            null));
+      }
+      writeOp.finalizeDestination(null, GlobalWindow.INSTANCE, 5 /* numShards */, results);
       fail("Should have failed.");
     } catch (IllegalArgumentException exn) {
       assertThat(exn.getMessage(), containsString("generated the same name"));
+      assertThat(exn.getMessage(), containsString("temp0"));
       assertThat(exn.getMessage(), containsString("temp1"));
-      assertThat(exn.getMessage(), containsString("temp2"));
     }
   }
 
@@ -505,7 +511,7 @@ public class FileBasedSinkTest {
     expected.add("footer");
     expected.add("footer");
 
-    writer.open(testUid, null);
+    writer.open(testUid);
     writer.write("a");
     writer.write("b");
     writer.close();

-- 
To stop receiving notification emails like this one, please contact
"commits@beam.apache.org" <co...@beam.apache.org>.