You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/12/21 22:49:54 UTC
[20/51] [abbrv] incubator-beam git commit: Move
InMemoryTimerInternals to runners-core
Move InMemoryTimerInternals to runners-core
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/445c1205
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/445c1205
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/445c1205
Branch: refs/heads/python-sdk
Commit: 445c120510948fb23e6d35b502da1e5a4f0ffdfb
Parents: 22e25a4
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Dec 15 20:45:56 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue Dec 20 11:21:52 2016 -0800
----------------------------------------------------------------------
.../GroupAlsoByWindowsViaOutputBufferDoFn.java | 1 -
.../runners/core/InMemoryTimerInternals.java | 273 ++++++++++++++++++
.../core/InMemoryTimerInternalsTest.java | 155 +++++++++++
.../beam/runners/core/ReduceFnTester.java | 1 -
.../beam/runners/core/SplittableParDoTest.java | 16 +-
.../triggers/TriggerStateMachineTester.java | 2 +-
.../translation/SparkGroupAlsoByWindowFn.java | 2 +-
.../apache/beam/sdk/transforms/DoFnTester.java | 36 ---
.../sdk/util/state/InMemoryTimerInternals.java | 275 -------------------
.../util/state/InMemoryTimerInternalsTest.java | 153 -----------
10 files changed, 443 insertions(+), 471 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/445c1205/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java
index 9189191..efcd771 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java
@@ -27,7 +27,6 @@ import org.apache.beam.sdk.util.SystemDoFnInternal;
import org.apache.beam.sdk.util.TimerInternals;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.util.state.InMemoryTimerInternals;
import org.apache.beam.sdk.util.state.StateInternals;
import org.apache.beam.sdk.util.state.StateInternalsFactory;
import org.joda.time.Instant;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/445c1205/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
new file mode 100644
index 0000000..5fcd088
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.base.MoreObjects;
+import java.util.HashSet;
+import java.util.PriorityQueue;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.TimeDomain;
+import org.apache.beam.sdk.util.TimerInternals;
+import org.apache.beam.sdk.util.WindowTracing;
+import org.apache.beam.sdk.util.state.StateNamespace;
+import org.joda.time.Instant;
+
+/** {@link TimerInternals} with all watermarks and processing clock simulated in-memory. */
+public class InMemoryTimerInternals implements TimerInternals {
+
+ /** At most one timer per timestamp is kept. */
+ private Set<TimerData> existingTimers = new HashSet<>();
+
+ /** Pending input watermark timers, in timestamp order. */
+ private PriorityQueue<TimerData> watermarkTimers = new PriorityQueue<>(11);
+
+ /** Pending processing time timers, in timestamp order. */
+ private PriorityQueue<TimerData> processingTimers = new PriorityQueue<>(11);
+
+ /** Pending synchronized processing time timers, in timestamp order. */
+ private PriorityQueue<TimerData> synchronizedProcessingTimers = new PriorityQueue<>(11);
+
+ /** Current input watermark. */
+ private Instant inputWatermarkTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
+
+ /** Current output watermark. */
+ @Nullable private Instant outputWatermarkTime = null;
+
+ /** Current processing time. */
+ private Instant processingTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
+
+ /** Current synchronized processing time. */
+ private Instant synchronizedProcessingTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
+
+ @Override
+ @Nullable
+ public Instant currentOutputWatermarkTime() {
+ return outputWatermarkTime;
+ }
+
+ /**
+ * Returns when the next timer in the given time domain will fire, or {@code null}
+ * if there are no timers scheduled in that time domain.
+ */
+ @Nullable
+ public Instant getNextTimer(TimeDomain domain) {
+ final TimerData data;
+ switch (domain) {
+ case EVENT_TIME:
+ data = watermarkTimers.peek();
+ break;
+ case PROCESSING_TIME:
+ data = processingTimers.peek();
+ break;
+ case SYNCHRONIZED_PROCESSING_TIME:
+ data = synchronizedProcessingTimers.peek();
+ break;
+ default:
+ throw new IllegalArgumentException("Unexpected time domain: " + domain);
+ }
+ return (data == null) ? null : data.getTimestamp();
+ }
+
+ private PriorityQueue<TimerData> queue(TimeDomain domain) {
+ switch (domain) {
+ case EVENT_TIME:
+ return watermarkTimers;
+ case PROCESSING_TIME:
+ return processingTimers;
+ case SYNCHRONIZED_PROCESSING_TIME:
+ return synchronizedProcessingTimers;
+ default:
+ throw new IllegalArgumentException("Unexpected time domain: " + domain);
+ }
+ }
+
+ @Override
+ public void setTimer(StateNamespace namespace, String timerId, Instant target,
+ TimeDomain timeDomain) {
+ throw new UnsupportedOperationException("Setting a timer by ID is not yet supported.");
+ }
+
+ @Override
+ public void setTimer(TimerData timerData) {
+ WindowTracing.trace("{}.setTimer: {}", getClass().getSimpleName(), timerData);
+ if (existingTimers.add(timerData)) {
+ queue(timerData.getDomain()).add(timerData);
+ }
+ }
+
+ @Override
+ public void deleteTimer(StateNamespace namespace, String timerId) {
+ throw new UnsupportedOperationException("Canceling a timer by ID is not yet supported.");
+ }
+
+ @Override
+ public void deleteTimer(TimerData timer) {
+ WindowTracing.trace("{}.deleteTimer: {}", getClass().getSimpleName(), timer);
+ existingTimers.remove(timer);
+ queue(timer.getDomain()).remove(timer);
+ }
+
+ @Override
+ public Instant currentProcessingTime() {
+ return processingTime;
+ }
+
+ @Override
+ @Nullable
+ public Instant currentSynchronizedProcessingTime() {
+ return synchronizedProcessingTime;
+ }
+
+ @Override
+ public Instant currentInputWatermarkTime() {
+ return inputWatermarkTime;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("watermarkTimers", watermarkTimers)
+ .add("processingTimers", processingTimers)
+ .add("synchronizedProcessingTimers", synchronizedProcessingTimers)
+ .add("inputWatermarkTime", inputWatermarkTime)
+ .add("outputWatermarkTime", outputWatermarkTime)
+ .add("processingTime", processingTime)
+ .toString();
+ }
+
+ /** Advances input watermark to the given value. */
+ public void advanceInputWatermark(Instant newInputWatermark) throws Exception {
+ checkNotNull(newInputWatermark);
+ checkState(
+ !newInputWatermark.isBefore(inputWatermarkTime),
+ "Cannot move input watermark time backwards from %s to %s",
+ inputWatermarkTime,
+ newInputWatermark);
+ WindowTracing.trace(
+ "{}.advanceInputWatermark: from {} to {}",
+ getClass().getSimpleName(), inputWatermarkTime, newInputWatermark);
+ inputWatermarkTime = newInputWatermark;
+ }
+
+ /** Advances output watermark to the given value. */
+ public void advanceOutputWatermark(Instant newOutputWatermark) {
+ checkNotNull(newOutputWatermark);
+ final Instant adjustedOutputWatermark;
+ if (newOutputWatermark.isAfter(inputWatermarkTime)) {
+ WindowTracing.trace(
+ "{}.advanceOutputWatermark: clipping output watermark from {} to {}",
+ getClass().getSimpleName(), newOutputWatermark, inputWatermarkTime);
+ adjustedOutputWatermark = inputWatermarkTime;
+ } else {
+ adjustedOutputWatermark = newOutputWatermark;
+ }
+
+ checkState(
+ outputWatermarkTime == null || !adjustedOutputWatermark.isBefore(outputWatermarkTime),
+ "Cannot move output watermark time backwards from %s to %s",
+ outputWatermarkTime,
+ adjustedOutputWatermark);
+ WindowTracing.trace(
+ "{}.advanceOutputWatermark: from {} to {}",
+ getClass().getSimpleName(), outputWatermarkTime, adjustedOutputWatermark);
+ outputWatermarkTime = adjustedOutputWatermark;
+ }
+
+ /** Advances processing time to the given value. */
+ public void advanceProcessingTime(Instant newProcessingTime) throws Exception {
+ checkNotNull(newProcessingTime);
+ checkState(
+ !newProcessingTime.isBefore(processingTime),
+ "Cannot move processing time backwards from %s to %s",
+ processingTime,
+ newProcessingTime);
+ WindowTracing.trace(
+ "{}.advanceProcessingTime: from {} to {}",
+ getClass().getSimpleName(), processingTime, newProcessingTime);
+ processingTime = newProcessingTime;
+ }
+
+ /** Advances synchronized processing time to the given value. */
+ public void advanceSynchronizedProcessingTime(Instant newSynchronizedProcessingTime)
+ throws Exception {
+ checkNotNull(newSynchronizedProcessingTime);
+ checkState(
+ !newSynchronizedProcessingTime.isBefore(synchronizedProcessingTime),
+ "Cannot move processing time backwards from %s to %s",
+ synchronizedProcessingTime,
+ newSynchronizedProcessingTime);
+ WindowTracing.trace(
+ "{}.advanceProcessingTime: from {} to {}",
+ getClass().getSimpleName(), synchronizedProcessingTime, newSynchronizedProcessingTime);
+ synchronizedProcessingTime = newSynchronizedProcessingTime;
+ }
+
+ /** Returns the next eligible event time timer, if none returns null. */
+ @Nullable
+ public TimerData removeNextEventTimer() {
+ TimerData timer = removeNextTimer(inputWatermarkTime, TimeDomain.EVENT_TIME);
+ if (timer != null) {
+ WindowTracing.trace(
+ "{}.removeNextEventTimer: firing {} at {}",
+ getClass().getSimpleName(), timer, inputWatermarkTime);
+ }
+ return timer;
+ }
+
+ /** Returns the next eligible processing time timer, if none returns null. */
+ @Nullable
+ public TimerData removeNextProcessingTimer() {
+ TimerData timer = removeNextTimer(processingTime, TimeDomain.PROCESSING_TIME);
+ if (timer != null) {
+ WindowTracing.trace(
+ "{}.removeNextProcessingTimer: firing {} at {}",
+ getClass().getSimpleName(), timer, processingTime);
+ }
+ return timer;
+ }
+
+ /** Returns the next eligible synchronized processing time timer, if none returns null. */
+ @Nullable
+ public TimerData removeNextSynchronizedProcessingTimer() {
+ TimerData timer = removeNextTimer(
+ synchronizedProcessingTime, TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
+ if (timer != null) {
+ WindowTracing.trace(
+ "{}.removeNextSynchronizedProcessingTimer: firing {} at {}",
+ getClass().getSimpleName(), timer, synchronizedProcessingTime);
+ }
+ return timer;
+ }
+
+ @Nullable
+ private TimerData removeNextTimer(Instant currentTime, TimeDomain domain) {
+ PriorityQueue<TimerData> queue = queue(domain);
+ if (!queue.isEmpty() && currentTime.isAfter(queue.peek().getTimestamp())) {
+ TimerData timer = queue.remove();
+ existingTimers.remove(timer);
+ return timer;
+ } else {
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/445c1205/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryTimerInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryTimerInternalsTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryTimerInternalsTest.java
new file mode 100644
index 0000000..2caa874
--- /dev/null
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryTimerInternalsTest.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.nullValue;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+
+import org.apache.beam.sdk.util.TimeDomain;
+import org.apache.beam.sdk.util.TimerInternals.TimerData;
+import org.apache.beam.sdk.util.state.StateNamespace;
+import org.apache.beam.sdk.util.state.StateNamespaceForTest;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link InMemoryTimerInternals}.
+ */
+@RunWith(JUnit4.class)
+public class InMemoryTimerInternalsTest {
+
+ private static final StateNamespace NS1 = new StateNamespaceForTest("NS1");
+
+ @Test
+ public void testFiringTimers() throws Exception {
+ InMemoryTimerInternals underTest = new InMemoryTimerInternals();
+ TimerData processingTime1 = TimerData.of(NS1, new Instant(19), TimeDomain.PROCESSING_TIME);
+ TimerData processingTime2 = TimerData.of(NS1, new Instant(29), TimeDomain.PROCESSING_TIME);
+
+ underTest.setTimer(processingTime1);
+ underTest.setTimer(processingTime2);
+
+ underTest.advanceProcessingTime(new Instant(20));
+ assertEquals(processingTime1, underTest.removeNextProcessingTimer());
+ assertNull(underTest.removeNextProcessingTimer());
+
+ // Advancing just a little shouldn't refire
+ underTest.advanceProcessingTime(new Instant(21));
+ assertNull(underTest.removeNextProcessingTimer());
+
+ // Adding the timer and advancing a little should refire
+ underTest.setTimer(processingTime1);
+ assertEquals(processingTime1, underTest.removeNextProcessingTimer());
+ assertNull(underTest.removeNextProcessingTimer());
+
+ // And advancing the rest of the way should still have the other timer
+ underTest.advanceProcessingTime(new Instant(30));
+ assertEquals(processingTime2, underTest.removeNextProcessingTimer());
+ assertNull(underTest.removeNextProcessingTimer());
+ }
+
+ @Test
+ public void testFiringTimersWithCallback() throws Exception {
+ InMemoryTimerInternals underTest = new InMemoryTimerInternals();
+ TimerData processingTime1 = TimerData.of(NS1, new Instant(19), TimeDomain.PROCESSING_TIME);
+ TimerData processingTime2 = TimerData.of(NS1, new Instant(29), TimeDomain.PROCESSING_TIME);
+
+ underTest.setTimer(processingTime1);
+ underTest.setTimer(processingTime2);
+
+ underTest.advanceProcessingTime(new Instant(20));
+ assertThat(underTest.removeNextProcessingTimer(), equalTo(processingTime1));
+ assertThat(underTest.removeNextProcessingTimer(), nullValue());
+
+ // Advancing just a little shouldn't refire
+ underTest.advanceProcessingTime(new Instant(21));
+ assertThat(underTest.removeNextProcessingTimer(), nullValue());
+
+ // Adding the timer and advancing a little should fire again
+ underTest.setTimer(processingTime1);
+ underTest.advanceProcessingTime(new Instant(21));
+ assertThat(underTest.removeNextProcessingTimer(), equalTo(processingTime1));
+ assertThat(underTest.removeNextProcessingTimer(), nullValue());
+
+ // And advancing the rest of the way should still have the other timer
+ underTest.advanceProcessingTime(new Instant(30));
+ assertThat(underTest.removeNextProcessingTimer(), equalTo(processingTime2));
+ assertThat(underTest.removeNextProcessingTimer(), nullValue());
+ }
+
+ @Test
+ public void testTimerOrdering() throws Exception {
+ InMemoryTimerInternals underTest = new InMemoryTimerInternals();
+ TimerData eventTime1 = TimerData.of(NS1, new Instant(19), TimeDomain.EVENT_TIME);
+ TimerData processingTime1 = TimerData.of(NS1, new Instant(19), TimeDomain.PROCESSING_TIME);
+ TimerData synchronizedProcessingTime1 = TimerData.of(
+ NS1, new Instant(19), TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
+ TimerData eventTime2 = TimerData.of(NS1, new Instant(29), TimeDomain.EVENT_TIME);
+ TimerData processingTime2 = TimerData.of(NS1, new Instant(29), TimeDomain.PROCESSING_TIME);
+ TimerData synchronizedProcessingTime2 = TimerData.of(
+ NS1, new Instant(29), TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
+
+ underTest.setTimer(processingTime1);
+ underTest.setTimer(eventTime1);
+ underTest.setTimer(synchronizedProcessingTime1);
+ underTest.setTimer(processingTime2);
+ underTest.setTimer(eventTime2);
+ underTest.setTimer(synchronizedProcessingTime2);
+
+ assertNull(underTest.removeNextEventTimer());
+ underTest.advanceInputWatermark(new Instant(30));
+ assertEquals(eventTime1, underTest.removeNextEventTimer());
+ assertEquals(eventTime2, underTest.removeNextEventTimer());
+ assertNull(underTest.removeNextEventTimer());
+
+ assertNull(underTest.removeNextProcessingTimer());
+ underTest.advanceProcessingTime(new Instant(30));
+ assertEquals(processingTime1, underTest.removeNextProcessingTimer());
+ assertEquals(processingTime2, underTest.removeNextProcessingTimer());
+ assertNull(underTest.removeNextProcessingTimer());
+
+ assertNull(underTest.removeNextSynchronizedProcessingTimer());
+ underTest.advanceSynchronizedProcessingTime(new Instant(30));
+ assertEquals(synchronizedProcessingTime1, underTest.removeNextSynchronizedProcessingTimer());
+ assertEquals(synchronizedProcessingTime2, underTest.removeNextSynchronizedProcessingTimer());
+ assertNull(underTest.removeNextProcessingTimer());
+ }
+
+ @Test
+ public void testDeduplicate() throws Exception {
+ InMemoryTimerInternals underTest = new InMemoryTimerInternals();
+ TimerData eventTime = TimerData.of(NS1, new Instant(19), TimeDomain.EVENT_TIME);
+ TimerData processingTime = TimerData.of(NS1, new Instant(19), TimeDomain.PROCESSING_TIME);
+ underTest.setTimer(eventTime);
+ underTest.setTimer(eventTime);
+ underTest.setTimer(processingTime);
+ underTest.setTimer(processingTime);
+ underTest.advanceProcessingTime(new Instant(20));
+ underTest.advanceInputWatermark(new Instant(20));
+
+ assertEquals(processingTime, underTest.removeNextProcessingTimer());
+ assertNull(underTest.removeNextProcessingTimer());
+ assertEquals(eventTime, underTest.removeNextEventTimer());
+ assertNull(underTest.removeNextEventTimer());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/445c1205/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
index db0cf91..890195a 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
@@ -73,7 +73,6 @@ import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingInternals;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode;
-import org.apache.beam.sdk.util.state.InMemoryTimerInternals;
import org.apache.beam.sdk.util.state.StateNamespace;
import org.apache.beam.sdk.util.state.StateNamespaces;
import org.apache.beam.sdk.util.state.StateTag;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/445c1205/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
index 0f0b106..74a566b 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
@@ -28,6 +28,7 @@ import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import java.io.Serializable;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
@@ -196,6 +197,8 @@ public class SplittableParDoTest {
tester;
private Instant currentProcessingTime;
+ private InMemoryTimerInternals timerInternals;
+
ProcessFnTester(
Instant currentProcessingTime,
DoFn<InputT, OutputT> fn,
@@ -206,6 +209,7 @@ public class SplittableParDoTest {
new SplittableParDo.ProcessFn<>(
fn, inputCoder, restrictionCoder, IntervalWindow.getCoder());
this.tester = DoFnTester.of(processFn);
+ this.timerInternals = new InMemoryTimerInternals();
processFn.setStateInternalsFactory(
new StateInternalsFactory<String>() {
@Override
@@ -217,7 +221,7 @@ public class SplittableParDoTest {
new TimerInternalsFactory<String>() {
@Override
public TimerInternals timerInternalsForKey(String key) {
- return tester.getTimerInternals();
+ return timerInternals;
}
});
processFn.setOutputWindowedValue(
@@ -253,7 +257,7 @@ public class SplittableParDoTest {
// through the state/timer/output callbacks.
this.tester.setCloningBehavior(DoFnTester.CloningBehavior.DO_NOT_CLONE);
this.tester.startBundle();
- this.tester.advanceProcessingTime(currentProcessingTime);
+ timerInternals.advanceProcessingTime(currentProcessingTime);
this.currentProcessingTime = currentProcessingTime;
}
@@ -291,7 +295,13 @@ public class SplittableParDoTest {
*/
boolean advanceProcessingTimeBy(Duration duration) throws Exception {
currentProcessingTime = currentProcessingTime.plus(duration);
- List<TimerInternals.TimerData> timers = tester.advanceProcessingTime(currentProcessingTime);
+ timerInternals.advanceProcessingTime(currentProcessingTime);
+
+ List<TimerInternals.TimerData> timers = new ArrayList<>();
+ TimerInternals.TimerData nextTimer;
+ while ((nextTimer = timerInternals.removeNextProcessingTimer()) != null) {
+ timers.add(nextTimer);
+ }
if (timers.isEmpty()) {
return false;
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/445c1205/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java
index be63c06..2a626d4 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java
@@ -34,6 +34,7 @@ import java.util.Set;
import javax.annotation.Nullable;
import org.apache.beam.runners.core.ActiveWindowSet;
import org.apache.beam.runners.core.ActiveWindowSet.MergeCallback;
+import org.apache.beam.runners.core.InMemoryTimerInternals;
import org.apache.beam.runners.core.MergingActiveWindowSet;
import org.apache.beam.runners.core.NonMergingActiveWindowSet;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -46,7 +47,6 @@ import org.apache.beam.sdk.util.Timers;
import org.apache.beam.sdk.util.WindowTracing;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode;
-import org.apache.beam.sdk.util.state.InMemoryTimerInternals;
import org.apache.beam.sdk.util.state.StateInternals;
import org.apache.beam.sdk.util.state.StateNamespace;
import org.apache.beam.sdk.util.state.StateNamespaces;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/445c1205/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java
index 87d3f50..5432d58 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.apache.beam.runners.core.GroupAlsoByWindowsDoFn;
+import org.apache.beam.runners.core.InMemoryTimerInternals;
import org.apache.beam.runners.core.OutputWindowedValue;
import org.apache.beam.runners.core.ReduceFnRunner;
import org.apache.beam.runners.core.SystemReduceFn;
@@ -37,7 +38,6 @@ import org.apache.beam.sdk.util.SideInputReader;
import org.apache.beam.sdk.util.TimerInternals;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.util.state.InMemoryTimerInternals;
import org.apache.beam.sdk.util.state.StateInternals;
import org.apache.beam.sdk.util.state.StateInternalsFactory;
import org.apache.beam.sdk.values.KV;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/445c1205/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
index 93b3f59..2d8684a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
@@ -46,12 +46,10 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.util.Timer;
-import org.apache.beam.sdk.util.TimerInternals;
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingInternals;
import org.apache.beam.sdk.util.state.InMemoryStateInternals;
-import org.apache.beam.sdk.util.state.InMemoryTimerInternals;
import org.apache.beam.sdk.util.state.StateInternals;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TimestampedValue;
@@ -143,10 +141,6 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
return (StateInternals<K>) stateInternals;
}
- public TimerInternals getTimerInternals() {
- return timerInternals;
- }
-
/**
* When a {@link DoFnTester} should clone the {@link DoFn} under test and how it should manage
* the lifecycle of the {@link DoFn}.
@@ -233,7 +227,6 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
context.setupDelegateAggregators();
// State and timer internals are per-bundle.
stateInternals = InMemoryStateInternals.forKey(new Object());
- timerInternals = new InMemoryTimerInternals();
try {
fnInvoker.invokeStartBundle(context);
} catch (UserCodeException e) {
@@ -542,34 +535,6 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
return extractAggregatorValue(agg.getName(), agg.getCombineFn());
}
- public List<TimerInternals.TimerData> advanceInputWatermark(Instant newWatermark) {
- try {
- timerInternals.advanceInputWatermark(newWatermark);
- final List<TimerInternals.TimerData> firedTimers = new ArrayList<>();
- TimerInternals.TimerData timer;
- while ((timer = timerInternals.removeNextEventTimer()) != null) {
- firedTimers.add(timer);
- }
- return firedTimers;
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- public List<TimerInternals.TimerData> advanceProcessingTime(Instant newProcessingTime) {
- try {
- timerInternals.advanceProcessingTime(newProcessingTime);
- final List<TimerInternals.TimerData> firedTimers = new ArrayList<>();
- TimerInternals.TimerData timer;
- while ((timer = timerInternals.removeNextProcessingTimer()) != null) {
- firedTimers.add(timer);
- }
- return firedTimers;
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
private <AccumT, AggregateT> AggregateT extractAggregatorValue(
String name, CombineFn<?, AccumT, AggregateT> combiner) {
@SuppressWarnings("unchecked")
@@ -814,7 +779,6 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
private Map<TupleTag<?>, List<ValueInSingleWindow<?>>> outputs;
private InMemoryStateInternals<?> stateInternals;
- private InMemoryTimerInternals timerInternals;
/** The state of processing of the {@link DoFn} under test. */
private State state = State.UNINITIALIZED;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/445c1205/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java
deleted file mode 100644
index 44b44f0..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java
+++ /dev/null
@@ -1,275 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util.state;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
-
-import com.google.common.base.MoreObjects;
-import java.util.HashSet;
-import java.util.PriorityQueue;
-import java.util.Set;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.TimeDomain;
-import org.apache.beam.sdk.util.TimerInternals;
-import org.apache.beam.sdk.util.WindowTracing;
-import org.joda.time.Instant;
-
-/**
- * Simulates the firing of timers and progression of input and output watermarks for a single
- * computation and key in a Windmill-like streaming environment.
- */
-public class InMemoryTimerInternals implements TimerInternals {
-
- /** At most one timer per timestamp is kept. */
- private Set<TimerData> existingTimers = new HashSet<>();
-
- /** Pending input watermark timers, in timestamp order. */
- private PriorityQueue<TimerData> watermarkTimers = new PriorityQueue<>(11);
-
- /** Pending processing time timers, in timestamp order. */
- private PriorityQueue<TimerData> processingTimers = new PriorityQueue<>(11);
-
- /** Pending synchronized processing time timers, in timestamp order. */
- private PriorityQueue<TimerData> synchronizedProcessingTimers = new PriorityQueue<>(11);
-
- /** Current input watermark. */
- private Instant inputWatermarkTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
-
- /** Current output watermark. */
- @Nullable private Instant outputWatermarkTime = null;
-
- /** Current processing time. */
- private Instant processingTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
-
- /** Current synchronized processing time. */
- private Instant synchronizedProcessingTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
-
- @Override
- @Nullable
- public Instant currentOutputWatermarkTime() {
- return outputWatermarkTime;
- }
-
- /**
- * Returns when the next timer in the given time domain will fire, or {@code null}
- * if there are no timers scheduled in that time domain.
- */
- @Nullable
- public Instant getNextTimer(TimeDomain domain) {
- final TimerData data;
- switch (domain) {
- case EVENT_TIME:
- data = watermarkTimers.peek();
- break;
- case PROCESSING_TIME:
- data = processingTimers.peek();
- break;
- case SYNCHRONIZED_PROCESSING_TIME:
- data = synchronizedProcessingTimers.peek();
- break;
- default:
- throw new IllegalArgumentException("Unexpected time domain: " + domain);
- }
- return (data == null) ? null : data.getTimestamp();
- }
-
- private PriorityQueue<TimerData> queue(TimeDomain domain) {
- switch (domain) {
- case EVENT_TIME:
- return watermarkTimers;
- case PROCESSING_TIME:
- return processingTimers;
- case SYNCHRONIZED_PROCESSING_TIME:
- return synchronizedProcessingTimers;
- default:
- throw new IllegalArgumentException("Unexpected time domain: " + domain);
- }
- }
-
- @Override
- public void setTimer(StateNamespace namespace, String timerId, Instant target,
- TimeDomain timeDomain) {
- throw new UnsupportedOperationException("Setting a timer by ID is not yet supported.");
- }
-
- @Override
- public void setTimer(TimerData timerData) {
- WindowTracing.trace("{}.setTimer: {}", getClass().getSimpleName(), timerData);
- if (existingTimers.add(timerData)) {
- queue(timerData.getDomain()).add(timerData);
- }
- }
-
- @Override
- public void deleteTimer(StateNamespace namespace, String timerId) {
- throw new UnsupportedOperationException("Canceling a timer by ID is not yet supported.");
- }
-
- @Override
- public void deleteTimer(TimerData timer) {
- WindowTracing.trace("{}.deleteTimer: {}", getClass().getSimpleName(), timer);
- existingTimers.remove(timer);
- queue(timer.getDomain()).remove(timer);
- }
-
- @Override
- public Instant currentProcessingTime() {
- return processingTime;
- }
-
- @Override
- @Nullable
- public Instant currentSynchronizedProcessingTime() {
- return synchronizedProcessingTime;
- }
-
- @Override
- public Instant currentInputWatermarkTime() {
- return inputWatermarkTime;
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(getClass())
- .add("watermarkTimers", watermarkTimers)
- .add("processingTimers", processingTimers)
- .add("synchronizedProcessingTimers", synchronizedProcessingTimers)
- .add("inputWatermarkTime", inputWatermarkTime)
- .add("outputWatermarkTime", outputWatermarkTime)
- .add("processingTime", processingTime)
- .toString();
- }
-
- /** Advances input watermark to the given value. */
- public void advanceInputWatermark(Instant newInputWatermark) throws Exception {
- checkNotNull(newInputWatermark);
- checkState(
- !newInputWatermark.isBefore(inputWatermarkTime),
- "Cannot move input watermark time backwards from %s to %s",
- inputWatermarkTime,
- newInputWatermark);
- WindowTracing.trace(
- "{}.advanceInputWatermark: from {} to {}",
- getClass().getSimpleName(), inputWatermarkTime, newInputWatermark);
- inputWatermarkTime = newInputWatermark;
- }
-
- /** Advances output watermark to the given value. */
- public void advanceOutputWatermark(Instant newOutputWatermark) {
- checkNotNull(newOutputWatermark);
- final Instant adjustedOutputWatermark;
- if (newOutputWatermark.isAfter(inputWatermarkTime)) {
- WindowTracing.trace(
- "{}.advanceOutputWatermark: clipping output watermark from {} to {}",
- getClass().getSimpleName(), newOutputWatermark, inputWatermarkTime);
- adjustedOutputWatermark = inputWatermarkTime;
- } else {
- adjustedOutputWatermark = newOutputWatermark;
- }
-
- checkState(
- outputWatermarkTime == null || !adjustedOutputWatermark.isBefore(outputWatermarkTime),
- "Cannot move output watermark time backwards from %s to %s",
- outputWatermarkTime,
- adjustedOutputWatermark);
- WindowTracing.trace(
- "{}.advanceOutputWatermark: from {} to {}",
- getClass().getSimpleName(), outputWatermarkTime, adjustedOutputWatermark);
- outputWatermarkTime = adjustedOutputWatermark;
- }
-
- /** Advances processing time to the given value. */
- public void advanceProcessingTime(Instant newProcessingTime) throws Exception {
- checkNotNull(newProcessingTime);
- checkState(
- !newProcessingTime.isBefore(processingTime),
- "Cannot move processing time backwards from %s to %s",
- processingTime,
- newProcessingTime);
- WindowTracing.trace(
- "{}.advanceProcessingTime: from {} to {}",
- getClass().getSimpleName(), processingTime, newProcessingTime);
- processingTime = newProcessingTime;
- }
-
- /** Advances synchronized processing time to the given value. */
- public void advanceSynchronizedProcessingTime(Instant newSynchronizedProcessingTime)
- throws Exception {
- checkNotNull(newSynchronizedProcessingTime);
- checkState(
- !newSynchronizedProcessingTime.isBefore(synchronizedProcessingTime),
- "Cannot move processing time backwards from %s to %s",
- synchronizedProcessingTime,
- newSynchronizedProcessingTime);
- WindowTracing.trace(
- "{}.advanceProcessingTime: from {} to {}",
- getClass().getSimpleName(), synchronizedProcessingTime, newSynchronizedProcessingTime);
- synchronizedProcessingTime = newSynchronizedProcessingTime;
- }
-
- /** Returns the next eligible event time timer, if none returns null. */
- @Nullable
- public TimerData removeNextEventTimer() {
- TimerData timer = removeNextTimer(inputWatermarkTime, TimeDomain.EVENT_TIME);
- if (timer != null) {
- WindowTracing.trace(
- "{}.removeNextEventTimer: firing {} at {}",
- getClass().getSimpleName(), timer, inputWatermarkTime);
- }
- return timer;
- }
-
- /** Returns the next eligible processing time timer, if none returns null. */
- @Nullable
- public TimerData removeNextProcessingTimer() {
- TimerData timer = removeNextTimer(processingTime, TimeDomain.PROCESSING_TIME);
- if (timer != null) {
- WindowTracing.trace(
- "{}.removeNextProcessingTimer: firing {} at {}",
- getClass().getSimpleName(), timer, processingTime);
- }
- return timer;
- }
-
- /** Returns the next eligible synchronized processing time timer, if none returns null. */
- @Nullable
- public TimerData removeNextSynchronizedProcessingTimer() {
- TimerData timer = removeNextTimer(
- synchronizedProcessingTime, TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
- if (timer != null) {
- WindowTracing.trace(
- "{}.removeNextSynchronizedProcessingTimer: firing {} at {}",
- getClass().getSimpleName(), timer, synchronizedProcessingTime);
- }
- return timer;
- }
-
- @Nullable
- private TimerData removeNextTimer(Instant currentTime, TimeDomain domain) {
- PriorityQueue<TimerData> queue = queue(domain);
- if (!queue.isEmpty() && currentTime.isAfter(queue.peek().getTimestamp())) {
- TimerData timer = queue.remove();
- existingTimers.remove(timer);
- return timer;
- } else {
- return null;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/445c1205/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/InMemoryTimerInternalsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/InMemoryTimerInternalsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/InMemoryTimerInternalsTest.java
deleted file mode 100644
index 4a2763c..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/InMemoryTimerInternalsTest.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util.state;
-
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.nullValue;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertThat;
-
-import org.apache.beam.sdk.util.TimeDomain;
-import org.apache.beam.sdk.util.TimerInternals.TimerData;
-import org.joda.time.Instant;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for {@link InMemoryTimerInternals}.
- */
-@RunWith(JUnit4.class)
-public class InMemoryTimerInternalsTest {
-
- private static final StateNamespace NS1 = new StateNamespaceForTest("NS1");
-
- @Test
- public void testFiringTimers() throws Exception {
- InMemoryTimerInternals underTest = new InMemoryTimerInternals();
- TimerData processingTime1 = TimerData.of(NS1, new Instant(19), TimeDomain.PROCESSING_TIME);
- TimerData processingTime2 = TimerData.of(NS1, new Instant(29), TimeDomain.PROCESSING_TIME);
-
- underTest.setTimer(processingTime1);
- underTest.setTimer(processingTime2);
-
- underTest.advanceProcessingTime(new Instant(20));
- assertEquals(processingTime1, underTest.removeNextProcessingTimer());
- assertNull(underTest.removeNextProcessingTimer());
-
- // Advancing just a little shouldn't refire
- underTest.advanceProcessingTime(new Instant(21));
- assertNull(underTest.removeNextProcessingTimer());
-
- // Adding the timer and advancing a little should refire
- underTest.setTimer(processingTime1);
- assertEquals(processingTime1, underTest.removeNextProcessingTimer());
- assertNull(underTest.removeNextProcessingTimer());
-
- // And advancing the rest of the way should still have the other timer
- underTest.advanceProcessingTime(new Instant(30));
- assertEquals(processingTime2, underTest.removeNextProcessingTimer());
- assertNull(underTest.removeNextProcessingTimer());
- }
-
- @Test
- public void testFiringTimersWithCallback() throws Exception {
- InMemoryTimerInternals underTest = new InMemoryTimerInternals();
- TimerData processingTime1 = TimerData.of(NS1, new Instant(19), TimeDomain.PROCESSING_TIME);
- TimerData processingTime2 = TimerData.of(NS1, new Instant(29), TimeDomain.PROCESSING_TIME);
-
- underTest.setTimer(processingTime1);
- underTest.setTimer(processingTime2);
-
- underTest.advanceProcessingTime(new Instant(20));
- assertThat(underTest.removeNextProcessingTimer(), equalTo(processingTime1));
- assertThat(underTest.removeNextProcessingTimer(), nullValue());
-
- // Advancing just a little shouldn't refire
- underTest.advanceProcessingTime(new Instant(21));
- assertThat(underTest.removeNextProcessingTimer(), nullValue());
-
- // Adding the timer and advancing a little should fire again
- underTest.setTimer(processingTime1);
- underTest.advanceProcessingTime(new Instant(21));
- assertThat(underTest.removeNextProcessingTimer(), equalTo(processingTime1));
- assertThat(underTest.removeNextProcessingTimer(), nullValue());
-
- // And advancing the rest of the way should still have the other timer
- underTest.advanceProcessingTime(new Instant(30));
- assertThat(underTest.removeNextProcessingTimer(), equalTo(processingTime2));
- assertThat(underTest.removeNextProcessingTimer(), nullValue());
- }
-
- @Test
- public void testTimerOrdering() throws Exception {
- InMemoryTimerInternals underTest = new InMemoryTimerInternals();
- TimerData eventTime1 = TimerData.of(NS1, new Instant(19), TimeDomain.EVENT_TIME);
- TimerData processingTime1 = TimerData.of(NS1, new Instant(19), TimeDomain.PROCESSING_TIME);
- TimerData synchronizedProcessingTime1 = TimerData.of(
- NS1, new Instant(19), TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
- TimerData eventTime2 = TimerData.of(NS1, new Instant(29), TimeDomain.EVENT_TIME);
- TimerData processingTime2 = TimerData.of(NS1, new Instant(29), TimeDomain.PROCESSING_TIME);
- TimerData synchronizedProcessingTime2 = TimerData.of(
- NS1, new Instant(29), TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
-
- underTest.setTimer(processingTime1);
- underTest.setTimer(eventTime1);
- underTest.setTimer(synchronizedProcessingTime1);
- underTest.setTimer(processingTime2);
- underTest.setTimer(eventTime2);
- underTest.setTimer(synchronizedProcessingTime2);
-
- assertNull(underTest.removeNextEventTimer());
- underTest.advanceInputWatermark(new Instant(30));
- assertEquals(eventTime1, underTest.removeNextEventTimer());
- assertEquals(eventTime2, underTest.removeNextEventTimer());
- assertNull(underTest.removeNextEventTimer());
-
- assertNull(underTest.removeNextProcessingTimer());
- underTest.advanceProcessingTime(new Instant(30));
- assertEquals(processingTime1, underTest.removeNextProcessingTimer());
- assertEquals(processingTime2, underTest.removeNextProcessingTimer());
- assertNull(underTest.removeNextProcessingTimer());
-
- assertNull(underTest.removeNextSynchronizedProcessingTimer());
- underTest.advanceSynchronizedProcessingTime(new Instant(30));
- assertEquals(synchronizedProcessingTime1, underTest.removeNextSynchronizedProcessingTimer());
- assertEquals(synchronizedProcessingTime2, underTest.removeNextSynchronizedProcessingTimer());
- assertNull(underTest.removeNextProcessingTimer());
- }
-
- @Test
- public void testDeduplicate() throws Exception {
- InMemoryTimerInternals underTest = new InMemoryTimerInternals();
- TimerData eventTime = TimerData.of(NS1, new Instant(19), TimeDomain.EVENT_TIME);
- TimerData processingTime = TimerData.of(NS1, new Instant(19), TimeDomain.PROCESSING_TIME);
- underTest.setTimer(eventTime);
- underTest.setTimer(eventTime);
- underTest.setTimer(processingTime);
- underTest.setTimer(processingTime);
- underTest.advanceProcessingTime(new Instant(20));
- underTest.advanceInputWatermark(new Instant(20));
-
- assertEquals(processingTime, underTest.removeNextProcessingTimer());
- assertNull(underTest.removeNextProcessingTimer());
- assertEquals(eventTime, underTest.removeNextEventTimer());
- assertNull(underTest.removeNextEventTimer());
- }
-}