You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/06/30 21:31:30 UTC

[28/50] beam git commit: ReduceFnRunner.onTrigger: add short circuit for empty pane, and move inputWM and pane after the short circuit.

ReduceFnRunner.onTrigger: add short circuit for empty pane, and move inputWM and pane after the short circuit.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2efb0d56
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2efb0d56
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2efb0d56

Branch: refs/heads/gearpump-runner
Commit: 2efb0d561fc62ba44bf71db6937a54708944f0f6
Parents: 38dd12d
Author: Author: 波特 <ha...@alibaba-inc.com>
Authored: Fri May 26 17:46:55 2017 +0800
Committer: Pei He <pe...@apache.org>
Committed: Thu Jun 29 14:01:54 2017 +0800

----------------------------------------------------------------------
 .../org/apache/beam/runners/core/ReduceFnRunner.java  | 14 +++++++++-----
 1 file changed, 9 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/2efb0d56/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
index a33bac1..ef33bef 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
@@ -953,11 +953,6 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
       ReduceFn<K, InputT, OutputT, W>.Context renamedContext,
       final boolean isFinished, boolean isEndOfWindow)
           throws Exception {
-    Instant inputWM = timerInternals.currentInputWatermarkTime();
-
-    // Calculate the pane info.
-    final PaneInfo pane = paneInfoTracker.getNextPaneInfo(directContext, isFinished).read();
-
     // Extract the window hold, and as a side effect clear it.
     final WatermarkHold.OldAndNewHolds pair =
         watermarkHold.extractAndRelease(renamedContext, isFinished).read();
@@ -966,7 +961,13 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
     @Nullable Instant newHold = pair.newHold;
 
     final boolean isEmpty = nonEmptyPanes.isEmpty(renamedContext.state()).read();
+    if (isEmpty
+        && windowingStrategy.getClosingBehavior() == ClosingBehavior.FIRE_IF_NON_EMPTY
+        && windowingStrategy.getOnTimeBehavior() == Window.OnTimeBehavior.FIRE_IF_NON_EMPTY) {
+      return newHold;
+    }
 
+    Instant inputWM = timerInternals.currentInputWatermarkTime();
     if (newHold != null) {
       // We can't be finished yet.
       checkState(
@@ -998,6 +999,9 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
       }
     }
 
+    // Calculate the pane info.
+    final PaneInfo pane = paneInfoTracker.getNextPaneInfo(directContext, isFinished).read();
+
     // Only emit a pane if it has data or empty panes are observable.
     if (needToEmit(isEmpty, isFinished, pane.getTiming())) {
       // Run reduceFn.onTrigger method.