You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/10/16 23:00:28 UTC

[1/5] beam git commit: Return null when timer not found instead of crashing

Repository: beam
Updated Branches:
  refs/heads/master a5cbd764b -> 52863efd7


Return null when timer not found instead of crashing


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e940456b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e940456b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e940456b

Branch: refs/heads/master
Commit: e940456bd95da3c8b79eb4666ad09280dccaedcf
Parents: 655227a
Author: Kenneth Knowles <ke...@apache.org>
Authored: Mon Oct 16 15:13:26 2017 -0700
Committer: Kenneth Knowles <ke...@apache.org>
Committed: Mon Oct 16 15:17:32 2017 -0700

----------------------------------------------------------------------
 .../runners/core/InMemoryTimerInternals.java    | 28 ++++++++++----------
 1 file changed, 14 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/e940456b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
index c29ea19..c7b4ac6 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
@@ -25,6 +25,7 @@ import com.google.common.base.MoreObjects;
 import com.google.common.collect.HashBasedTable;
 import com.google.common.collect.Table;
 import java.util.NavigableSet;
+import java.util.NoSuchElementException;
 import java.util.TreeSet;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.state.TimeDomain;
@@ -71,21 +72,20 @@ public class InMemoryTimerInternals implements TimerInternals {
    */
   @Nullable
   public Instant getNextTimer(TimeDomain domain) {
-    final TimerData data;
-    switch (domain) {
-      case EVENT_TIME:
-        data = watermarkTimers.first();
-        break;
-      case PROCESSING_TIME:
-        data = processingTimers.first();
-        break;
-      case SYNCHRONIZED_PROCESSING_TIME:
-        data = synchronizedProcessingTimers.first();
-        break;
-      default:
-        throw new IllegalArgumentException("Unexpected time domain: " + domain);
+    try {
+      switch (domain) {
+        case EVENT_TIME:
+          return watermarkTimers.first().getTimestamp();
+        case PROCESSING_TIME:
+          return processingTimers.first().getTimestamp();
+        case SYNCHRONIZED_PROCESSING_TIME:
+          return synchronizedProcessingTimers.first().getTimestamp();
+        default:
+          throw new IllegalArgumentException("Unexpected time domain: " + domain);
+      }
+    } catch (NoSuchElementException exc) {
+      return null;
     }
-    return (data == null) ? null : data.getTimestamp();
   }
 
   private NavigableSet<TimerData> timersForDomain(TimeDomain domain) {


[5/5] beam git commit: This closes #4001: [BEAM-3052] Set end-of-window timer in AfterWatermarkStateMachine

Posted by ke...@apache.org.
This closes #4001: [BEAM-3052] Set end-of-window timer in AfterWatermarkStateMachine

  Do not set EOW timer in TriggerStateMachineTester
  Set end-of-window timer in AfterWatermarkStateMachine
  Allow checking timers set in TriggerStateMachineTester
  Return null when timer not found instead of crashing


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/52863efd
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/52863efd
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/52863efd

Branch: refs/heads/master
Commit: 52863efd7478d406597b5b162c61701314155f25
Parents: a5cbd76 3d36f63
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Oct 16 15:20:27 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Oct 16 15:20:27 2017 -0700

----------------------------------------------------------------------
 .../runners/core/InMemoryTimerInternals.java    | 28 +++++++++----------
 .../triggers/AfterWatermarkStateMachine.java    |  8 +++++-
 .../AfterWatermarkStateMachineTest.java         | 29 ++++++++++++++++++++
 .../triggers/TriggerStateMachineTester.java     | 13 ++++-----
 4 files changed, 56 insertions(+), 22 deletions(-)
----------------------------------------------------------------------



[4/5] beam git commit: Do not set EOW timer in TriggerStateMachineTester

Posted by ke...@apache.org.
Do not set EOW timer in TriggerStateMachineTester


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3d36f63c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3d36f63c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3d36f63c

Branch: refs/heads/master
Commit: 3d36f63cbb6de5ce7115d34a1432835407cd666a
Parents: 45c65c5
Author: Kenneth Knowles <ke...@apache.org>
Authored: Mon Oct 16 15:03:33 2017 -0700
Committer: Kenneth Knowles <ke...@apache.org>
Committed: Mon Oct 16 15:19:14 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/core/triggers/TriggerStateMachineTester.java | 7 -------
 1 file changed, 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/3d36f63c/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java
index b41977d..0f38be0 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java
@@ -269,11 +269,6 @@ public class TriggerStateMachineTester<InputT, W extends BoundedWindow> {
 
         for (W window : assignedWindows) {
           activeWindows.addActiveForTesting(window);
-
-          // Today, triggers assume onTimer firing at the watermark time, whether or not they
-          // explicitly set the timer themselves. So this tester must set it.
-          timerInternals.setTimer(
-              TimerData.of(windowNamespace(window), window.maxTimestamp(), TimeDomain.EVENT_TIME));
         }
 
         windowedValues.add(WindowedValue.of(value, timestamp, assignedWindows, PaneInfo.NO_FIRING));
@@ -357,8 +352,6 @@ public class TriggerStateMachineTester<InputT, W extends BoundedWindow> {
         executableTrigger.invokeOnMerge(contextFactory.createOnMergeContext(mergeResult,
             new TestTimers(windowNamespace(mergeResult)), executableTrigger,
             getFinishedSet(mergeResult), mergingFinishedSets));
-        timerInternals.setTimer(TimerData.of(
-            windowNamespace(mergeResult), mergeResult.maxTimestamp(), TimeDomain.EVENT_TIME));
       }
     });
   }


[2/5] beam git commit: Set end-of-window timer in AfterWatermarkStateMachine

Posted by ke...@apache.org.
Set end-of-window timer in AfterWatermarkStateMachine


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/45c65c55
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/45c65c55
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/45c65c55

Branch: refs/heads/master
Commit: 45c65c557bf79f8b4c5975eee15b5763e45882be
Parents: 64fb19d
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Oct 12 13:38:27 2017 -0700
Committer: Kenneth Knowles <ke...@apache.org>
Committed: Mon Oct 16 15:19:14 2017 -0700

----------------------------------------------------------------------
 .../triggers/AfterWatermarkStateMachine.java    |  8 +++++-
 .../AfterWatermarkStateMachineTest.java         | 29 ++++++++++++++++++++
 2 files changed, 36 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/45c65c55/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachine.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachine.java
index 509c96b..2c99722 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachine.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachine.java
@@ -100,6 +100,10 @@ public class AfterWatermarkStateMachine {
 
     @Override
     public void onElement(OnElementContext c) throws Exception {
+      if (!endOfWindowReached(c)) {
+        c.setTimer(c.window().maxTimestamp(), TimeDomain.EVENT_TIME);
+      }
+
       if (!c.trigger().isMerging()) {
         // If merges can never happen, we just run the unfinished subtrigger
         c.trigger().firstUnfinishedSubTrigger().invokeOnElement(c);
@@ -270,7 +274,9 @@ public class AfterWatermarkStateMachine {
       // We're interested in knowing when the input watermark passes the end of the window.
       // (It is possible this has already happened, in which case the timer will be fired
       // almost immediately).
-      c.setTimer(c.window().maxTimestamp(), TimeDomain.EVENT_TIME);
+      if (!endOfWindowReached(c)) {
+        c.setTimer(c.window().maxTimestamp(), TimeDomain.EVENT_TIME);
+      }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/45c65c55/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachineTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachineTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachineTest.java
index 45a5cfb..65c8be3 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachineTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachineTest.java
@@ -17,8 +17,11 @@
  */
 package org.apache.beam.runners.core.triggers;
 
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.nullValue;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.verify;
@@ -26,6 +29,7 @@ import static org.mockito.Mockito.when;
 
 import org.apache.beam.runners.core.triggers.TriggerStateMachine.OnMergeContext;
 import org.apache.beam.runners.core.triggers.TriggerStateMachineTester.SimpleTriggerStateMachineTester;
+import org.apache.beam.sdk.state.TimeDomain;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.Sessions;
@@ -104,6 +108,31 @@ public class AfterWatermarkStateMachineTest {
   }
 
   @Test
+  public void testTimerForEndOfWindow() throws Exception {
+    tester = TriggerStateMachineTester.forTrigger(
+        AfterWatermarkStateMachine.pastEndOfWindow(),
+        FixedWindows.of(Duration.millis(100)));
+
+    assertThat(tester.getNextTimer(TimeDomain.EVENT_TIME), nullValue());
+    injectElements(1);
+    IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(100));
+    assertThat(tester.getNextTimer(TimeDomain.EVENT_TIME), equalTo(window.maxTimestamp()));
+  }
+
+  @Test
+  public void testTimerForEndOfWindowCompound() throws Exception {
+    tester =
+        TriggerStateMachineTester.forTrigger(
+            AfterWatermarkStateMachine.pastEndOfWindow().withEarlyFirings(NeverStateMachine.ever()),
+            FixedWindows.of(Duration.millis(100)));
+
+    assertThat(tester.getNextTimer(TimeDomain.EVENT_TIME), nullValue());
+    injectElements(1);
+    IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(100));
+    assertThat(tester.getNextTimer(TimeDomain.EVENT_TIME), equalTo(window.maxTimestamp()));
+  }
+
+  @Test
   public void testAtWatermarkAndLate() throws Exception {
     tester = TriggerStateMachineTester.forTrigger(
         AfterWatermarkStateMachine.pastEndOfWindow()


[3/5] beam git commit: Allow checking timers set in TriggerStateMachineTester

Posted by ke...@apache.org.
Allow checking timers set in TriggerStateMachineTester


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/64fb19da
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/64fb19da
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/64fb19da

Branch: refs/heads/master
Commit: 64fb19da42e3d26d0a9dae41b19dd7bf77ff49c7
Parents: e940456
Author: Kenneth Knowles <ke...@apache.org>
Authored: Mon Oct 16 15:16:07 2017 -0700
Committer: Kenneth Knowles <ke...@apache.org>
Committed: Mon Oct 16 15:19:14 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/core/triggers/TriggerStateMachineTester.java  | 6 ++++++
 1 file changed, 6 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/64fb19da/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java
index 9a10f53..b41977d 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java
@@ -198,6 +198,12 @@ public class TriggerStateMachineTester<InputT, W extends BoundedWindow> {
     }
   }
 
+  /** Retrieves the next timer for this time domain, if any, for use in assertions. */
+  @Nullable
+  public Instant getNextTimer(TimeDomain domain) {
+    return timerInternals.getNextTimer(domain);
+  }
+
   /**
    * Returns {@code true} if the {@link TriggerStateMachine} under test is finished for the given
    * window.