You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/12/21 22:49:54 UTC

[20/51] [abbrv] incubator-beam git commit: Move InMemoryTimerInternals to runners-core

Move InMemoryTimerInternals to runners-core


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/445c1205
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/445c1205
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/445c1205

Branch: refs/heads/python-sdk
Commit: 445c120510948fb23e6d35b502da1e5a4f0ffdfb
Parents: 22e25a4
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Dec 15 20:45:56 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue Dec 20 11:21:52 2016 -0800

----------------------------------------------------------------------
 .../GroupAlsoByWindowsViaOutputBufferDoFn.java  |   1 -
 .../runners/core/InMemoryTimerInternals.java    | 273 ++++++++++++++++++
 .../core/InMemoryTimerInternalsTest.java        | 155 +++++++++++
 .../beam/runners/core/ReduceFnTester.java       |   1 -
 .../beam/runners/core/SplittableParDoTest.java  |  16 +-
 .../triggers/TriggerStateMachineTester.java     |   2 +-
 .../translation/SparkGroupAlsoByWindowFn.java   |   2 +-
 .../apache/beam/sdk/transforms/DoFnTester.java  |  36 ---
 .../sdk/util/state/InMemoryTimerInternals.java  | 275 -------------------
 .../util/state/InMemoryTimerInternalsTest.java  | 153 -----------
 10 files changed, 443 insertions(+), 471 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/445c1205/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java
index 9189191..efcd771 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java
@@ -27,7 +27,6 @@ import org.apache.beam.sdk.util.SystemDoFnInternal;
 import org.apache.beam.sdk.util.TimerInternals;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.util.state.InMemoryTimerInternals;
 import org.apache.beam.sdk.util.state.StateInternals;
 import org.apache.beam.sdk.util.state.StateInternalsFactory;
 import org.joda.time.Instant;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/445c1205/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
