You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/10/16 23:00:28 UTC
[1/5] beam git commit: Return null when timer not found instead of
crashing
Repository: beam
Updated Branches:
refs/heads/master a5cbd764b -> 52863efd7
Return null when timer not found instead of crashing
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e940456b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e940456b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e940456b
Branch: refs/heads/master
Commit: e940456bd95da3c8b79eb4666ad09280dccaedcf
Parents: 655227a
Author: Kenneth Knowles <ke...@apache.org>
Authored: Mon Oct 16 15:13:26 2017 -0700
Committer: Kenneth Knowles <ke...@apache.org>
Committed: Mon Oct 16 15:17:32 2017 -0700
----------------------------------------------------------------------
.../runners/core/InMemoryTimerInternals.java | 28 ++++++++++----------
1 file changed, 14 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/e940456b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
index c29ea19..c7b4ac6 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
@@ -25,6 +25,7 @@ import com.google.common.base.MoreObjects;
import com.google.common.collect.HashBasedTable;
import com.google.common.collect.Table;
import java.util.NavigableSet;
+import java.util.NoSuchElementException;
import java.util.TreeSet;
import javax.annotation.Nullable;
import org.apache.beam.sdk.state.TimeDomain;
@@ -71,21 +72,20 @@ public class InMemoryTimerInternals implements TimerInternals {
*/
@Nullable
public Instant getNextTimer(TimeDomain domain) {
- final TimerData data;
- switch (domain) {
- case EVENT_TIME:
- data = watermarkTimers.first();
- break;
- case PROCESSING_TIME:
- data = processingTimers.first();
- break;
- case SYNCHRONIZED_PROCESSING_TIME:
- data = synchronizedProcessingTimers.first();
- break;
- default:
- throw new IllegalArgumentException("Unexpected time domain: " + domain);
+ try {
+ switch (domain) {
+ case EVENT_TIME:
+ return watermarkTimers.first().getTimestamp();
+ case PROCESSING_TIME:
+ return processingTimers.first().getTimestamp();
+ case SYNCHRONIZED_PROCESSING_TIME:
+ return synchronizedProcessingTimers.first().getTimestamp();
+ default:
+ throw new IllegalArgumentException("Unexpected time domain: " + domain);
+ }
+ } catch (NoSuchElementException exc) {
+ return null;
}
- return (data == null) ? null : data.getTimestamp();
}
private NavigableSet<TimerData> timersForDomain(TimeDomain domain) {
[5/5] beam git commit: This closes #4001: [BEAM-3052] Set
end-of-window timer in AfterWatermarkStateMachine
Posted by ke...@apache.org.
This closes #4001: [BEAM-3052] Set end-of-window timer in AfterWatermarkStateMachine
Do not set EOW timer in TriggerStateMachineTester
Set end-of-window timer in AfterWatermarkStateMachine
Allow checking timers set in TriggerStateMachineTester
Return null when timer not found instead of crashing
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/52863efd
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/52863efd
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/52863efd
Branch: refs/heads/master
Commit: 52863efd7478d406597b5b162c61701314155f25
Parents: a5cbd76 3d36f63
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Oct 16 15:20:27 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Oct 16 15:20:27 2017 -0700
----------------------------------------------------------------------
.../runners/core/InMemoryTimerInternals.java | 28 +++++++++----------
.../triggers/AfterWatermarkStateMachine.java | 8 +++++-
.../AfterWatermarkStateMachineTest.java | 29 ++++++++++++++++++++
.../triggers/TriggerStateMachineTester.java | 13 ++++-----
4 files changed, 56 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
[4/5] beam git commit: Do not set EOW timer in
TriggerStateMachineTester
Posted by ke...@apache.org.
Do not set EOW timer in TriggerStateMachineTester
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3d36f63c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3d36f63c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3d36f63c
Branch: refs/heads/master
Commit: 3d36f63cbb6de5ce7115d34a1432835407cd666a
Parents: 45c65c5
Author: Kenneth Knowles <ke...@apache.org>
Authored: Mon Oct 16 15:03:33 2017 -0700
Committer: Kenneth Knowles <ke...@apache.org>
Committed: Mon Oct 16 15:19:14 2017 -0700
----------------------------------------------------------------------
.../beam/runners/core/triggers/TriggerStateMachineTester.java | 7 -------
1 file changed, 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/3d36f63c/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java
index b41977d..0f38be0 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java
@@ -269,11 +269,6 @@ public class TriggerStateMachineTester<InputT, W extends BoundedWindow> {
for (W window : assignedWindows) {
activeWindows.addActiveForTesting(window);
-
- // Today, triggers assume onTimer firing at the watermark time, whether or not they
- // explicitly set the timer themselves. So this tester must set it.
- timerInternals.setTimer(
- TimerData.of(windowNamespace(window), window.maxTimestamp(), TimeDomain.EVENT_TIME));
}
windowedValues.add(WindowedValue.of(value, timestamp, assignedWindows, PaneInfo.NO_FIRING));
@@ -357,8 +352,6 @@ public class TriggerStateMachineTester<InputT, W extends BoundedWindow> {
executableTrigger.invokeOnMerge(contextFactory.createOnMergeContext(mergeResult,
new TestTimers(windowNamespace(mergeResult)), executableTrigger,
getFinishedSet(mergeResult), mergingFinishedSets));
- timerInternals.setTimer(TimerData.of(
- windowNamespace(mergeResult), mergeResult.maxTimestamp(), TimeDomain.EVENT_TIME));
}
});
}
[2/5] beam git commit: Set end-of-window timer in
AfterWatermarkStateMachine
Posted by ke...@apache.org.
Set end-of-window timer in AfterWatermarkStateMachine
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/45c65c55
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/45c65c55
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/45c65c55
Branch: refs/heads/master
Commit: 45c65c557bf79f8b4c5975eee15b5763e45882be
Parents: 64fb19d
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Oct 12 13:38:27 2017 -0700
Committer: Kenneth Knowles <ke...@apache.org>
Committed: Mon Oct 16 15:19:14 2017 -0700
----------------------------------------------------------------------
.../triggers/AfterWatermarkStateMachine.java | 8 +++++-
.../AfterWatermarkStateMachineTest.java | 29 ++++++++++++++++++++
2 files changed, 36 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/45c65c55/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachine.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachine.java
index 509c96b..2c99722 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachine.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachine.java
@@ -100,6 +100,10 @@ public class AfterWatermarkStateMachine {
@Override
public void onElement(OnElementContext c) throws Exception {
+ if (!endOfWindowReached(c)) {
+ c.setTimer(c.window().maxTimestamp(), TimeDomain.EVENT_TIME);
+ }
+
if (!c.trigger().isMerging()) {
// If merges can never happen, we just run the unfinished subtrigger
c.trigger().firstUnfinishedSubTrigger().invokeOnElement(c);
@@ -270,7 +274,9 @@ public class AfterWatermarkStateMachine {
// We're interested in knowing when the input watermark passes the end of the window.
// (It is possible this has already happened, in which case the timer will be fired
// almost immediately).
- c.setTimer(c.window().maxTimestamp(), TimeDomain.EVENT_TIME);
+ if (!endOfWindowReached(c)) {
+ c.setTimer(c.window().maxTimestamp(), TimeDomain.EVENT_TIME);
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/45c65c55/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachineTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachineTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachineTest.java
index 45a5cfb..65c8be3 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachineTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachineTest.java
@@ -17,8 +17,11 @@
*/
package org.apache.beam.runners.core.triggers;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.verify;
@@ -26,6 +29,7 @@ import static org.mockito.Mockito.when;
import org.apache.beam.runners.core.triggers.TriggerStateMachine.OnMergeContext;
import org.apache.beam.runners.core.triggers.TriggerStateMachineTester.SimpleTriggerStateMachineTester;
+import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.Sessions;
@@ -104,6 +108,31 @@ public class AfterWatermarkStateMachineTest {
}
@Test
+ public void testTimerForEndOfWindow() throws Exception {
+ tester = TriggerStateMachineTester.forTrigger(
+ AfterWatermarkStateMachine.pastEndOfWindow(),
+ FixedWindows.of(Duration.millis(100)));
+
+ assertThat(tester.getNextTimer(TimeDomain.EVENT_TIME), nullValue());
+ injectElements(1);
+ IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(100));
+ assertThat(tester.getNextTimer(TimeDomain.EVENT_TIME), equalTo(window.maxTimestamp()));
+ }
+
+ @Test
+ public void testTimerForEndOfWindowCompound() throws Exception {
+ tester =
+ TriggerStateMachineTester.forTrigger(
+ AfterWatermarkStateMachine.pastEndOfWindow().withEarlyFirings(NeverStateMachine.ever()),
+ FixedWindows.of(Duration.millis(100)));
+
+ assertThat(tester.getNextTimer(TimeDomain.EVENT_TIME), nullValue());
+ injectElements(1);
+ IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(100));
+ assertThat(tester.getNextTimer(TimeDomain.EVENT_TIME), equalTo(window.maxTimestamp()));
+ }
+
+ @Test
public void testAtWatermarkAndLate() throws Exception {
tester = TriggerStateMachineTester.forTrigger(
AfterWatermarkStateMachine.pastEndOfWindow()
[3/5] beam git commit: Allow checking timers set in
TriggerStateMachineTester
Posted by ke...@apache.org.
Allow checking timers set in TriggerStateMachineTester
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/64fb19da
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/64fb19da
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/64fb19da
Branch: refs/heads/master
Commit: 64fb19da42e3d26d0a9dae41b19dd7bf77ff49c7
Parents: e940456
Author: Kenneth Knowles <ke...@apache.org>
Authored: Mon Oct 16 15:16:07 2017 -0700
Committer: Kenneth Knowles <ke...@apache.org>
Committed: Mon Oct 16 15:19:14 2017 -0700
----------------------------------------------------------------------
.../beam/runners/core/triggers/TriggerStateMachineTester.java | 6 ++++++
1 file changed, 6 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/64fb19da/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java
index 9a10f53..b41977d 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java
@@ -198,6 +198,12 @@ public class TriggerStateMachineTester<InputT, W extends BoundedWindow> {
}
}
+ /** Retrieves the next timer for this time domain, if any, for use in assertions. */
+ @Nullable
+ public Instant getNextTimer(TimeDomain domain) {
+ return timerInternals.getNextTimer(domain);
+ }
+
/**
* Returns {@code true} if the {@link TriggerStateMachine} under test is finished for the given
* window.