You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/03/03 04:05:41 UTC
[1/3] incubator-beam git commit: Fix up checkstyle
Repository: incubator-beam
Updated Branches:
refs/heads/master f87f35b70 -> 7582212f5
Fix up checkstyle
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/542b8db3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/542b8db3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/542b8db3
Branch: refs/heads/master
Commit: 542b8db385dce20954d3c0858686fd354335e3ed
Parents: 968494f
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Mar 2 18:53:08 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Wed Mar 2 19:01:09 2016 -0800
----------------------------------------------------------------------
.../cloud/dataflow/sdk/options/DataflowPipelineDebugOptions.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/542b8db3/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineDebugOptions.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineDebugOptions.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineDebugOptions.java
index a14be44..1da2b43 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineDebugOptions.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineDebugOptions.java
@@ -172,7 +172,7 @@ public interface DataflowPipelineDebugOptions extends PipelineOptions {
/**
* Whether to update the currently running pipeline with the same name as this one.
- *
+ *
* @deprecated This property is replaced by @{link DataflowPipelineOptions#getUpdate()}
*/
@Deprecated
[3/3] incubator-beam git commit: This closes #13
Posted by ke...@apache.org.
This closes #13
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7582212f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7582212f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7582212f
Branch: refs/heads/master
Commit: 7582212f58deb8628dfabfb26b63c9391ee3209a
Parents: f87f35b 542b8db
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Mar 2 19:05:22 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Wed Mar 2 19:05:22 2016 -0800
----------------------------------------------------------------------
.../options/DataflowPipelineDebugOptions.java | 2 +-
.../dataflow/sdk/util/PaneInfoTracker.java | 30 +++++++++----------
.../cloud/dataflow/sdk/util/ReduceFnRunner.java | 31 ++++++++------------
.../dataflow/sdk/util/ReduceFnRunnerTest.java | 9 +++---
4 files changed, 33 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
[2/3] incubator-beam git commit: [BEAM-80] Decide EARLY or ON_TIME
based on input watermark
Posted by ke...@apache.org.
[BEAM-80] Decide EARLY or ON_TIME based on input watermark
This change is based on trigger specs.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/968494f3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/968494f3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/968494f3
Branch: refs/heads/master
Commit: 968494f3e427cc242d91ef6de25f5c7c408540dc
Parents: f87f35b
Author: Pei He <pe...@gmail.com>
Authored: Wed Mar 2 16:28:43 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Wed Mar 2 19:01:09 2016 -0800
----------------------------------------------------------------------
.../dataflow/sdk/util/PaneInfoTracker.java | 30 +++++++++----------
.../cloud/dataflow/sdk/util/ReduceFnRunner.java | 31 ++++++++------------
.../dataflow/sdk/util/ReduceFnRunnerTest.java | 9 +++---
3 files changed, 32 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/968494f3/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PaneInfoTracker.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PaneInfoTracker.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PaneInfoTracker.java
index 38499c2..a7818a3 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PaneInfoTracker.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PaneInfoTracker.java
@@ -54,13 +54,11 @@ public class PaneInfoTracker {
* Return a ({@link ReadableState} for) the pane info appropriate for {@code context}. The pane
* info includes the timing for the pane, who's calculation is quite subtle.
*
- * @param isEndOfWindow should be {@code true} only if the pane is being emitted
- * because an end-of-window timer has fired and the trigger agreed we should fire.
* @param isFinal should be {@code true} only if the triggering machinery can guarantee
* no further firings for the
*/
- public ReadableState<PaneInfo> getNextPaneInfo(ReduceFn<?, ?, ?, ?>.Context context,
- final boolean isEndOfWindow, final boolean isFinal) {
+ public ReadableState<PaneInfo> getNextPaneInfo(
+ ReduceFn<?, ?, ?, ?>.Context context, final boolean isFinal) {
final Object key = context.key();
final ReadableState<PaneInfo> previousPaneFuture =
context.state().access(PaneInfoTracker.PANE_INFO_TAG);
@@ -76,7 +74,7 @@ public class PaneInfoTracker {
@Override
public PaneInfo read() {
PaneInfo previousPane = previousPaneFuture.read();
- return describePane(key, windowMaxTimestamp, previousPane, isEndOfWindow, isFinal);
+ return describePane(key, windowMaxTimestamp, previousPane, isFinal);
}
};
}
@@ -85,8 +83,8 @@ public class PaneInfoTracker {
context.state().access(PANE_INFO_TAG).write(currentPane);
}
- private <W> PaneInfo describePane(Object key, Instant windowMaxTimestamp, PaneInfo previousPane,
- boolean isEndOfWindow, boolean isFinal) {
+ private <W> PaneInfo describePane(
+ Object key, Instant windowMaxTimestamp, PaneInfo previousPane, boolean isFinal) {
boolean isFirst = previousPane == null;
Timing previousTiming = isFirst ? null : previousPane.getTiming();
long index = isFirst ? 0 : previousPane.getIndex() + 1;
@@ -104,26 +102,28 @@ public class PaneInfoTracker {
// if the output watermark is behind the end of the window.
boolean onlyEarlyPanesSoFar = previousTiming == null || previousTiming == Timing.EARLY;
+ // True is the input watermark hasn't passed the window's max timestamp.
+ boolean isEarlyForInput = inputWM == null || !inputWM.isAfter(windowMaxTimestamp);
+
Timing timing;
if (isLateForOutput || !onlyEarlyPanesSoFar) {
// The output watermark has already passed the end of this window, or we have already
// emitted a non-EARLY pane. Irrespective of how this pane was triggered we must
// consider this pane LATE.
timing = Timing.LATE;
- } else if (isEndOfWindow) {
- // This is the unique ON_TIME firing for the window.
- timing = Timing.ON_TIME;
- } else {
- // All other cases are EARLY.
+ } else if (isEarlyForInput) {
+ // This is an EARLY firing.
timing = Timing.EARLY;
nonSpeculativeIndex = -1;
+ } else {
+ // This is the unique ON_TIME firing for the window.
+ timing = Timing.ON_TIME;
}
WindowTracing.debug(
"describePane: {} pane (prev was {}) for key:{}; windowMaxTimestamp:{}; "
- + "inputWatermark:{}; outputWatermark:{}; isEndOfWindow:{}; isLateForOutput:{}",
- timing, previousTiming, key, windowMaxTimestamp, inputWM, outputWM, isEndOfWindow,
- isLateForOutput);
+ + "inputWatermark:{}; outputWatermark:{}; isLateForOutput:{}",
+ timing, previousTiming, key, windowMaxTimestamp, inputWM, outputWM, isLateForOutput);
if (previousPane != null) {
// Timing transitions should follow EARLY* ON_TIME? LATE*
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/968494f3/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java
index fe5c474..1a009bb 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java
@@ -289,7 +289,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
ReduceFn<K, InputT, OutputT, W>.Context renamedContext =
contextFactory.base(mergedWindow, StateStyle.RENAMED);
triggerRunner.prefetchShouldFire(mergedWindow, directContext.state());
- emitIfAppropriate(directContext, renamedContext, false/* isEndOfWindow */);
+ emitIfAppropriate(directContext, renamedContext);
}
// We're all done with merging and emitting elements so can compress the activeWindow state.
@@ -532,14 +532,6 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
"ReduceFnRunner.onTimer: Note that timer {} is for non-ACTIVE window {}", timer, window);
}
- // If this is an end-of-window timer then:
- // 1. We need to set a GC timer
- // 2. We need to let the PaneInfoTracker know that we are transitioning from early to late,
- // and possibly emitting an on-time pane.
- boolean isEndOfWindow =
- TimeDomain.EVENT_TIME == timer.getDomain()
- && timer.getTimestamp().equals(window.maxTimestamp());
-
// If this is a garbage collection timer then we should trigger and garbage collect the window.
Instant cleanupTime = window.maxTimestamp().plus(windowingStrategy.getAllowedLateness());
boolean isGarbageCollection =
@@ -556,7 +548,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
// We need to call onTrigger to emit the final pane if required.
// The final pane *may* be ON_TIME if no prior ON_TIME pane has been emitted,
// and the watermark has passed the end of the window.
- onTrigger(directContext, renamedContext, isEndOfWindow, true/* isFinished */);
+ onTrigger(directContext, renamedContext, true/* isFinished */);
}
// Cleanup flavor B: Clear all the remaining state for this window since we'll never
@@ -569,9 +561,12 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
key, window, timer.getTimestamp(), timerInternals.currentInputWatermarkTime(),
timerInternals.currentOutputWatermarkTime());
if (windowIsActive) {
- emitIfAppropriate(directContext, renamedContext, isEndOfWindow);
+ emitIfAppropriate(directContext, renamedContext);
}
+ // If this is an end-of-window timer then, we need to set a GC timer
+ boolean isEndOfWindow = TimeDomain.EVENT_TIME == timer.getDomain()
+ && timer.getTimestamp().equals(window.maxTimestamp());
if (isEndOfWindow) {
// Since we are processing an on-time firing we should schedule the garbage collection
// timer. (If getAllowedLateness is zero then the timer event will be considered a
@@ -649,7 +644,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
* Possibly emit a pane if a trigger is ready to fire or timers require it, and cleanup state.
*/
private void emitIfAppropriate(ReduceFn<K, InputT, OutputT, W>.Context directContext,
- ReduceFn<K, InputT, OutputT, W>.Context renamedContext, boolean isEndOfWindow)
+ ReduceFn<K, InputT, OutputT, W>.Context renamedContext)
throws Exception {
if (!triggerRunner.shouldFire(
directContext.window(), directContext.timers(), directContext.state())) {
@@ -667,7 +662,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
// Run onTrigger to produce the actual pane contents.
// As a side effect it will clear all element holds, but not necessarily any
// end-of-window or garbage collection holds.
- onTrigger(directContext, renamedContext, isEndOfWindow, isFinished);
+ onTrigger(directContext, renamedContext, isFinished);
// Now that we've triggered, the pane is empty.
nonEmptyPanes.clearPane(renamedContext.state());
@@ -692,13 +687,12 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
/**
* Do we need to emit a pane?
*/
- private boolean needToEmit(
- boolean isEmpty, boolean isEndOfWindow, boolean isFinished, PaneInfo.Timing timing) {
+ private boolean needToEmit(boolean isEmpty, boolean isFinished, PaneInfo.Timing timing) {
if (!isEmpty) {
// The pane has elements.
return true;
}
- if (isEndOfWindow && timing == Timing.ON_TIME) {
+ if (timing == Timing.ON_TIME) {
// This is the unique ON_TIME pane.
return true;
}
@@ -715,14 +709,13 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
private void onTrigger(
final ReduceFn<K, InputT, OutputT, W>.Context directContext,
ReduceFn<K, InputT, OutputT, W>.Context renamedContext,
- boolean isEndOfWindow,
boolean isFinished)
throws Exception {
// Prefetch necessary states
ReadableState<Instant> outputTimestampFuture =
watermarkHold.extractAndRelease(renamedContext, isFinished).readLater();
ReadableState<PaneInfo> paneFuture =
- paneInfoTracker.getNextPaneInfo(directContext, isEndOfWindow, isFinished).readLater();
+ paneInfoTracker.getNextPaneInfo(directContext, isFinished).readLater();
ReadableState<Boolean> isEmptyFuture =
nonEmptyPanes.isEmpty(renamedContext.state()).readLater();
@@ -735,7 +728,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
final Instant outputTimestamp = outputTimestampFuture.read();
// Only emit a pane if it has data or empty panes are observable.
- if (needToEmit(isEmptyFuture.read(), isEndOfWindow, isFinished, pane.getTiming())) {
+ if (needToEmit(isEmptyFuture.read(), isFinished, pane.getTiming())) {
// Run reduceFn.onTrigger method.
final List<W> windows = Collections.singletonList(directContext.window());
ReduceFn<K, InputT, OutputT, W>.OnTriggerContext renamedTriggerContext =
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/968494f3/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunnerTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunnerTest.java
index c85b1ca..4fb3e37 100644
--- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunnerTest.java
+++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunnerTest.java
@@ -479,19 +479,20 @@ public class ReduceFnRunnerTest {
when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
injectElement(tester, 3);
assertThat(tester.extractOutput(), contains(
- // This is late, because the trigger wasn't waiting for AfterWatermark
- WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(false, false, Timing.EARLY, 2, -1))));
+ WindowMatchers.valueWithPaneInfo(
+ PaneInfo.createPane(false, false, Timing.ON_TIME, 2, 0))));
when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
injectElement(tester, 4);
assertThat(tester.extractOutput(), contains(
- WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(false, false, Timing.EARLY, 3, -1))));
+ WindowMatchers.valueWithPaneInfo(
+ PaneInfo.createPane(false, false, Timing.LATE, 3, 1))));
when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
triggerShouldFinish(mockTrigger);
injectElement(tester, 5);
assertThat(tester.extractOutput(), contains(
- WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(false, true, Timing.EARLY, 4, -1))));
+ WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(false, true, Timing.LATE, 4, 2))));
}
@Test