You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/12/21 19:14:55 UTC
[1/2] incubator-beam git commit: Support set and delete of timer by
ID in InMemoryTimerInternals
Repository: incubator-beam
Updated Branches:
refs/heads/master a9447a225 -> 7ee8c86d3
Support set and delete of timer by ID in InMemoryTimerInternals
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/df2e540d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/df2e540d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/df2e540d
Branch: refs/heads/master
Commit: df2e540d7a7b8444b9ff3b404740d5a3394b7691
Parents: acd2196
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Dec 19 14:01:36 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Wed Dec 21 11:01:36 2016 -0800
----------------------------------------------------------------------
.../runners/core/InMemoryTimerInternals.java | 65 +++++++----
.../core/InMemoryTimerInternalsTest.java | 112 +++++++++++++------
2 files changed, 120 insertions(+), 57 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/df2e540d/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
index 5fcd088..292ac23 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
@@ -17,13 +17,15 @@
*/
package org.apache.beam.runners.core;
+import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import com.google.common.base.MoreObjects;
-import java.util.HashSet;
-import java.util.PriorityQueue;
-import java.util.Set;
+import com.google.common.collect.HashBasedTable;
+import com.google.common.collect.Table;
+import java.util.NavigableSet;
+import java.util.TreeSet;
import javax.annotation.Nullable;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.TimeDomain;
@@ -35,17 +37,17 @@ import org.joda.time.Instant;
/** {@link TimerInternals} with all watermarks and processing clock simulated in-memory. */
public class InMemoryTimerInternals implements TimerInternals {
- /** At most one timer per timestamp is kept. */
- private Set<TimerData> existingTimers = new HashSet<>();
+ /** The current set timers by namespace and ID. */
+ Table<StateNamespace, String, TimerData> existingTimers = HashBasedTable.create();
/** Pending input watermark timers, in timestamp order. */
- private PriorityQueue<TimerData> watermarkTimers = new PriorityQueue<>(11);
+ private NavigableSet<TimerData> watermarkTimers = new TreeSet<>();
/** Pending processing time timers, in timestamp order. */
- private PriorityQueue<TimerData> processingTimers = new PriorityQueue<>(11);
+ private NavigableSet<TimerData> processingTimers = new TreeSet<>();
/** Pending synchronized processing time timers, in timestamp order. */
- private PriorityQueue<TimerData> synchronizedProcessingTimers = new PriorityQueue<>(11);
+ private NavigableSet<TimerData> synchronizedProcessingTimers = new TreeSet<>();
/** Current input watermark. */
private Instant inputWatermarkTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
@@ -74,13 +76,13 @@ public class InMemoryTimerInternals implements TimerInternals {
final TimerData data;
switch (domain) {
case EVENT_TIME:
- data = watermarkTimers.peek();
+ data = watermarkTimers.first();
break;
case PROCESSING_TIME:
- data = processingTimers.peek();
+ data = processingTimers.first();
break;
case SYNCHRONIZED_PROCESSING_TIME:
- data = synchronizedProcessingTimers.peek();
+ data = synchronizedProcessingTimers.first();
break;
default:
throw new IllegalArgumentException("Unexpected time domain: " + domain);
@@ -88,7 +90,7 @@ public class InMemoryTimerInternals implements TimerInternals {
return (data == null) ? null : data.getTimestamp();
}
- private PriorityQueue<TimerData> queue(TimeDomain domain) {
+ private NavigableSet<TimerData> timersForDomain(TimeDomain domain) {
switch (domain) {
case EVENT_TIME:
return watermarkTimers;
@@ -104,27 +106,45 @@ public class InMemoryTimerInternals implements TimerInternals {
@Override
public void setTimer(StateNamespace namespace, String timerId, Instant target,
TimeDomain timeDomain) {
- throw new UnsupportedOperationException("Setting a timer by ID is not yet supported.");
+ setTimer(TimerData.of(timerId, namespace, target, timeDomain));
}
@Override
public void setTimer(TimerData timerData) {
WindowTracing.trace("{}.setTimer: {}", getClass().getSimpleName(), timerData);
- if (existingTimers.add(timerData)) {
- queue(timerData.getDomain()).add(timerData);
+
+ @Nullable
+ TimerData existing = existingTimers.get(timerData.getNamespace(), timerData.getTimerId());
+ if (existing == null) {
+ existingTimers.put(timerData.getNamespace(), timerData.getTimerId(), timerData);
+ timersForDomain(timerData.getDomain()).add(timerData);
+ } else {
+ checkArgument(timerData.getDomain().equals(existing.getDomain()),
+ "Attempt to set %s for time domain %s, but it is already set for time domain %s",
+ timerData.getTimerId(), timerData.getDomain(), existing.getDomain());
+
+ if (!timerData.getTimestamp().equals(existing.getTimestamp())) {
+ NavigableSet<TimerData> timers = timersForDomain(timerData.getDomain());
+ timers.remove(existing);
+ timers.add(timerData);
+ existingTimers.put(timerData.getNamespace(), timerData.getTimerId(), timerData);
+ }
}
}
@Override
public void deleteTimer(StateNamespace namespace, String timerId) {
- throw new UnsupportedOperationException("Canceling a timer by ID is not yet supported.");
+ TimerData existing = existingTimers.get(namespace, timerId);
+ if (existing != null) {
+ deleteTimer(existing);
+ }
}
@Override
public void deleteTimer(TimerData timer) {
WindowTracing.trace("{}.deleteTimer: {}", getClass().getSimpleName(), timer);
- existingTimers.remove(timer);
- queue(timer.getDomain()).remove(timer);
+ existingTimers.remove(timer.getNamespace(), timer.getTimerId());
+ timersForDomain(timer.getDomain()).remove(timer);
}
@Override
@@ -261,10 +281,11 @@ public class InMemoryTimerInternals implements TimerInternals {
@Nullable
private TimerData removeNextTimer(Instant currentTime, TimeDomain domain) {
- PriorityQueue<TimerData> queue = queue(domain);
- if (!queue.isEmpty() && currentTime.isAfter(queue.peek().getTimestamp())) {
- TimerData timer = queue.remove();
- existingTimers.remove(timer);
+ NavigableSet<TimerData> timers = timersForDomain(domain);
+
+ if (!timers.isEmpty() && currentTime.isAfter(timers.first().getTimestamp())) {
+ TimerData timer = timers.pollFirst();
+ existingTimers.remove(timer.getNamespace(), timer.getTimerId());
return timer;
} else {
return null;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/df2e540d/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryTimerInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryTimerInternalsTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryTimerInternalsTest.java
index 2caa874..e711285 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryTimerInternalsTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryTimerInternalsTest.java
@@ -19,8 +19,6 @@ package org.apache.beam.runners.core;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import org.apache.beam.sdk.util.TimeDomain;
@@ -39,37 +37,79 @@ import org.junit.runners.JUnit4;
public class InMemoryTimerInternalsTest {
private static final StateNamespace NS1 = new StateNamespaceForTest("NS1");
+ private static final String ID1 = "id1";
+ private static final String ID2 = "id2";
@Test
- public void testFiringTimers() throws Exception {
+ public void testFiringEventTimers() throws Exception {
InMemoryTimerInternals underTest = new InMemoryTimerInternals();
- TimerData processingTime1 = TimerData.of(NS1, new Instant(19), TimeDomain.PROCESSING_TIME);
- TimerData processingTime2 = TimerData.of(NS1, new Instant(29), TimeDomain.PROCESSING_TIME);
+ TimerData eventTimer1 = TimerData.of(ID1, NS1, new Instant(19), TimeDomain.EVENT_TIME);
+ TimerData eventTimer2 = TimerData.of(ID2, NS1, new Instant(29), TimeDomain.EVENT_TIME);
- underTest.setTimer(processingTime1);
- underTest.setTimer(processingTime2);
+ underTest.setTimer(eventTimer1);
+ underTest.setTimer(eventTimer2);
- underTest.advanceProcessingTime(new Instant(20));
- assertEquals(processingTime1, underTest.removeNextProcessingTimer());
- assertNull(underTest.removeNextProcessingTimer());
+ underTest.advanceInputWatermark(new Instant(20));
+ assertThat(underTest.removeNextEventTimer(), equalTo(eventTimer1));
+ assertThat(underTest.removeNextEventTimer(), nullValue());
// Advancing just a little shouldn't refire
- underTest.advanceProcessingTime(new Instant(21));
- assertNull(underTest.removeNextProcessingTimer());
+ underTest.advanceInputWatermark(new Instant(21));
+ assertThat(underTest.removeNextEventTimer(), nullValue());
// Adding the timer and advancing a little should refire
- underTest.setTimer(processingTime1);
- assertEquals(processingTime1, underTest.removeNextProcessingTimer());
- assertNull(underTest.removeNextProcessingTimer());
+ underTest.setTimer(eventTimer1);
+ assertThat(underTest.removeNextEventTimer(), equalTo(eventTimer1));
+ assertThat(underTest.removeNextEventTimer(), nullValue());
// And advancing the rest of the way should still have the other timer
- underTest.advanceProcessingTime(new Instant(30));
- assertEquals(processingTime2, underTest.removeNextProcessingTimer());
- assertNull(underTest.removeNextProcessingTimer());
+ underTest.advanceInputWatermark(new Instant(30));
+ assertThat(underTest.removeNextEventTimer(), equalTo(eventTimer2));
+ assertThat(underTest.removeNextEventTimer(), nullValue());
+ }
+
+ @Test
+ public void testResetById() throws Exception {
+ InMemoryTimerInternals underTest = new InMemoryTimerInternals();
+ Instant earlyTimestamp = new Instant(13);
+ Instant laterTimestamp = new Instant(42);
+
+ underTest.advanceInputWatermark(new Instant(0));
+ underTest.setTimer(NS1, ID1, earlyTimestamp, TimeDomain.EVENT_TIME);
+ underTest.setTimer(NS1, ID1, laterTimestamp, TimeDomain.EVENT_TIME);
+ underTest.advanceInputWatermark(earlyTimestamp.plus(1L));
+ assertThat(underTest.removeNextEventTimer(), nullValue());
+
+ underTest.advanceInputWatermark(laterTimestamp.plus(1L));
+ assertThat(
+ underTest.removeNextEventTimer(),
+ equalTo(TimerData.of(ID1, NS1, laterTimestamp, TimeDomain.EVENT_TIME)));
+ }
+
+ @Test
+ public void testDeletionIdempotent() throws Exception {
+ InMemoryTimerInternals underTest = new InMemoryTimerInternals();
+ Instant timestamp = new Instant(42);
+ underTest.setTimer(NS1, ID1, timestamp, TimeDomain.EVENT_TIME);
+ underTest.deleteTimer(NS1, ID1);
+ underTest.deleteTimer(NS1, ID1);
}
@Test
- public void testFiringTimersWithCallback() throws Exception {
+ public void testDeletionById() throws Exception {
+ InMemoryTimerInternals underTest = new InMemoryTimerInternals();
+ Instant timestamp = new Instant(42);
+
+ underTest.advanceInputWatermark(new Instant(0));
+ underTest.setTimer(NS1, ID1, timestamp, TimeDomain.EVENT_TIME);
+ underTest.deleteTimer(NS1, ID1);
+ underTest.advanceInputWatermark(new Instant(43));
+
+ assertThat(underTest.removeNextEventTimer(), nullValue());
+ }
+
+ @Test
+ public void testFiringProcessingTimeTimers() throws Exception {
InMemoryTimerInternals underTest = new InMemoryTimerInternals();
TimerData processingTime1 = TimerData.of(NS1, new Instant(19), TimeDomain.PROCESSING_TIME);
TimerData processingTime2 = TimerData.of(NS1, new Instant(29), TimeDomain.PROCESSING_TIME);
@@ -116,23 +156,25 @@ public class InMemoryTimerInternalsTest {
underTest.setTimer(eventTime2);
underTest.setTimer(synchronizedProcessingTime2);
- assertNull(underTest.removeNextEventTimer());
+ assertThat(underTest.removeNextEventTimer(), nullValue());
underTest.advanceInputWatermark(new Instant(30));
- assertEquals(eventTime1, underTest.removeNextEventTimer());
- assertEquals(eventTime2, underTest.removeNextEventTimer());
- assertNull(underTest.removeNextEventTimer());
+ assertThat(underTest.removeNextEventTimer(), equalTo(eventTime1));
+ assertThat(underTest.removeNextEventTimer(), equalTo(eventTime2));
+ assertThat(underTest.removeNextEventTimer(), nullValue());
- assertNull(underTest.removeNextProcessingTimer());
+ assertThat(underTest.removeNextProcessingTimer(), nullValue());
underTest.advanceProcessingTime(new Instant(30));
- assertEquals(processingTime1, underTest.removeNextProcessingTimer());
- assertEquals(processingTime2, underTest.removeNextProcessingTimer());
- assertNull(underTest.removeNextProcessingTimer());
+ assertThat(underTest.removeNextProcessingTimer(), equalTo(processingTime1));
+ assertThat(underTest.removeNextProcessingTimer(), equalTo(processingTime2));
+ assertThat(underTest.removeNextProcessingTimer(), nullValue());
- assertNull(underTest.removeNextSynchronizedProcessingTimer());
+ assertThat(underTest.removeNextSynchronizedProcessingTimer(), nullValue());
underTest.advanceSynchronizedProcessingTime(new Instant(30));
- assertEquals(synchronizedProcessingTime1, underTest.removeNextSynchronizedProcessingTimer());
- assertEquals(synchronizedProcessingTime2, underTest.removeNextSynchronizedProcessingTimer());
- assertNull(underTest.removeNextProcessingTimer());
+ assertThat(
+ underTest.removeNextSynchronizedProcessingTimer(), equalTo(synchronizedProcessingTime1));
+ assertThat(
+ underTest.removeNextSynchronizedProcessingTimer(), equalTo(synchronizedProcessingTime2));
+ assertThat(underTest.removeNextProcessingTimer(), nullValue());
}
@Test
@@ -147,9 +189,9 @@ public class InMemoryTimerInternalsTest {
underTest.advanceProcessingTime(new Instant(20));
underTest.advanceInputWatermark(new Instant(20));
- assertEquals(processingTime, underTest.removeNextProcessingTimer());
- assertNull(underTest.removeNextProcessingTimer());
- assertEquals(eventTime, underTest.removeNextEventTimer());
- assertNull(underTest.removeNextEventTimer());
+ assertThat(underTest.removeNextProcessingTimer(), equalTo(processingTime));
+ assertThat(underTest.removeNextProcessingTimer(), nullValue());
+ assertThat(underTest.removeNextEventTimer(), equalTo(eventTime));
+ assertThat(underTest.removeNextEventTimer(), nullValue());
}
}
[2/2] incubator-beam git commit: This closes #1160: Support set and
delete of timer by ID in InMemoryTimerInternals
Posted by ke...@apache.org.
This closes #1160: Support set and delete of timer by ID in InMemoryTimerInternals
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7ee8c86d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7ee8c86d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7ee8c86d
Branch: refs/heads/master
Commit: 7ee8c86d3b0553d8cb7de60b0dc1a03103dfbbc5
Parents: a9447a2 df2e540
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Dec 21 11:02:02 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Wed Dec 21 11:02:02 2016 -0800
----------------------------------------------------------------------
.../runners/core/InMemoryTimerInternals.java | 65 +++++++----
.../core/InMemoryTimerInternalsTest.java | 112 +++++++++++++------
2 files changed, 120 insertions(+), 57 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7ee8c86d/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
----------------------------------------------------------------------
diff --cc runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
index 5ddd5a7,292ac23..2c3d78a
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
@@@ -104,10 -106,9 +106,10 @@@ public class InMemoryTimerInternals imp
@Override
public void setTimer(StateNamespace namespace, String timerId, Instant target,
TimeDomain timeDomain) {
- throw new UnsupportedOperationException("Setting a timer by ID is not yet supported.");
+ setTimer(TimerData.of(timerId, namespace, target, timeDomain));
}
+ @Deprecated
@Override
public void setTimer(TimerData timerData) {
WindowTracing.trace("{}.setTimer: {}", getClass().getSimpleName(), timerData);
@@@ -117,17 -133,13 +134,20 @@@
}
@Override
+ public void deleteTimer(StateNamespace namespace, String timerId, TimeDomain timeDomain) {
+ throw new UnsupportedOperationException("Canceling a timer by ID is not yet supported.");
+ }
+
+ @Deprecated
+ @Override
public void deleteTimer(StateNamespace namespace, String timerId) {
- throw new UnsupportedOperationException("Canceling a timer by ID is not yet supported.");
+ TimerData existing = existingTimers.get(namespace, timerId);
+ if (existing != null) {
+ deleteTimer(existing);
+ }
}
+ @Deprecated
@Override
public void deleteTimer(TimerData timer) {
WindowTracing.trace("{}.deleteTimer: {}", getClass().getSimpleName(), timer);