You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/12/21 22:50:12 UTC

[38/51] [abbrv] incubator-beam git commit: Support set and delete of timer by ID in InMemoryTimerInternals

Support set and delete of timer by ID in InMemoryTimerInternals


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/df2e540d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/df2e540d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/df2e540d

Branch: refs/heads/python-sdk
Commit: df2e540d7a7b8444b9ff3b404740d5a3394b7691
Parents: acd2196
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Dec 19 14:01:36 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Wed Dec 21 11:01:36 2016 -0800

----------------------------------------------------------------------
 .../runners/core/InMemoryTimerInternals.java    |  65 +++++++----
 .../core/InMemoryTimerInternalsTest.java        | 112 +++++++++++++------
 2 files changed, 120 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/df2e540d/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
index 5fcd088..292ac23 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
@@ -17,13 +17,15 @@
  */
 package org.apache.beam.runners.core;
 
+import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 
 import com.google.common.base.MoreObjects;
-import java.util.HashSet;
-import java.util.PriorityQueue;
-import java.util.Set;
+import com.google.common.collect.HashBasedTable;
+import com.google.common.collect.Table;
+import java.util.NavigableSet;
+import java.util.TreeSet;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.TimeDomain;
@@ -35,17 +37,17 @@ import org.joda.time.Instant;
 /** {@link TimerInternals} with all watermarks and processing clock simulated in-memory. */
 public class InMemoryTimerInternals implements TimerInternals {
 
-  /** At most one timer per timestamp is kept. */
-  private Set<TimerData> existingTimers = new HashSet<>();
+  /** The current set timers by namespace and ID. */
+  Table<StateNamespace, String, TimerData> existingTimers = HashBasedTable.create();
 
   /** Pending input watermark timers, in timestamp order. */
-  private PriorityQueue<TimerData> watermarkTimers = new PriorityQueue<>(11);
+  private NavigableSet<TimerData> watermarkTimers = new TreeSet<>();
 
   /** Pending processing time timers, in timestamp order. */
-  private PriorityQueue<TimerData> processingTimers = new PriorityQueue<>(11);
+  private NavigableSet<TimerData> processingTimers = new TreeSet<>();
 
   /** Pending synchronized processing time timers, in timestamp order. */
-  private PriorityQueue<TimerData> synchronizedProcessingTimers = new PriorityQueue<>(11);
+  private NavigableSet<TimerData> synchronizedProcessingTimers = new TreeSet<>();
 
   /** Current input watermark. */
   private Instant inputWatermarkTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
@@ -74,13 +76,13 @@ public class InMemoryTimerInternals implements TimerInternals {
     final TimerData data;
     switch (domain) {
       case EVENT_TIME:
-        data = watermarkTimers.peek();
+        data = watermarkTimers.first();
         break;
       case PROCESSING_TIME:
-        data = processingTimers.peek();
+        data = processingTimers.first();
         break;
       case SYNCHRONIZED_PROCESSING_TIME:
-        data = synchronizedProcessingTimers.peek();
+        data = synchronizedProcessingTimers.first();
         break;
       default:
         throw new IllegalArgumentException("Unexpected time domain: " + domain);
@@ -88,7 +90,7 @@ public class InMemoryTimerInternals implements TimerInternals {
     return (data == null) ? null : data.getTimestamp();
   }
 
-  private PriorityQueue<TimerData> queue(TimeDomain domain) {
+  private NavigableSet<TimerData> timersForDomain(TimeDomain domain) {
     switch (domain) {
       case EVENT_TIME:
         return watermarkTimers;
@@ -104,27 +106,45 @@ public class InMemoryTimerInternals implements TimerInternals {
   @Override
   public void setTimer(StateNamespace namespace, String timerId, Instant target,
       TimeDomain timeDomain) {
-    throw new UnsupportedOperationException("Setting a timer by ID is not yet supported.");
+    setTimer(TimerData.of(timerId, namespace, target, timeDomain));
   }
 
   @Override
   public void setTimer(TimerData timerData) {
     WindowTracing.trace("{}.setTimer: {}", getClass().getSimpleName(), timerData);
-    if (existingTimers.add(timerData)) {
-      queue(timerData.getDomain()).add(timerData);
+
+    @Nullable
+    TimerData existing = existingTimers.get(timerData.getNamespace(), timerData.getTimerId());
+    if (existing == null) {
+      existingTimers.put(timerData.getNamespace(), timerData.getTimerId(), timerData);
+      timersForDomain(timerData.getDomain()).add(timerData);
+    } else {
+      checkArgument(timerData.getDomain().equals(existing.getDomain()),
+          "Attempt to set %s for time domain %s, but it is already set for time domain %s",
+          timerData.getTimerId(), timerData.getDomain(), existing.getDomain());
+
+      if (!timerData.getTimestamp().equals(existing.getTimestamp())) {
+        NavigableSet<TimerData> timers = timersForDomain(timerData.getDomain());
+        timers.remove(existing);
+        timers.add(timerData);
+        existingTimers.put(timerData.getNamespace(), timerData.getTimerId(), timerData);
+      }
     }
   }
 
   @Override
   public void deleteTimer(StateNamespace namespace, String timerId) {
-    throw new UnsupportedOperationException("Canceling a timer by ID is not yet supported.");
+    TimerData existing = existingTimers.get(namespace, timerId);
+    if (existing != null) {
+      deleteTimer(existing);
+    }
   }
 
   @Override
   public void deleteTimer(TimerData timer) {
     WindowTracing.trace("{}.deleteTimer: {}", getClass().getSimpleName(), timer);
-    existingTimers.remove(timer);
-    queue(timer.getDomain()).remove(timer);
+    existingTimers.remove(timer.getNamespace(), timer.getTimerId());
+    timersForDomain(timer.getDomain()).remove(timer);
   }
 
   @Override
@@ -261,10 +281,11 @@ public class InMemoryTimerInternals implements TimerInternals {
 
   @Nullable
   private TimerData removeNextTimer(Instant currentTime, TimeDomain domain) {
-    PriorityQueue<TimerData> queue = queue(domain);
-    if (!queue.isEmpty() && currentTime.isAfter(queue.peek().getTimestamp())) {
-      TimerData timer = queue.remove();
-      existingTimers.remove(timer);
+    NavigableSet<TimerData> timers = timersForDomain(domain);
+
+    if (!timers.isEmpty() && currentTime.isAfter(timers.first().getTimestamp())) {
+      TimerData timer = timers.pollFirst();
+      existingTimers.remove(timer.getNamespace(), timer.getTimerId());
       return timer;
     } else {
       return null;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/df2e540d/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryTimerInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryTimerInternalsTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryTimerInternalsTest.java
index 2caa874..e711285 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryTimerInternalsTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryTimerInternalsTest.java
@@ -19,8 +19,6 @@ package org.apache.beam.runners.core;
 
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.nullValue;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
 
 import org.apache.beam.sdk.util.TimeDomain;
@@ -39,37 +37,79 @@ import org.junit.runners.JUnit4;
 public class InMemoryTimerInternalsTest {
 
   private static final StateNamespace NS1 = new StateNamespaceForTest("NS1");
+  private static final String ID1 = "id1";
+  private static final String ID2 = "id2";
 
   @Test
-  public void testFiringTimers() throws Exception {
+  public void testFiringEventTimers() throws Exception {
     InMemoryTimerInternals underTest = new InMemoryTimerInternals();
-    TimerData processingTime1 = TimerData.of(NS1, new Instant(19), TimeDomain.PROCESSING_TIME);
-    TimerData processingTime2 = TimerData.of(NS1, new Instant(29), TimeDomain.PROCESSING_TIME);
+    TimerData eventTimer1 = TimerData.of(ID1, NS1, new Instant(19), TimeDomain.EVENT_TIME);
+    TimerData eventTimer2 = TimerData.of(ID2, NS1, new Instant(29), TimeDomain.EVENT_TIME);
 
-    underTest.setTimer(processingTime1);
-    underTest.setTimer(processingTime2);
+    underTest.setTimer(eventTimer1);
+    underTest.setTimer(eventTimer2);
 
-    underTest.advanceProcessingTime(new Instant(20));
-    assertEquals(processingTime1, underTest.removeNextProcessingTimer());
-    assertNull(underTest.removeNextProcessingTimer());
+    underTest.advanceInputWatermark(new Instant(20));
+    assertThat(underTest.removeNextEventTimer(), equalTo(eventTimer1));
+    assertThat(underTest.removeNextEventTimer(), nullValue());
 
     // Advancing just a little shouldn't refire
-    underTest.advanceProcessingTime(new Instant(21));
-    assertNull(underTest.removeNextProcessingTimer());
+    underTest.advanceInputWatermark(new Instant(21));
+    assertThat(underTest.removeNextEventTimer(), nullValue());
 
     // Adding the timer and advancing a little should refire
-    underTest.setTimer(processingTime1);
-    assertEquals(processingTime1, underTest.removeNextProcessingTimer());
-    assertNull(underTest.removeNextProcessingTimer());
+    underTest.setTimer(eventTimer1);
+    assertThat(underTest.removeNextEventTimer(), equalTo(eventTimer1));
+    assertThat(underTest.removeNextEventTimer(), nullValue());
 
     // And advancing the rest of the way should still have the other timer
-    underTest.advanceProcessingTime(new Instant(30));
-    assertEquals(processingTime2, underTest.removeNextProcessingTimer());
-    assertNull(underTest.removeNextProcessingTimer());
+    underTest.advanceInputWatermark(new Instant(30));
+    assertThat(underTest.removeNextEventTimer(), equalTo(eventTimer2));
+    assertThat(underTest.removeNextEventTimer(), nullValue());
+  }
+
+  @Test
+  public void testResetById() throws Exception {
+    InMemoryTimerInternals underTest = new InMemoryTimerInternals();
+    Instant earlyTimestamp = new Instant(13);
+    Instant laterTimestamp = new Instant(42);
+
+    underTest.advanceInputWatermark(new Instant(0));
+    underTest.setTimer(NS1, ID1, earlyTimestamp, TimeDomain.EVENT_TIME);
+    underTest.setTimer(NS1, ID1, laterTimestamp, TimeDomain.EVENT_TIME);
+    underTest.advanceInputWatermark(earlyTimestamp.plus(1L));
+    assertThat(underTest.removeNextEventTimer(), nullValue());
+
+    underTest.advanceInputWatermark(laterTimestamp.plus(1L));
+    assertThat(
+        underTest.removeNextEventTimer(),
+        equalTo(TimerData.of(ID1, NS1, laterTimestamp, TimeDomain.EVENT_TIME)));
+  }
+
+  @Test
+  public void testDeletionIdempotent() throws Exception {
+    InMemoryTimerInternals underTest = new InMemoryTimerInternals();
+    Instant timestamp = new Instant(42);
+    underTest.setTimer(NS1, ID1, timestamp, TimeDomain.EVENT_TIME);
+    underTest.deleteTimer(NS1, ID1);
+    underTest.deleteTimer(NS1, ID1);
   }
 
   @Test
-  public void testFiringTimersWithCallback() throws Exception {
+  public void testDeletionById() throws Exception {
+    InMemoryTimerInternals underTest = new InMemoryTimerInternals();
+    Instant timestamp = new Instant(42);
+
+    underTest.advanceInputWatermark(new Instant(0));
+    underTest.setTimer(NS1, ID1, timestamp, TimeDomain.EVENT_TIME);
+    underTest.deleteTimer(NS1, ID1);
+    underTest.advanceInputWatermark(new Instant(43));
+
+    assertThat(underTest.removeNextEventTimer(), nullValue());
+  }
+
+  @Test
+  public void testFiringProcessingTimeTimers() throws Exception {
     InMemoryTimerInternals underTest = new InMemoryTimerInternals();
     TimerData processingTime1 = TimerData.of(NS1, new Instant(19), TimeDomain.PROCESSING_TIME);
     TimerData processingTime2 = TimerData.of(NS1, new Instant(29), TimeDomain.PROCESSING_TIME);
@@ -116,23 +156,25 @@ public class InMemoryTimerInternalsTest {
     underTest.setTimer(eventTime2);
     underTest.setTimer(synchronizedProcessingTime2);
 
-    assertNull(underTest.removeNextEventTimer());
+    assertThat(underTest.removeNextEventTimer(), nullValue());
     underTest.advanceInputWatermark(new Instant(30));
-    assertEquals(eventTime1, underTest.removeNextEventTimer());
-    assertEquals(eventTime2, underTest.removeNextEventTimer());
-    assertNull(underTest.removeNextEventTimer());
+    assertThat(underTest.removeNextEventTimer(), equalTo(eventTime1));
+    assertThat(underTest.removeNextEventTimer(), equalTo(eventTime2));
+    assertThat(underTest.removeNextEventTimer(), nullValue());
 
-    assertNull(underTest.removeNextProcessingTimer());
+    assertThat(underTest.removeNextProcessingTimer(), nullValue());
     underTest.advanceProcessingTime(new Instant(30));
-    assertEquals(processingTime1, underTest.removeNextProcessingTimer());
-    assertEquals(processingTime2, underTest.removeNextProcessingTimer());
-    assertNull(underTest.removeNextProcessingTimer());
+    assertThat(underTest.removeNextProcessingTimer(), equalTo(processingTime1));
+    assertThat(underTest.removeNextProcessingTimer(), equalTo(processingTime2));
+    assertThat(underTest.removeNextProcessingTimer(), nullValue());
 
-    assertNull(underTest.removeNextSynchronizedProcessingTimer());
+    assertThat(underTest.removeNextSynchronizedProcessingTimer(), nullValue());
     underTest.advanceSynchronizedProcessingTime(new Instant(30));
-    assertEquals(synchronizedProcessingTime1, underTest.removeNextSynchronizedProcessingTimer());
-    assertEquals(synchronizedProcessingTime2, underTest.removeNextSynchronizedProcessingTimer());
-    assertNull(underTest.removeNextProcessingTimer());
+    assertThat(
+        underTest.removeNextSynchronizedProcessingTimer(), equalTo(synchronizedProcessingTime1));
+    assertThat(
+        underTest.removeNextSynchronizedProcessingTimer(), equalTo(synchronizedProcessingTime2));
+    assertThat(underTest.removeNextProcessingTimer(), nullValue());
   }
 
   @Test
@@ -147,9 +189,9 @@ public class InMemoryTimerInternalsTest {
     underTest.advanceProcessingTime(new Instant(20));
     underTest.advanceInputWatermark(new Instant(20));
 
-    assertEquals(processingTime, underTest.removeNextProcessingTimer());
-    assertNull(underTest.removeNextProcessingTimer());
-    assertEquals(eventTime, underTest.removeNextEventTimer());
-    assertNull(underTest.removeNextEventTimer());
+    assertThat(underTest.removeNextProcessingTimer(), equalTo(processingTime));
+    assertThat(underTest.removeNextProcessingTimer(), nullValue());
+    assertThat(underTest.removeNextEventTimer(), equalTo(eventTime));
+    assertThat(underTest.removeNextEventTimer(), nullValue());
   }
 }