You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by xi...@apache.org on 2021/08/18 17:28:38 UTC
[beam] branch master updated: [BEAM-12742] Samza Runner does not
properly delete modified timer (#15322)
This is an automated email from the ASF dual-hosted git repository.
xinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new bd51855 [BEAM-12742] Samza Runner does not properly delete modified timer (#15322)
bd51855 is described below
commit bd51855e0321aeff2fa4416ec8a7d12a7ae3945c
Author: Ke Wu <kw...@linkedin.com>
AuthorDate: Wed Aug 18 10:27:53 2021 -0700
[BEAM-12742] Samza Runner does not properly delete modified timer (#15322)
---
runners/samza/build.gradle | 32 ++++-
.../samza/runtime/SamzaTimerInternalsFactory.java | 143 ++++++++++++---------
.../runtime/SamzaTimerInternalsFactoryTest.java | 107 +++++----------
.../org/apache/beam/sdk/io/WriteFilesTest.java | 8 +-
.../beam/sdk/transforms/GroupIntoBatchesTest.java | 3 +
.../org/apache/beam/sdk/transforms/ParDoTest.java | 34 ++++-
6 files changed, 183 insertions(+), 144 deletions(-)
diff --git a/runners/samza/build.gradle b/runners/samza/build.gradle
index 23aa274..b4aff99 100644
--- a/runners/samza/build.gradle
+++ b/runners/samza/build.gradle
@@ -95,7 +95,10 @@ task validatesRunner(type: Test) {
classpath = configurations.validatesRunner
testClassesDirs = files(project(":sdks:java:core").sourceSets.test.output.classesDirs)
useJUnit {
+ includeCategories 'org.apache.beam.sdk.testing.NeedsRunner'
includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner'
+ excludeCategories 'org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo'
+ excludeCategories 'org.apache.beam.sdk.testing.UsesSchema'
excludeCategories 'org.apache.beam.sdk.testing.LargeKeys$Above100MB'
excludeCategories 'org.apache.beam.sdk.testing.UsesAttemptedMetrics'
excludeCategories 'org.apache.beam.sdk.testing.UsesCommittedMetrics'
@@ -113,10 +116,35 @@ task validatesRunner(type: Test) {
excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoTest$TimerTests.testOutputTimestampDefaultUnbounded'
// TODO(BEAM-11479)
excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoTest$TimerTests.testOutputTimestamp'
- // https://issues.apache.org/jira/browse/BEAM-12035
+ // TODO(BEAM-11479)
+ excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoTest$TimerTests.testRelativeTimerWithOutputTimestamp'
+ // TODO(BEAM-12035)
excludeTestsMatching 'org.apache.beam.sdk.testing.TestStreamTest.testFirstElementLate'
- // https://issues.apache.org/jira/browse/BEAM-12036
+ // TODO(BEAM-12036)
excludeTestsMatching 'org.apache.beam.sdk.testing.TestStreamTest.testLateDataAccumulating'
+ // TODO(BEAM-12743)
+ excludeTestsMatching 'org.apache.beam.sdk.coders.PCollectionCustomCoderTest.testEncodingNPException'
+ excludeTestsMatching 'org.apache.beam.sdk.coders.PCollectionCustomCoderTest.testEncodingIOException'
+ excludeTestsMatching 'org.apache.beam.sdk.coders.PCollectionCustomCoderTest.testDecodingNPException'
+ excludeTestsMatching 'org.apache.beam.sdk.coders.PCollectionCustomCoderTest.testDecodingIOException'
+ // TODO(BEAM-12744)
+ excludeTestsMatching 'org.apache.beam.sdk.PipelineTest.testEmptyPipeline'
+ // TODO(BEAM-12745)
+ excludeTestsMatching 'org.apache.beam.sdk.io.AvroIOTest*'
+ // TODO(BEAM-12746)
+ excludeTestsMatching 'org.apache.beam.sdk.io.FileIOTest*'
+ // TODO(BEAM-12747)
+ excludeTestsMatching 'org.apache.beam.sdk.transforms.WithTimestampsTest.withTimestampsBackwardsInTimeShouldThrow'
+ excludeTestsMatching 'org.apache.beam.sdk.transforms.WithTimestampsTest.withTimestampsWithNullTimestampShouldThrow'
+ // TODO(BEAM-12748)
+ excludeTestsMatching 'org.apache.beam.sdk.transforms.ViewTest.testEmptySingletonSideInput'
+ excludeTestsMatching 'org.apache.beam.sdk.transforms.ViewTest.testNonSingletonSideInput'
+ // TODO(BEAM-12749)
+ excludeTestsMatching 'org.apache.beam.sdk.transforms.MapElementsTest.testMapSimpleFunction'
+ // TODO(BEAM-12750)
+ excludeTestsMatching 'org.apache.beam.sdk.transforms.GroupIntoBatchesTest.testInGlobalWindowBatchSizeByteSizeFn'
+ excludeTestsMatching 'org.apache.beam.sdk.transforms.GroupIntoBatchesTest.testInStreamingMode'
+ excludeTestsMatching 'org.apache.beam.sdk.transforms.GroupIntoBatchesTest.testWithShardedKeyInGlobalWindow'
// These tests fail since there is no support for side inputs in Samza's unbounded splittable DoFn integration
excludeTestsMatching 'org.apache.beam.sdk.transforms.SplittableDoFnTest.testWindowedSideInputWithCheckpointsUnbounded'
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java
index 4b34a25..f791c3a 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java
@@ -180,7 +180,7 @@ public class SamzaTimerInternalsFactory<K> implements TimerInternalsFactory<K> {
final Collection<KeyedTimerData<K>> readyTimers = new ArrayList<>();
while (!eventTimeBuffer.isEmpty()
- && eventTimeBuffer.first().getTimerData().getTimestamp().isBefore(inputWatermark)
+ && !eventTimeBuffer.first().getTimerData().getTimestamp().isAfter(inputWatermark)
&& readyTimers.size() < maxReadyTimersToProcessOnce) {
final KeyedTimerData<K> keyedTimerData = eventTimeBuffer.pollFirst();
@@ -262,72 +262,81 @@ public class SamzaTimerInternalsFactory<K> implements TimerInternalsFactory<K> {
final Long lastTimestamp = state.get(keyedTimerData);
final Long newTimestamp = timerData.getTimestamp().getMillis();
- if (!newTimestamp.equals(lastTimestamp)) {
- if (lastTimestamp != null) {
- final TimerData lastTimerData =
- TimerData.of(
- timerData.getTimerId(),
- timerData.getNamespace(),
- new Instant(lastTimestamp),
- new Instant(lastTimestamp),
- timerData.getDomain());
- deleteTimer(lastTimerData, false);
- }
+ if (newTimestamp.equals(lastTimestamp)) {
+ return;
+ }
+
+ if (lastTimestamp != null) {
+ final TimerData lastTimerData =
+ TimerData.of(
+ timerData.getTimerId(),
+ timerData.getTimerFamilyId(),
+ timerData.getNamespace(),
+ new Instant(lastTimestamp),
+ new Instant(lastTimestamp),
+ timerData.getDomain());
+ deleteTimer(lastTimerData, false);
+ }
+
+ // persist it first
+ state.persist(keyedTimerData);
- // persist it first
- state.persist(keyedTimerData);
-
- // TO-DO: apply the same memory optimization over processing timers
- switch (timerData.getDomain()) {
- case EVENT_TIME:
- /**
- * To determine if the upcoming KeyedTimerData could be added to the Buffer while
- * guaranteeing the Buffer's timestamps are all <= than those in State Store to preserve
- * timestamp eviction priority:
- *
- * <p>1) If maxEventTimeInBuffer == long.MAX_VALUE, it indicates that the State is
- * empty, therefore all the Event times greater or lesser than newTimestamp are in the
- * buffer;
- *
- * <p>2) If newTimestamp < maxEventTimeInBuffer, it indicates that there are entries
- * greater than newTimestamp, so it is safe to add it to the buffer
- *
- * <p>In case that the Buffer is full, we remove the largest timer from memory according
- * to {@link KeyedTimerData.compareTo()}
- */
- if (newTimestamp < maxEventTimeInBuffer) {
- eventTimeBuffer.add(keyedTimerData);
- if (eventTimeBuffer.size() > maxEventTimerBufferSize) {
- eventTimeBuffer.pollLast();
- maxEventTimeInBuffer =
- eventTimeBuffer.last().getTimerData().getTimestamp().getMillis();
- }
+ // TO-DO: apply the same memory optimization over processing timers
+ switch (timerData.getDomain()) {
+ case EVENT_TIME:
+ /**
+ * To determine if the upcoming KeyedTimerData could be added to the Buffer while
+ * guaranteeing the Buffer's timestamps are all <= than those in State Store to preserve
+ * timestamp eviction priority:
+ *
+ * <p>1) If maxEventTimeInBuffer == long.MAX_VALUE, it indicates that the State is empty,
+ * therefore all the Event times greater or lesser than newTimestamp are in the buffer;
+ *
+ * <p>2) If newTimestamp < maxEventTimeInBuffer, it indicates that there are entries
+ * greater than newTimestamp, so it is safe to add it to the buffer
+ *
+ * <p>In case that the Buffer is full, we remove the largest timer from memory according
+ * to {@link KeyedTimerData.compareTo()}
+ */
+ if (newTimestamp < maxEventTimeInBuffer) {
+ eventTimeBuffer.add(keyedTimerData);
+ if (eventTimeBuffer.size() > maxEventTimerBufferSize) {
+ eventTimeBuffer.pollLast();
+ maxEventTimeInBuffer =
+ eventTimeBuffer.last().getTimerData().getTimestamp().getMillis();
}
- break;
+ }
+ break;
- case PROCESSING_TIME:
- timerRegistry.schedule(keyedTimerData, timerData.getTimestamp().getMillis());
- break;
+ case PROCESSING_TIME:
+ timerRegistry.schedule(keyedTimerData, timerData.getTimestamp().getMillis());
+ break;
- default:
- throw new UnsupportedOperationException(
- String.format(
- "%s currently only supports even time or processing time", SamzaRunner.class));
- }
+ default:
+ throw new UnsupportedOperationException(
+ String.format(
+ "%s currently only supports even time or processing time", SamzaRunner.class));
}
}
@Override
public void deleteTimer(
StateNamespace namespace, String timerId, String timerFamilyId, TimeDomain timeDomain) {
- Instant now = Instant.now();
- deleteTimer(TimerData.of(timerId, namespace, now, now, timeDomain));
+ TimerKey<K> timerKey = TimerKey.of(key, namespace, timerId, timerFamilyId);
+
+ Long lastTimestamp = state.get(timerKey, timeDomain);
+
+ if (lastTimestamp == null) {
+ return;
+ }
+
+ Instant timestamp = Instant.ofEpochMilli(lastTimestamp);
+ deleteTimer(TimerData.of(timerId, namespace, timestamp, timestamp, timeDomain));
}
@Override
public void deleteTimer(StateNamespace namespace, String timerId, String timerFamilyId) {
- Instant now = Instant.now();
- deleteTimer(TimerData.of(timerId, namespace, now, now, TimeDomain.EVENT_TIME));
+ deleteTimer(namespace, timerId, timerFamilyId, TimeDomain.EVENT_TIME);
}
@Override
@@ -426,13 +435,16 @@ public class SamzaTimerInternalsFactory<K> implements TimerInternalsFactory<K> {
}
Long get(KeyedTimerData<K> keyedTimerData) {
- final TimerKey<K> timerKey = TimerKey.of(keyedTimerData);
- switch (keyedTimerData.getTimerData().getDomain()) {
+ return get(TimerKey.of(keyedTimerData), keyedTimerData.getTimerData().getDomain());
+ }
+
+ Long get(TimerKey<K> key, TimeDomain domain) {
+ switch (domain) {
case EVENT_TIME:
- return eventTimeTimerState.get(timerKey).read();
+ return eventTimeTimerState.get(key).read();
case PROCESSING_TIME:
- return processingTimeTimerState.get(timerKey).read();
+ return processingTimeTimerState.get(key).read();
default:
throw new UnsupportedOperationException(
@@ -587,11 +599,20 @@ public class SamzaTimerInternalsFactory<K> implements TimerInternalsFactory<K> {
static <K> TimerKey<K> of(KeyedTimerData<K> keyedTimerData) {
final TimerInternals.TimerData timerData = keyedTimerData.getTimerData();
+ return of(
+ keyedTimerData.getKey(),
+ timerData.getNamespace(),
+ timerData.getTimerId(),
+ timerData.getTimerFamilyId());
+ }
+
+ static <K> TimerKey<K> of(
+ K key, StateNamespace namespace, String timerId, String timerFamilyId) {
return TimerKey.<K>builder()
- .setKey(keyedTimerData.getKey())
- .setStateNamespace(timerData.getNamespace())
- .setTimerId(timerData.getTimerId())
- .setTimerFamilyId(timerData.getTimerFamilyId())
+ .setKey(key)
+ .setStateNamespace(namespace)
+ .setTimerId(timerId)
+ .setTimerFamilyId(timerFamilyId)
.build();
}
diff --git a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactoryTest.java b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactoryTest.java
index 0f4ac5d..291519d 100644
--- a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactoryTest.java
+++ b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactoryTest.java
@@ -425,12 +425,8 @@ public class SamzaTimerInternalsFactoryTest {
store.close();
}
- /**
- * Test the number of event time timers maintained in memory does not go beyond the limit defined
- * in pipeline option.
- */
@Test
- public void testEventTimeTimersMemoryBoundary1() {
+ public void testBufferSizeNotExceedingPipelineOptionValue() {
final SamzaPipelineOptions pipelineOptions =
PipelineOptionsFactory.create().as(SamzaPipelineOptions.class);
pipelineOptions.setEventTimerBufferSize(2);
@@ -445,30 +441,19 @@ public class SamzaTimerInternalsFactoryTest {
// prepare 5 timers.
// timers in memory are then timestamped from 0 - 1;
// timers in store are then timestamped from 0 - 4.
- TimerInternals.TimerData timer;
for (int i = 0; i < 5; i++) {
- timer =
- TimerInternals.TimerData.of(
- "timer" + i, nameSpace, new Instant(i), new Instant(i), TimeDomain.EVENT_TIME);
- timerInternals.setTimer(timer);
+ timerInternals.setTimer(
+ nameSpace, "timer" + i, "", new Instant(i), new Instant(i), TimeDomain.EVENT_TIME);
}
- timerInternalsFactory.setInputWatermark(new Instant(2));
- Collection<KeyedTimerData<String>> readyTimers;
-
- readyTimers = timerInternalsFactory.removeReadyTimers();
- assertEquals(2, readyTimers.size());
+ // only two timers are supposed to be in event time buffer
assertEquals(2, timerInternalsFactory.getEventTimeBuffer().size());
store.close();
}
- /**
- * Test the total number of event time timers reloaded into memory is aligned with the number of
- * event time timers written to the store.
- */
@Test
- public void testEventTimeTimersMemoryBoundary2() {
+ public void testAllTimersAreFiredWithReload() {
final SamzaPipelineOptions pipelineOptions =
PipelineOptionsFactory.create().as(SamzaPipelineOptions.class);
pipelineOptions.setEventTimerBufferSize(2);
@@ -483,18 +468,16 @@ public class SamzaTimerInternalsFactoryTest {
// prepare 3 timers.
// timers in memory now are timestamped from 0 - 1;
// timers in store now are timestamped from 0 - 2.
- TimerInternals.TimerData timer;
for (int i = 0; i < 3; i++) {
- timer =
- TimerInternals.TimerData.of(
- "timer" + i, nameSpace, new Instant(i), new Instant(i), TimeDomain.EVENT_TIME);
- timerInternals.setTimer(timer);
+ timerInternals.setTimer(
+ nameSpace, "timer" + i, "", new Instant(i), new Instant(i), TimeDomain.EVENT_TIME);
}
// total number of event time timers to fire equals to the number of timers in store
Collection<KeyedTimerData<String>> readyTimers;
timerInternalsFactory.setInputWatermark(new Instant(3));
readyTimers = timerInternalsFactory.removeReadyTimers();
+ // buffer should reload from store and all timers are supposed to be fired.
assertEquals(3, readyTimers.size());
store.close();
@@ -506,7 +489,7 @@ public class SamzaTimerInternalsFactoryTest {
* maintained in order.
*/
@Test
- public void testEventTimeTimersMemoryBoundary3() {
+ public void testAllTimersAreFiredInOrder() {
final SamzaPipelineOptions pipelineOptions =
PipelineOptionsFactory.create().as(SamzaPipelineOptions.class);
pipelineOptions.setEventTimerBufferSize(5);
@@ -521,19 +504,16 @@ public class SamzaTimerInternalsFactoryTest {
// prepare 8 timers.
// timers in memory now are timestamped from 0 - 4;
// timers in store now are timestamped from 0 - 7.
- TimerInternals.TimerData timer;
for (int i = 0; i < 8; i++) {
- timer =
- TimerInternals.TimerData.of(
- "timer" + i, nameSpace, new Instant(i), new Instant(i), TimeDomain.EVENT_TIME);
- timerInternals.setTimer(timer);
+ timerInternals.setTimer(
+ nameSpace, "timer" + i, "", new Instant(i), new Instant(i), TimeDomain.EVENT_TIME);
}
// fire the first 2 timers.
// timers in memory now are timestamped from 2 - 4;
// timers in store now are timestamped from 2 - 7.
Collection<KeyedTimerData<String>> readyTimers;
- timerInternalsFactory.setInputWatermark(new Instant(2));
+ timerInternalsFactory.setInputWatermark(new Instant(1));
long lastTimestamp = 0;
readyTimers = timerInternalsFactory.removeReadyTimers();
for (KeyedTimerData<String> keyedTimerData : readyTimers) {
@@ -549,10 +529,8 @@ public class SamzaTimerInternalsFactoryTest {
// timers in store now are timestamped from 2 - 19.
// the total number of timers to fire is 18.
for (int i = 8; i < 20; i++) {
- timer =
- TimerInternals.TimerData.of(
- "timer" + i, nameSpace, new Instant(i), new Instant(i), TimeDomain.EVENT_TIME);
- timerInternals.setTimer(timer);
+ timerInternals.setTimer(
+ nameSpace, "timer" + i, "", new Instant(i), new Instant(i), TimeDomain.EVENT_TIME);
}
timerInternalsFactory.setInputWatermark(new Instant(20));
lastTimestamp = 0;
@@ -567,14 +545,8 @@ public class SamzaTimerInternalsFactoryTest {
store.close();
}
- /**
- * Test the total number of event time timers reloaded into memory is aligned with the number of
- * the event time timers written to the store. Moreover, event time timers reloaded into memory is
- * maintained in order, even though memory boundary is hit and timer is early than the last timer
- * in memory.
- */
@Test
- public void testEventTimeTimersMemoryBoundary4() {
+ public void testNewTimersAreInsertedInOrder() {
final SamzaPipelineOptions pipelineOptions =
PipelineOptionsFactory.create().as(SamzaPipelineOptions.class);
pipelineOptions.setEventTimerBufferSize(5);
@@ -586,22 +558,19 @@ public class SamzaTimerInternalsFactoryTest {
final StateNamespace nameSpace = StateNamespaces.global();
final TimerInternals timerInternals = timerInternalsFactory.timerInternalsForKey("testKey");
- // prepare 8 timers.
+ // prepare 10 timers.
// timers in memory now are timestamped from 0 - 4;
// timers in store now are timestamped from 0 - 9.
- TimerInternals.TimerData timer;
for (int i = 0; i < 10; i++) {
- timer =
- TimerInternals.TimerData.of(
- "timer" + i, nameSpace, new Instant(i), new Instant(i), TimeDomain.EVENT_TIME);
- timerInternals.setTimer(timer);
+ timerInternals.setTimer(
+ nameSpace, "timer" + i, "", new Instant(i), new Instant(i), TimeDomain.EVENT_TIME);
}
// fire the first 2 timers.
// timers in memory now are timestamped from 2 - 4;
// timers in store now are timestamped from 2 - 9.
Collection<KeyedTimerData<String>> readyTimers;
- timerInternalsFactory.setInputWatermark(new Instant(2));
+ timerInternalsFactory.setInputWatermark(new Instant(1));
long lastTimestamp = 0;
readyTimers = timerInternalsFactory.removeReadyTimers();
for (KeyedTimerData<String> keyedTimerData : readyTimers) {
@@ -611,16 +580,14 @@ public class SamzaTimerInternalsFactoryTest {
}
assertEquals(2, readyTimers.size());
- // add 3 timers.
+ // add 3 timers but timer 2 has duplicate so drop.
// timers in memory now are timestamped from 0 to 2 prefixed with lateTimer, and 2 to
// 4 prefixed with timer, timestamp is in order;
// timers in store now are timestamped from 0 to 2 prefixed with lateTimer, and 2 to 9
// prefixed with timer, timestamp is in order;
for (int i = 0; i < 3; i++) {
- timer =
- TimerInternals.TimerData.of(
- "lateTimer" + i, nameSpace, new Instant(i), new Instant(i), TimeDomain.EVENT_TIME);
- timerInternals.setTimer(timer);
+ timerInternals.setTimer(
+ nameSpace, "timer" + i, "", new Instant(i), new Instant(i), TimeDomain.EVENT_TIME);
}
// there are 11 timers in state now.
@@ -647,15 +614,14 @@ public class SamzaTimerInternalsFactoryTest {
assertTrue(lastTimestamp <= currentTimeStamp);
lastTimestamp = currentTimeStamp;
}
- assertEquals(5, readyTimers.size());
+ assertEquals(4, readyTimers.size());
assertEquals(0, timerInternalsFactory.getEventTimeBuffer().size());
store.close();
}
- /** Test buffer could still be filled after restore to a non-full state. */
@Test
- public void testEventTimeTimersMemoryBoundary5() {
+ public void testBufferRefilledAfterRestoreToNonFullState() {
final SamzaPipelineOptions pipelineOptions =
PipelineOptionsFactory.create().as(SamzaPipelineOptions.class);
pipelineOptions.setEventTimerBufferSize(5);
@@ -670,37 +636,28 @@ public class SamzaTimerInternalsFactoryTest {
// prepare (buffer capacity + 1) 6 timers.
// timers in memory now are timestamped from 0 - 4;
// timer in store now is timestamped 6.
- TimerInternals.TimerData timer;
for (int i = 0; i < 6; i++) {
- timer =
- TimerInternals.TimerData.of(
- "timer" + i, nameSpace, new Instant(i), new Instant(i), TimeDomain.EVENT_TIME);
- timerInternals.setTimer(timer);
+ timerInternals.setTimer(
+ nameSpace, "timer" + i, "", new Instant(i), new Instant(i), TimeDomain.EVENT_TIME);
}
// total number of event time timers to fire equals to the number of timers in store
Collection<KeyedTimerData<String>> readyTimers;
- timerInternalsFactory.setInputWatermark(new Instant(5));
+ timerInternalsFactory.setInputWatermark(new Instant(4));
readyTimers = timerInternalsFactory.removeReadyTimers();
assertEquals(5, readyTimers.size());
// reloaded timer5
assertEquals(1, timerInternalsFactory.getEventTimeBuffer().size());
- for (int i = 0; i < 7; i++) {
- timer =
- TimerInternals.TimerData.of(
- "timer" + (i + 6),
- nameSpace,
- new Instant(i + 6),
- new Instant(i + 6),
- TimeDomain.EVENT_TIME);
- timerInternals.setTimer(timer);
+ for (int i = 6; i < 13; i++) {
+ timerInternals.setTimer(
+ nameSpace, "timer" + i, "", new Instant(i), new Instant(i), TimeDomain.EVENT_TIME);
}
// timers should go into buffer not state
assertEquals(5, timerInternalsFactory.getEventTimeBuffer().size());
- // watermark 12 comes, so all timers will be evicted in order.
- timerInternalsFactory.setInputWatermark(new Instant(11));
+ // watermark 10 comes,6 timers will be evicted in order and 2 still in buffer.
+ timerInternalsFactory.setInputWatermark(new Instant(10));
readyTimers = timerInternalsFactory.removeReadyTimers();
long lastTimestamp = 0;
for (KeyedTimerData<String> keyedTimerData : readyTimers) {
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java
index 4fa342e..059abcf 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteFilesTest.java
@@ -64,6 +64,7 @@ import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.testing.UsesTestStream;
+import org.apache.beam.sdk.testing.UsesTestStreamWithProcessingTime;
import org.apache.beam.sdk.testing.UsesUnboundedPCollections;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
@@ -329,7 +330,12 @@ public class WriteFilesTest {
}
@Test
- @Category({NeedsRunner.class, UsesUnboundedPCollections.class, UsesTestStream.class})
+ @Category({
+ NeedsRunner.class,
+ UsesUnboundedPCollections.class,
+ UsesTestStream.class,
+ UsesTestStreamWithProcessingTime.class
+ })
public void testWithRunnerDeterminedShardingTestStream() throws IOException {
List<String> elements = Lists.newArrayList();
for (int i = 0; i < 30; ++i) {
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupIntoBatchesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupIntoBatchesTest.java
index 5ba8da1..316e258 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupIntoBatchesTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupIntoBatchesTest.java
@@ -37,6 +37,7 @@ import org.apache.beam.sdk.testing.TestStream.ProcessingTimeEvent;
import org.apache.beam.sdk.testing.TestStream.WatermarkEvent;
import org.apache.beam.sdk.testing.UsesStatefulParDo;
import org.apache.beam.sdk.testing.UsesTestStream;
+import org.apache.beam.sdk.testing.UsesTestStreamWithProcessingTime;
import org.apache.beam.sdk.testing.UsesTimersInParDo;
import org.apache.beam.sdk.transforms.windowing.AfterPane;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -451,6 +452,7 @@ public class GroupIntoBatchesTest implements Serializable {
NeedsRunner.class,
UsesTimersInParDo.class,
UsesTestStream.class,
+ UsesTestStreamWithProcessingTime.class,
UsesStatefulParDo.class
})
public void testBufferingTimerInFixedWindow() {
@@ -573,6 +575,7 @@ public class GroupIntoBatchesTest implements Serializable {
NeedsRunner.class,
UsesTimersInParDo.class,
UsesTestStream.class,
+ UsesTestStreamWithProcessingTime.class,
UsesStatefulParDo.class
})
public void testBufferingTimerInGlobalWindow() {
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index 4477151..25a574a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -3994,6 +3994,7 @@ public class ParDoTest implements Serializable {
@Category({
ValidatesRunner.class,
UsesTimersInParDo.class,
+ UsesTestStream.class,
UsesTestStreamWithProcessingTime.class
})
public void testSimpleProcessingTimerTimer() throws Exception {
@@ -4165,6 +4166,7 @@ public class ParDoTest implements Serializable {
@Category({
ValidatesRunner.class,
UsesTimersInParDo.class,
+ UsesTestStream.class,
UsesTestStreamWithProcessingTime.class
})
public void testProcessingTimeTimerCanBeReset() throws Exception {
@@ -4589,6 +4591,7 @@ public class ParDoTest implements Serializable {
ValidatesRunner.class,
UsesStatefulParDo.class,
UsesTimersInParDo.class,
+ UsesTestStream.class,
UsesTestStreamWithProcessingTime.class,
UsesTestStreamWithOutputTimestamp.class
})
@@ -4670,7 +4673,7 @@ public class ParDoTest implements Serializable {
}
@Test
- @Category(NeedsRunner.class)
+ @Category({NeedsRunner.class, UsesTimersInParDo.class, UsesTestStream.class})
public void testRelativeTimerWithOutputTimestamp() {
DoFn<KV<Void, String>, String> buffferFn =
new DoFn<KV<Void, String>, String>() {
@@ -4854,7 +4857,12 @@ public class ParDoTest implements Serializable {
}
@Test
- @Category({NeedsRunner.class, UsesTimersInParDo.class, UsesTestStream.class})
+ @Category({
+ NeedsRunner.class,
+ UsesTimersInParDo.class,
+ UsesTestStream.class,
+ UsesTestStreamWithProcessingTime.class
+ })
public void testSetAndClearProcessingTimeTimer() {
final String timerId = "processing-timer";
@@ -4929,7 +4937,12 @@ public class ParDoTest implements Serializable {
}
@Test
- @Category({NeedsRunner.class, UsesTimersInParDo.class, UsesTestStream.class})
+ @Category({
+ NeedsRunner.class,
+ UsesTimersInParDo.class,
+ UsesTestStream.class,
+ UsesTestStreamWithProcessingTime.class
+ })
public void testClearUnsetProcessingTimeTimer() {
final String timerId = "processing-timer";
@@ -5001,7 +5014,12 @@ public class ParDoTest implements Serializable {
}
@Test
- @Category({NeedsRunner.class, UsesTimersInParDo.class, UsesTestStream.class})
+ @Category({
+ NeedsRunner.class,
+ UsesTimersInParDo.class,
+ UsesTestStream.class,
+ UsesTestStreamWithProcessingTime.class
+ })
public void testClearProcessingTimeTimer() {
final String timerId = "processing-timer";
final String clearTimerId = "clear-timer";
@@ -5111,7 +5129,12 @@ public class ParDoTest implements Serializable {
}
@Test
- @Category({NeedsRunner.class, UsesTimersInParDo.class, UsesTestStream.class})
+ @Category({
+ NeedsRunner.class,
+ UsesTimersInParDo.class,
+ UsesTestStream.class,
+ UsesTestStreamWithProcessingTime.class
+ })
public void testSetProcessingTimerAfterClear() {
final String timerId = "processing-timer";
@@ -5745,6 +5768,7 @@ public class ParDoTest implements Serializable {
@Category({
ValidatesRunner.class,
UsesTimersInParDo.class,
+ UsesTestStream.class,
UsesTestStreamWithProcessingTime.class,
UsesTimerMap.class
})