You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/06/30 21:31:30 UTC
[28/50] beam git commit: ReduceFnRunner.onTrigger: add short circuit
for empty pane, and move inputWM and pane after the short circuit.
ReduceFnRunner.onTrigger: add short circuit for empty pane, and move inputWM and pane after the short circuit.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2efb0d56
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2efb0d56
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2efb0d56
Branch: refs/heads/gearpump-runner
Commit: 2efb0d561fc62ba44bf71db6937a54708944f0f6
Parents: 38dd12d
Author: Author: 波特 <ha...@alibaba-inc.com>
Authored: Fri May 26 17:46:55 2017 +0800
Committer: Pei He <pe...@apache.org>
Committed: Thu Jun 29 14:01:54 2017 +0800
----------------------------------------------------------------------
.../org/apache/beam/runners/core/ReduceFnRunner.java | 14 +++++++++-----
1 file changed, 9 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/2efb0d56/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
index a33bac1..ef33bef 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
@@ -953,11 +953,6 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
ReduceFn<K, InputT, OutputT, W>.Context renamedContext,
final boolean isFinished, boolean isEndOfWindow)
throws Exception {
- Instant inputWM = timerInternals.currentInputWatermarkTime();
-
- // Calculate the pane info.
- final PaneInfo pane = paneInfoTracker.getNextPaneInfo(directContext, isFinished).read();
-
// Extract the window hold, and as a side effect clear it.
final WatermarkHold.OldAndNewHolds pair =
watermarkHold.extractAndRelease(renamedContext, isFinished).read();
@@ -966,7 +961,13 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
@Nullable Instant newHold = pair.newHold;
final boolean isEmpty = nonEmptyPanes.isEmpty(renamedContext.state()).read();
+ if (isEmpty
+ && windowingStrategy.getClosingBehavior() == ClosingBehavior.FIRE_IF_NON_EMPTY
+ && windowingStrategy.getOnTimeBehavior() == Window.OnTimeBehavior.FIRE_IF_NON_EMPTY) {
+ return newHold;
+ }
+ Instant inputWM = timerInternals.currentInputWatermarkTime();
if (newHold != null) {
// We can't be finished yet.
checkState(
@@ -998,6 +999,9 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
}
}
+ // Calculate the pane info.
+ final PaneInfo pane = paneInfoTracker.getNextPaneInfo(directContext, isFinished).read();
+
// Only emit a pane if it has data or empty panes are observable.
if (needToEmit(isEmpty, isFinished, pane.getTiming())) {
// Run reduceFn.onTrigger method.