You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by re...@apache.org on 2020/05/16 06:40:18 UTC
[beam] branch master updated: Merge pull request #11725:
[BEAM-10015] Fix output timestamp on dataflow runner
This is an automated email from the ASF dual-hosted git repository.
reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new c395c84 Merge pull request #11725: [BEAM-10015] Fix output timestamp on dataflow runner
c395c84 is described below
commit c395c84943fbd4875d2021f0cbf744e982c9fe31
Author: reuvenlax <re...@google.com>
AuthorDate: Fri May 15 23:39:53 2020 -0700
Merge pull request #11725: [BEAM-10015] Fix output timestamp on dataflow runner
---
.../operators/ApexTimerInternalsTest.java | 6 +-
.../beam/runners/core/ReduceFnContextFactory.java | 4 +-
.../core/SplittableParDoViaKeyedWorkItems.java | 3 +-
.../apache/beam/runners/core/TimerInternals.java | 32 +----
.../runners/core/InMemoryTimerInternalsTest.java | 38 ++++--
.../beam/runners/core/KeyedWorkItemCoderTest.java | 6 +-
.../apache/beam/runners/core/ReduceFnTester.java | 4 +-
.../beam/runners/core/SimpleDoFnRunnerTest.java | 1 +
.../SimplePushbackSideInputDoFnRunnerTest.java | 1 +
.../beam/runners/core/TimerInternalsTest.java | 37 ++++--
.../core/triggers/TriggerStateMachineTester.java | 4 +-
.../runners/direct/DirectTimerInternalsTest.java | 26 +++-
...cycleManagerRemovingTransformEvaluatorTest.java | 7 +-
.../beam/runners/direct/EvaluationContextTest.java | 3 +-
.../beam/runners/direct/WatermarkManagerTest.java | 148 ++++++++++++++++-----
.../streaming/ExecutableStageDoFnOperatorTest.java | 2 +
.../dataflow/worker/WindmillTimerInternals.java | 137 ++++++++++++++-----
.../worker/StreamingGroupAlsoByWindowFnsTest.java | 1 +
...eamingKeyedWorkItemSideInputDoFnRunnerTest.java | 1 +
.../worker/StreamingModeExecutionContextTest.java | 6 +-
.../worker/StreamingSideInputFetcherTest.java | 1 +
.../dataflow/worker/UserParDoFnFactoryTest.java | 2 +
.../dataflow/worker/WindmillKeyedWorkItemTest.java | 3 +-
.../worker/WindmillTimerInternalsTest.java | 48 ++++---
.../apache/beam/runners/samza/runtime/DoFnOp.java | 1 +
.../beam/runners/samza/runtime/KeyedTimerData.java | 2 +-
.../samza/runtime/SamzaTimerInternalsFactory.java | 13 +-
.../runners/samza/runtime/KeyedTimerDataTest.java | 2 +-
.../runtime/SamzaTimerInternalsFactoryTest.java | 25 ++--
.../java/org/apache/beam/sdk/transforms/DoFn.java | 4 +-
.../org/apache/beam/sdk/transforms/ParDoTest.java | 38 ++++++
31 files changed, 439 insertions(+), 167 deletions(-)
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternalsTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternalsTest.java
index 180d2fe..f4e8d10 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternalsTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternalsTest.java
@@ -128,7 +128,11 @@ public class ApexTimerInternalsTest {
TimerDataCoderV2 timerDataCoder = TimerDataCoderV2.of(GlobalWindow.Coder.INSTANCE);
TimerData timerData =
TimerData.of(
- "arbitrary-id", StateNamespaces.global(), new Instant(0), TimeDomain.EVENT_TIME);
+ "arbitrary-id",
+ StateNamespaces.global(),
+ new Instant(0),
+ new Instant(0),
+ TimeDomain.EVENT_TIME);
String key = "key";
ApexTimerInternals<String> timerInternals = new ApexTimerInternals<>(timerDataCoder);
timerInternals.setContext(key, StringUtf8Coder.of(), Instant.now(), null);
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java
index ac6185b..8bdcc87 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java
@@ -132,12 +132,12 @@ class ReduceFnContextFactory<K, InputT, OutputT, W extends BoundedWindow> {
@Override
public void setTimer(Instant timestamp, TimeDomain timeDomain) {
- timerInternals.setTimer(TimerData.of(namespace, timestamp, timeDomain));
+ timerInternals.setTimer(TimerData.of(namespace, timestamp, timestamp, timeDomain));
}
@Override
public void deleteTimer(Instant timestamp, TimeDomain timeDomain) {
- timerInternals.deleteTimer(TimerData.of(namespace, timestamp, timeDomain));
+ timerInternals.deleteTimer(TimerData.of(namespace, timestamp, timestamp, timeDomain));
}
@Override
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
index 01612f0..3ef996f 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
@@ -545,7 +545,8 @@ public class SplittableParDoViaKeyedWorkItems {
holdState.add(futureOutputWatermark);
// Set a timer to continue processing this element.
timerInternals.setTimer(
- TimerInternals.TimerData.of(stateNamespace, wakeupTime, TimeDomain.PROCESSING_TIME));
+ TimerInternals.TimerData.of(
+ stateNamespace, wakeupTime, wakeupTime, TimeDomain.PROCESSING_TIME));
}
private DoFnInvoker.ArgumentProvider<InputT, OutputT> wrapContextAsStartBundle(
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java
index 5ea3fb7..413884d 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java
@@ -214,27 +214,6 @@ public interface TimerInternals {
}
/**
- * Construct a {@link TimerData} for the given parameters, where the timer ID is automatically
- * generated. Construct a {@link TimerData} for the given parameters except for {@code
- * outputTimestamp}. {@code outputTimestamp} is set to timer {@code timestamp}.
- */
- public static TimerData of(
- String timerId, StateNamespace namespace, Instant timestamp, TimeDomain domain) {
- return new AutoValue_TimerInternals_TimerData(
- timerId, "", namespace, timestamp, timestamp, domain);
- }
-
- public static TimerData of(
- String timerId,
- String timerFamilyId,
- StateNamespace namespace,
- Instant timestamp,
- TimeDomain domain) {
- return new AutoValue_TimerInternals_TimerData(
- timerId, timerFamilyId, namespace, timestamp, timestamp, domain);
- }
-
- /**
* Construct a {@link TimerData} for the given parameters except for timer ID. Timer ID is
* deterministically generated from the {@code timestamp} and {@code domain}.
*/
@@ -250,15 +229,6 @@ public interface TimerInternals {
}
/**
- * Construct a {@link TimerData} for the given parameters, where the timer ID is
- * deterministically generated from the {@code timestamp} and {@code domain}. Also, output
- * timestamp is set to the timer timestamp by default.
- */
- public static TimerData of(StateNamespace namespace, Instant timestamp, TimeDomain domain) {
- return of(namespace, timestamp, timestamp, domain);
- }
-
- /**
* {@inheritDoc}.
*
* <p>Used for sorting {@link TimerData} by timestamp. Furthermore, we compare timers by all the
@@ -364,7 +334,7 @@ public interface TimerInternals {
StateNamespaces.fromString(STRING_CODER.decode(inStream), windowCoder);
Instant timestamp = INSTANT_CODER.decode(inStream);
TimeDomain domain = TimeDomain.valueOf(STRING_CODER.decode(inStream));
- return TimerData.of(timerId, namespace, timestamp, domain);
+ return TimerData.of(timerId, namespace, timestamp, timestamp, domain);
}
@Override
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryTimerInternalsTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryTimerInternalsTest.java
index 3ab7932..bc997c3 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryTimerInternalsTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryTimerInternalsTest.java
@@ -39,8 +39,10 @@ public class InMemoryTimerInternalsTest {
@Test
public void testFiringEventTimers() throws Exception {
InMemoryTimerInternals underTest = new InMemoryTimerInternals();
- TimerData eventTimer1 = TimerData.of(ID1, NS1, new Instant(19), TimeDomain.EVENT_TIME);
- TimerData eventTimer2 = TimerData.of(ID2, NS1, new Instant(29), TimeDomain.EVENT_TIME);
+ TimerData eventTimer1 =
+ TimerData.of(ID1, NS1, new Instant(19), new Instant(19), TimeDomain.EVENT_TIME);
+ TimerData eventTimer2 =
+ TimerData.of(ID2, NS1, new Instant(29), new Instant(29), TimeDomain.EVENT_TIME);
underTest.setTimer(eventTimer1);
underTest.setTimer(eventTimer2);
@@ -79,7 +81,7 @@ public class InMemoryTimerInternalsTest {
underTest.advanceInputWatermark(laterTimestamp.plus(1L));
assertThat(
underTest.removeNextEventTimer(),
- equalTo(TimerData.of(ID1, "", NS1, laterTimestamp, TimeDomain.EVENT_TIME)));
+ equalTo(TimerData.of(ID1, "", NS1, laterTimestamp, laterTimestamp, TimeDomain.EVENT_TIME)));
}
@Test
@@ -107,8 +109,10 @@ public class InMemoryTimerInternalsTest {
@Test
public void testFiringProcessingTimeTimers() throws Exception {
InMemoryTimerInternals underTest = new InMemoryTimerInternals();
- TimerData processingTime1 = TimerData.of(NS1, new Instant(19), TimeDomain.PROCESSING_TIME);
- TimerData processingTime2 = TimerData.of(NS1, new Instant(29), TimeDomain.PROCESSING_TIME);
+ TimerData processingTime1 =
+ TimerData.of(NS1, new Instant(19), new Instant(19), TimeDomain.PROCESSING_TIME);
+ TimerData processingTime2 =
+ TimerData.of(NS1, new Instant(29), new Instant(29), TimeDomain.PROCESSING_TIME);
underTest.setTimer(processingTime1);
underTest.setTimer(processingTime2);
@@ -136,14 +140,20 @@ public class InMemoryTimerInternalsTest {
@Test
public void testTimerOrdering() throws Exception {
InMemoryTimerInternals underTest = new InMemoryTimerInternals();
- TimerData eventTime1 = TimerData.of(NS1, new Instant(19), TimeDomain.EVENT_TIME);
- TimerData processingTime1 = TimerData.of(NS1, new Instant(19), TimeDomain.PROCESSING_TIME);
+ TimerData eventTime1 =
+ TimerData.of(NS1, new Instant(19), new Instant(19), TimeDomain.EVENT_TIME);
+ TimerData processingTime1 =
+ TimerData.of(NS1, new Instant(19), new Instant(19), TimeDomain.PROCESSING_TIME);
TimerData synchronizedProcessingTime1 =
- TimerData.of(NS1, new Instant(19), TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
- TimerData eventTime2 = TimerData.of(NS1, new Instant(29), TimeDomain.EVENT_TIME);
- TimerData processingTime2 = TimerData.of(NS1, new Instant(29), TimeDomain.PROCESSING_TIME);
+ TimerData.of(
+ NS1, new Instant(19), new Instant(19), TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
+ TimerData eventTime2 =
+ TimerData.of(NS1, new Instant(29), new Instant(29), TimeDomain.EVENT_TIME);
+ TimerData processingTime2 =
+ TimerData.of(NS1, new Instant(29), new Instant(29), TimeDomain.PROCESSING_TIME);
TimerData synchronizedProcessingTime2 =
- TimerData.of(NS1, new Instant(29), TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
+ TimerData.of(
+ NS1, new Instant(29), new Instant(29), TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
underTest.setTimer(processingTime1);
underTest.setTimer(eventTime1);
@@ -176,8 +186,10 @@ public class InMemoryTimerInternalsTest {
@Test
public void testDeduplicate() throws Exception {
InMemoryTimerInternals underTest = new InMemoryTimerInternals();
- TimerData eventTime = TimerData.of(NS1, new Instant(19), TimeDomain.EVENT_TIME);
- TimerData processingTime = TimerData.of(NS1, new Instant(19), TimeDomain.PROCESSING_TIME);
+ TimerData eventTime =
+ TimerData.of(NS1, new Instant(19), new Instant(19), TimeDomain.EVENT_TIME);
+ TimerData processingTime =
+ TimerData.of(NS1, new Instant(19), new Instant(19), TimeDomain.PROCESSING_TIME);
underTest.setTimer(eventTime);
underTest.setTimer(eventTime);
underTest.setTimer(processingTime);
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/KeyedWorkItemCoderTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/KeyedWorkItemCoderTest.java
index 1aebb03..5de5749 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/KeyedWorkItemCoderTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/KeyedWorkItemCoderTest.java
@@ -43,7 +43,11 @@ public class KeyedWorkItemCoderTest {
public void testEncodeDecodeEqual() throws Exception {
Iterable<TimerData> timers =
ImmutableList.of(
- TimerData.of(StateNamespaces.global(), new Instant(500L), TimeDomain.EVENT_TIME));
+ TimerData.of(
+ StateNamespaces.global(),
+ new Instant(500L),
+ new Instant(500L),
+ TimeDomain.EVENT_TIME));
Iterable<WindowedValue<Integer>> elements =
ImmutableList.of(
WindowedValue.valueInGlobalWindow(1),
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
index f316d07..0e2e9a3 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
@@ -570,7 +570,8 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
ReduceFnRunner<String, InputT, OutputT, W> runner = createRunner();
ArrayList<TimerData> timers = new ArrayList<>(1);
timers.add(
- TimerData.of(StateNamespaces.window(windowFn.windowCoder(), window), timestamp, domain));
+ TimerData.of(
+ StateNamespaces.window(windowFn.windowCoder(), window), timestamp, timestamp, domain));
runner.onTimers(timers);
runner.persist();
}
@@ -583,6 +584,7 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
TimerData.of(
StateNamespaces.window(windowFn.windowCoder(), window),
timer.getTimestamp(),
+ timer.getTimestamp(),
timer.getValue()));
}
runner.onTimers(timerData);
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
index a1c3d72..9daf372 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
@@ -441,6 +441,7 @@ public class SimpleDoFnRunnerTest {
TimerData.of(
DoFnWithTimers.TIMER_ID,
StateNamespaces.window(windowCoder, (W) context.window()),
+ context.fireTimestamp(),
context.timestamp(),
context.timeDomain()));
}
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunnerTest.java
index f9d4296..a0a6102 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunnerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunnerTest.java
@@ -318,6 +318,7 @@ public class SimplePushbackSideInputDoFnRunnerTest {
timerId,
StateNamespaces.window(IntervalWindow.getCoder(), window),
timestamp,
+ timestamp,
TimeDomain.EVENT_TIME)));
}
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/TimerInternalsTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/TimerInternalsTest.java
index ab2978f..c6a36bd 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/TimerInternalsTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/TimerInternalsTest.java
@@ -42,7 +42,11 @@ public class TimerInternalsTest {
CoderProperties.coderDecodeEncodeEqual(
TimerDataCoderV2.of(GlobalWindow.Coder.INSTANCE),
TimerData.of(
- "arbitrary-id", StateNamespaces.global(), new Instant(0), TimeDomain.EVENT_TIME));
+ "arbitrary-id",
+ StateNamespaces.global(),
+ new Instant(0),
+ new Instant(0),
+ TimeDomain.EVENT_TIME));
Coder<IntervalWindow> windowCoder = IntervalWindow.getCoder();
CoderProperties.coderDecodeEncodeEqual(
@@ -52,6 +56,7 @@ public class TimerInternalsTest {
StateNamespaces.window(
windowCoder, new IntervalWindow(new Instant(0), new Instant(100))),
new Instant(99),
+ new Instant(99),
TimeDomain.PROCESSING_TIME));
}
@@ -64,10 +69,12 @@ public class TimerInternalsTest {
public void testCompareEqual() {
Instant timestamp = new Instant(100);
StateNamespace namespace = StateNamespaces.global();
- TimerData timer = TimerData.of("id", namespace, timestamp, TimeDomain.EVENT_TIME);
+ TimerData timer = TimerData.of("id", namespace, timestamp, timestamp, TimeDomain.EVENT_TIME);
assertThat(
- timer, comparesEqualTo(TimerData.of("id", namespace, timestamp, TimeDomain.EVENT_TIME)));
+ timer,
+ comparesEqualTo(
+ TimerData.of("id", namespace, timestamp, timestamp, TimeDomain.EVENT_TIME)));
}
@Test
@@ -76,8 +83,10 @@ public class TimerInternalsTest {
Instant secondTimestamp = new Instant(200);
StateNamespace namespace = StateNamespaces.global();
- TimerData firstTimer = TimerData.of(namespace, firstTimestamp, TimeDomain.EVENT_TIME);
- TimerData secondTimer = TimerData.of(namespace, secondTimestamp, TimeDomain.EVENT_TIME);
+ TimerData firstTimer =
+ TimerData.of(namespace, firstTimestamp, firstTimestamp, TimeDomain.EVENT_TIME);
+ TimerData secondTimer =
+ TimerData.of(namespace, secondTimestamp, secondTimestamp, TimeDomain.EVENT_TIME);
assertThat(firstTimer, lessThan(secondTimer));
}
@@ -87,10 +96,10 @@ public class TimerInternalsTest {
Instant timestamp = new Instant(100);
StateNamespace namespace = StateNamespaces.global();
- TimerData eventTimer = TimerData.of(namespace, timestamp, TimeDomain.EVENT_TIME);
- TimerData procTimer = TimerData.of(namespace, timestamp, TimeDomain.PROCESSING_TIME);
+ TimerData eventTimer = TimerData.of(namespace, timestamp, timestamp, TimeDomain.EVENT_TIME);
+ TimerData procTimer = TimerData.of(namespace, timestamp, timestamp, TimeDomain.PROCESSING_TIME);
TimerData synchronizedProcTimer =
- TimerData.of(namespace, timestamp, TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
+ TimerData.of(namespace, timestamp, timestamp, TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
assertThat(eventTimer, lessThan(procTimer));
assertThat(eventTimer, lessThan(synchronizedProcTimer));
@@ -107,8 +116,10 @@ public class TimerInternalsTest {
StateNamespace firstWindowNs = StateNamespaces.window(windowCoder, firstWindow);
StateNamespace secondWindowNs = StateNamespaces.window(windowCoder, secondWindow);
- TimerData secondEventTime = TimerData.of(firstWindowNs, timestamp, TimeDomain.EVENT_TIME);
- TimerData thirdEventTime = TimerData.of(secondWindowNs, timestamp, TimeDomain.EVENT_TIME);
+ TimerData secondEventTime =
+ TimerData.of(firstWindowNs, timestamp, timestamp, TimeDomain.EVENT_TIME);
+ TimerData thirdEventTime =
+ TimerData.of(secondWindowNs, timestamp, timestamp, TimeDomain.EVENT_TIME);
assertThat(secondEventTime, lessThan(thirdEventTime));
}
@@ -118,8 +129,10 @@ public class TimerInternalsTest {
Instant timestamp = new Instant(100);
StateNamespace namespace = StateNamespaces.global();
- TimerData id0Timer = TimerData.of("id0", namespace, timestamp, TimeDomain.EVENT_TIME);
- TimerData id1Timer = TimerData.of("id1", namespace, timestamp, TimeDomain.EVENT_TIME);
+ TimerData id0Timer =
+ TimerData.of("id0", namespace, timestamp, timestamp, TimeDomain.EVENT_TIME);
+ TimerData id1Timer =
+ TimerData.of("id1", namespace, timestamp, timestamp, TimeDomain.EVENT_TIME);
assertThat(id0Timer, lessThan(id1Timer));
}
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java
index 21f2d13..bd59f87 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java
@@ -413,12 +413,12 @@ public class TriggerStateMachineTester<InputT, W extends BoundedWindow> {
@Override
public void setTimer(Instant timestamp, TimeDomain timeDomain) {
- timerInternals.setTimer(TimerData.of(namespace, timestamp, timeDomain));
+ timerInternals.setTimer(TimerData.of(namespace, timestamp, timestamp, timeDomain));
}
@Override
public void deleteTimer(Instant timestamp, TimeDomain timeDomain) {
- timerInternals.deleteTimer(TimerData.of(namespace, timestamp, timeDomain));
+ timerInternals.deleteTimer(TimerData.of(namespace, timestamp, timestamp, timeDomain));
}
@Override
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectTimerInternalsTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectTimerInternalsTest.java
index dfec1f1..5bd0d39 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectTimerInternalsTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectTimerInternalsTest.java
@@ -62,13 +62,22 @@ public class DirectTimerInternalsTest {
@Test
public void setTimerAddsToBuilder() {
TimerData eventTimer =
- TimerData.of(StateNamespaces.global(), new Instant(20145L), TimeDomain.EVENT_TIME);
+ TimerData.of(
+ StateNamespaces.global(),
+ new Instant(20145L),
+ new Instant(20145L),
+ TimeDomain.EVENT_TIME);
TimerData processingTimer =
- TimerData.of(StateNamespaces.global(), new Instant(125555555L), TimeDomain.PROCESSING_TIME);
+ TimerData.of(
+ StateNamespaces.global(),
+ new Instant(125555555L),
+ new Instant(125555555L),
+ TimeDomain.PROCESSING_TIME);
TimerData synchronizedProcessingTimer =
TimerData.of(
StateNamespaces.global(),
new Instant(98745632189L),
+ new Instant(98745632189L),
TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
internals.setTimer(eventTimer);
internals.setTimer(processingTimer);
@@ -82,13 +91,22 @@ public class DirectTimerInternalsTest {
@Test
public void deleteTimerDeletesOnBuilder() {
TimerData eventTimer =
- TimerData.of(StateNamespaces.global(), new Instant(20145L), TimeDomain.EVENT_TIME);
+ TimerData.of(
+ StateNamespaces.global(),
+ new Instant(20145L),
+ new Instant(20145L),
+ TimeDomain.EVENT_TIME);
TimerData processingTimer =
- TimerData.of(StateNamespaces.global(), new Instant(125555555L), TimeDomain.PROCESSING_TIME);
+ TimerData.of(
+ StateNamespaces.global(),
+ new Instant(125555555L),
+ new Instant(125555555L),
+ TimeDomain.PROCESSING_TIME);
TimerData synchronizedProcessingTimer =
TimerData.of(
StateNamespaces.global(),
new Instant(98745632189L),
+ new Instant(98745632189L),
TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
internals.deleteTimer(eventTimer);
internals.deleteTimer(processingTimer);
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java
index 3f10123..3e7871b 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java
@@ -104,7 +104,12 @@ public class DoFnLifecycleManagerRemovingTransformEvaluatorTest {
try {
evaluator.onTimer(
- TimerData.of("foo", StateNamespaces.global(), new Instant(0), TimeDomain.EVENT_TIME),
+ TimerData.of(
+ "foo",
+ StateNamespaces.global(),
+ new Instant(0),
+ new Instant(0),
+ TimeDomain.EVENT_TIME),
"",
GlobalWindow.INSTANCE);
} catch (Exception e) {
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
index b2262af..055b48e 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
@@ -316,7 +316,8 @@ public class EvaluationContextTest {
StructuralKey<?> key = StructuralKey.of("foo".length(), VarIntCoder.of());
TimerData toFire =
- TimerData.of(StateNamespaces.global(), new Instant(100L), TimeDomain.EVENT_TIME);
+ TimerData.of(
+ StateNamespaces.global(), new Instant(100L), new Instant(100L), TimeDomain.EVENT_TIME);
TransformResult<?> timerResult =
StepTransformResult.withoutHold(downstreamProducer)
.withState(CopyOnAccessInMemoryStateInternals.withUnderlying(key, null))
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
index eef43c7..6515e22 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
@@ -990,9 +990,17 @@ public class WatermarkManagerTest implements Serializable {
StructuralKey<String> key = StructuralKey.of("key", StringUtf8Coder.of());
CommittedBundle<Integer> filteredBundle = multiWindowedBundle(filtered, 2, 8);
TimerData pastTimer =
- TimerData.of(StateNamespaces.global(), new Instant(250L), TimeDomain.PROCESSING_TIME);
+ TimerData.of(
+ StateNamespaces.global(),
+ new Instant(250L),
+ new Instant(250L),
+ TimeDomain.PROCESSING_TIME);
TimerData futureTimer =
- TimerData.of(StateNamespaces.global(), new Instant(4096L), TimeDomain.PROCESSING_TIME);
+ TimerData.of(
+ StateNamespaces.global(),
+ new Instant(4096L),
+ new Instant(4096L),
+ TimeDomain.PROCESSING_TIME);
TimerUpdate timers = TimerUpdate.builder(key).setTimer(pastTimer).setTimer(futureTimer).build();
manager.updateWatermarks(
createdBundle,
@@ -1125,7 +1133,8 @@ public class WatermarkManagerTest implements Serializable {
CommittedBundle<Integer> filteredBundle = multiWindowedBundle(filtered, 2, 4);
Instant upstreamHold = new Instant(2048L);
TimerData upstreamProcessingTimer =
- TimerData.of(StateNamespaces.global(), upstreamHold, TimeDomain.PROCESSING_TIME);
+ TimerData.of(
+ StateNamespaces.global(), upstreamHold, upstreamHold, TimeDomain.PROCESSING_TIME);
manager.updateWatermarks(
created,
TimerUpdate.builder(StructuralKey.of("key", StringUtf8Coder.of()))
@@ -1214,11 +1223,20 @@ public class WatermarkManagerTest implements Serializable {
manager.refreshAll();
TimerData earliestTimer =
- TimerData.of(StateNamespaces.global(), new Instant(1000), TimeDomain.EVENT_TIME);
+ TimerData.of(
+ StateNamespaces.global(), new Instant(1000), new Instant(1000), TimeDomain.EVENT_TIME);
TimerData middleTimer =
- TimerData.of(StateNamespaces.global(), new Instant(5000L), TimeDomain.EVENT_TIME);
+ TimerData.of(
+ StateNamespaces.global(),
+ new Instant(5000L),
+ new Instant(5000L),
+ TimeDomain.EVENT_TIME);
TimerData lastTimer =
- TimerData.of(StateNamespaces.global(), new Instant(10000L), TimeDomain.EVENT_TIME);
+ TimerData.of(
+ StateNamespaces.global(),
+ new Instant(10000L),
+ new Instant(10000L),
+ TimeDomain.EVENT_TIME);
StructuralKey<byte[]> key = StructuralKey.of(new byte[] {1, 4, 9}, ByteArrayCoder.of());
TimerUpdate update =
TimerUpdate.builder(key)
@@ -1290,11 +1308,23 @@ public class WatermarkManagerTest implements Serializable {
new Instant(1500L));
TimerData earliestTimer =
- TimerData.of(StateNamespaces.global(), new Instant(999L), TimeDomain.PROCESSING_TIME);
+ TimerData.of(
+ StateNamespaces.global(),
+ new Instant(999L),
+ new Instant(999L),
+ TimeDomain.PROCESSING_TIME);
TimerData middleTimer =
- TimerData.of(StateNamespaces.global(), new Instant(5000L), TimeDomain.PROCESSING_TIME);
+ TimerData.of(
+ StateNamespaces.global(),
+ new Instant(5000L),
+ new Instant(5000L),
+ TimeDomain.PROCESSING_TIME);
TimerData lastTimer =
- TimerData.of(StateNamespaces.global(), new Instant(10000L), TimeDomain.PROCESSING_TIME);
+ TimerData.of(
+ StateNamespaces.global(),
+ new Instant(10000L),
+ new Instant(10000L),
+ TimeDomain.PROCESSING_TIME);
StructuralKey<?> key = StructuralKey.of(-12L, VarLongCoder.of());
TimerUpdate update =
TimerUpdate.builder(key)
@@ -1367,13 +1397,22 @@ public class WatermarkManagerTest implements Serializable {
TimerData earliestTimer =
TimerData.of(
- StateNamespaces.global(), new Instant(999L), TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
+ StateNamespaces.global(),
+ new Instant(999L),
+ new Instant(999L),
+ TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
TimerData middleTimer =
TimerData.of(
- StateNamespaces.global(), new Instant(5000L), TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
+ StateNamespaces.global(),
+ new Instant(5000L),
+ new Instant(5000L),
+ TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
TimerData lastTimer =
TimerData.of(
- StateNamespaces.global(), new Instant(10000L), TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
+ StateNamespaces.global(),
+ new Instant(10000L),
+ new Instant(10000L),
+ TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
StructuralKey<byte[]> key = StructuralKey.of(new byte[] {2, -2, 22}, ByteArrayCoder.of());
TimerUpdate update =
TimerUpdate.builder(key)
@@ -1436,11 +1475,19 @@ public class WatermarkManagerTest implements Serializable {
TimerData initialTimer =
TimerData.of(
- timerId, StateNamespaces.global(), new Instant(5000L), TimeDomain.PROCESSING_TIME);
+ timerId,
+ StateNamespaces.global(),
+ new Instant(5000L),
+ new Instant(5000L),
+ TimeDomain.PROCESSING_TIME);
TimerData overridingTimer =
TimerData.of(
- timerId, StateNamespaces.global(), new Instant(10000L), TimeDomain.PROCESSING_TIME);
+ timerId,
+ StateNamespaces.global(),
+ new Instant(10000L),
+ new Instant(10000L),
+ TimeDomain.PROCESSING_TIME);
TimerUpdate initialUpdate = TimerUpdate.builder(key).setTimer(initialTimer).build();
TimerUpdate overridingUpdate = TimerUpdate.builder(key).setTimer(overridingTimer).build();
@@ -1483,9 +1530,19 @@ public class WatermarkManagerTest implements Serializable {
StructuralKey<?> key = StructuralKey.of(-12L, VarLongCoder.of());
TimerData initialTimer =
- TimerData.of(timerId, StateNamespaces.global(), new Instant(1000L), TimeDomain.EVENT_TIME);
+ TimerData.of(
+ timerId,
+ StateNamespaces.global(),
+ new Instant(1000L),
+ new Instant(1000L),
+ TimeDomain.EVENT_TIME);
TimerData overridingTimer =
- TimerData.of(timerId, StateNamespaces.global(), new Instant(2000L), TimeDomain.EVENT_TIME);
+ TimerData.of(
+ timerId,
+ StateNamespaces.global(),
+ new Instant(2000L),
+ new Instant(2000L),
+ TimeDomain.EVENT_TIME);
TimerUpdate initialUpdate = TimerUpdate.builder(key).setTimer(initialTimer).build();
TimerUpdate overridingUpdate = TimerUpdate.builder(key).setTimer(overridingTimer).build();
@@ -1543,9 +1600,19 @@ public class WatermarkManagerTest implements Serializable {
// Apply a timer update
StructuralKey<String> key = StructuralKey.of("key", StringUtf8Coder.of());
TimerData timer1 =
- TimerData.of("a", StateNamespaces.global(), new Instant(100), TimeDomain.EVENT_TIME);
+ TimerData.of(
+ "a",
+ StateNamespaces.global(),
+ new Instant(100),
+ new Instant(100),
+ TimeDomain.EVENT_TIME);
TimerData timer2 =
- TimerData.of("a", StateNamespaces.global(), new Instant(200), TimeDomain.EVENT_TIME);
+ TimerData.of(
+ "a",
+ StateNamespaces.global(),
+ new Instant(200),
+ new Instant(200),
+ TimeDomain.EVENT_TIME);
underTest.updateTimers(TimerUpdate.builder(key).setTimer(timer1).setTimer(timer2).build());
// Only the last timer update should be observable
@@ -1575,14 +1642,27 @@ public class WatermarkManagerTest implements Serializable {
@Test
public void timerUpdateBuilderBuildAddsAllAddedTimers() {
- TimerData set = TimerData.of(StateNamespaces.global(), new Instant(10L), TimeDomain.EVENT_TIME);
+ TimerData set =
+ TimerData.of(
+ StateNamespaces.global(), new Instant(10L), new Instant(10L), TimeDomain.EVENT_TIME);
TimerData deleted =
- TimerData.of(StateNamespaces.global(), new Instant(24L), TimeDomain.PROCESSING_TIME);
+ TimerData.of(
+ StateNamespaces.global(),
+ new Instant(24L),
+ new Instant(24L),
+ TimeDomain.PROCESSING_TIME);
TimerData completedOne =
TimerData.of(
- StateNamespaces.global(), new Instant(1024L), TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
+ StateNamespaces.global(),
+ new Instant(1024L),
+ new Instant(1024L),
+ TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
TimerData completedTwo =
- TimerData.of(StateNamespaces.global(), new Instant(2048L), TimeDomain.EVENT_TIME);
+ TimerData.of(
+ StateNamespaces.global(),
+ new Instant(2048L),
+ new Instant(2048L),
+ TimeDomain.EVENT_TIME);
TimerUpdate update =
TimerUpdate.builder(StructuralKey.of("foo", StringUtf8Coder.of()))
@@ -1599,7 +1679,8 @@ public class WatermarkManagerTest implements Serializable {
@Test
public void timerUpdateBuilderWithSetAtEndOfTime() {
Instant timerStamp = BoundedWindow.TIMESTAMP_MAX_VALUE;
- TimerData tooFar = TimerData.of(StateNamespaces.global(), timerStamp, TimeDomain.EVENT_TIME);
+ TimerData tooFar =
+ TimerData.of(StateNamespaces.global(), timerStamp, timerStamp, TimeDomain.EVENT_TIME);
TimerUpdateBuilder builder = TimerUpdate.builder(StructuralKey.empty());
thrown.expect(IllegalArgumentException.class);
@@ -1610,7 +1691,8 @@ public class WatermarkManagerTest implements Serializable {
@Test
public void timerUpdateBuilderWithSetPastEndOfTime() {
Instant timerStamp = BoundedWindow.TIMESTAMP_MAX_VALUE.plus(Duration.standardMinutes(2));
- TimerData tooFar = TimerData.of(StateNamespaces.global(), timerStamp, TimeDomain.EVENT_TIME);
+ TimerData tooFar =
+ TimerData.of(StateNamespaces.global(), timerStamp, timerStamp, TimeDomain.EVENT_TIME);
TimerUpdateBuilder builder = TimerUpdate.builder(StructuralKey.empty());
thrown.expect(IllegalArgumentException.class);
@@ -1621,7 +1703,8 @@ public class WatermarkManagerTest implements Serializable {
@Test
public void timerUpdateBuilderWithSetThenDeleteHasOnlyDeleted() {
TimerUpdateBuilder builder = TimerUpdate.builder(null);
- TimerData timer = TimerData.of(StateNamespaces.global(), Instant.now(), TimeDomain.EVENT_TIME);
+ Instant now = Instant.now();
+ TimerData timer = TimerData.of(StateNamespaces.global(), now, now, TimeDomain.EVENT_TIME);
TimerUpdate built = builder.setTimer(timer).deletedTimer(timer).build();
@@ -1632,7 +1715,8 @@ public class WatermarkManagerTest implements Serializable {
@Test
public void timerUpdateBuilderWithDeleteThenSetHasOnlySet() {
TimerUpdateBuilder builder = TimerUpdate.builder(null);
- TimerData timer = TimerData.of(StateNamespaces.global(), Instant.now(), TimeDomain.EVENT_TIME);
+ Instant now = Instant.now();
+ TimerData timer = TimerData.of(StateNamespaces.global(), now, now, TimeDomain.EVENT_TIME);
TimerUpdate built = builder.deletedTimer(timer).setTimer(timer).build();
@@ -1643,7 +1727,8 @@ public class WatermarkManagerTest implements Serializable {
@Test
public void timerUpdateBuilderWithSetAfterBuildNotAddedToBuilt() {
TimerUpdateBuilder builder = TimerUpdate.builder(null);
- TimerData timer = TimerData.of(StateNamespaces.global(), Instant.now(), TimeDomain.EVENT_TIME);
+ Instant now = Instant.now();
+ TimerData timer = TimerData.of(StateNamespaces.global(), now, now, TimeDomain.EVENT_TIME);
TimerUpdate built = builder.build();
builder.setTimer(timer);
@@ -1655,7 +1740,8 @@ public class WatermarkManagerTest implements Serializable {
@Test
public void timerUpdateBuilderWithDeleteAfterBuildNotAddedToBuilt() {
TimerUpdateBuilder builder = TimerUpdate.builder(null);
- TimerData timer = TimerData.of(StateNamespaces.global(), Instant.now(), TimeDomain.EVENT_TIME);
+ Instant now = Instant.now();
+ TimerData timer = TimerData.of(StateNamespaces.global(), now, now, TimeDomain.EVENT_TIME);
TimerUpdate built = builder.build();
builder.deletedTimer(timer);
@@ -1667,7 +1753,8 @@ public class WatermarkManagerTest implements Serializable {
@Test
public void timerUpdateBuilderWithCompletedAfterBuildNotAddedToBuilt() {
TimerUpdateBuilder builder = TimerUpdate.builder(null);
- TimerData timer = TimerData.of(StateNamespaces.global(), Instant.now(), TimeDomain.EVENT_TIME);
+ Instant now = Instant.now();
+ TimerData timer = TimerData.of(StateNamespaces.global(), now, now, TimeDomain.EVENT_TIME);
TimerUpdate built = builder.build();
builder.withCompletedTimers(ImmutableList.of(timer));
@@ -1679,7 +1766,8 @@ public class WatermarkManagerTest implements Serializable {
@Test
public void timerUpdateWithCompletedTimersNotAddedToExisting() {
TimerUpdateBuilder builder = TimerUpdate.builder(null);
- TimerData timer = TimerData.of(StateNamespaces.global(), Instant.now(), TimeDomain.EVENT_TIME);
+ Instant now = Instant.now();
+ TimerData timer = TimerData.of(StateNamespaces.global(), now, now, TimeDomain.EVENT_TIME);
TimerUpdate built = builder.build();
assertThat(built.getCompletedTimers(), emptyIterable());
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest.java
index 932fac8..efea477 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest.java
@@ -759,6 +759,7 @@ public class ExecutableStageDoFnOperatorTest {
timerInputKey.getKey(), timerInputKey.getValue()),
stateNamespace,
window.maxTimestamp(),
+ window.maxTimestamp(),
TimeDomain.EVENT_TIME);
timerInternals.setTimer(userTimer);
@@ -793,6 +794,7 @@ public class ExecutableStageDoFnOperatorTest {
timerInputKey.getKey(), timerInputKey.getValue()),
stateNamespace,
window.maxTimestamp(),
+ window.maxTimestamp(),
TimeDomain.EVENT_TIME);
operator.setTimer(
Timer.of(
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
index 5d92d2a..c8416c4 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
@@ -20,6 +20,8 @@ package org.apache.beam.runners.dataflow.worker;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
import javax.annotation.Nullable;
import org.apache.beam.runners.core.StateNamespace;
import org.apache.beam.runners.core.StateNamespaces;
@@ -29,6 +31,9 @@ import org.apache.beam.runners.dataflow.worker.windmill.Windmill.Timer;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.ExposedByteArrayInputStream;
+import org.apache.beam.sdk.util.ExposedByteArrayOutputStream;
+import org.apache.beam.sdk.util.VarInt;
import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.HashBasedTable;
@@ -278,27 +283,77 @@ class WindmillTimerInternals implements TimerInternals {
// - the GlobalWindow is currently encoded in zero bytes, so it becomes "//"
// - the Global StateNamespace is different, and becomes "/"
// - the id is totally arbitrary; currently unescaped though that could change
- String tag = timer.getTag().toStringUtf8();
+
+ ByteString tag = timer.getTag();
checkArgument(
- timer.getTag().startsWith(prefix.byteString()),
+ tag.startsWith(prefix.byteString()),
"Expected timer tag %s to start with prefix %s",
tag,
prefix.byteString());
+
+ Instant timestamp = WindmillTimeUtils.windmillToHarnessTimestamp(timer.getTimestamp());
+
+ // Parse the namespace.
int namespaceStart = prefix.byteString().size(); // drop the prefix, leave the begin slash
- int namespaceEnd = tag.indexOf('+', namespaceStart); // keep the end slash, drop the +
- String namespaceString = tag.substring(namespaceStart, namespaceEnd);
- String timerIdPlusTimerFamilyId = tag.substring(namespaceEnd + 1); // timerId+timerFamilyId
- int timerIdEnd = timerIdPlusTimerFamilyId.indexOf('+'); // end of timerId
- // if no '+' found then timerFamilyId is empty string else they have a '+' separator
- String familyId = timerIdEnd == -1 ? "" : timerIdPlusTimerFamilyId.substring(timerIdEnd + 1);
- String id =
- timerIdEnd == -1
- ? timerIdPlusTimerFamilyId
- : timerIdPlusTimerFamilyId.substring(0, timerIdEnd);
+ int namespaceEnd = namespaceStart;
+ while (namespaceEnd < tag.size() && tag.byteAt(namespaceEnd) != '+') {
+ namespaceEnd++;
+ }
+ String namespaceString = tag.substring(namespaceStart, namespaceEnd).toStringUtf8();
+
+ // Parse the timer id.
+ int timerIdStart = namespaceEnd + 1;
+ int timerIdEnd = timerIdStart;
+ while (timerIdEnd < tag.size() && tag.byteAt(timerIdEnd) != '+') {
+ timerIdEnd++;
+ }
+ String timerId = tag.substring(timerIdStart, timerIdEnd).toStringUtf8();
+
+ // Parse the timer family.
+ int timerFamilyStart = timerIdEnd + 1;
+ int timerFamilyEnd = timerFamilyStart;
+ while (timerFamilyEnd < tag.size() && tag.byteAt(timerFamilyEnd) != '+') {
+ timerFamilyEnd++;
+ }
+ // For backwards compatibility, handle the case were the timer family isn't present.
+ String timerFamily =
+ (timerFamilyStart < tag.size())
+ ? tag.substring(timerFamilyStart, timerFamilyEnd).toStringUtf8()
+ : "";
+
+ // Parse the output timestamp.
+ int outputTimestampStart = timerFamilyEnd + 1;
+ int outputTimestampEnd = outputTimestampStart;
+ while (outputTimestampEnd < tag.size() && tag.byteAt(outputTimestampEnd) != '+') {
+ outputTimestampEnd++;
+ }
+
+ // For backwards compatibility, handle the case were the output timestamp isn't present.
+ Instant outputTimestamp = timestamp;
+ if ((outputTimestampStart < tag.size())) {
+ try {
+ outputTimestamp =
+ new Instant(
+ VarInt.decodeLong(
+ tag.substring(outputTimestampStart, outputTimestampEnd).newInput()));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
StateNamespace namespace = StateNamespaces.fromString(namespaceString, windowCoder);
- Instant timestamp = WindmillTimeUtils.windmillToHarnessTimestamp(timer.getTimestamp());
+ return TimerData.of(
+ timerId,
+ timerFamily,
+ namespace,
+ timestamp,
+ outputTimestamp,
+ timerTypeToTimeDomain(timer.getType()));
+ }
- return TimerData.of(id, familyId, namespace, timestamp, timerTypeToTimeDomain(timer.getType()));
+ private static boolean useNewTimerTagEncoding(TimerData timerData) {
+ return !timerData.getTimerFamilyId().isEmpty()
+ || !timerData.getOutputTimestamp().equals(timerData.getTimestamp());
}
/**
@@ -309,27 +364,41 @@ class WindmillTimerInternals implements TimerInternals {
*/
public static ByteString timerTag(WindmillNamespacePrefix prefix, TimerData timerData) {
String tagString;
- // Timers without timerFamily would have timerFamily would be an empty string
- if ("".equals(timerData.getTimerFamilyId())) {
- tagString =
- new StringBuilder()
- .append(prefix.byteString().toStringUtf8()) // this never ends with a slash
- .append(timerData.getNamespace().stringKey()) // this must begin and end with a slash
- .append('+')
- .append(timerData.getTimerId()) // this is arbitrary; currently unescaped
- .toString();
- } else {
- tagString =
- new StringBuilder()
- .append(prefix.byteString().toStringUtf8()) // this never ends with a slash
- .append(timerData.getNamespace().stringKey()) // this must begin and end with a slash
- .append('+')
- .append(timerData.getTimerId()) // this is arbitrary; currently unescaped
- .append('+')
- .append(timerData.getTimerFamilyId())
- .toString();
+ ExposedByteArrayOutputStream out = new ExposedByteArrayOutputStream();
+ try {
+ if (useNewTimerTagEncoding(timerData)) {
+ tagString =
+ new StringBuilder()
+ .append(prefix.byteString().toStringUtf8()) // this never ends with a slash
+ .append(
+ timerData.getNamespace().stringKey()) // this must begin and end with a slash
+ .append('+')
+ .append(timerData.getTimerId()) // this is arbitrary; currently unescaped
+ .append('+')
+ .append(timerData.getTimerFamilyId())
+ .toString();
+ out.write(tagString.getBytes(StandardCharsets.UTF_8));
+ // Only encode the extra 9 bytes if the output timestamp is different than the timestamp;
+ if (!timerData.getOutputTimestamp().equals(timerData.getTimestamp())) {
+ out.write('+');
+ VarInt.encode(timerData.getOutputTimestamp().getMillis(), out);
+ }
+ } else {
+ // Timers without timerFamily would have timerFamily would be an empty string
+ tagString =
+ new StringBuilder()
+ .append(prefix.byteString().toStringUtf8()) // this never ends with a slash
+ .append(
+ timerData.getNamespace().stringKey()) // this must begin and end with a slash
+ .append('+')
+ .append(timerData.getTimerId()) // this is arbitrary; currently unescaped
+ .toString();
+ out.write(tagString.getBytes(StandardCharsets.UTF_8));
+ }
+ return ByteString.readFrom(new ExposedByteArrayInputStream(out.toByteArray()));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
}
- return ByteString.copyFromUtf8(tagString);
}
/**
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowFnsTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowFnsTest.java
index bf9e875..0d5a883 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowFnsTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowFnsTest.java
@@ -149,6 +149,7 @@ public class StreamingGroupAlsoByWindowFnsTest {
TimerData.of(
namespace,
timestamp,
+ timestamp,
type == Windmill.Timer.Type.WATERMARK
? TimeDomain.EVENT_TIME
: TimeDomain.PROCESSING_TIME)))
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingKeyedWorkItemSideInputDoFnRunnerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingKeyedWorkItemSideInputDoFnRunnerTest.java
index 790a7d5..5d6f1e7 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingKeyedWorkItemSideInputDoFnRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingKeyedWorkItemSideInputDoFnRunnerTest.java
@@ -164,6 +164,7 @@ public class StreamingKeyedWorkItemSideInputDoFnRunnerTest {
return TimerData.of(
StateNamespaces.window(IntervalWindow.getCoder(), window),
timestamp,
+ timestamp,
type == Windmill.Timer.Type.WATERMARK ? TimeDomain.EVENT_TIME : TimeDomain.PROCESSING_TIME);
}
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java
index ff957c1..d5071d7 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java
@@ -143,7 +143,11 @@ public class StreamingModeExecutionContextTest {
TimerInternals timerInternals = stepContext.timerInternals();
timerInternals.setTimer(
- TimerData.of(new StateNamespaceForTest("key"), new Instant(5000), TimeDomain.EVENT_TIME));
+ TimerData.of(
+ new StateNamespaceForTest("key"),
+ new Instant(5000),
+ new Instant(5000),
+ TimeDomain.EVENT_TIME));
executionContext.flushState();
Windmill.Timer timer = outputBuilder.buildPartial().getOutputTimers(0);
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputFetcherTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputFetcherTest.java
index c1a9945..c9a4227 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputFetcherTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputFetcherTest.java
@@ -203,6 +203,7 @@ public class StreamingSideInputFetcherTest {
return TimerData.of(
StateNamespaces.window(IntervalWindow.getCoder(), createWindow(timestamp)),
new Instant(timestamp),
+ new Instant(timestamp),
TimeDomain.EVENT_TIME);
}
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/UserParDoFnFactoryTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/UserParDoFnFactoryTest.java
index 5e688f3..45889df 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/UserParDoFnFactoryTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/UserParDoFnFactoryTest.java
@@ -426,6 +426,7 @@ public class UserParDoFnFactoryTest {
SimpleParDoFn.CLEANUP_TIMER_ID,
firstWindowNamespace,
firstWindow.maxTimestamp().plus(1L),
+ firstWindow.maxTimestamp().plus(1L),
TimeDomain.EVENT_TIME))
.thenReturn(null);
@@ -441,6 +442,7 @@ public class UserParDoFnFactoryTest {
SimpleParDoFn.CLEANUP_TIMER_ID,
secondWindowNamespace,
secondWindow.maxTimestamp().plus(1L),
+ secondWindow.maxTimestamp().plus(1L),
TimeDomain.EVENT_TIME))
.thenReturn(null);
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItemTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItemTest.java
index de62567..40d4c74 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItemTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItemTest.java
@@ -166,6 +166,7 @@ public class WindmillKeyedWorkItemTest {
TimerData.of(
ns,
new Instant(timestamp),
+ new Instant(timestamp),
WindmillTimerInternals.timerTypeToTimeDomain(type))))
.setTimestamp(WindmillTimeUtils.harnessToWindmillTimestamp(new Instant(timestamp)))
.setType(type)
@@ -174,7 +175,7 @@ public class WindmillKeyedWorkItemTest {
}
private static TimerData makeTimer(StateNamespace ns, long timestamp, TimeDomain domain) {
- return TimerData.of(ns, new Instant(timestamp), domain);
+ return TimerData.of(ns, new Instant(timestamp), new Instant(timestamp), domain);
}
@Test
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternalsTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternalsTest.java
index d0f0f8a..3c1f6bd 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternalsTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternalsTest.java
@@ -79,25 +79,43 @@ public class WindmillTimerInternalsTest {
for (TimeDomain timeDomain : TimeDomain.values()) {
for (WindmillNamespacePrefix prefix : WindmillNamespacePrefix.values()) {
for (Instant timestamp : TEST_TIMESTAMPS) {
- TimerData anonymousTimerData = TimerData.of(namespace, timestamp, timeDomain);
-
- assertThat(
- WindmillTimerInternals.windmillTimerToTimerData(
- prefix,
- WindmillTimerInternals.timerDataToWindmillTimer(
- stateFamily, prefix, anonymousTimerData),
- coder),
- equalTo(anonymousTimerData));
-
- for (String timerId : TEST_TIMER_IDS) {
- TimerData timerData = TimerData.of(timerId, namespace, timestamp, timeDomain);
+ List<TimerData> anonymousTimers =
+ ImmutableList.of(
+ TimerData.of(namespace, timestamp, timestamp, timeDomain),
+ TimerData.of(namespace, timestamp, timestamp.minus(1), timeDomain));
+ for (TimerData timer : anonymousTimers) {
assertThat(
WindmillTimerInternals.windmillTimerToTimerData(
prefix,
- WindmillTimerInternals.timerDataToWindmillTimer(
- stateFamily, prefix, timerData),
+ WindmillTimerInternals.timerDataToWindmillTimer(stateFamily, prefix, timer),
coder),
- equalTo(timerData));
+ equalTo(timer));
+ }
+
+ for (String timerId : TEST_TIMER_IDS) {
+ List<TimerData> timers =
+ ImmutableList.of(
+ TimerData.of(timerId, namespace, timestamp, timestamp, timeDomain),
+ TimerData.of(
+ timerId, "family", namespace, timestamp, timestamp, timeDomain),
+ TimerData.of(timerId, namespace, timestamp, timestamp.minus(1), timeDomain),
+ TimerData.of(
+ timerId,
+ "family",
+ namespace,
+ timestamp,
+ timestamp.minus(1),
+ timeDomain));
+
+ for (TimerData timer : timers) {
+ assertThat(
+ WindmillTimerInternals.windmillTimerToTimerData(
+ prefix,
+ WindmillTimerInternals.timerDataToWindmillTimer(
+ stateFamily, prefix, timer),
+ coder),
+ equalTo(timer));
+ }
}
}
}
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java
index b9d6216..4dd4441 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java
@@ -277,6 +277,7 @@ public class DoFnOp<InT, FnOutT, OutT> implements Op<InT, OutT, Void> {
bundleCheckTimerId,
StateNamespaces.global(),
nextBundleCheckTime,
+ nextBundleCheckTime,
TimeDomain.PROCESSING_TIME);
bundleTimerScheduler.schedule(
new KeyedTimerData<>(new byte[0], null, timerData), nextBundleCheckTime.getMillis());
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/KeyedTimerData.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/KeyedTimerData.java
index a6214be..6a51967 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/KeyedTimerData.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/KeyedTimerData.java
@@ -160,7 +160,7 @@ public class KeyedTimerData<K> implements Comparable<KeyedTimerData<K>> {
final StateNamespace namespace =
StateNamespaces.fromString(STRING_CODER.decode(inStream), windowCoder);
final TimeDomain domain = TimeDomain.valueOf(STRING_CODER.decode(inStream));
- final TimerData timer = TimerData.of(timerId, namespace, timestamp, domain);
+ final TimerData timer = TimerData.of(timerId, namespace, timestamp, timestamp, domain);
byte[] keyBytes = null;
K key = null;
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java
index 22c2ac9..e5ba2d3 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java
@@ -218,6 +218,7 @@ public class SamzaTimerInternalsFactory<K> implements TimerInternalsFactory<K> {
timerData.getTimerId(),
timerData.getNamespace(),
new Instant(lastTimestamp),
+ new Instant(lastTimestamp),
timerData.getDomain());
deleteTimer(lastTimerData, false);
}
@@ -244,12 +245,14 @@ public class SamzaTimerInternalsFactory<K> implements TimerInternalsFactory<K> {
@Override
public void deleteTimer(StateNamespace namespace, String timerId, TimeDomain timeDomain) {
- deleteTimer(TimerData.of(timerId, namespace, Instant.now(), timeDomain));
+ Instant now = Instant.now();
+ deleteTimer(TimerData.of(timerId, namespace, now, now, timeDomain));
}
@Override
public void deleteTimer(StateNamespace namespace, String timerId, String timerFamilyId) {
- deleteTimer(TimerData.of(timerId, namespace, Instant.now(), TimeDomain.EVENT_TIME));
+ Instant now = Instant.now();
+ deleteTimer(TimerData.of(timerId, namespace, now, now, TimeDomain.EVENT_TIME));
}
@Override
@@ -455,7 +458,11 @@ public class SamzaTimerInternalsFactory<K> implements TimerInternalsFactory<K> {
keyBytes,
timerKey.key,
TimerInternals.TimerData.of(
- timerKey.timerId, timerKey.stateNamespace, new Instant(timestamp), domain));
+ timerKey.timerId,
+ timerKey.stateNamespace,
+ new Instant(timestamp),
+ new Instant(timestamp),
+ domain));
}
private TimerKey(K key, StateNamespace stateNamespace, String timerId) {
diff --git a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/KeyedTimerDataTest.java b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/KeyedTimerDataTest.java
index e521774..c16b4ce 100644
--- a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/KeyedTimerDataTest.java
+++ b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/KeyedTimerDataTest.java
@@ -36,7 +36,7 @@ public class KeyedTimerDataTest {
public void testCoder() throws Exception {
final TimerInternals.TimerData td =
TimerInternals.TimerData.of(
- "timer", StateNamespaces.global(), new Instant(), TimeDomain.EVENT_TIME);
+ "timer", StateNamespaces.global(), new Instant(), new Instant(), TimeDomain.EVENT_TIME);
final String key = "timer-key";
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
diff --git a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactoryTest.java b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactoryTest.java
index 0f536c3..d0d6126 100644
--- a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactoryTest.java
+++ b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactoryTest.java
@@ -144,11 +144,13 @@ public class SamzaTimerInternalsFactoryTest {
final StateNamespace nameSpace = StateNamespaces.global();
final TimerInternals timerInternals = timerInternalsFactory.timerInternalsForKey("testKey");
final TimerInternals.TimerData timer1 =
- TimerInternals.TimerData.of("timer1", nameSpace, new Instant(10), TimeDomain.EVENT_TIME);
+ TimerInternals.TimerData.of(
+ "timer1", nameSpace, new Instant(10), new Instant(10), TimeDomain.EVENT_TIME);
timerInternals.setTimer(timer1);
final TimerInternals.TimerData timer2 =
- TimerInternals.TimerData.of("timer2", nameSpace, new Instant(100), TimeDomain.EVENT_TIME);
+ TimerInternals.TimerData.of(
+ "timer2", nameSpace, new Instant(100), new Instant(100), TimeDomain.EVENT_TIME);
timerInternals.setTimer(timer2);
timerInternalsFactory.setInputWatermark(new Instant(5));
@@ -181,11 +183,13 @@ public class SamzaTimerInternalsFactoryTest {
final StateNamespace nameSpace = StateNamespaces.global();
final TimerInternals timerInternals = timerInternalsFactory.timerInternalsForKey(key);
final TimerInternals.TimerData timer1 =
- TimerInternals.TimerData.of("timer1", nameSpace, new Instant(10), TimeDomain.EVENT_TIME);
+ TimerInternals.TimerData.of(
+ "timer1", nameSpace, new Instant(10), new Instant(10), TimeDomain.EVENT_TIME);
timerInternals.setTimer(timer1);
final TimerInternals.TimerData timer2 =
- TimerInternals.TimerData.of("timer2", nameSpace, new Instant(100), TimeDomain.EVENT_TIME);
+ TimerInternals.TimerData.of(
+ "timer2", nameSpace, new Instant(100), new Instant(100), TimeDomain.EVENT_TIME);
timerInternals.setTimer(timer2);
store.close();
@@ -226,12 +230,12 @@ public class SamzaTimerInternalsFactoryTest {
final TimerInternals timerInternals = timerInternalsFactory.timerInternalsForKey("testKey");
final TimerInternals.TimerData timer1 =
TimerInternals.TimerData.of(
- "timer1", nameSpace, new Instant(10), TimeDomain.PROCESSING_TIME);
+ "timer1", nameSpace, new Instant(10), new Instant(10), TimeDomain.PROCESSING_TIME);
timerInternals.setTimer(timer1);
final TimerInternals.TimerData timer2 =
TimerInternals.TimerData.of(
- "timer2", nameSpace, new Instant(100), TimeDomain.PROCESSING_TIME);
+ "timer2", nameSpace, new Instant(100), new Instant(100), TimeDomain.PROCESSING_TIME);
timerInternals.setTimer(timer2);
assertEquals(2, timerRegistry.timers.size());
@@ -267,16 +271,19 @@ public class SamzaTimerInternalsFactoryTest {
final StateNamespace nameSpace = StateNamespaces.global();
final TimerInternals timerInternals = timerInternalsFactory.timerInternalsForKey("testKey");
final TimerInternals.TimerData timer1 =
- TimerInternals.TimerData.of("timerId", nameSpace, new Instant(10), TimeDomain.EVENT_TIME);
+ TimerInternals.TimerData.of(
+ "timerId", nameSpace, new Instant(10), new Instant(10), TimeDomain.EVENT_TIME);
timerInternals.setTimer(timer1);
// this timer should override the first timer
final TimerInternals.TimerData timer2 =
- TimerInternals.TimerData.of("timerId", nameSpace, new Instant(100), TimeDomain.EVENT_TIME);
+ TimerInternals.TimerData.of(
+ "timerId", nameSpace, new Instant(100), new Instant(100), TimeDomain.EVENT_TIME);
timerInternals.setTimer(timer2);
final TimerInternals.TimerData timer3 =
- TimerInternals.TimerData.of("timerId2", nameSpace, new Instant(200), TimeDomain.EVENT_TIME);
+ TimerInternals.TimerData.of(
+ "timerId2", nameSpace, new Instant(200), new Instant(200), TimeDomain.EVENT_TIME);
timerInternals.setTimer(timer3);
// this timer shouldn't override since it has a different id
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
index c4df77b..c673cbd 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
@@ -270,10 +270,10 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
@Experimental(Kind.TIMERS)
public abstract class OnTimerContext extends WindowedContext {
- /** Returns the timestamp of the current timer. */
+ /** Returns the output timestamp of the current timer. */
public abstract Instant timestamp();
- /** Returns the output timestamp of the current timer. */
+ /** Returns the firing timestamp of the current timer. */
public abstract Instant fireTimestamp();
/** Returns the window in which the timer is firing. */
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index c71e963..bfae241 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -3541,6 +3541,44 @@ public class ParDoTest implements Serializable {
UsesTimersInParDo.class,
DataflowPortabilityApiUnsupported.class
})
+ public void testOutputTimestampDefault() throws Exception {
+ final String timerId = "foo";
+ DoFn<KV<String, Long>, Long> fn1 =
+ new DoFn<KV<String, Long>, Long>() {
+
+ @TimerId(timerId)
+ private final TimerSpec timer = TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+ @ProcessElement
+ public void processElement(
+ @TimerId(timerId) Timer timer, @Timestamp Instant timestamp) {
+ timer
+ .withOutputTimestamp(timestamp.plus(Duration.millis(5)))
+ .set(timestamp.plus(Duration.millis(10)));
+ }
+
+ @OnTimer(timerId)
+ public void onTimer(@Timestamp Instant timestamp, OutputReceiver<Long> o) {
+ o.output(timestamp.getMillis());
+ }
+ };
+
+ PCollection<Long> output =
+ pipeline
+ .apply(Create.timestamped(TimestampedValue.of(KV.of("hello", 1L), new Instant(3))))
+ .setIsBoundedInternal(IsBounded.UNBOUNDED)
+ .apply("first", ParDo.of(fn1));
+
+ PAssert.that(output).containsInAnyOrder(new Instant(8).getMillis()); // result output
+ pipeline.run();
+ }
+
+ @Test
+ @Category({
+ ValidatesRunner.class,
+ UsesTimersInParDo.class,
+ DataflowPortabilityApiUnsupported.class
+ })
public void testOutOfBoundsEventTimeTimerHold() throws Exception {
final String timerId = "foo";