You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/12/06 04:54:30 UTC

[2/3] incubator-beam git commit: Add test for InMemoryTimerInternals synchronized processing time timers. Ensure that processing time and synchronized processing timer are not null.

Add test for InMemoryTimerInternals synchronized processing time timers.
Ensure that processing time and synchronized processing timer are not null.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3434e8a4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3434e8a4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3434e8a4

Branch: refs/heads/master
Commit: 3434e8a434dea2f0ba5bb6e561bb7f3bd9d5d603
Parents: e9d835e
Author: Sam Whittle <sa...@google.com>
Authored: Mon Dec 5 14:57:29 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Dec 5 20:54:19 2016 -0800

----------------------------------------------------------------------
 .../beam/sdk/util/state/InMemoryTimerInternals.java   |  8 +++++---
 .../org/apache/beam/sdk/util/state/TimerCallback.java |  2 +-
 .../sdk/util/state/InMemoryTimerInternalsTest.java    | 14 ++++++++++++++
 3 files changed, 20 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3434e8a4/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java
index 60c4a96..159b583 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java
@@ -50,7 +50,7 @@ public class InMemoryTimerInternals implements TimerInternals {
   private PriorityQueue<TimerData> synchronizedProcessingTimers = new PriorityQueue<>(11);
 
   /** Current input watermark. */
-  @Nullable private Instant inputWatermarkTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
+  private Instant inputWatermarkTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
 
   /** Current output watermark. */
   @Nullable private Instant outputWatermarkTime = null;
@@ -59,7 +59,7 @@ public class InMemoryTimerInternals implements TimerInternals {
   private Instant processingTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
 
   /** Current synchronized processing time. */
-  @Nullable private Instant synchronizedProcessingTime = null;
+  private Instant synchronizedProcessingTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
 
   @Override
   @Nullable
@@ -142,7 +142,7 @@ public class InMemoryTimerInternals implements TimerInternals {
 
   @Override
   public Instant currentInputWatermarkTime() {
-    return checkNotNull(inputWatermarkTime);
+    return inputWatermarkTime;
   }
 
   @Override
@@ -197,6 +197,7 @@ public class InMemoryTimerInternals implements TimerInternals {
 
   /** Advances processing time to the given value. */
   public void advanceProcessingTime(Instant newProcessingTime) throws Exception {
+    checkNotNull(newProcessingTime);
     checkState(
         !newProcessingTime.isBefore(processingTime),
         "Cannot move processing time backwards from %s to %s",
@@ -211,6 +212,7 @@ public class InMemoryTimerInternals implements TimerInternals {
   /** Advances synchronized processing time to the given value. */
   public void advanceSynchronizedProcessingTime(Instant newSynchronizedProcessingTime)
       throws Exception {
+    checkNotNull(newSynchronizedProcessingTime);
     checkState(
         !newSynchronizedProcessingTime.isBefore(synchronizedProcessingTime),
         "Cannot move processing time backwards from %s to %s",

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3434e8a4/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/TimerCallback.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/TimerCallback.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/TimerCallback.java
index 1d68e36..83791d6 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/TimerCallback.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/TimerCallback.java
@@ -22,7 +22,7 @@ import org.apache.beam.sdk.util.TimerInternals;
 /**
  * A callback that processes a {@link TimerInternals.TimerData TimerData}.
  *
- * @deprecated Use TimerInternals.advanceTime and removeTimers instead of callback.
+ * @deprecated Use InMemoryTimerInternals advance and remove methods instead of callback.
  */
 @Deprecated
 public interface TimerCallback {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3434e8a4/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/InMemoryTimerInternalsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/InMemoryTimerInternalsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/InMemoryTimerInternalsTest.java
index 87c9aea..1e42864 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/InMemoryTimerInternalsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/InMemoryTimerInternalsTest.java
@@ -110,23 +110,37 @@ public class InMemoryTimerInternalsTest {
     InMemoryTimerInternals underTest = new InMemoryTimerInternals();
     TimerData eventTime1 = TimerData.of(NS1, new Instant(19), TimeDomain.EVENT_TIME);
     TimerData processingTime1 = TimerData.of(NS1, new Instant(19), TimeDomain.PROCESSING_TIME);
+    TimerData synchronizedProcessingTime1 = TimerData.of(
+        NS1, new Instant(19), TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
     TimerData eventTime2 = TimerData.of(NS1, new Instant(29), TimeDomain.EVENT_TIME);
     TimerData processingTime2 = TimerData.of(NS1, new Instant(29), TimeDomain.PROCESSING_TIME);
+    TimerData synchronizedProcessingTime2 = TimerData.of(
+        NS1, new Instant(29), TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
 
     underTest.setTimer(processingTime1);
     underTest.setTimer(eventTime1);
+    underTest.setTimer(synchronizedProcessingTime1);
     underTest.setTimer(processingTime2);
     underTest.setTimer(eventTime2);
+    underTest.setTimer(synchronizedProcessingTime2);
 
+    assertNull(underTest.removeNextEventTimer());
     underTest.advanceInputWatermark(new Instant(30));
     assertEquals(eventTime1, underTest.removeNextEventTimer());
     assertEquals(eventTime2, underTest.removeNextEventTimer());
     assertNull(underTest.removeNextEventTimer());
 
+    assertNull(underTest.removeNextProcessingTimer());
     underTest.advanceProcessingTime(new Instant(30));
     assertEquals(processingTime1, underTest.removeNextProcessingTimer());
     assertEquals(processingTime2, underTest.removeNextProcessingTimer());
     assertNull(underTest.removeNextProcessingTimer());
+
+    assertNull(underTest.removeNextSynchronizedProcessingTimer());
+    underTest.advanceSynchronizedProcessingTime(new Instant(30));
+    assertEquals(synchronizedProcessingTime1, underTest.removeNextSynchronizedProcessingTimer());
+    assertEquals(synchronizedProcessingTime2, underTest.removeNextSynchronizedProcessingTimer());
+    assertNull(underTest.removeNextProcessingTimer());
   }
 
   @Test