You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by xi...@apache.org on 2021/08/18 17:28:38 UTC

[beam] branch master updated: [BEAM-12742] Samza Runner does not properly delete modified timer (#15322)

This is an automated email from the ASF dual-hosted git repository.

xinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new bd51855  [BEAM-12742] Samza Runner does not properly delete modified timer (#15322)
bd51855 is described below

commit bd51855e0321aeff2fa4416ec8a7d12a7ae3945c
Author: Ke Wu <kw...@linkedin.com>
AuthorDate: Wed Aug 18 10:27:53 2021 -0700

    [BEAM-12742] Samza Runner does not properly delete modified timer (#15322)
---
 runners/samza/build.gradle                         |  32 ++++-
 .../samza/runtime/SamzaTimerInternalsFactory.java  | 143 ++++++++++++---------
 .../runtime/SamzaTimerInternalsFactoryTest.java    | 107 +++++----------
 .../org/apache/beam/sdk/io/WriteFilesTest.java     |   8 +-
 .../beam/sdk/transforms/GroupIntoBatchesTest.java  |   3 +
 .../org/apache/beam/sdk/transforms/ParDoTest.java  |  34 ++++-
 6 files changed, 183 insertions(+), 144 deletions(-)

diff --git a/runners/samza/build.gradle b/runners/samza/build.gradle
index 23aa274..b4aff99 100644
--- a/runners/samza/build.gradle
+++ b/runners/samza/build.gradle
@@ -95,7 +95,10 @@ task validatesRunner(type: Test) {
   classpath = configurations.validatesRunner
   testClassesDirs = files(project(":sdks:java:core").sourceSets.test.output.classesDirs)
   useJUnit {
+    includeCategories 'org.apache.beam.sdk.testing.NeedsRunner'
     includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner'
+    excludeCategories 'org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo'
+    excludeCategories 'org.apache.beam.sdk.testing.UsesSchema'
     excludeCategories 'org.apache.beam.sdk.testing.LargeKeys$Above100MB'
     excludeCategories 'org.apache.beam.sdk.testing.UsesAttemptedMetrics'
     excludeCategories 'org.apache.beam.sdk.testing.UsesCommittedMetrics'
@@ -113,10 +116,35 @@ task validatesRunner(type: Test) {
     excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoTest$TimerTests.testOutputTimestampDefaultUnbounded'
     // TODO(BEAM-11479)
     excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoTest$TimerTests.testOutputTimestamp'
-    // https://issues.apache.org/jira/browse/BEAM-12035
+    // TODO(BEAM-11479)
+    excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoTest$TimerTests.testRelativeTimerWithOutputTimestamp'
+    // TODO(BEAM-12035)
     excludeTestsMatching 'org.apache.beam.sdk.testing.TestStreamTest.testFirstElementLate'
-    // https://issues.apache.org/jira/browse/BEAM-12036
+    // TODO(BEAM-12036)
     excludeTestsMatching 'org.apache.beam.sdk.testing.TestStreamTest.testLateDataAccumulating'
+    // TODO(BEAM-12743)
+    excludeTestsMatching 'org.apache.beam.sdk.coders.PCollectionCustomCoderTest.testEncodingNPException'
+    excludeTestsMatching 'org.apache.beam.sdk.coders.PCollectionCustomCoderTest.testEncodingIOException'
+    excludeTestsMatching 'org.apache.beam.sdk.coders.PCollectionCustomCoderTest.testDecodingNPException'
+    excludeTestsMatching 'org.apache.beam.sdk.coders.PCollectionCustomCoderTest.testDecodingIOException'
+    // TODO(BEAM-12744)
+    excludeTestsMatching 'org.apache.beam.sdk.PipelineTest.testEmptyPipeline'
+    // TODO(BEAM-12745)
+    excludeTestsMatching 'org.apache.beam.sdk.io.AvroIOTest*'
+    // TODO(BEAM-12746)
+    excludeTestsMatching 'org.apache.beam.sdk.io.FileIOTest*'
+    // TODO(BEAM-12747)
+    excludeTestsMatching 'org.apache.beam.sdk.transforms.WithTimestampsTest.withTimestampsBackwardsInTimeShouldThrow'
+    excludeTestsMatching 'org.apache.beam.sdk.transforms.WithTimestampsTest.withTimestampsWithNullTimestampShouldThrow'
+    // TODO(BEAM-12748)
+    excludeTestsMatching 'org.apache.beam.sdk.transforms.ViewTest.testEmptySingletonSideInput'
+    excludeTestsMatching 'org.apache.beam.sdk.transforms.ViewTest.testNonSingletonSideInput'
+    // TODO(BEAM-12749)
+    excludeTestsMatching 'org.apache.beam.sdk.transforms.MapElementsTest.testMapSimpleFunction'
+    // TODO(BEAM-12750)
+    excludeTestsMatching 'org.apache.beam.sdk.transforms.GroupIntoBatchesTest.testInGlobalWindowBatchSizeByteSizeFn'
+    excludeTestsMatching 'org.apache.beam.sdk.transforms.GroupIntoBatchesTest.testInStreamingMode'
+    excludeTestsMatching 'org.apache.beam.sdk.transforms.GroupIntoBatchesTest.testWithShardedKeyInGlobalWindow'
 
     // These tests fail since there is no support for side inputs in Samza's unbounded splittable DoFn integration
     excludeTestsMatching 'org.apache.beam.sdk.transforms.SplittableDoFnTest.testWindowedSideInputWithCheckpointsUnbounded'
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java
index 4b34a25..f791c3a 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java
@@ -180,7 +180,7 @@ public class SamzaTimerInternalsFactory<K> implements TimerInternalsFactory<K> {
     final Collection<KeyedTimerData<K>> readyTimers = new ArrayList<>();
 
     while (!eventTimeBuffer.isEmpty()
-        && eventTimeBuffer.first().getTimerData().getTimestamp().isBefore(inputWatermark)
+        && !eventTimeBuffer.first().getTimerData().getTimestamp().isAfter(inputWatermark)
         && readyTimers.size() < maxReadyTimersToProcessOnce) {
 
       final KeyedTimerData<K> keyedTimerData = eventTimeBuffer.pollFirst();
@@ -262,72 +262,81 @@ public class SamzaTimerInternalsFactory<K> implements TimerInternalsFactory<K> {
       final Long lastTimestamp = state.get(keyedTimerData);
       final Long newTimestamp = timerData.getTimestamp().getMillis();
 
-      if (!newTimestamp.equals(lastTimestamp)) {
-        if (lastTimestamp != null) {
-          final TimerData lastTimerData =
-              TimerData.of(
-                  timerData.getTimerId(),
-                  timerData.getNamespace(),
-                  new Instant(lastTimestamp),
-                  new Instant(lastTimestamp),
-                  timerData.getDomain());
-          deleteTimer(lastTimerData, false);
-        }
+      if (newTimestamp.equals(lastTimestamp)) {
+        return;
+      }
+
+      if (lastTimestamp != null) {
+        final TimerData lastTimerData =
+            TimerData.of(
+                timerData.getTimerId(),
+                timerData.getTimerFamilyId(),
+                timerData.getNamespace(),
+                new Instant(lastTimestamp),
+                new Instant(lastTimestamp),
+                timerData.getDomain());
+        deleteTimer(lastTimerData, false);
+      }
+
+      // persist it first
+      state.persist(keyedTimerData);
 
-        // persist it first
-        state.persist(keyedTimerData);
-
-        // TO-DO: apply the same memory optimization over processing timers
-        switch (timerData.getDomain()) {
-          case EVENT_TIME:
-            /**
-             * To determine if the upcoming KeyedTimerData could be added to the Buffer while
-             * guaranteeing the Buffer's timestamps are all <= than those in State Store to preserve
-             * timestamp eviction priority:
-             *
-             * <p>1) If maxEventTimeInBuffer == long.MAX_VALUE, it indicates that the State is
-             * empty, therefore all the Event times greater or lesser than newTimestamp are in the
-             * buffer;
-             *
-             * <p>2) If newTimestamp < maxEventTimeInBuffer, it indicates that there are entries
-             * greater than newTimestamp, so it is safe to add it to the buffer
-             *
-             * <p>In case that the Buffer is full, we remove the largest timer from memory according
-             * to {@link KeyedTimerData.compareTo()}
-             */
-            if (newTimestamp < maxEventTimeInBuffer) {
-              eventTimeBuffer.add(keyedTimerData);
-              if (eventTimeBuffer.size() > maxEventTimerBufferSize) {
-                eventTimeBuffer.pollLast();
-                maxEventTimeInBuffer =
-                    eventTimeBuffer.last().getTimerData().getTimestamp().getMillis();
-              }
+      // TO-DO: apply the same memory optimization over processing timers
+      switch (timerData.getDomain()) {
+        case EVENT_TIME:
+          /**
+           * To determine if the upcoming KeyedTimerData could be added to the Buffer while
+           * guaranteeing the Buffer's timestamps are all <= than those in State Store to preserve
+           * timestamp eviction priority:
+           *
+           * <p>1) If maxEventTimeInBuffer == long.MAX_VALUE, it indicates that the State is empty,
+           * therefore all the Event times greater or lesser than newTimestamp are in the buffer;
+           *
+           * <p>2) If newTimestamp < maxEventTimeInBuffer, it indicates that there are entries
+           * greater than newTimestamp, so it is safe to add it to the buffer
+           *
+           * <p>In case that the Buffer is full, we remove the largest timer from memory according
+           * to {@link KeyedTimerData.compareTo()}
+           */
+          if (newTimestamp < maxEventTimeInBuffer) {
+            eventTimeBuffer.add(keyedTimerData);
+            if (eventTimeBuffer.size() > maxEventTimerBufferSize) {
+              eventTimeBuffer.pollLast();
+              maxEventTimeInBuffer =
+                  eventTimeBuffer.last().getTimerData().getTimestamp().getMillis();
             }
-            break;
+          }
+          break;
 
-          case PROCESSING_TIME:
-            timerRegistry.schedule(keyedTimerData, timerData.getTimestamp().getMillis());
-            break;
+        case PROCESSING_TIME:
+          timerRegistry.schedule(keyedTimerData, timerData.getTimestamp().getMillis());
+          break;
 
-          default:
-            throw new UnsupportedOperationException(
-                String.format(
-                    "%s currently only supports even time or processing time", SamzaRunner.class));
-        }
+        default:
+          throw new UnsupportedOperationException(
+              String.format(
+                  "%s currently only supports even time or processing time", SamzaRunner.class));
       }
     }
 
     @Override
     public void deleteTimer(
         StateNamespace namespace, String timerId, String timerFamilyId, TimeDomain timeDomain) {
-      Instant now = Instant.now();
-      deleteTimer(TimerData.of(timerId, namespace, now, now, timeDomain));
+      TimerKey<K> timerKey = TimerKey.of(key, namespace, timerId, timerFamilyId);
+
+      Long lastTimestamp = state.get(timerKey, timeDomain);
+
+      if (lastTimestamp == null) {
+        return;
+      }
+
+      Instant timestamp = Instant.ofEpochMilli(lastTimestamp);
+      deleteTimer(TimerData.of(timerId, namespace, timestamp, timestamp, timeDomain));
     }
 
     @Override
     public void deleteTimer(StateNamespace namespace, String timerId, String timerFamilyId) {
-      Instant now = Instant.now();
-      deleteTimer(TimerData.of(timerId, namespace, now, now, TimeDomain.EVENT_TIME));
+      deleteTimer(namespace, timerId, timerFamilyId, TimeDomain.EVENT_TIME);
     }
 
     @Override
@@ -426,13 +435,16 @@ public class SamzaTimerInternalsFactory<K> implements TimerInternalsFactory<K> {
     }
 
     Long get(KeyedTimerData<K> keyedTimerData) {
-      final TimerKey<K> timerKey = TimerKey.of(keyedTimerData);
-      switch (keyedTimerData.getTimerData().getDomain()) {
+      return get(TimerKey.of(keyedTimerData), keyedTimerData.getTimerData().getDomain());
+    }
+
+    Long get(TimerKey<K> key, TimeDomain domain) {
+      switch (domain) {
         case EVENT_TIME:
-          return eventTimeTimerState.get(timerKey).read();
+          return eventTimeTimerState.get(key).read();
 
         case PROCESSING_TIME:
-          return processingTimeTimerState.get(timerKey).read();
+          return processingTimeTimerState.get(key).read();
 
         default:
           throw new UnsupportedOperationException(
@@ -587,11 +599,20 @@ public class SamzaTimerInternalsFactory<K> implements TimerInternalsFactory<K> {
 
     static <K> TimerKey<K> of(KeyedTimerData<K> keyedTimerData) {
       final TimerInternals.TimerData timerData = keyedTimerData.getTimerData();
+      return of(
+          keyedTimerData.getKey(),
+          timerData.getNamespace(),
+          timerData.getTimerId(),
+          timerData.getTimerFamilyId());
+    }
+
+    static <K> TimerKey<K> of(
+        K key, StateNamespace namespace, String timerId, String timerFamilyId) {
       return TimerKey.<K>builder()
-          .setKey(keyedTimerData.getKey())
-          .setStateNamespace(timerData.getNamespace())
-          .setTimerId(timerData.getTimerId())
-          .setTimerFamilyId(timerData.getTimerFamilyId())
+          .setKey(key)
+          .setStateNamespace(namespace)
+          .setTimerId(timerId)
+          .setTimerFamilyId(timerFamilyId)
           .build();
     }
 
diff --git a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactoryTest.java b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactoryTest.java
index 0f4ac5d..291519d 100644
--- a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactoryTest.java
+++ b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactoryTest.java
@@ -425,12 +425,8 @@ public class SamzaTimerInternalsFactoryTest {
     store.close();
   }
 
-  /**
-   * Test the number of event time timers maintained in memory does not go beyond the limit defined
-   * in pipeline option.
-   */
   @Test
-  public void testEventTimeTimersMemoryBoundary1() {
+  public void testBufferSizeNotExceedingPipelineOptionValue() {
     final SamzaPipelineOptions pipelineOptions =
         PipelineOptionsFactory.create().as(SamzaPipelineOptions.class);
     pipelineOptions.setEventTimerBufferSize(2);
@@ -445,30 +441,19 @@ public class SamzaTimerInternalsFactoryTest {
     // prepare 5 timers.
     // timers in memory are then timestamped from 0 - 1;
     // timers in store are then timestamped from 0 - 4.
-    TimerInternals.TimerData timer;
     for (int i = 0; i < 5; i++) {
-      timer =
-          TimerInternals.TimerData.of(
-              "timer" + i, nameSpace, new Instant(i), new Instant(i), TimeDomain.EVENT_TIME);
-      timerInternals.setTimer(timer);
+      timerInternals.setTimer(
+          nameSpace, "timer" + i, "", new Instant(i), new Instant(i), TimeDomain.EVENT_TIME);
     }
 
-    timerInternalsFactory.setInputWatermark(new Instant(2));
-    Collection<KeyedTimerData<String>> readyTimers;
-
-    readyTimers = timerInternalsFactory.removeReadyTimers();
-    assertEquals(2, readyTimers.size());
+    // only two timers are supposed to be in event time buffer
     assertEquals(2, timerInternalsFactory.getEventTimeBuffer().size());
 
     store.close();
   }
 
-  /**
-   * Test the total number of event time timers reloaded into memory is aligned with the number of
-   * event time timers written to the store.
-   */
   @Test
-  public void testEventTimeTimersMemoryBoundary2() {
+  public void testAllTimersAreFiredWithReload() {
     final SamzaPipelineOptions pipelineOptions =
         PipelineOptionsFactory.create().as(SamzaPipelineOptions.class);
     pipelineOptions.setEventTimerBufferSize(2);
@@ -483,18 +468,16 @@ public class SamzaTimerInternalsFactoryTest {
     // prepare 3 timers.
     // timers in memory now are timestamped from 0 - 1;
     // timers in store now are timestamped from 0 - 2.
-    TimerInternals.TimerData timer;
     for (int i = 0; i < 3; i++) {
-      timer =
-          TimerInternals.TimerData.of(
-              "timer" + i, nameSpace, new Instant(i), new Instant(i), TimeDomain.EVENT_TIME);
-      timerInternals.setTimer(timer);
+      timerInternals.setTimer(
+          nameSpace, "timer" + i, "", new Instant(i), new Instant(i), TimeDomain.EVENT_TIME);
     }
 
     // total number of event time timers to fire equals to the number of timers in store
     Collection<KeyedTimerData<String>> readyTimers;
     timerInternalsFactory.setInputWatermark(new Instant(3));
     readyTimers = timerInternalsFactory.removeReadyTimers();
+    // buffer should reload from store and all timers are supposed to be fired.
     assertEquals(3, readyTimers.size());
 
     store.close();
@@ -506,7 +489,7 @@ public class SamzaTimerInternalsFactoryTest {
    * maintained in order.
    */
   @Test
-  public void testEventTimeTimersMemoryBoundary3() {
+  public void testAllTimersAreFiredInOrder() {
     final SamzaPipelineOptions pipelineOptions =
         PipelineOptionsFactory.create().as(SamzaPipelineOptions.class);
     pipelineOptions.setEventTimerBufferSize(5);
@@ -521,19 +504,16 @@ public class SamzaTimerInternalsFactoryTest {
     // prepare 8 timers.
     // timers in memory now are timestamped from 0 - 4;
     // timers in store now are timestamped from 0 - 7.
-    TimerInternals.TimerData timer;
     for (int i = 0; i < 8; i++) {
-      timer =
-          TimerInternals.TimerData.of(
-              "timer" + i, nameSpace, new Instant(i), new Instant(i), TimeDomain.EVENT_TIME);
-      timerInternals.setTimer(timer);
+      timerInternals.setTimer(
+          nameSpace, "timer" + i, "", new Instant(i), new Instant(i), TimeDomain.EVENT_TIME);
     }
 
     // fire the first 2 timers.
     // timers in memory now are timestamped from 2 - 4;
     // timers in store now are timestamped from 2 - 7.
     Collection<KeyedTimerData<String>> readyTimers;
-    timerInternalsFactory.setInputWatermark(new Instant(2));
+    timerInternalsFactory.setInputWatermark(new Instant(1));
     long lastTimestamp = 0;
     readyTimers = timerInternalsFactory.removeReadyTimers();
     for (KeyedTimerData<String> keyedTimerData : readyTimers) {
@@ -549,10 +529,8 @@ public class SamzaTimerInternalsFactoryTest {
     // timers in store now are timestamped from 2 - 19.
     // the total number of timers to fire is 18.
     for (int i = 8; i < 20; i++) {
-      timer =
-          TimerInternals.TimerData.of(
-              "timer" + i, nameSpace, new Instant(i), new Instant(i), TimeDomain.EVENT_TIME);
-      timerInternals.setTimer(timer);
+      timerInternals.setTimer(
+          nameSpace, "timer" + i, "", new Instant(i), new Instant(i), TimeDomain.EVENT_TIME);
     }
     timerInternalsFactory.setInputWatermark(new Instant(20));
     lastTimestamp = 0;
@@ -567,14 +545,8 @@ public class SamzaTimerInternalsFactoryTest {
     store.close();
   }
 
-  /**
-   * Test the total number of event time timers reloaded into memory is aligned with the number of
-   * the event time timers written to the store. Moreover, event time timers reloaded into memory is
-   * maintained in order, even though memory boundary is hit and timer is early than the last timer
-   * in memory.
-   */
   @Test
-  public void testEventTimeTimersMemoryBoundary4() {
+  public void testNewTimersAreInsertedInOrder() {
     final SamzaPipelineOptions pipelineOptions =
         PipelineOptionsFactory.create().as(SamzaPipelineOptions.class);
     pipelineOptions.setEventTimerBufferSize(5);
@@ -586,22 +558,19 @@ public class SamzaTimerInternalsFactoryTest {
     final StateNamespace nameSpace = StateNamespaces.global();
     final TimerInternals timerInternals = timerInternalsFactory.timerInternalsForKey("testKey");
 
-    // prepare 8 timers.
+    // prepare 10 timers.
     // timers in memory now are timestamped from 0 - 4;
     // timers in store now are timestamped from 0 - 9.
-    TimerInternals.TimerData timer;
     for (int i = 0; i < 10; i++) {
-      timer =
-          TimerInternals.TimerData.of(
-              "timer" + i, nameSpace, new Instant(i), new Instant(i), TimeDomain.EVENT_TIME);
-      timerInternals.setTimer(timer);
+      timerInternals.setTimer(
+          nameSpace, "timer" + i, "", new Instant(i), new Instant(i), TimeDomain.EVENT_TIME);
     }
 
     // fire the first 2 timers.
     // timers in memory now are timestamped from 2 - 4;
     // timers in store now are timestamped from 2 - 9.
     Collection<KeyedTimerData<String>> readyTimers;
-    timerInternalsFactory.setInputWatermark(new Instant(2));
+    timerInternalsFactory.setInputWatermark(new Instant(1));
     long lastTimestamp = 0;
     readyTimers = timerInternalsFactory.removeReadyTimers();
     for (KeyedTimerData<String> keyedTimerData : readyTimers) {
@@ -611,16 +580,14 @@ public class SamzaTimerInternalsFactoryTest {
     }
     assertEquals(2, readyTimers.size());
 
-    // add 3 timers.
+    // add 3 timers but timer 2 has duplicate so drop.
     // timers in memory now are timestamped from 0 to 2 prefixed with lateTimer, and 2 to
     // 4 prefixed with timer, timestamp is in order;
     // timers in store now are timestamped from 0 to 2 prefixed with lateTimer, and 2 to 9
     // prefixed with timer, timestamp is in order;
     for (int i = 0; i < 3; i++) {
-      timer =
-          TimerInternals.TimerData.of(
-              "lateTimer" + i, nameSpace, new Instant(i), new Instant(i), TimeDomain.EVENT_TIME);
-      timerInternals.setTimer(timer);
+      timerInternals.setTimer(
+          nameSpace, "timer" + i, "", new Instant(i), new Instant(i), TimeDomain.EVENT_TIME);
     }
 
     // there are 11 timers in state now.
@@ -647,15 +614,14 @@ public class SamzaTimerInternalsFactoryTest {
       assertTrue(lastTimestamp <= currentTimeStamp);
       lastTimestamp = currentTimeStamp;
     }
-    assertEquals(5, readyTimers.size());
+    assertEquals(4, readyTimers.size());
     assertEquals(0, timerInternalsFactory.getEventTimeBuffer().size());
 
     store.close();
   }
 
-  /** Test buffer could still be filled after restore to a non-full state. */
   @Test
-  public void testEventTimeTimersMemoryBoundary5() {
+  public void testBufferRefilledAfterRestoreToNonFullState() {
     final SamzaPipelineOptions pipelineOptions =
         PipelineOptionsFactory.create().as(SamzaPipelineOptions.class);
     pipelineOptions.setEventTimerBufferSize(5);
@@ -670,37 +636,28 @@ public class SamzaTimerInternalsFactoryTest {
     // prepare (buffer capacity + 1) 6 timers.
     // timers in memory now are timestamped from 0 - 4;
     // timer in store now is timestamped 6.
-    TimerInternals.TimerData timer;
     for (int i = 0; i < 6; i++) {
-      timer =
-          TimerInternals.TimerData.of(
-              "timer" + i, nameSpace, new Instant(i), new Instant(i), TimeDomain.EVENT_TIME);
-      timerInternals.setTimer(timer);
+      timerInternals.setTimer(
+          nameSpace, "timer" + i, "", new Instant(i), new Instant(i), TimeDomain.EVENT_TIME);
     }
 
     // total number of event time timers to fire equals to the number of timers in store
     Collection<KeyedTimerData<String>> readyTimers;
-    timerInternalsFactory.setInputWatermark(new Instant(5));
+    timerInternalsFactory.setInputWatermark(new Instant(4));
     readyTimers = timerInternalsFactory.removeReadyTimers();
     assertEquals(5, readyTimers.size());
     // reloaded timer5
     assertEquals(1, timerInternalsFactory.getEventTimeBuffer().size());
 
-    for (int i = 0; i < 7; i++) {
-      timer =
-          TimerInternals.TimerData.of(
-              "timer" + (i + 6),
-              nameSpace,
-              new Instant(i + 6),
-              new Instant(i + 6),
-              TimeDomain.EVENT_TIME);
-      timerInternals.setTimer(timer);
+    for (int i = 6; i < 13; i++) {
+      timerInternals.setTimer(
+          nameSpace, "timer" + i, "", new Instant(i), new Instant(i), TimeDomain.EVENT_TIME);
     }
     // timers should go into buffer not state
     assertEquals(5, timerInternalsFactory.getEventTimeBuffer().size());
 
-    // watermark 12 comes, so all timers will be evicted in order.
-    timerInternalsFactory.setInputWatermark(new Instant(11));
+    // watermark 10 comes,6 timers will be evicted in order and 2 still in buffer.
+    timerInternalsFactory.setInputWatermark(new Instant(10));
     readyTimers = timerInternalsFactory.removeReadyTimers();
     long lastTimestamp = 0;
     for (KeyedTimerData<String> keyedTimerData : readyTimers) {
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java
index 4fa342e..059abcf 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java
@@ -64,6 +64,7 @@ import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.testing.TestStream;
 import org.apache.beam.sdk.testing.UsesTestStream;
+import org.apache.beam.sdk.testing.UsesTestStreamWithProcessingTime;
 import org.apache.beam.sdk.testing.UsesUnboundedPCollections;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -329,7 +330,12 @@ public class WriteFilesTest {
   }
 
   @Test
-  @Category({NeedsRunner.class, UsesUnboundedPCollections.class, UsesTestStream.class})
+  @Category({
+    NeedsRunner.class,
+    UsesUnboundedPCollections.class,
+    UsesTestStream.class,
+    UsesTestStreamWithProcessingTime.class
+  })
   public void testWithRunnerDeterminedShardingTestStream() throws IOException {
     List<String> elements = Lists.newArrayList();
     for (int i = 0; i < 30; ++i) {
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupIntoBatchesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupIntoBatchesTest.java
index 5ba8da1..316e258 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupIntoBatchesTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupIntoBatchesTest.java
@@ -37,6 +37,7 @@ import org.apache.beam.sdk.testing.TestStream.ProcessingTimeEvent;
 import org.apache.beam.sdk.testing.TestStream.WatermarkEvent;
 import org.apache.beam.sdk.testing.UsesStatefulParDo;
 import org.apache.beam.sdk.testing.UsesTestStream;
+import org.apache.beam.sdk.testing.UsesTestStreamWithProcessingTime;
 import org.apache.beam.sdk.testing.UsesTimersInParDo;
 import org.apache.beam.sdk.transforms.windowing.AfterPane;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -451,6 +452,7 @@ public class GroupIntoBatchesTest implements Serializable {
     NeedsRunner.class,
     UsesTimersInParDo.class,
     UsesTestStream.class,
+    UsesTestStreamWithProcessingTime.class,
     UsesStatefulParDo.class
   })
   public void testBufferingTimerInFixedWindow() {
@@ -573,6 +575,7 @@ public class GroupIntoBatchesTest implements Serializable {
     NeedsRunner.class,
     UsesTimersInParDo.class,
     UsesTestStream.class,
+    UsesTestStreamWithProcessingTime.class,
     UsesStatefulParDo.class
   })
   public void testBufferingTimerInGlobalWindow() {
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index 4477151..25a574a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -3994,6 +3994,7 @@ public class ParDoTest implements Serializable {
     @Category({
       ValidatesRunner.class,
       UsesTimersInParDo.class,
+      UsesTestStream.class,
       UsesTestStreamWithProcessingTime.class
     })
     public void testSimpleProcessingTimerTimer() throws Exception {
@@ -4165,6 +4166,7 @@ public class ParDoTest implements Serializable {
     @Category({
       ValidatesRunner.class,
       UsesTimersInParDo.class,
+      UsesTestStream.class,
       UsesTestStreamWithProcessingTime.class
     })
     public void testProcessingTimeTimerCanBeReset() throws Exception {
@@ -4589,6 +4591,7 @@ public class ParDoTest implements Serializable {
       ValidatesRunner.class,
       UsesStatefulParDo.class,
       UsesTimersInParDo.class,
+      UsesTestStream.class,
       UsesTestStreamWithProcessingTime.class,
       UsesTestStreamWithOutputTimestamp.class
     })
@@ -4670,7 +4673,7 @@ public class ParDoTest implements Serializable {
     }
 
     @Test
-    @Category(NeedsRunner.class)
+    @Category({NeedsRunner.class, UsesTimersInParDo.class, UsesTestStream.class})
     public void testRelativeTimerWithOutputTimestamp() {
       DoFn<KV<Void, String>, String> buffferFn =
           new DoFn<KV<Void, String>, String>() {
@@ -4854,7 +4857,12 @@ public class ParDoTest implements Serializable {
     }
 
     @Test
-    @Category({NeedsRunner.class, UsesTimersInParDo.class, UsesTestStream.class})
+    @Category({
+      NeedsRunner.class,
+      UsesTimersInParDo.class,
+      UsesTestStream.class,
+      UsesTestStreamWithProcessingTime.class
+    })
     public void testSetAndClearProcessingTimeTimer() {
 
       final String timerId = "processing-timer";
@@ -4929,7 +4937,12 @@ public class ParDoTest implements Serializable {
     }
 
     @Test
-    @Category({NeedsRunner.class, UsesTimersInParDo.class, UsesTestStream.class})
+    @Category({
+      NeedsRunner.class,
+      UsesTimersInParDo.class,
+      UsesTestStream.class,
+      UsesTestStreamWithProcessingTime.class
+    })
     public void testClearUnsetProcessingTimeTimer() {
       final String timerId = "processing-timer";
 
@@ -5001,7 +5014,12 @@ public class ParDoTest implements Serializable {
     }
 
     @Test
-    @Category({NeedsRunner.class, UsesTimersInParDo.class, UsesTestStream.class})
+    @Category({
+      NeedsRunner.class,
+      UsesTimersInParDo.class,
+      UsesTestStream.class,
+      UsesTestStreamWithProcessingTime.class
+    })
     public void testClearProcessingTimeTimer() {
       final String timerId = "processing-timer";
       final String clearTimerId = "clear-timer";
@@ -5111,7 +5129,12 @@ public class ParDoTest implements Serializable {
     }
 
     @Test
-    @Category({NeedsRunner.class, UsesTimersInParDo.class, UsesTestStream.class})
+    @Category({
+      NeedsRunner.class,
+      UsesTimersInParDo.class,
+      UsesTestStream.class,
+      UsesTestStreamWithProcessingTime.class
+    })
     public void testSetProcessingTimerAfterClear() {
       final String timerId = "processing-timer";
 
@@ -5745,6 +5768,7 @@ public class ParDoTest implements Serializable {
     @Category({
       ValidatesRunner.class,
       UsesTimersInParDo.class,
+      UsesTestStream.class,
       UsesTestStreamWithProcessingTime.class,
       UsesTimerMap.class
     })