new file mode 100644
index 0000000..5fcd088
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.base.MoreObjects;
+import java.util.HashSet;
+import java.util.PriorityQueue;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.TimeDomain;
+import org.apache.beam.sdk.util.TimerInternals;
+import org.apache.beam.sdk.util.WindowTracing;
+import org.apache.beam.sdk.util.state.StateNamespace;
+import org.joda.time.Instant;
+
+/** {@link TimerInternals} with all watermarks and processing clock simulated in-memory. */
+public class InMemoryTimerInternals implements TimerInternals {
+
+  /** At most one timer per timestamp is kept. */
+  private Set<TimerData> existingTimers = new HashSet<>();
+
+  /** Pending input watermark timers, in timestamp order. */
+  private PriorityQueue<TimerData> watermarkTimers = new PriorityQueue<>(11);
+
+  /** Pending processing time timers, in timestamp order. */
+  private PriorityQueue<TimerData> processingTimers = new PriorityQueue<>(11);
+
+  /** Pending synchronized processing time timers, in timestamp order. */
+  private PriorityQueue<TimerData> synchronizedProcessingTimers = new PriorityQueue<>(11);
+
+  /** Current input watermark. */
+  private Instant inputWatermarkTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
+
+  /** Current output watermark. */
+  @Nullable private Instant outputWatermarkTime = null;
+
+  /** Current processing time. */
+  private Instant processingTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
+
+  /** Current synchronized processing time. */
+  private Instant synchronizedProcessingTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
+
+  @Override
+  @Nullable
+  public Instant currentOutputWatermarkTime() {
+    return outputWatermarkTime;
+  }
+
+  /**
+   * Returns when the next timer in the given time domain will fire, or {@code null}
+   * if there are no timers scheduled in that time domain.
+   */
+  @Nullable
+  public Instant getNextTimer(TimeDomain domain) {
+    final TimerData data;
+    switch (domain) {
+      case EVENT_TIME:
+        data = watermarkTimers.peek();
+        break;
+      case PROCESSING_TIME:
+        data = processingTimers.peek();
+        break;
+      case SYNCHRONIZED_PROCESSING_TIME:
+        data = synchronizedProcessingTimers.peek();
+        break;
+      default:
+        throw new IllegalArgumentException("Unexpected time domain: " + domain);
+    }
+    return (data == null) ? null : data.getTimestamp();
+  }
+
+  private PriorityQueue<TimerData> queue(TimeDomain domain) {
+    switch (domain) {
+      case EVENT_TIME:
+        return watermarkTimers;
+      case PROCESSING_TIME:
+        return processingTimers;
+      case SYNCHRONIZED_PROCESSING_TIME:
+        return synchronizedProcessingTimers;
+      default:
+        throw new IllegalArgumentException("Unexpected time domain: " + domain);
+    }
+  }
+
+  @Override
+  public void setTimer(StateNamespace namespace, String timerId, Instant target,
+      TimeDomain timeDomain) {
+    throw new UnsupportedOperationException("Setting a timer by ID is not yet supported.");
+  }
+
+  @Override
+  public void setTimer(TimerData timerData) {
+    WindowTracing.trace("{}.setTimer: {}", getClass().getSimpleName(), timerData);
+    if (existingTimers.add(timerData)) {
+      queue(timerData.getDomain()).add(timerData);
+    }
+  }
+
+  @Override
+  public void deleteTimer(StateNamespace namespace, String timerId) {
+    throw new UnsupportedOperationException("Canceling a timer by ID is not yet supported.");
+  }
+
+  @Override
+  public void deleteTimer(TimerData timer) {
+    WindowTracing.trace("{}.deleteTimer: {}", getClass().getSimpleName(), timer);
+    existingTimers.remove(timer);
+    queue(timer.getDomain()).remove(timer);
+  }
+
+  @Override
+  public Instant currentProcessingTime() {
+    return processingTime;
+  }
+
+  @Override
+  @Nullable
+  public Instant currentSynchronizedProcessingTime() {
+    return synchronizedProcessingTime;
+  }
+
+  @Override
+  public Instant currentInputWatermarkTime() {
+    return inputWatermarkTime;
+  }
+
+  @Override
+  public String toString() {
+    return MoreObjects.toStringHelper(getClass())
+        .add("watermarkTimers", watermarkTimers)
+        .add("processingTimers", processingTimers)
+        .add("synchronizedProcessingTimers", synchronizedProcessingTimers)
+        .add("inputWatermarkTime", inputWatermarkTime)
+        .add("outputWatermarkTime", outputWatermarkTime)
+        .add("processingTime", processingTime)
+        .toString();
+  }
+
+  /** Advances input watermark to the given value. */
+  public void advanceInputWatermark(Instant newInputWatermark) throws Exception {
+    checkNotNull(newInputWatermark);
+    checkState(
+        !newInputWatermark.isBefore(inputWatermarkTime),
+        "Cannot move input watermark time backwards from %s to %s",
+        inputWatermarkTime,
+        newInputWatermark);
+    WindowTracing.trace(
+        "{}.advanceInputWatermark: from {} to {}",
+        getClass().getSimpleName(), inputWatermarkTime, newInputWatermark);
+    inputWatermarkTime = newInputWatermark;
+  }
+
+  /** Advances output watermark to the given value. */
+  public void advanceOutputWatermark(Instant newOutputWatermark) {
+    checkNotNull(newOutputWatermark);
+    final Instant adjustedOutputWatermark;
+    if (newOutputWatermark.isAfter(inputWatermarkTime)) {
+      WindowTracing.trace(
+          "{}.advanceOutputWatermark: clipping output watermark from {} to {}",
+          getClass().getSimpleName(), newOutputWatermark, inputWatermarkTime);
+      adjustedOutputWatermark = inputWatermarkTime;
+    } else {
+      adjustedOutputWatermark = newOutputWatermark;
+    }
+
+    checkState(
+        outputWatermarkTime == null || !adjustedOutputWatermark.isBefore(outputWatermarkTime),
+        "Cannot move output watermark time backwards from %s to %s",
+        outputWatermarkTime,
+        adjustedOutputWatermark);
+    WindowTracing.trace(
+        "{}.advanceOutputWatermark: from {} to {}",
+        getClass().getSimpleName(), outputWatermarkTime, adjustedOutputWatermark);
+    outputWatermarkTime = adjustedOutputWatermark;
+  }
+
+  /** Advances processing time to the given value. */
+  public void advanceProcessingTime(Instant newProcessingTime) throws Exception {
+    checkNotNull(newProcessingTime);
+    checkState(
+        !newProcessingTime.isBefore(processingTime),
+        "Cannot move processing time backwards from %s to %s",
+        processingTime,
+        newProcessingTime);
+    WindowTracing.trace(
+        "{}.advanceProcessingTime: from {} to {}",
+        getClass().getSimpleName(), processingTime, newProcessingTime);
+    processingTime = newProcessingTime;
+  }
+
+  /** Advances synchronized processing time to the given value. */
+  public void advanceSynchronizedProcessingTime(Instant newSynchronizedProcessingTime)
+      throws Exception {
+    checkNotNull(newSynchronizedProcessingTime);
+    checkState(
+        !newSynchronizedProcessingTime.isBefore(synchronizedProcessingTime),
+        "Cannot move processing time backwards from %s to %s",
+        synchronizedProcessingTime,
+        newSynchronizedProcessingTime);
+    WindowTracing.trace(
+        "{}.advanceProcessingTime: from {} to {}",
+        getClass().getSimpleName(), synchronizedProcessingTime, newSynchronizedProcessingTime);
+    synchronizedProcessingTime = newSynchronizedProcessingTime;
+  }
+
+  /** Returns the next eligible event time timer, if none returns null. */
+  @Nullable
+  public TimerData removeNextEventTimer() {
+    TimerData timer = removeNextTimer(inputWatermarkTime, TimeDomain.EVENT_TIME);
+    if (timer != null) {
+      WindowTracing.trace(
+          "{}.removeNextEventTimer: firing {} at {}",
+          getClass().getSimpleName(), timer, inputWatermarkTime);
+    }
+    return timer;
+  }
+
+  /** Returns the next eligible processing time timer, if none returns null. */
+  @Nullable
+  public TimerData removeNextProcessingTimer() {
+    TimerData timer = removeNextTimer(processingTime, TimeDomain.PROCESSING_TIME);
+    if (timer != null) {
+      WindowTracing.trace(
+          "{}.removeNextProcessingTimer: firing {} at {}",
+          getClass().getSimpleName(), timer, processingTime);
+    }
+    return timer;
+  }
+
+  /** Returns the next eligible synchronized processing time timer, if none returns null. */
+  @Nullable
+  public TimerData removeNextSynchronizedProcessingTimer() {
+    TimerData timer = removeNextTimer(
+        synchronizedProcessingTime, TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
+    if (timer != null) {
+      WindowTracing.trace(
+          "{}.removeNextSynchronizedProcessingTimer: firing {} at {}",
+          getClass().getSimpleName(), timer, synchronizedProcessingTime);
+    }
+    return timer;
+  }
+
+  @Nullable
+  private TimerData removeNextTimer(Instant currentTime, TimeDomain domain) {
+    PriorityQueue<TimerData> queue = queue(domain);
+    if (!queue.isEmpty() && currentTime.isAfter(queue.peek().getTimestamp())) {
+      TimerData timer = queue.remove();
+      existingTimers.remove(timer);
+      return timer;
+    } else {
+      return null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/445c1205/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryTimerInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryTimerInternalsTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryTimerInternalsTest.java
new file mode 100644
index 0000000..2caa874
--- /dev/null
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryTimerInternalsTest.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.nullValue;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+
+import org.apache.beam.sdk.util.TimeDomain;
+import org.apache.beam.sdk.util.TimerInternals.TimerData;
+import org.apache.beam.sdk.util.state.StateNamespace;
+import org.apache.beam.sdk.util.state.StateNamespaceForTest;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link InMemoryTimerInternals}.
+ */
+@RunWith(JUnit4.class)
+public class InMemoryTimerInternalsTest {
+
+  private static final StateNamespace NS1 = new StateNamespaceForTest("NS1");
+
+  @Test
+  public void testFiringTimers() throws Exception {
+    InMemoryTimerInternals underTest = new InMemoryTimerInternals();
+    TimerData processingTime1 = TimerData.of(NS1, new Instant(19), TimeDomain.PROCESSING_TIME);
+    TimerData processingTime2 = TimerData.of(NS1, new Instant(29), TimeDomain.PROCESSING_TIME);
+
+    underTest.setTimer(processingTime1);
+    underTest.setTimer(processingTime2);
+
+    underTest.advanceProcessingTime(new Instant(20));
+    assertEquals(processingTime1, underTest.removeNextProcessingTimer());
+    assertNull(underTest.removeNextProcessingTimer());
+
+    // Advancing just a little shouldn't refire
+    underTest.advanceProcessingTime(new Instant(21));
+    assertNull(underTest.removeNextProcessingTimer());
+
+    // Adding the timer and advancing a little should refire
+    underTest.setTimer(processingTime1);
+    assertEquals(processingTime1, underTest.removeNextProcessingTimer());
+    assertNull(underTest.removeNextProcessingTimer());
+
+    // And advancing the rest of the way should still have the other timer
+    underTest.advanceProcessingTime(new Instant(30));
+    assertEquals(processingTime2, underTest.removeNextProcessingTimer());
+    assertNull(underTest.removeNextProcessingTimer());
+  }
+
+  @Test
+  public void testFiringTimersWithCallback() throws Exception {
+    InMemoryTimerInternals underTest = new InMemoryTimerInternals();
+    TimerData processingTime1 = TimerData.of(NS1, new Instant(19), TimeDomain.PROCESSING_TIME);
+    TimerData processingTime2 = TimerData.of(NS1, new Instant(29), TimeDomain.PROCESSING_TIME);
+
+    underTest.setTimer(processingTime1);
+    underTest.setTimer(processingTime2);
+
+    underTest.advanceProcessingTime(new Instant(20));
+    assertThat(underTest.removeNextProcessingTimer(), equalTo(processingTime1));
+    assertThat(underTest.removeNextProcessingTimer(), nullValue());
+
+    // Advancing just a little shouldn't refire
+    underTest.advanceProcessingTime(new Instant(21));
+    assertThat(underTest.removeNextProcessingTimer(), nullValue());
+
+    // Adding the timer and advancing a little should fire again
+    underTest.setTimer(processingTime1);
+    underTest.advanceProcessingTime(new Instant(21));
+    assertThat(underTest.removeNextProcessingTimer(), equalTo(processingTime1));
+    assertThat(underTest.removeNextProcessingTimer(), nullValue());
+
+    // And advancing the rest of the way should still have the other timer
+    underTest.advanceProcessingTime(new Instant(30));
+    assertThat(underTest.removeNextProcessingTimer(), equalTo(processingTime2));
+    assertThat(underTest.removeNextProcessingTimer(), nullValue());
+  }
+
+  @Test
+  public void testTimerOrdering() throws Exception {
+    InMemoryTimerInternals underTest = new InMemoryTimerInternals();
+    TimerData eventTime1 = TimerData.of(NS1, new Instant(19), TimeDomain.EVENT_TIME);
+    TimerData processingTime1 = TimerData.of(NS1, new Instant(19), TimeDomain.PROCESSING_TIME);
+    TimerData synchronizedProcessingTime1 = TimerData.of(
+        NS1, new Instant(19), TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
+    TimerData eventTime2 = TimerData.of(NS1, new Instant(29), TimeDomain.EVENT_TIME);
+    TimerData processingTime2 = TimerData.of(NS1, new Instant(29), TimeDomain.PROCESSING_TIME);
+    TimerData synchronizedProcessingTime2 = TimerData.of(
+        NS1, new Instant(29), TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
+
+    underTest.setTimer(processingTime1);
+    underTest.setTimer(eventTime1);
+    underTest.setTimer(synchronizedProcessingTime1);
+    underTest.setTimer(processingTime2);
+    underTest.setTimer(eventTime2);
+    underTest.setTimer(synchronizedProcessingTime2);
+
+    assertNull(underTest.removeNextEventTimer());
+    underTest.advanceInputWatermark(new Instant(30));
+    assertEquals(eventTime1, underTest.removeNextEventTimer());
+    assertEquals(eventTime2, underTest.removeNextEventTimer());
+    assertNull(underTest.removeNextEventTimer());
+
+    assertNull(underTest.removeNextProcessingTimer());
+    underTest.advanceProcessingTime(new Instant(30));
+    assertEquals(processingTime1, underTest.removeNextProcessingTimer());
+    assertEquals(processingTime2, underTest.removeNextProcessingTimer());
+    assertNull(underTest.removeNextProcessingTimer());
+
+    assertNull(underTest.removeNextSynchronizedProcessingTimer());
+    underTest.advanceSynchronizedProcessingTime(new Instant(30));
+    assertEquals(synchronizedProcessingTime1, underTest.removeNextSynchronizedProcessingTimer());
+    assertEquals(synchronizedProcessingTime2, underTest.removeNextSynchronizedProcessingTimer());
+    assertNull(underTest.removeNextProcessingTimer());
+  }
+
+  @Test
+  public void testDeduplicate() throws Exception {
+    InMemoryTimerInternals underTest = new InMemoryTimerInternals();
+    TimerData eventTime = TimerData.of(NS1, new Instant(19), TimeDomain.EVENT_TIME);
+    TimerData processingTime = TimerData.of(NS1, new Instant(19), TimeDomain.PROCESSING_TIME);
+    underTest.setTimer(eventTime);
+    underTest.setTimer(eventTime);
+    underTest.setTimer(processingTime);
+    underTest.setTimer(processingTime);
+    underTest.advanceProcessingTime(new Instant(20));
+    underTest.advanceInputWatermark(new Instant(20));
+
+    assertEquals(processingTime, underTest.removeNextProcessingTimer());
+    assertNull(underTest.removeNextProcessingTimer());
+    assertEquals(eventTime, underTest.removeNextEventTimer());
+    assertNull(underTest.removeNextEventTimer());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/445c1205/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
index db0cf91..890195a 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
@@ -73,7 +73,6 @@ import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingInternals;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode;
-import org.apache.beam.sdk.util.state.InMemoryTimerInternals;
 import org.apache.beam.sdk.util.state.StateNamespace;
 import org.apache.beam.sdk.util.state.StateNamespaces;
 import org.apache.beam.sdk.util.state.StateTag;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/445c1205/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
index 0f0b106..74a566b 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
@@ -28,6 +28,7 @@ import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
 import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
@@ -196,6 +197,8 @@ public class SplittableParDoTest {
         tester;
     private Instant currentProcessingTime;
 
+    private InMemoryTimerInternals timerInternals;
+
     ProcessFnTester(
         Instant currentProcessingTime,
         DoFn<InputT, OutputT> fn,
@@ -206,6 +209,7 @@ public class SplittableParDoTest {
           new SplittableParDo.ProcessFn<>(
               fn, inputCoder, restrictionCoder, IntervalWindow.getCoder());
       this.tester = DoFnTester.of(processFn);
+      this.timerInternals = new InMemoryTimerInternals();
       processFn.setStateInternalsFactory(
           new StateInternalsFactory<String>() {
             @Override
@@ -217,7 +221,7 @@ public class SplittableParDoTest {
           new TimerInternalsFactory<String>() {
             @Override
             public TimerInternals timerInternalsForKey(String key) {
-              return tester.getTimerInternals();
+              return timerInternals;
             }
           });
       processFn.setOutputWindowedValue(
@@ -253,7 +257,7 @@ public class SplittableParDoTest {
       // through the state/timer/output callbacks.
       this.tester.setCloningBehavior(DoFnTester.CloningBehavior.DO_NOT_CLONE);
       this.tester.startBundle();
-      this.tester.advanceProcessingTime(currentProcessingTime);
+      timerInternals.advanceProcessingTime(currentProcessingTime);
 
       this.currentProcessingTime = currentProcessingTime;
     }
@@ -291,7 +295,13 @@ public class SplittableParDoTest {
      */
     boolean advanceProcessingTimeBy(Duration duration) throws Exception {
       currentProcessingTime = currentProcessingTime.plus(duration);
-      List<TimerInternals.TimerData> timers = tester.advanceProcessingTime(currentProcessingTime);
+      timerInternals.advanceProcessingTime(currentProcessingTime);
+
+      List<TimerInternals.TimerData> timers = new ArrayList<>();
+      TimerInternals.TimerData nextTimer;
+      while ((nextTimer = timerInternals.removeNextProcessingTimer()) != null) {
+        timers.add(nextTimer);
+      }
       if (timers.isEmpty()) {
         return false;
       }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/445c1205/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java
index be63c06..2a626d4 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java
@@ -34,6 +34,7 @@ import java.util.Set;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.core.ActiveWindowSet;
 import org.apache.beam.runners.core.ActiveWindowSet.MergeCallback;
+import org.apache.beam.runners.core.InMemoryTimerInternals;
 import org.apache.beam.runners.core.MergingActiveWindowSet;
 import org.apache.beam.runners.core.NonMergingActiveWindowSet;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -46,7 +47,6 @@ import org.apache.beam.sdk.util.Timers;
 import org.apache.beam.sdk.util.WindowTracing;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode;
-import org.apache.beam.sdk.util.state.InMemoryTimerInternals;
 import org.apache.beam.sdk.util.state.StateInternals;
 import org.apache.beam.sdk.util.state.StateNamespace;
 import org.apache.beam.sdk.util.state.StateNamespaces;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/445c1205/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java
index 87d3f50..5432d58 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import org.apache.beam.runners.core.GroupAlsoByWindowsDoFn;
+import org.apache.beam.runners.core.InMemoryTimerInternals;
 import org.apache.beam.runners.core.OutputWindowedValue;
 import org.apache.beam.runners.core.ReduceFnRunner;
 import org.apache.beam.runners.core.SystemReduceFn;
@@ -37,7 +38,6 @@ import org.apache.beam.sdk.util.SideInputReader;
 import org.apache.beam.sdk.util.TimerInternals;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.util.state.InMemoryTimerInternals;
 import org.apache.beam.sdk.util.state.StateInternals;
 import org.apache.beam.sdk.util.state.StateInternalsFactory;
 import org.apache.beam.sdk.values.KV;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/445c1205/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
index 93b3f59..2d8684a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
@@ -46,12 +46,10 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.util.Timer;
-import org.apache.beam.sdk.util.TimerInternals;
 import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingInternals;
 import org.apache.beam.sdk.util.state.InMemoryStateInternals;
-import org.apache.beam.sdk.util.state.InMemoryTimerInternals;
 import org.apache.beam.sdk.util.state.StateInternals;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TimestampedValue;
@@ -143,10 +141,6 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
     return (StateInternals<K>) stateInternals;
   }
 
-  public TimerInternals getTimerInternals() {
-    return timerInternals;
-  }
-
   /**
    * When a {@link DoFnTester} should clone the {@link DoFn} under test and how it should manage
    * the lifecycle of the {@link DoFn}.
@@ -233,7 +227,6 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
     context.setupDelegateAggregators();
     // State and timer internals are per-bundle.
     stateInternals = InMemoryStateInternals.forKey(new Object());
-    timerInternals = new InMemoryTimerInternals();
     try {
       fnInvoker.invokeStartBundle(context);
     } catch (UserCodeException e) {
@@ -542,34 +535,6 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
     return extractAggregatorValue(agg.getName(), agg.getCombineFn());
   }
 
-  public List<TimerInternals.TimerData> advanceInputWatermark(Instant newWatermark) {
-    try {
-      timerInternals.advanceInputWatermark(newWatermark);
-      final List<TimerInternals.TimerData> firedTimers = new ArrayList<>();
-      TimerInternals.TimerData timer;
-      while ((timer = timerInternals.removeNextEventTimer()) != null) {
-        firedTimers.add(timer);
-      }
-      return firedTimers;
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  public List<TimerInternals.TimerData> advanceProcessingTime(Instant newProcessingTime) {
-    try {
-      timerInternals.advanceProcessingTime(newProcessingTime);
-      final List<TimerInternals.TimerData> firedTimers = new ArrayList<>();
-      TimerInternals.TimerData timer;
-      while ((timer = timerInternals.removeNextProcessingTimer()) != null) {
-        firedTimers.add(timer);
-      }
-      return firedTimers;
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-  }
-
   private <AccumT, AggregateT> AggregateT extractAggregatorValue(
       String name, CombineFn<?, AccumT, AggregateT> combiner) {
     @SuppressWarnings("unchecked")
@@ -814,7 +779,6 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
   private Map<TupleTag<?>, List<ValueInSingleWindow<?>>> outputs;
 
   private InMemoryStateInternals<?> stateInternals;
-  private InMemoryTimerInternals timerInternals;
 
   /** The state of processing of the {@link DoFn} under test. */
   private State state = State.UNINITIALIZED;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/445c1205/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java
deleted file mode 100644
index 44b44f0..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java
+++ /dev/null
@@ -1,275 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util.state;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
-
-import com.google.common.base.MoreObjects;
-import java.util.HashSet;
-import java.util.PriorityQueue;
-import java.util.Set;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.TimeDomain;
-import org.apache.beam.sdk.util.TimerInternals;
-import org.apache.beam.sdk.util.WindowTracing;
-import org.joda.time.Instant;
-
-/**
- * Simulates the firing of timers and progression of input and output watermarks for a single
- * computation and key in a Windmill-like streaming environment.
- */
-public class InMemoryTimerInternals implements TimerInternals {
-
-  /** At most one timer per timestamp is kept. */
-  private Set<TimerData> existingTimers = new HashSet<>();
-
-  /** Pending input watermark timers, in timestamp order. */
-  private PriorityQueue<TimerData> watermarkTimers = new PriorityQueue<>(11);
-
-  /** Pending processing time timers, in timestamp order. */
-  private PriorityQueue<TimerData> processingTimers = new PriorityQueue<>(11);
-
-  /** Pending synchronized processing time timers, in timestamp order. */
-  private PriorityQueue<TimerData> synchronizedProcessingTimers = new PriorityQueue<>(11);
-
-  /** Current input watermark. */
-  private Instant inputWatermarkTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
-
-  /** Current output watermark. */
-  @Nullable private Instant outputWatermarkTime = null;
-
-  /** Current processing time. */
-  private Instant processingTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
-
-  /** Current synchronized processing time. */
-  private Instant synchronizedProcessingTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
-
-  @Override
-  @Nullable
-  public Instant currentOutputWatermarkTime() {
-    return outputWatermarkTime;
-  }
-
-  /**
-   * Returns when the next timer in the given time domain will fire, or {@code null}
-   * if there are no timers scheduled in that time domain.
-   */
-  @Nullable
-  public Instant getNextTimer(TimeDomain domain) {
-    final TimerData data;
-    switch (domain) {
-      case EVENT_TIME:
-        data = watermarkTimers.peek();
-        break;
-      case PROCESSING_TIME:
-        data = processingTimers.peek();
-        break;
-      case SYNCHRONIZED_PROCESSING_TIME:
-        data = synchronizedProcessingTimers.peek();
-        break;
-      default:
-        throw new IllegalArgumentException("Unexpected time domain: " + domain);
-    }
-    return (data == null) ? null : data.getTimestamp();
-  }
-
-  private PriorityQueue<TimerData> queue(TimeDomain domain) {
-    switch (domain) {
-      case EVENT_TIME:
-        return watermarkTimers;
-      case PROCESSING_TIME:
-        return processingTimers;
-      case SYNCHRONIZED_PROCESSING_TIME:
-        return synchronizedProcessingTimers;
-      default:
-        throw new IllegalArgumentException("Unexpected time domain: " + domain);
-    }
-  }
-
-  @Override
-  public void setTimer(StateNamespace namespace, String timerId, Instant target,
-      TimeDomain timeDomain) {
-    throw new UnsupportedOperationException("Setting a timer by ID is not yet supported.");
-  }
-
-  @Override
-  public void setTimer(TimerData timerData) {
-    WindowTracing.trace("{}.setTimer: {}", getClass().getSimpleName(), timerData);
-    if (existingTimers.add(timerData)) {
-      queue(timerData.getDomain()).add(timerData);
-    }
-  }
-
-  @Override
-  public void deleteTimer(StateNamespace namespace, String timerId) {
-    throw new UnsupportedOperationException("Canceling a timer by ID is not yet supported.");
-  }
-
-  @Override
-  public void deleteTimer(TimerData timer) {
-    WindowTracing.trace("{}.deleteTimer: {}", getClass().getSimpleName(), timer);
-    existingTimers.remove(timer);
-    queue(timer.getDomain()).remove(timer);
-  }
-
-  @Override
-  public Instant currentProcessingTime() {
-    return processingTime;
-  }
-
-  @Override
-  @Nullable
-  public Instant currentSynchronizedProcessingTime() {
-    return synchronizedProcessingTime;
-  }
-
-  @Override
-  public Instant currentInputWatermarkTime() {
-    return inputWatermarkTime;
-  }
-
-  @Override
-  public String toString() {
-    return MoreObjects.toStringHelper(getClass())
-        .add("watermarkTimers", watermarkTimers)
-        .add("processingTimers", processingTimers)
-        .add("synchronizedProcessingTimers", synchronizedProcessingTimers)
-        .add("inputWatermarkTime", inputWatermarkTime)
-        .add("outputWatermarkTime", outputWatermarkTime)
-        .add("processingTime", processingTime)
-        .toString();
-  }
-
-  /** Advances input watermark to the given value. */
-  public void advanceInputWatermark(Instant newInputWatermark) throws Exception {
-    checkNotNull(newInputWatermark);
-    checkState(
-        !newInputWatermark.isBefore(inputWatermarkTime),
-        "Cannot move input watermark time backwards from %s to %s",
-        inputWatermarkTime,
-        newInputWatermark);
-    WindowTracing.trace(
-        "{}.advanceInputWatermark: from {} to {}",
-        getClass().getSimpleName(), inputWatermarkTime, newInputWatermark);
-    inputWatermarkTime = newInputWatermark;
-  }
-
-  /** Advances output watermark to the given value. */
-  public void advanceOutputWatermark(Instant newOutputWatermark) {
-    checkNotNull(newOutputWatermark);
-    final Instant adjustedOutputWatermark;
-    if (newOutputWatermark.isAfter(inputWatermarkTime)) {
-      WindowTracing.trace(
-          "{}.advanceOutputWatermark: clipping output watermark from {} to {}",
-          getClass().getSimpleName(), newOutputWatermark, inputWatermarkTime);
-      adjustedOutputWatermark = inputWatermarkTime;
-    } else {
-      adjustedOutputWatermark = newOutputWatermark;
-    }
-
-    checkState(
-        outputWatermarkTime == null || !adjustedOutputWatermark.isBefore(outputWatermarkTime),
-        "Cannot move output watermark time backwards from %s to %s",
-        outputWatermarkTime,
-        adjustedOutputWatermark);
-    WindowTracing.trace(
-        "{}.advanceOutputWatermark: from {} to {}",
-        getClass().getSimpleName(), outputWatermarkTime, adjustedOutputWatermark);
-    outputWatermarkTime = adjustedOutputWatermark;
-  }
-
-  /** Advances processing time to the given value. */
-  public void advanceProcessingTime(Instant newProcessingTime) throws Exception {
-    checkNotNull(newProcessingTime);
-    checkState(
-        !newProcessingTime.isBefore(processingTime),
-        "Cannot move processing time backwards from %s to %s",
-        processingTime,
-        newProcessingTime);
-    WindowTracing.trace(
-        "{}.advanceProcessingTime: from {} to {}",
-        getClass().getSimpleName(), processingTime, newProcessingTime);
-    processingTime = newProcessingTime;
-  }
-
-  /** Advances synchronized processing time to the given value. */
-  public void advanceSynchronizedProcessingTime(Instant newSynchronizedProcessingTime)
-      throws Exception {
-    checkNotNull(newSynchronizedProcessingTime);
-    checkState(
-        !newSynchronizedProcessingTime.isBefore(synchronizedProcessingTime),
-        "Cannot move processing time backwards from %s to %s",
-        synchronizedProcessingTime,
-        newSynchronizedProcessingTime);
-    WindowTracing.trace(
-        "{}.advanceProcessingTime: from {} to {}",
-        getClass().getSimpleName(), synchronizedProcessingTime, newSynchronizedProcessingTime);
-    synchronizedProcessingTime = newSynchronizedProcessingTime;
-  }
-
-  /** Returns the next eligible event time timer, if none returns null. */
-  @Nullable
-  public TimerData removeNextEventTimer() {
-    TimerData timer = removeNextTimer(inputWatermarkTime, TimeDomain.EVENT_TIME);
-    if (timer != null) {
-      WindowTracing.trace(
-          "{}.removeNextEventTimer: firing {} at {}",
-          getClass().getSimpleName(), timer, inputWatermarkTime);
-    }
-    return timer;
-  }
-
-  /** Returns the next eligible processing time timer, if none returns null. */
-  @Nullable
-  public TimerData removeNextProcessingTimer() {
-    TimerData timer = removeNextTimer(processingTime, TimeDomain.PROCESSING_TIME);
-    if (timer != null) {
-      WindowTracing.trace(
-          "{}.removeNextProcessingTimer: firing {} at {}",
-          getClass().getSimpleName(), timer, processingTime);
-    }
-    return timer;
-  }
-
-  /** Returns the next eligible synchronized processing time timer, if none returns null. */
-  @Nullable
-  public TimerData removeNextSynchronizedProcessingTimer() {
-    TimerData timer = removeNextTimer(
-        synchronizedProcessingTime, TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
-    if (timer != null) {
-      WindowTracing.trace(
-          "{}.removeNextSynchronizedProcessingTimer: firing {} at {}",
-          getClass().getSimpleName(), timer, synchronizedProcessingTime);
-    }
-    return timer;
-  }
-
-  @Nullable
-  private TimerData removeNextTimer(Instant currentTime, TimeDomain domain) {
-    PriorityQueue<TimerData> queue = queue(domain);
-    if (!queue.isEmpty() && currentTime.isAfter(queue.peek().getTimestamp())) {
-      TimerData timer = queue.remove();
-      existingTimers.remove(timer);
-      return timer;
-    } else {
-      return null;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/445c1205/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/InMemoryTimerInternalsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/InMemoryTimerInternalsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/InMemoryTimerInternalsTest.java
deleted file mode 100644
index 4a2763c..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/InMemoryTimerInternalsTest.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util.state;
-
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.nullValue;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertThat;
-
-import org.apache.beam.sdk.util.TimeDomain;
-import org.apache.beam.sdk.util.TimerInternals.TimerData;
-import org.joda.time.Instant;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for {@link InMemoryTimerInternals}.
- */
-@RunWith(JUnit4.class)
-public class InMemoryTimerInternalsTest {
-
-  private static final StateNamespace NS1 = new StateNamespaceForTest("NS1");
-
-  @Test
-  public void testFiringTimers() throws Exception {
-    InMemoryTimerInternals underTest = new InMemoryTimerInternals();
-    TimerData processingTime1 = TimerData.of(NS1, new Instant(19), TimeDomain.PROCESSING_TIME);
-    TimerData processingTime2 = TimerData.of(NS1, new Instant(29), TimeDomain.PROCESSING_TIME);
-
-    underTest.setTimer(processingTime1);
-    underTest.setTimer(processingTime2);
-
-    underTest.advanceProcessingTime(new Instant(20));
-    assertEquals(processingTime1, underTest.removeNextProcessingTimer());
-    assertNull(underTest.removeNextProcessingTimer());
-
-    // Advancing just a little shouldn't refire
-    underTest.advanceProcessingTime(new Instant(21));
-    assertNull(underTest.removeNextProcessingTimer());
-
-    // Adding the timer and advancing a little should refire
-    underTest.setTimer(processingTime1);
-    assertEquals(processingTime1, underTest.removeNextProcessingTimer());
-    assertNull(underTest.removeNextProcessingTimer());
-
-    // And advancing the rest of the way should still have the other timer
-    underTest.advanceProcessingTime(new Instant(30));
-    assertEquals(processingTime2, underTest.removeNextProcessingTimer());
-    assertNull(underTest.removeNextProcessingTimer());
-  }
-
-  @Test
-  public void testFiringTimersWithCallback() throws Exception {
-    InMemoryTimerInternals underTest = new InMemoryTimerInternals();
-    TimerData processingTime1 = TimerData.of(NS1, new Instant(19), TimeDomain.PROCESSING_TIME);
-    TimerData processingTime2 = TimerData.of(NS1, new Instant(29), TimeDomain.PROCESSING_TIME);
-
-    underTest.setTimer(processingTime1);
-    underTest.setTimer(processingTime2);
-
-    underTest.advanceProcessingTime(new Instant(20));
-    assertThat(underTest.removeNextProcessingTimer(), equalTo(processingTime1));
-    assertThat(underTest.removeNextProcessingTimer(), nullValue());
-
-    // Advancing just a little shouldn't refire
-    underTest.advanceProcessingTime(new Instant(21));
-    assertThat(underTest.removeNextProcessingTimer(), nullValue());
-
-    // Adding the timer and advancing a little should fire again
-    underTest.setTimer(processingTime1);
-    underTest.advanceProcessingTime(new Instant(21));
-    assertThat(underTest.removeNextProcessingTimer(), equalTo(processingTime1));
-    assertThat(underTest.removeNextProcessingTimer(), nullValue());
-
-    // And advancing the rest of the way should still have the other timer
-    underTest.advanceProcessingTime(new Instant(30));
-    assertThat(underTest.removeNextProcessingTimer(), equalTo(processingTime2));
-    assertThat(underTest.removeNextProcessingTimer(), nullValue());
-  }
-
-  @Test
-  public void testTimerOrdering() throws Exception {
-    InMemoryTimerInternals underTest = new InMemoryTimerInternals();
-    TimerData eventTime1 = TimerData.of(NS1, new Instant(19), TimeDomain.EVENT_TIME);
-    TimerData processingTime1 = TimerData.of(NS1, new Instant(19), TimeDomain.PROCESSING_TIME);
-    TimerData synchronizedProcessingTime1 = TimerData.of(
-        NS1, new Instant(19), TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
-    TimerData eventTime2 = TimerData.of(NS1, new Instant(29), TimeDomain.EVENT_TIME);
-    TimerData processingTime2 = TimerData.of(NS1, new Instant(29), TimeDomain.PROCESSING_TIME);
-    TimerData synchronizedProcessingTime2 = TimerData.of(
-        NS1, new Instant(29), TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
-
-    underTest.setTimer(processingTime1);
-    underTest.setTimer(eventTime1);
-    underTest.setTimer(synchronizedProcessingTime1);
-    underTest.setTimer(processingTime2);
-    underTest.setTimer(eventTime2);
-    underTest.setTimer(synchronizedProcessingTime2);
-
-    assertNull(underTest.removeNextEventTimer());
-    underTest.advanceInputWatermark(new Instant(30));
-    assertEquals(eventTime1, underTest.removeNextEventTimer());
-    assertEquals(eventTime2, underTest.removeNextEventTimer());
-    assertNull(underTest.removeNextEventTimer());
-
-    assertNull(underTest.removeNextProcessingTimer());
-    underTest.advanceProcessingTime(new Instant(30));
-    assertEquals(processingTime1, underTest.removeNextProcessingTimer());
-    assertEquals(processingTime2, underTest.removeNextProcessingTimer());
-    assertNull(underTest.removeNextProcessingTimer());
-
-    assertNull(underTest.removeNextSynchronizedProcessingTimer());
-    underTest.advanceSynchronizedProcessingTime(new Instant(30));
-    assertEquals(synchronizedProcessingTime1, underTest.removeNextSynchronizedProcessingTimer());
-    assertEquals(synchronizedProcessingTime2, underTest.removeNextSynchronizedProcessingTimer());
-    assertNull(underTest.removeNextProcessingTimer());
-  }
-
-  @Test
-  public void testDeduplicate() throws Exception {
-    InMemoryTimerInternals underTest = new InMemoryTimerInternals();
-    TimerData eventTime = TimerData.of(NS1, new Instant(19), TimeDomain.EVENT_TIME);
-    TimerData processingTime = TimerData.of(NS1, new Instant(19), TimeDomain.PROCESSING_TIME);
-    underTest.setTimer(eventTime);
-    underTest.setTimer(eventTime);
-    underTest.setTimer(processingTime);
-    underTest.setTimer(processingTime);
-    underTest.advanceProcessingTime(new Instant(20));
-    underTest.advanceInputWatermark(new Instant(20));
-
-    assertEquals(processingTime, underTest.removeNextProcessingTimer());
-    assertNull(underTest.removeNextProcessingTimer());
-    assertEquals(eventTime, underTest.removeNextEventTimer());
-    assertNull(underTest.removeNextEventTimer());
-  }
-}