You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/12/06 00:29:50 UTC

[beam] 03/13: non-null window/pane in FileResult

This is an automated email from the ASF dual-hosted git repository.

jkff pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit b2d0671185fa1bd7f100853c7921e555c84578e7
Author: Eugene Kirpichov <ki...@google.com>
AuthorDate: Wed Nov 15 18:25:14 2017 -0800

    non-null window/pane in FileResult
---
 .../java/org/apache/beam/sdk/io/FileBasedSink.java | 10 ++++--
 .../java/org/apache/beam/sdk/io/WriteFiles.java    | 38 ++++++++++------------
 2 files changed, 24 insertions(+), 24 deletions(-)

diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index 2108253..c8bdbfc 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -655,6 +655,7 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
         checkArgument(
             result.getShard() != UNKNOWN_SHARDNUM, "Should have set shard number on %s", result);
         ResourceId finalFilename = result.getDestinationFile(
+            windowedWrites,
             getSink().getDynamicDestinations(),
             effectiveNumShards,
             getSink().getWritableByteChannelFactory());
@@ -984,7 +985,7 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
   public static final class FileResult<DestinationT> {
     private final ResourceId tempFilename;
     private final int shard;
-    private final @Nullable BoundedWindow window;
+    private final BoundedWindow window;
     private final PaneInfo paneInfo;
     private final DestinationT destination;
 
@@ -992,9 +993,11 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
     public FileResult(
         ResourceId tempFilename,
         int shard,
-        @Nullable BoundedWindow window,
+        BoundedWindow window,
         PaneInfo paneInfo,
         DestinationT destination) {
+      checkArgument(window != null);
+      checkArgument(paneInfo != null);
       this.tempFilename = tempFilename;
       this.shard = shard;
       this.window = window;
@@ -1029,13 +1032,14 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
 
     @Experimental(Kind.FILESYSTEM)
     public ResourceId getDestinationFile(
+        boolean windowedWrites,
         DynamicDestinations<?, DestinationT, ?> dynamicDestinations,
         int numShards,
         OutputFileHints outputFileHints) {
       checkArgument(getShard() != UNKNOWN_SHARDNUM);
       checkArgument(numShards > 0);
       FilenamePolicy policy = dynamicDestinations.getFilenamePolicy(destination);
-      if (getWindow() != null) {
+      if (windowedWrites) {
         return policy.windowedFilename(
             getShard(), numShards, getWindow(), getPaneInfo(), outputFileHints);
       } else {
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
index 35b28a1..19457e6 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
@@ -71,6 +71,7 @@ import org.apache.beam.sdk.transforms.WithKeys;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.Window;
@@ -387,7 +388,6 @@ public class WriteFiles<UserT, DestinationT, OutputT>
   private class WriteBundles extends DoFn<UserT, FileResult<DestinationT>> {
     private final TupleTag<KV<ShardedKey<Integer>, UserT>> unwrittenRecordsTag;
     private final Coder<DestinationT> destinationCoder;
-    private final boolean windowedWrites;
 
     // Initialized in startBundle()
     private @Nullable Map<WriterKey<DestinationT>, Writer<DestinationT, OutputT>> writers;
@@ -395,10 +395,8 @@ public class WriteFiles<UserT, DestinationT, OutputT>
     private int spilledShardNum = UNKNOWN_SHARDNUM;
 
     WriteBundles(
-        boolean windowedWrites,
         TupleTag<KV<ShardedKey<Integer>, UserT>> unwrittenRecordsTag,
         Coder<DestinationT> destinationCoder) {
-      this.windowedWrites = windowedWrites;
       this.unwrittenRecordsTag = unwrittenRecordsTag;
       this.destinationCoder = destinationCoder;
     }
@@ -466,13 +464,11 @@ public class WriteFiles<UserT, DestinationT, OutputT>
           throw e;
         }
         BoundedWindow window = key.window;
-        FileResult<DestinationT> res =
-            windowedWrites
-                ? new FileResult<>(
-                    writer.getOutputFile(), UNKNOWN_SHARDNUM, window, key.paneInfo, key.destination)
-                : new FileResult<>(
-                    writer.getOutputFile(), UNKNOWN_SHARDNUM, null, null, key.destination);
-        c.output(res, window.maxTimestamp(), window);
+        c.output(
+            new FileResult<>(
+                writer.getOutputFile(), UNKNOWN_SHARDNUM, window, key.paneInfo, key.destination),
+            window.maxTimestamp(),
+            window);
       }
     }
 
@@ -535,14 +531,9 @@ public class WriteFiles<UserT, DestinationT, OutputT>
             shardNumberAssignment == ShardAssignment.ASSIGN_WHEN_WRITING
                 ? c.element().getKey().getShardNumber()
                 : UNKNOWN_SHARDNUM;
-        if (windowedWrites) {
-          c.output(
-              new FileResult<>(
-                  writer.getOutputFile(), shardNumber, window, c.pane(), entry.getKey()));
-        } else {
-          c.output(
-              new FileResult<>(writer.getOutputFile(), shardNumber, null, null, entry.getKey()));
-        }
+        c.output(
+            new FileResult<>(
+                writer.getOutputFile(), shardNumber, window, c.pane(), entry.getKey()));
       }
     }
 
@@ -706,7 +697,7 @@ public class WriteFiles<UserT, DestinationT, OutputT>
       PCollectionTuple writeTuple =
           input.apply(
               writeName,
-              ParDo.of(new WriteBundles(windowedWrites, unwrittedRecordsTag, destinationCoder))
+              ParDo.of(new WriteBundles(unwrittedRecordsTag, destinationCoder))
                   .withSideInputs(sideInputs)
                   .withOutputTags(writtenRecordsTag, TupleTagList.of(unwrittedRecordsTag)));
       PCollection<FileResult<DestinationT>> writtenBundleFiles =
@@ -1011,14 +1002,19 @@ public class WriteFiles<UserT, DestinationT, OutputT>
           writer.open(uuid, destination);
           writer.close();
           completeResults.add(
-              new FileResult<>(writer.getOutputFile(), shard, null, null, destination));
+              new FileResult<>(
+                  writer.getOutputFile(),
+                  shard,
+                  GlobalWindow.INSTANCE,
+                  PaneInfo.ON_TIME_AND_ONLY_FIRING,
+                  destination));
         }
         LOG.debug("Done creating extra shards for {}.", destination);
       }
       return
           writeOperation.buildOutputFilenames(
               destination,
-              null,
+              GlobalWindow.INSTANCE,
               (fixedNumShards == null) ? null : completeResults.size(),
               completeResults);
     }

-- 
To stop receiving notification emails like this one, please contact
"commits@beam.apache.org" <co...@beam.apache.org>.