You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by re...@apache.org on 2020/05/16 06:40:18 UTC

[beam] branch master updated: Merge pull request #11725: [BEAM-10015] Fix output timestamp on dataflow runner

This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new c395c84  Merge pull request #11725: [BEAM-10015] Fix output timestamp on dataflow runner
c395c84 is described below

commit c395c84943fbd4875d2021f0cbf744e982c9fe31
Author: reuvenlax <re...@google.com>
AuthorDate: Fri May 15 23:39:53 2020 -0700

    Merge pull request #11725: [BEAM-10015] Fix output timestamp on dataflow runner
---
 .../operators/ApexTimerInternalsTest.java          |   6 +-
 .../beam/runners/core/ReduceFnContextFactory.java  |   4 +-
 .../core/SplittableParDoViaKeyedWorkItems.java     |   3 +-
 .../apache/beam/runners/core/TimerInternals.java   |  32 +----
 .../runners/core/InMemoryTimerInternalsTest.java   |  38 ++++--
 .../beam/runners/core/KeyedWorkItemCoderTest.java  |   6 +-
 .../apache/beam/runners/core/ReduceFnTester.java   |   4 +-
 .../beam/runners/core/SimpleDoFnRunnerTest.java    |   1 +
 .../SimplePushbackSideInputDoFnRunnerTest.java     |   1 +
 .../beam/runners/core/TimerInternalsTest.java      |  37 ++++--
 .../core/triggers/TriggerStateMachineTester.java   |   4 +-
 .../runners/direct/DirectTimerInternalsTest.java   |  26 +++-
 ...cycleManagerRemovingTransformEvaluatorTest.java |   7 +-
 .../beam/runners/direct/EvaluationContextTest.java |   3 +-
 .../beam/runners/direct/WatermarkManagerTest.java  | 148 ++++++++++++++++-----
 .../streaming/ExecutableStageDoFnOperatorTest.java |   2 +
 .../dataflow/worker/WindmillTimerInternals.java    | 137 ++++++++++++++-----
 .../worker/StreamingGroupAlsoByWindowFnsTest.java  |   1 +
 ...eamingKeyedWorkItemSideInputDoFnRunnerTest.java |   1 +
 .../worker/StreamingModeExecutionContextTest.java  |   6 +-
 .../worker/StreamingSideInputFetcherTest.java      |   1 +
 .../dataflow/worker/UserParDoFnFactoryTest.java    |   2 +
 .../dataflow/worker/WindmillKeyedWorkItemTest.java |   3 +-
 .../worker/WindmillTimerInternalsTest.java         |  48 ++++---
 .../apache/beam/runners/samza/runtime/DoFnOp.java  |   1 +
 .../beam/runners/samza/runtime/KeyedTimerData.java |   2 +-
 .../samza/runtime/SamzaTimerInternalsFactory.java  |  13 +-
 .../runners/samza/runtime/KeyedTimerDataTest.java  |   2 +-
 .../runtime/SamzaTimerInternalsFactoryTest.java    |  25 ++--
 .../java/org/apache/beam/sdk/transforms/DoFn.java  |   4 +-
 .../org/apache/beam/sdk/transforms/ParDoTest.java  |  38 ++++++
 31 files changed, 439 insertions(+), 167 deletions(-)

diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternalsTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternalsTest.java
index 180d2fe..f4e8d10 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternalsTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternalsTest.java
@@ -128,7 +128,11 @@ public class ApexTimerInternalsTest {
     TimerDataCoderV2 timerDataCoder = TimerDataCoderV2.of(GlobalWindow.Coder.INSTANCE);
     TimerData timerData =
         TimerData.of(
-            "arbitrary-id", StateNamespaces.global(), new Instant(0), TimeDomain.EVENT_TIME);
+            "arbitrary-id",
+            StateNamespaces.global(),
+            new Instant(0),
+            new Instant(0),
+            TimeDomain.EVENT_TIME);
     String key = "key";
     ApexTimerInternals<String> timerInternals = new ApexTimerInternals<>(timerDataCoder);
     timerInternals.setContext(key, StringUtf8Coder.of(), Instant.now(), null);
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java
index ac6185b..8bdcc87 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java
@@ -132,12 +132,12 @@ class ReduceFnContextFactory<K, InputT, OutputT, W extends BoundedWindow> {
 
     @Override
     public void setTimer(Instant timestamp, TimeDomain timeDomain) {
-      timerInternals.setTimer(TimerData.of(namespace, timestamp, timeDomain));
+      timerInternals.setTimer(TimerData.of(namespace, timestamp, timestamp, timeDomain));
     }
 
     @Override
     public void deleteTimer(Instant timestamp, TimeDomain timeDomain) {
-      timerInternals.deleteTimer(TimerData.of(namespace, timestamp, timeDomain));
+      timerInternals.deleteTimer(TimerData.of(namespace, timestamp, timestamp, timeDomain));
     }
 
     @Override
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
index 01612f0..3ef996f 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
@@ -545,7 +545,8 @@ public class SplittableParDoViaKeyedWorkItems {
       holdState.add(futureOutputWatermark);
       // Set a timer to continue processing this element.
       timerInternals.setTimer(
-          TimerInternals.TimerData.of(stateNamespace, wakeupTime, TimeDomain.PROCESSING_TIME));
+          TimerInternals.TimerData.of(
+              stateNamespace, wakeupTime, wakeupTime, TimeDomain.PROCESSING_TIME));
     }
 
     private DoFnInvoker.ArgumentProvider<InputT, OutputT> wrapContextAsStartBundle(
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java
index 5ea3fb7..413884d 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java
@@ -214,27 +214,6 @@ public interface TimerInternals {
     }
 
     /**
-     * Construct a {@link TimerData} for the given parameters, where the timer ID is automatically
-     * generated. Construct a {@link TimerData} for the given parameters except for {@code
-     * outputTimestamp}. {@code outputTimestamp} is set to timer {@code timestamp}.
-     */
-    public static TimerData of(
-        String timerId, StateNamespace namespace, Instant timestamp, TimeDomain domain) {
-      return new AutoValue_TimerInternals_TimerData(
-          timerId, "", namespace, timestamp, timestamp, domain);
-    }
-
-    public static TimerData of(
-        String timerId,
-        String timerFamilyId,
-        StateNamespace namespace,
-        Instant timestamp,
-        TimeDomain domain) {
-      return new AutoValue_TimerInternals_TimerData(
-          timerId, timerFamilyId, namespace, timestamp, timestamp, domain);
-    }
-
-    /**
      * Construct a {@link TimerData} for the given parameters except for timer ID. Timer ID is
      * deterministically generated from the {@code timestamp} and {@code domain}.
      */
@@ -250,15 +229,6 @@ public interface TimerInternals {
     }
 
     /**
-     * Construct a {@link TimerData} for the given parameters, where the timer ID is
-     * deterministically generated from the {@code timestamp} and {@code domain}. Also, output
-     * timestamp is set to the timer timestamp by default.
-     */
-    public static TimerData of(StateNamespace namespace, Instant timestamp, TimeDomain domain) {
-      return of(namespace, timestamp, timestamp, domain);
-    }
-
-    /**
      * {@inheritDoc}.
      *
      * <p>Used for sorting {@link TimerData} by timestamp. Furthermore, we compare timers by all the
@@ -364,7 +334,7 @@ public interface TimerInternals {
           StateNamespaces.fromString(STRING_CODER.decode(inStream), windowCoder);
       Instant timestamp = INSTANT_CODER.decode(inStream);
       TimeDomain domain = TimeDomain.valueOf(STRING_CODER.decode(inStream));
-      return TimerData.of(timerId, namespace, timestamp, domain);
+      return TimerData.of(timerId, namespace, timestamp, timestamp, domain);
     }
 
     @Override
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryTimerInternalsTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryTimerInternalsTest.java
index 3ab7932..bc997c3 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryTimerInternalsTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryTimerInternalsTest.java
@@ -39,8 +39,10 @@ public class InMemoryTimerInternalsTest {
   @Test
   public void testFiringEventTimers() throws Exception {
     InMemoryTimerInternals underTest = new InMemoryTimerInternals();
-    TimerData eventTimer1 = TimerData.of(ID1, NS1, new Instant(19), TimeDomain.EVENT_TIME);
-    TimerData eventTimer2 = TimerData.of(ID2, NS1, new Instant(29), TimeDomain.EVENT_TIME);
+    TimerData eventTimer1 =
+        TimerData.of(ID1, NS1, new Instant(19), new Instant(19), TimeDomain.EVENT_TIME);
+    TimerData eventTimer2 =
+        TimerData.of(ID2, NS1, new Instant(29), new Instant(29), TimeDomain.EVENT_TIME);
 
     underTest.setTimer(eventTimer1);
     underTest.setTimer(eventTimer2);
@@ -79,7 +81,7 @@ public class InMemoryTimerInternalsTest {
     underTest.advanceInputWatermark(laterTimestamp.plus(1L));
     assertThat(
         underTest.removeNextEventTimer(),
-        equalTo(TimerData.of(ID1, "", NS1, laterTimestamp, TimeDomain.EVENT_TIME)));
+        equalTo(TimerData.of(ID1, "", NS1, laterTimestamp, laterTimestamp, TimeDomain.EVENT_TIME)));
   }
 
   @Test
@@ -107,8 +109,10 @@ public class InMemoryTimerInternalsTest {
   @Test
   public void testFiringProcessingTimeTimers() throws Exception {
     InMemoryTimerInternals underTest = new InMemoryTimerInternals();
-    TimerData processingTime1 = TimerData.of(NS1, new Instant(19), TimeDomain.PROCESSING_TIME);
-    TimerData processingTime2 = TimerData.of(NS1, new Instant(29), TimeDomain.PROCESSING_TIME);
+    TimerData processingTime1 =
+        TimerData.of(NS1, new Instant(19), new Instant(19), TimeDomain.PROCESSING_TIME);
+    TimerData processingTime2 =
+        TimerData.of(NS1, new Instant(29), new Instant(29), TimeDomain.PROCESSING_TIME);
 
     underTest.setTimer(processingTime1);
     underTest.setTimer(processingTime2);
@@ -136,14 +140,20 @@ public class InMemoryTimerInternalsTest {
   @Test
   public void testTimerOrdering() throws Exception {
     InMemoryTimerInternals underTest = new InMemoryTimerInternals();
-    TimerData eventTime1 = TimerData.of(NS1, new Instant(19), TimeDomain.EVENT_TIME);
-    TimerData processingTime1 = TimerData.of(NS1, new Instant(19), TimeDomain.PROCESSING_TIME);
+    TimerData eventTime1 =
+        TimerData.of(NS1, new Instant(19), new Instant(19), TimeDomain.EVENT_TIME);
+    TimerData processingTime1 =
+        TimerData.of(NS1, new Instant(19), new Instant(19), TimeDomain.PROCESSING_TIME);
     TimerData synchronizedProcessingTime1 =
-        TimerData.of(NS1, new Instant(19), TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
-    TimerData eventTime2 = TimerData.of(NS1, new Instant(29), TimeDomain.EVENT_TIME);
-    TimerData processingTime2 = TimerData.of(NS1, new Instant(29), TimeDomain.PROCESSING_TIME);
+        TimerData.of(
+            NS1, new Instant(19), new Instant(19), TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
+    TimerData eventTime2 =
+        TimerData.of(NS1, new Instant(29), new Instant(29), TimeDomain.EVENT_TIME);
+    TimerData processingTime2 =
+        TimerData.of(NS1, new Instant(29), new Instant(29), TimeDomain.PROCESSING_TIME);
     TimerData synchronizedProcessingTime2 =
-        TimerData.of(NS1, new Instant(29), TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
+        TimerData.of(
+            NS1, new Instant(29), new Instant(29), TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
 
     underTest.setTimer(processingTime1);
     underTest.setTimer(eventTime1);
@@ -176,8 +186,10 @@ public class InMemoryTimerInternalsTest {
   @Test
   public void testDeduplicate() throws Exception {
     InMemoryTimerInternals underTest = new InMemoryTimerInternals();
-    TimerData eventTime = TimerData.of(NS1, new Instant(19), TimeDomain.EVENT_TIME);
-    TimerData processingTime = TimerData.of(NS1, new Instant(19), TimeDomain.PROCESSING_TIME);
+    TimerData eventTime =
+        TimerData.of(NS1, new Instant(19), new Instant(19), TimeDomain.EVENT_TIME);
+    TimerData processingTime =
+        TimerData.of(NS1, new Instant(19), new Instant(19), TimeDomain.PROCESSING_TIME);
     underTest.setTimer(eventTime);
     underTest.setTimer(eventTime);
     underTest.setTimer(processingTime);
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/KeyedWorkItemCoderTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/KeyedWorkItemCoderTest.java
index 1aebb03..5de5749 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/KeyedWorkItemCoderTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/KeyedWorkItemCoderTest.java
@@ -43,7 +43,11 @@ public class KeyedWorkItemCoderTest {
   public void testEncodeDecodeEqual() throws Exception {
     Iterable<TimerData> timers =
         ImmutableList.of(
-            TimerData.of(StateNamespaces.global(), new Instant(500L), TimeDomain.EVENT_TIME));
+            TimerData.of(
+                StateNamespaces.global(),
+                new Instant(500L),
+                new Instant(500L),
+                TimeDomain.EVENT_TIME));
     Iterable<WindowedValue<Integer>> elements =
         ImmutableList.of(
             WindowedValue.valueInGlobalWindow(1),
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
index f316d07..0e2e9a3 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
@@ -570,7 +570,8 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
     ReduceFnRunner<String, InputT, OutputT, W> runner = createRunner();
     ArrayList<TimerData> timers = new ArrayList<>(1);
     timers.add(
-        TimerData.of(StateNamespaces.window(windowFn.windowCoder(), window), timestamp, domain));
+        TimerData.of(
+            StateNamespaces.window(windowFn.windowCoder(), window), timestamp, timestamp, domain));
     runner.onTimers(timers);
     runner.persist();
   }
@@ -583,6 +584,7 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
           TimerData.of(
               StateNamespaces.window(windowFn.windowCoder(), window),
               timer.getTimestamp(),
+              timer.getTimestamp(),
               timer.getValue()));
     }
     runner.onTimers(timerData);
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
index a1c3d72..9daf372 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
@@ -441,6 +441,7 @@ public class SimpleDoFnRunnerTest {
           TimerData.of(
               DoFnWithTimers.TIMER_ID,
               StateNamespaces.window(windowCoder, (W) context.window()),
+              context.fireTimestamp(),
               context.timestamp(),
               context.timeDomain()));
     }
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunnerTest.java
index f9d4296..a0a6102 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunnerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunnerTest.java
@@ -318,6 +318,7 @@ public class SimplePushbackSideInputDoFnRunnerTest {
                 timerId,
                 StateNamespaces.window(IntervalWindow.getCoder(), window),
                 timestamp,
+                timestamp,
                 TimeDomain.EVENT_TIME)));
   }
 
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/TimerInternalsTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/TimerInternalsTest.java
index ab2978f..c6a36bd 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/TimerInternalsTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/TimerInternalsTest.java
@@ -42,7 +42,11 @@ public class TimerInternalsTest {
     CoderProperties.coderDecodeEncodeEqual(
         TimerDataCoderV2.of(GlobalWindow.Coder.INSTANCE),
         TimerData.of(
-            "arbitrary-id", StateNamespaces.global(), new Instant(0), TimeDomain.EVENT_TIME));
+            "arbitrary-id",
+            StateNamespaces.global(),
+            new Instant(0),
+            new Instant(0),
+            TimeDomain.EVENT_TIME));
 
     Coder<IntervalWindow> windowCoder = IntervalWindow.getCoder();
     CoderProperties.coderDecodeEncodeEqual(
@@ -52,6 +56,7 @@ public class TimerInternalsTest {
             StateNamespaces.window(
                 windowCoder, new IntervalWindow(new Instant(0), new Instant(100))),
             new Instant(99),
+            new Instant(99),
             TimeDomain.PROCESSING_TIME));
   }
 
@@ -64,10 +69,12 @@ public class TimerInternalsTest {
   public void testCompareEqual() {
     Instant timestamp = new Instant(100);
     StateNamespace namespace = StateNamespaces.global();
-    TimerData timer = TimerData.of("id", namespace, timestamp, TimeDomain.EVENT_TIME);
+    TimerData timer = TimerData.of("id", namespace, timestamp, timestamp, TimeDomain.EVENT_TIME);
 
     assertThat(
-        timer, comparesEqualTo(TimerData.of("id", namespace, timestamp, TimeDomain.EVENT_TIME)));
+        timer,
+        comparesEqualTo(
+            TimerData.of("id", namespace, timestamp, timestamp, TimeDomain.EVENT_TIME)));
   }
 
   @Test
@@ -76,8 +83,10 @@ public class TimerInternalsTest {
     Instant secondTimestamp = new Instant(200);
     StateNamespace namespace = StateNamespaces.global();
 
-    TimerData firstTimer = TimerData.of(namespace, firstTimestamp, TimeDomain.EVENT_TIME);
-    TimerData secondTimer = TimerData.of(namespace, secondTimestamp, TimeDomain.EVENT_TIME);
+    TimerData firstTimer =
+        TimerData.of(namespace, firstTimestamp, firstTimestamp, TimeDomain.EVENT_TIME);
+    TimerData secondTimer =
+        TimerData.of(namespace, secondTimestamp, secondTimestamp, TimeDomain.EVENT_TIME);
 
     assertThat(firstTimer, lessThan(secondTimer));
   }
@@ -87,10 +96,10 @@ public class TimerInternalsTest {
     Instant timestamp = new Instant(100);
     StateNamespace namespace = StateNamespaces.global();
 
-    TimerData eventTimer = TimerData.of(namespace, timestamp, TimeDomain.EVENT_TIME);
-    TimerData procTimer = TimerData.of(namespace, timestamp, TimeDomain.PROCESSING_TIME);
+    TimerData eventTimer = TimerData.of(namespace, timestamp, timestamp, TimeDomain.EVENT_TIME);
+    TimerData procTimer = TimerData.of(namespace, timestamp, timestamp, TimeDomain.PROCESSING_TIME);
     TimerData synchronizedProcTimer =
-        TimerData.of(namespace, timestamp, TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
+        TimerData.of(namespace, timestamp, timestamp, TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
 
     assertThat(eventTimer, lessThan(procTimer));
     assertThat(eventTimer, lessThan(synchronizedProcTimer));
@@ -107,8 +116,10 @@ public class TimerInternalsTest {
     StateNamespace firstWindowNs = StateNamespaces.window(windowCoder, firstWindow);
     StateNamespace secondWindowNs = StateNamespaces.window(windowCoder, secondWindow);
 
-    TimerData secondEventTime = TimerData.of(firstWindowNs, timestamp, TimeDomain.EVENT_TIME);
-    TimerData thirdEventTime = TimerData.of(secondWindowNs, timestamp, TimeDomain.EVENT_TIME);
+    TimerData secondEventTime =
+        TimerData.of(firstWindowNs, timestamp, timestamp, TimeDomain.EVENT_TIME);
+    TimerData thirdEventTime =
+        TimerData.of(secondWindowNs, timestamp, timestamp, TimeDomain.EVENT_TIME);
 
     assertThat(secondEventTime, lessThan(thirdEventTime));
   }
@@ -118,8 +129,10 @@ public class TimerInternalsTest {
     Instant timestamp = new Instant(100);
     StateNamespace namespace = StateNamespaces.global();
 
-    TimerData id0Timer = TimerData.of("id0", namespace, timestamp, TimeDomain.EVENT_TIME);
-    TimerData id1Timer = TimerData.of("id1", namespace, timestamp, TimeDomain.EVENT_TIME);
+    TimerData id0Timer =
+        TimerData.of("id0", namespace, timestamp, timestamp, TimeDomain.EVENT_TIME);
+    TimerData id1Timer =
+        TimerData.of("id1", namespace, timestamp, timestamp, TimeDomain.EVENT_TIME);
 
     assertThat(id0Timer, lessThan(id1Timer));
   }
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java
index 21f2d13..bd59f87 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java
@@ -413,12 +413,12 @@ public class TriggerStateMachineTester<InputT, W extends BoundedWindow> {
 
     @Override
     public void setTimer(Instant timestamp, TimeDomain timeDomain) {
-      timerInternals.setTimer(TimerData.of(namespace, timestamp, timeDomain));
+      timerInternals.setTimer(TimerData.of(namespace, timestamp, timestamp, timeDomain));
     }
 
     @Override
     public void deleteTimer(Instant timestamp, TimeDomain timeDomain) {
-      timerInternals.deleteTimer(TimerData.of(namespace, timestamp, timeDomain));
+      timerInternals.deleteTimer(TimerData.of(namespace, timestamp, timestamp, timeDomain));
     }
 
     @Override
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectTimerInternalsTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectTimerInternalsTest.java
index dfec1f1..5bd0d39 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectTimerInternalsTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectTimerInternalsTest.java
@@ -62,13 +62,22 @@ public class DirectTimerInternalsTest {
   @Test
   public void setTimerAddsToBuilder() {
     TimerData eventTimer =
-        TimerData.of(StateNamespaces.global(), new Instant(20145L), TimeDomain.EVENT_TIME);
+        TimerData.of(
+            StateNamespaces.global(),
+            new Instant(20145L),
+            new Instant(20145L),
+            TimeDomain.EVENT_TIME);
     TimerData processingTimer =
-        TimerData.of(StateNamespaces.global(), new Instant(125555555L), TimeDomain.PROCESSING_TIME);
+        TimerData.of(
+            StateNamespaces.global(),
+            new Instant(125555555L),
+            new Instant(125555555L),
+            TimeDomain.PROCESSING_TIME);
     TimerData synchronizedProcessingTimer =
         TimerData.of(
             StateNamespaces.global(),
             new Instant(98745632189L),
+            new Instant(98745632189L),
             TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
     internals.setTimer(eventTimer);
     internals.setTimer(processingTimer);
@@ -82,13 +91,22 @@ public class DirectTimerInternalsTest {
   @Test
   public void deleteTimerDeletesOnBuilder() {
     TimerData eventTimer =
-        TimerData.of(StateNamespaces.global(), new Instant(20145L), TimeDomain.EVENT_TIME);
+        TimerData.of(
+            StateNamespaces.global(),
+            new Instant(20145L),
+            new Instant(20145L),
+            TimeDomain.EVENT_TIME);
     TimerData processingTimer =
-        TimerData.of(StateNamespaces.global(), new Instant(125555555L), TimeDomain.PROCESSING_TIME);
+        TimerData.of(
+            StateNamespaces.global(),
+            new Instant(125555555L),
+            new Instant(125555555L),
+            TimeDomain.PROCESSING_TIME);
     TimerData synchronizedProcessingTimer =
         TimerData.of(
             StateNamespaces.global(),
             new Instant(98745632189L),
+            new Instant(98745632189L),
             TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
     internals.deleteTimer(eventTimer);
     internals.deleteTimer(processingTimer);
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java
index 3f10123..3e7871b 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DoFnLifecycleManagerRemovingTransformEvaluatorTest.java
@@ -104,7 +104,12 @@ public class DoFnLifecycleManagerRemovingTransformEvaluatorTest {
 
     try {
       evaluator.onTimer(
-          TimerData.of("foo", StateNamespaces.global(), new Instant(0), TimeDomain.EVENT_TIME),
+          TimerData.of(
+              "foo",
+              StateNamespaces.global(),
+              new Instant(0),
+              new Instant(0),
+              TimeDomain.EVENT_TIME),
           "",
           GlobalWindow.INSTANCE);
     } catch (Exception e) {
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
index b2262af..055b48e 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
@@ -316,7 +316,8 @@ public class EvaluationContextTest {
 
     StructuralKey<?> key = StructuralKey.of("foo".length(), VarIntCoder.of());
     TimerData toFire =
-        TimerData.of(StateNamespaces.global(), new Instant(100L), TimeDomain.EVENT_TIME);
+        TimerData.of(
+            StateNamespaces.global(), new Instant(100L), new Instant(100L), TimeDomain.EVENT_TIME);
     TransformResult<?> timerResult =
         StepTransformResult.withoutHold(downstreamProducer)
             .withState(CopyOnAccessInMemoryStateInternals.withUnderlying(key, null))
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
index eef43c7..6515e22 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
@@ -990,9 +990,17 @@ public class WatermarkManagerTest implements Serializable {
     StructuralKey<String> key = StructuralKey.of("key", StringUtf8Coder.of());
     CommittedBundle<Integer> filteredBundle = multiWindowedBundle(filtered, 2, 8);
     TimerData pastTimer =
-        TimerData.of(StateNamespaces.global(), new Instant(250L), TimeDomain.PROCESSING_TIME);
+        TimerData.of(
+            StateNamespaces.global(),
+            new Instant(250L),
+            new Instant(250L),
+            TimeDomain.PROCESSING_TIME);
     TimerData futureTimer =
-        TimerData.of(StateNamespaces.global(), new Instant(4096L), TimeDomain.PROCESSING_TIME);
+        TimerData.of(
+            StateNamespaces.global(),
+            new Instant(4096L),
+            new Instant(4096L),
+            TimeDomain.PROCESSING_TIME);
     TimerUpdate timers = TimerUpdate.builder(key).setTimer(pastTimer).setTimer(futureTimer).build();
     manager.updateWatermarks(
         createdBundle,
@@ -1125,7 +1133,8 @@ public class WatermarkManagerTest implements Serializable {
     CommittedBundle<Integer> filteredBundle = multiWindowedBundle(filtered, 2, 4);
     Instant upstreamHold = new Instant(2048L);
     TimerData upstreamProcessingTimer =
-        TimerData.of(StateNamespaces.global(), upstreamHold, TimeDomain.PROCESSING_TIME);
+        TimerData.of(
+            StateNamespaces.global(), upstreamHold, upstreamHold, TimeDomain.PROCESSING_TIME);
     manager.updateWatermarks(
         created,
         TimerUpdate.builder(StructuralKey.of("key", StringUtf8Coder.of()))
@@ -1214,11 +1223,20 @@ public class WatermarkManagerTest implements Serializable {
     manager.refreshAll();
 
     TimerData earliestTimer =
-        TimerData.of(StateNamespaces.global(), new Instant(1000), TimeDomain.EVENT_TIME);
+        TimerData.of(
+            StateNamespaces.global(), new Instant(1000), new Instant(1000), TimeDomain.EVENT_TIME);
     TimerData middleTimer =
-        TimerData.of(StateNamespaces.global(), new Instant(5000L), TimeDomain.EVENT_TIME);
+        TimerData.of(
+            StateNamespaces.global(),
+            new Instant(5000L),
+            new Instant(5000L),
+            TimeDomain.EVENT_TIME);
     TimerData lastTimer =
-        TimerData.of(StateNamespaces.global(), new Instant(10000L), TimeDomain.EVENT_TIME);
+        TimerData.of(
+            StateNamespaces.global(),
+            new Instant(10000L),
+            new Instant(10000L),
+            TimeDomain.EVENT_TIME);
     StructuralKey<byte[]> key = StructuralKey.of(new byte[] {1, 4, 9}, ByteArrayCoder.of());
     TimerUpdate update =
         TimerUpdate.builder(key)
@@ -1290,11 +1308,23 @@ public class WatermarkManagerTest implements Serializable {
         new Instant(1500L));
 
     TimerData earliestTimer =
-        TimerData.of(StateNamespaces.global(), new Instant(999L), TimeDomain.PROCESSING_TIME);
+        TimerData.of(
+            StateNamespaces.global(),
+            new Instant(999L),
+            new Instant(999L),
+            TimeDomain.PROCESSING_TIME);
     TimerData middleTimer =
-        TimerData.of(StateNamespaces.global(), new Instant(5000L), TimeDomain.PROCESSING_TIME);
+        TimerData.of(
+            StateNamespaces.global(),
+            new Instant(5000L),
+            new Instant(5000L),
+            TimeDomain.PROCESSING_TIME);
     TimerData lastTimer =
-        TimerData.of(StateNamespaces.global(), new Instant(10000L), TimeDomain.PROCESSING_TIME);
+        TimerData.of(
+            StateNamespaces.global(),
+            new Instant(10000L),
+            new Instant(10000L),
+            TimeDomain.PROCESSING_TIME);
     StructuralKey<?> key = StructuralKey.of(-12L, VarLongCoder.of());
     TimerUpdate update =
         TimerUpdate.builder(key)
@@ -1367,13 +1397,22 @@ public class WatermarkManagerTest implements Serializable {
 
     TimerData earliestTimer =
         TimerData.of(
-            StateNamespaces.global(), new Instant(999L), TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
+            StateNamespaces.global(),
+            new Instant(999L),
+            new Instant(999L),
+            TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
     TimerData middleTimer =
         TimerData.of(
-            StateNamespaces.global(), new Instant(5000L), TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
+            StateNamespaces.global(),
+            new Instant(5000L),
+            new Instant(5000L),
+            TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
     TimerData lastTimer =
         TimerData.of(
-            StateNamespaces.global(), new Instant(10000L), TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
+            StateNamespaces.global(),
+            new Instant(10000L),
+            new Instant(10000L),
+            TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
     StructuralKey<byte[]> key = StructuralKey.of(new byte[] {2, -2, 22}, ByteArrayCoder.of());
     TimerUpdate update =
         TimerUpdate.builder(key)
@@ -1436,11 +1475,19 @@ public class WatermarkManagerTest implements Serializable {
 
     TimerData initialTimer =
         TimerData.of(
-            timerId, StateNamespaces.global(), new Instant(5000L), TimeDomain.PROCESSING_TIME);
+            timerId,
+            StateNamespaces.global(),
+            new Instant(5000L),
+            new Instant(5000L),
+            TimeDomain.PROCESSING_TIME);
 
     TimerData overridingTimer =
         TimerData.of(
-            timerId, StateNamespaces.global(), new Instant(10000L), TimeDomain.PROCESSING_TIME);
+            timerId,
+            StateNamespaces.global(),
+            new Instant(10000L),
+            new Instant(10000L),
+            TimeDomain.PROCESSING_TIME);
 
     TimerUpdate initialUpdate = TimerUpdate.builder(key).setTimer(initialTimer).build();
     TimerUpdate overridingUpdate = TimerUpdate.builder(key).setTimer(overridingTimer).build();
@@ -1483,9 +1530,19 @@ public class WatermarkManagerTest implements Serializable {
     StructuralKey<?> key = StructuralKey.of(-12L, VarLongCoder.of());
 
     TimerData initialTimer =
-        TimerData.of(timerId, StateNamespaces.global(), new Instant(1000L), TimeDomain.EVENT_TIME);
+        TimerData.of(
+            timerId,
+            StateNamespaces.global(),
+            new Instant(1000L),
+            new Instant(1000L),
+            TimeDomain.EVENT_TIME);
     TimerData overridingTimer =
-        TimerData.of(timerId, StateNamespaces.global(), new Instant(2000L), TimeDomain.EVENT_TIME);
+        TimerData.of(
+            timerId,
+            StateNamespaces.global(),
+            new Instant(2000L),
+            new Instant(2000L),
+            TimeDomain.EVENT_TIME);
 
     TimerUpdate initialUpdate = TimerUpdate.builder(key).setTimer(initialTimer).build();
     TimerUpdate overridingUpdate = TimerUpdate.builder(key).setTimer(overridingTimer).build();
@@ -1543,9 +1600,19 @@ public class WatermarkManagerTest implements Serializable {
     // Apply a timer update
     StructuralKey<String> key = StructuralKey.of("key", StringUtf8Coder.of());
     TimerData timer1 =
-        TimerData.of("a", StateNamespaces.global(), new Instant(100), TimeDomain.EVENT_TIME);
+        TimerData.of(
+            "a",
+            StateNamespaces.global(),
+            new Instant(100),
+            new Instant(100),
+            TimeDomain.EVENT_TIME);
     TimerData timer2 =
-        TimerData.of("a", StateNamespaces.global(), new Instant(200), TimeDomain.EVENT_TIME);
+        TimerData.of(
+            "a",
+            StateNamespaces.global(),
+            new Instant(200),
+            new Instant(200),
+            TimeDomain.EVENT_TIME);
     underTest.updateTimers(TimerUpdate.builder(key).setTimer(timer1).setTimer(timer2).build());
 
     // Only the last timer update should be observable
@@ -1575,14 +1642,27 @@ public class WatermarkManagerTest implements Serializable {
 
   @Test
   public void timerUpdateBuilderBuildAddsAllAddedTimers() {
-    TimerData set = TimerData.of(StateNamespaces.global(), new Instant(10L), TimeDomain.EVENT_TIME);
+    TimerData set =
+        TimerData.of(
+            StateNamespaces.global(), new Instant(10L), new Instant(10L), TimeDomain.EVENT_TIME);
     TimerData deleted =
-        TimerData.of(StateNamespaces.global(), new Instant(24L), TimeDomain.PROCESSING_TIME);
+        TimerData.of(
+            StateNamespaces.global(),
+            new Instant(24L),
+            new Instant(24L),
+            TimeDomain.PROCESSING_TIME);
     TimerData completedOne =
         TimerData.of(
-            StateNamespaces.global(), new Instant(1024L), TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
+            StateNamespaces.global(),
+            new Instant(1024L),
+            new Instant(1024L),
+            TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
     TimerData completedTwo =
-        TimerData.of(StateNamespaces.global(), new Instant(2048L), TimeDomain.EVENT_TIME);
+        TimerData.of(
+            StateNamespaces.global(),
+            new Instant(2048L),
+            new Instant(2048L),
+            TimeDomain.EVENT_TIME);
 
     TimerUpdate update =
         TimerUpdate.builder(StructuralKey.of("foo", StringUtf8Coder.of()))
@@ -1599,7 +1679,8 @@ public class WatermarkManagerTest implements Serializable {
   @Test
   public void timerUpdateBuilderWithSetAtEndOfTime() {
     Instant timerStamp = BoundedWindow.TIMESTAMP_MAX_VALUE;
-    TimerData tooFar = TimerData.of(StateNamespaces.global(), timerStamp, TimeDomain.EVENT_TIME);
+    TimerData tooFar =
+        TimerData.of(StateNamespaces.global(), timerStamp, timerStamp, TimeDomain.EVENT_TIME);
 
     TimerUpdateBuilder builder = TimerUpdate.builder(StructuralKey.empty());
     thrown.expect(IllegalArgumentException.class);
@@ -1610,7 +1691,8 @@ public class WatermarkManagerTest implements Serializable {
   @Test
   public void timerUpdateBuilderWithSetPastEndOfTime() {
     Instant timerStamp = BoundedWindow.TIMESTAMP_MAX_VALUE.plus(Duration.standardMinutes(2));
-    TimerData tooFar = TimerData.of(StateNamespaces.global(), timerStamp, TimeDomain.EVENT_TIME);
+    TimerData tooFar =
+        TimerData.of(StateNamespaces.global(), timerStamp, timerStamp, TimeDomain.EVENT_TIME);
 
     TimerUpdateBuilder builder = TimerUpdate.builder(StructuralKey.empty());
     thrown.expect(IllegalArgumentException.class);
@@ -1621,7 +1703,8 @@ public class WatermarkManagerTest implements Serializable {
   @Test
   public void timerUpdateBuilderWithSetThenDeleteHasOnlyDeleted() {
     TimerUpdateBuilder builder = TimerUpdate.builder(null);
-    TimerData timer = TimerData.of(StateNamespaces.global(), Instant.now(), TimeDomain.EVENT_TIME);
+    Instant now = Instant.now();
+    TimerData timer = TimerData.of(StateNamespaces.global(), now, now, TimeDomain.EVENT_TIME);
 
     TimerUpdate built = builder.setTimer(timer).deletedTimer(timer).build();
 
@@ -1632,7 +1715,8 @@ public class WatermarkManagerTest implements Serializable {
   @Test
   public void timerUpdateBuilderWithDeleteThenSetHasOnlySet() {
     TimerUpdateBuilder builder = TimerUpdate.builder(null);
-    TimerData timer = TimerData.of(StateNamespaces.global(), Instant.now(), TimeDomain.EVENT_TIME);
+    Instant now = Instant.now();
+    TimerData timer = TimerData.of(StateNamespaces.global(), now, now, TimeDomain.EVENT_TIME);
 
     TimerUpdate built = builder.deletedTimer(timer).setTimer(timer).build();
 
@@ -1643,7 +1727,8 @@ public class WatermarkManagerTest implements Serializable {
   @Test
   public void timerUpdateBuilderWithSetAfterBuildNotAddedToBuilt() {
     TimerUpdateBuilder builder = TimerUpdate.builder(null);
-    TimerData timer = TimerData.of(StateNamespaces.global(), Instant.now(), TimeDomain.EVENT_TIME);
+    Instant now = Instant.now();
+    TimerData timer = TimerData.of(StateNamespaces.global(), now, now, TimeDomain.EVENT_TIME);
 
     TimerUpdate built = builder.build();
     builder.setTimer(timer);
@@ -1655,7 +1740,8 @@ public class WatermarkManagerTest implements Serializable {
   @Test
   public void timerUpdateBuilderWithDeleteAfterBuildNotAddedToBuilt() {
     TimerUpdateBuilder builder = TimerUpdate.builder(null);
-    TimerData timer = TimerData.of(StateNamespaces.global(), Instant.now(), TimeDomain.EVENT_TIME);
+    Instant now = Instant.now();
+    TimerData timer = TimerData.of(StateNamespaces.global(), now, now, TimeDomain.EVENT_TIME);
 
     TimerUpdate built = builder.build();
     builder.deletedTimer(timer);
@@ -1667,7 +1753,8 @@ public class WatermarkManagerTest implements Serializable {
   @Test
   public void timerUpdateBuilderWithCompletedAfterBuildNotAddedToBuilt() {
     TimerUpdateBuilder builder = TimerUpdate.builder(null);
-    TimerData timer = TimerData.of(StateNamespaces.global(), Instant.now(), TimeDomain.EVENT_TIME);
+    Instant now = Instant.now();
+    TimerData timer = TimerData.of(StateNamespaces.global(), now, now, TimeDomain.EVENT_TIME);
 
     TimerUpdate built = builder.build();
     builder.withCompletedTimers(ImmutableList.of(timer));
@@ -1679,7 +1766,8 @@ public class WatermarkManagerTest implements Serializable {
   @Test
   public void timerUpdateWithCompletedTimersNotAddedToExisting() {
     TimerUpdateBuilder builder = TimerUpdate.builder(null);
-    TimerData timer = TimerData.of(StateNamespaces.global(), Instant.now(), TimeDomain.EVENT_TIME);
+    Instant now = Instant.now();
+    TimerData timer = TimerData.of(StateNamespaces.global(), now, now, TimeDomain.EVENT_TIME);
 
     TimerUpdate built = builder.build();
     assertThat(built.getCompletedTimers(), emptyIterable());
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest.java
index 932fac8..efea477 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest.java
@@ -759,6 +759,7 @@ public class ExecutableStageDoFnOperatorTest {
                 timerInputKey.getKey(), timerInputKey.getValue()),
             stateNamespace,
             window.maxTimestamp(),
+            window.maxTimestamp(),
             TimeDomain.EVENT_TIME);
     timerInternals.setTimer(userTimer);
 
@@ -793,6 +794,7 @@ public class ExecutableStageDoFnOperatorTest {
                 timerInputKey.getKey(), timerInputKey.getValue()),
             stateNamespace,
             window.maxTimestamp(),
+            window.maxTimestamp(),
             TimeDomain.EVENT_TIME);
     operator.setTimer(
         Timer.of(
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
index 5d92d2a..c8416c4 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
@@ -20,6 +20,8 @@ package org.apache.beam.runners.dataflow.worker;
 import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
 import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
 
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.core.StateNamespace;
 import org.apache.beam.runners.core.StateNamespaces;
@@ -29,6 +31,9 @@ import org.apache.beam.runners.dataflow.worker.windmill.Windmill.Timer;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.state.TimeDomain;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.ExposedByteArrayInputStream;
+import org.apache.beam.sdk.util.ExposedByteArrayOutputStream;
+import org.apache.beam.sdk.util.VarInt;
 import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.HashBasedTable;
@@ -278,27 +283,77 @@ class WindmillTimerInternals implements TimerInternals {
     //    - the GlobalWindow is currently encoded in zero bytes, so it becomes "//"
     //    - the Global StateNamespace is different, and becomes "/"
     //  - the id is totally arbitrary; currently unescaped though that could change
-    String tag = timer.getTag().toStringUtf8();
+
+    ByteString tag = timer.getTag();
     checkArgument(
-        timer.getTag().startsWith(prefix.byteString()),
+        tag.startsWith(prefix.byteString()),
         "Expected timer tag %s to start with prefix %s",
         tag,
         prefix.byteString());
+
+    Instant timestamp = WindmillTimeUtils.windmillToHarnessTimestamp(timer.getTimestamp());
+
+    // Parse the namespace.
     int namespaceStart = prefix.byteString().size(); // drop the prefix, leave the begin slash
-    int namespaceEnd = tag.indexOf('+', namespaceStart); // keep the end slash, drop the +
-    String namespaceString = tag.substring(namespaceStart, namespaceEnd);
-    String timerIdPlusTimerFamilyId = tag.substring(namespaceEnd + 1); // timerId+timerFamilyId
-    int timerIdEnd = timerIdPlusTimerFamilyId.indexOf('+'); // end of timerId
-    // if no '+' found then timerFamilyId is empty string else they have a '+' separator
-    String familyId = timerIdEnd == -1 ? "" : timerIdPlusTimerFamilyId.substring(timerIdEnd + 1);
-    String id =
-        timerIdEnd == -1
-            ? timerIdPlusTimerFamilyId
-            : timerIdPlusTimerFamilyId.substring(0, timerIdEnd);
+    int namespaceEnd = namespaceStart;
+    while (namespaceEnd < tag.size() && tag.byteAt(namespaceEnd) != '+') {
+      namespaceEnd++;
+    }
+    String namespaceString = tag.substring(namespaceStart, namespaceEnd).toStringUtf8();
+
+    // Parse the timer id.
+    int timerIdStart = namespaceEnd + 1;
+    int timerIdEnd = timerIdStart;
+    while (timerIdEnd < tag.size() && tag.byteAt(timerIdEnd) != '+') {
+      timerIdEnd++;
+    }
+    String timerId = tag.substring(timerIdStart, timerIdEnd).toStringUtf8();
+
+    // Parse the timer family.
+    int timerFamilyStart = timerIdEnd + 1;
+    int timerFamilyEnd = timerFamilyStart;
+    while (timerFamilyEnd < tag.size() && tag.byteAt(timerFamilyEnd) != '+') {
+      timerFamilyEnd++;
+    }
+    // For backwards compatibility, handle the case were the timer family isn't present.
+    String timerFamily =
+        (timerFamilyStart < tag.size())
+            ? tag.substring(timerFamilyStart, timerFamilyEnd).toStringUtf8()
+            : "";
+
+    // Parse the output timestamp.
+    int outputTimestampStart = timerFamilyEnd + 1;
+    int outputTimestampEnd = outputTimestampStart;
+    while (outputTimestampEnd < tag.size() && tag.byteAt(outputTimestampEnd) != '+') {
+      outputTimestampEnd++;
+    }
+
+    // For backwards compatibility, handle the case were the output timestamp isn't present.
+    Instant outputTimestamp = timestamp;
+    if ((outputTimestampStart < tag.size())) {
+      try {
+        outputTimestamp =
+            new Instant(
+                VarInt.decodeLong(
+                    tag.substring(outputTimestampStart, outputTimestampEnd).newInput()));
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
     StateNamespace namespace = StateNamespaces.fromString(namespaceString, windowCoder);
-    Instant timestamp = WindmillTimeUtils.windmillToHarnessTimestamp(timer.getTimestamp());
+    return TimerData.of(
+        timerId,
+        timerFamily,
+        namespace,
+        timestamp,
+        outputTimestamp,
+        timerTypeToTimeDomain(timer.getType()));
+  }
 
-    return TimerData.of(id, familyId, namespace, timestamp, timerTypeToTimeDomain(timer.getType()));
+  private static boolean useNewTimerTagEncoding(TimerData timerData) {
+    return !timerData.getTimerFamilyId().isEmpty()
+        || !timerData.getOutputTimestamp().equals(timerData.getTimestamp());
   }
 
   /**
@@ -309,27 +364,41 @@ class WindmillTimerInternals implements TimerInternals {
    */
   public static ByteString timerTag(WindmillNamespacePrefix prefix, TimerData timerData) {
     String tagString;
-    // Timers without timerFamily would have timerFamily would be an empty string
-    if ("".equals(timerData.getTimerFamilyId())) {
-      tagString =
-          new StringBuilder()
-              .append(prefix.byteString().toStringUtf8()) // this never ends with a slash
-              .append(timerData.getNamespace().stringKey()) // this must begin and end with a slash
-              .append('+')
-              .append(timerData.getTimerId()) // this is arbitrary; currently unescaped
-              .toString();
-    } else {
-      tagString =
-          new StringBuilder()
-              .append(prefix.byteString().toStringUtf8()) // this never ends with a slash
-              .append(timerData.getNamespace().stringKey()) // this must begin and end with a slash
-              .append('+')
-              .append(timerData.getTimerId()) // this is arbitrary; currently unescaped
-              .append('+')
-              .append(timerData.getTimerFamilyId())
-              .toString();
+    ExposedByteArrayOutputStream out = new ExposedByteArrayOutputStream();
+    try {
+      if (useNewTimerTagEncoding(timerData)) {
+        tagString =
+            new StringBuilder()
+                .append(prefix.byteString().toStringUtf8()) // this never ends with a slash
+                .append(
+                    timerData.getNamespace().stringKey()) // this must begin and end with a slash
+                .append('+')
+                .append(timerData.getTimerId()) // this is arbitrary; currently unescaped
+                .append('+')
+                .append(timerData.getTimerFamilyId())
+                .toString();
+        out.write(tagString.getBytes(StandardCharsets.UTF_8));
+        // Only encode the extra 9 bytes if the output timestamp is different than the timestamp;
+        if (!timerData.getOutputTimestamp().equals(timerData.getTimestamp())) {
+          out.write('+');
+          VarInt.encode(timerData.getOutputTimestamp().getMillis(), out);
+        }
+      } else {
+        // Timers without timerFamily would have timerFamily would be an empty string
+        tagString =
+            new StringBuilder()
+                .append(prefix.byteString().toStringUtf8()) // this never ends with a slash
+                .append(
+                    timerData.getNamespace().stringKey()) // this must begin and end with a slash
+                .append('+')
+                .append(timerData.getTimerId()) // this is arbitrary; currently unescaped
+                .toString();
+        out.write(tagString.getBytes(StandardCharsets.UTF_8));
+      }
+      return ByteString.readFrom(new ExposedByteArrayInputStream(out.toByteArray()));
+    } catch (IOException e) {
+      throw new RuntimeException(e);
     }
-    return ByteString.copyFromUtf8(tagString);
   }
 
   /**
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowFnsTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowFnsTest.java
index bf9e875..0d5a883 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowFnsTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowFnsTest.java
@@ -149,6 +149,7 @@ public class StreamingGroupAlsoByWindowFnsTest {
                 TimerData.of(
                     namespace,
                     timestamp,
+                    timestamp,
                     type == Windmill.Timer.Type.WATERMARK
                         ? TimeDomain.EVENT_TIME
                         : TimeDomain.PROCESSING_TIME)))
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingKeyedWorkItemSideInputDoFnRunnerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingKeyedWorkItemSideInputDoFnRunnerTest.java
index 790a7d5..5d6f1e7 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingKeyedWorkItemSideInputDoFnRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingKeyedWorkItemSideInputDoFnRunnerTest.java
@@ -164,6 +164,7 @@ public class StreamingKeyedWorkItemSideInputDoFnRunnerTest {
     return TimerData.of(
         StateNamespaces.window(IntervalWindow.getCoder(), window),
         timestamp,
+        timestamp,
         type == Windmill.Timer.Type.WATERMARK ? TimeDomain.EVENT_TIME : TimeDomain.PROCESSING_TIME);
   }
 
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java
index ff957c1..d5071d7 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java
@@ -143,7 +143,11 @@ public class StreamingModeExecutionContextTest {
     TimerInternals timerInternals = stepContext.timerInternals();
 
     timerInternals.setTimer(
-        TimerData.of(new StateNamespaceForTest("key"), new Instant(5000), TimeDomain.EVENT_TIME));
+        TimerData.of(
+            new StateNamespaceForTest("key"),
+            new Instant(5000),
+            new Instant(5000),
+            TimeDomain.EVENT_TIME));
     executionContext.flushState();
 
     Windmill.Timer timer = outputBuilder.buildPartial().getOutputTimers(0);
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputFetcherTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputFetcherTest.java
index c1a9945..c9a4227 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputFetcherTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputFetcherTest.java
@@ -203,6 +203,7 @@ public class StreamingSideInputFetcherTest {
     return TimerData.of(
         StateNamespaces.window(IntervalWindow.getCoder(), createWindow(timestamp)),
         new Instant(timestamp),
+        new Instant(timestamp),
         TimeDomain.EVENT_TIME);
   }
 
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/UserParDoFnFactoryTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/UserParDoFnFactoryTest.java
index 5e688f3..45889df 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/UserParDoFnFactoryTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/UserParDoFnFactoryTest.java
@@ -426,6 +426,7 @@ public class UserParDoFnFactoryTest {
                 SimpleParDoFn.CLEANUP_TIMER_ID,
                 firstWindowNamespace,
                 firstWindow.maxTimestamp().plus(1L),
+                firstWindow.maxTimestamp().plus(1L),
                 TimeDomain.EVENT_TIME))
         .thenReturn(null);
 
@@ -441,6 +442,7 @@ public class UserParDoFnFactoryTest {
                 SimpleParDoFn.CLEANUP_TIMER_ID,
                 secondWindowNamespace,
                 secondWindow.maxTimestamp().plus(1L),
+                secondWindow.maxTimestamp().plus(1L),
                 TimeDomain.EVENT_TIME))
         .thenReturn(null);
 
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItemTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItemTest.java
index de62567..40d4c74 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItemTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItemTest.java
@@ -166,6 +166,7 @@ public class WindmillKeyedWorkItemTest {
                 TimerData.of(
                     ns,
                     new Instant(timestamp),
+                    new Instant(timestamp),
                     WindmillTimerInternals.timerTypeToTimeDomain(type))))
         .setTimestamp(WindmillTimeUtils.harnessToWindmillTimestamp(new Instant(timestamp)))
         .setType(type)
@@ -174,7 +175,7 @@ public class WindmillKeyedWorkItemTest {
   }
 
   private static TimerData makeTimer(StateNamespace ns, long timestamp, TimeDomain domain) {
-    return TimerData.of(ns, new Instant(timestamp), domain);
+    return TimerData.of(ns, new Instant(timestamp), new Instant(timestamp), domain);
   }
 
   @Test
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternalsTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternalsTest.java
index d0f0f8a..3c1f6bd 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternalsTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternalsTest.java
@@ -79,25 +79,43 @@ public class WindmillTimerInternalsTest {
         for (TimeDomain timeDomain : TimeDomain.values()) {
           for (WindmillNamespacePrefix prefix : WindmillNamespacePrefix.values()) {
             for (Instant timestamp : TEST_TIMESTAMPS) {
-              TimerData anonymousTimerData = TimerData.of(namespace, timestamp, timeDomain);
-
-              assertThat(
-                  WindmillTimerInternals.windmillTimerToTimerData(
-                      prefix,
-                      WindmillTimerInternals.timerDataToWindmillTimer(
-                          stateFamily, prefix, anonymousTimerData),
-                      coder),
-                  equalTo(anonymousTimerData));
-
-              for (String timerId : TEST_TIMER_IDS) {
-                TimerData timerData = TimerData.of(timerId, namespace, timestamp, timeDomain);
+              List<TimerData> anonymousTimers =
+                  ImmutableList.of(
+                      TimerData.of(namespace, timestamp, timestamp, timeDomain),
+                      TimerData.of(namespace, timestamp, timestamp.minus(1), timeDomain));
+              for (TimerData timer : anonymousTimers) {
                 assertThat(
                     WindmillTimerInternals.windmillTimerToTimerData(
                         prefix,
-                        WindmillTimerInternals.timerDataToWindmillTimer(
-                            stateFamily, prefix, timerData),
+                        WindmillTimerInternals.timerDataToWindmillTimer(stateFamily, prefix, timer),
                         coder),
-                    equalTo(timerData));
+                    equalTo(timer));
+              }
+
+              for (String timerId : TEST_TIMER_IDS) {
+                List<TimerData> timers =
+                    ImmutableList.of(
+                        TimerData.of(timerId, namespace, timestamp, timestamp, timeDomain),
+                        TimerData.of(
+                            timerId, "family", namespace, timestamp, timestamp, timeDomain),
+                        TimerData.of(timerId, namespace, timestamp, timestamp.minus(1), timeDomain),
+                        TimerData.of(
+                            timerId,
+                            "family",
+                            namespace,
+                            timestamp,
+                            timestamp.minus(1),
+                            timeDomain));
+
+                for (TimerData timer : timers) {
+                  assertThat(
+                      WindmillTimerInternals.windmillTimerToTimerData(
+                          prefix,
+                          WindmillTimerInternals.timerDataToWindmillTimer(
+                              stateFamily, prefix, timer),
+                          coder),
+                      equalTo(timer));
+                }
               }
             }
           }
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java
index b9d6216..4dd4441 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java
@@ -277,6 +277,7 @@ public class DoFnOp<InT, FnOutT, OutT> implements Op<InT, OutT, Void> {
             bundleCheckTimerId,
             StateNamespaces.global(),
             nextBundleCheckTime,
+            nextBundleCheckTime,
             TimeDomain.PROCESSING_TIME);
     bundleTimerScheduler.schedule(
         new KeyedTimerData<>(new byte[0], null, timerData), nextBundleCheckTime.getMillis());
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/KeyedTimerData.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/KeyedTimerData.java
index a6214be..6a51967 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/KeyedTimerData.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/KeyedTimerData.java
@@ -160,7 +160,7 @@ public class KeyedTimerData<K> implements Comparable<KeyedTimerData<K>> {
       final StateNamespace namespace =
           StateNamespaces.fromString(STRING_CODER.decode(inStream), windowCoder);
       final TimeDomain domain = TimeDomain.valueOf(STRING_CODER.decode(inStream));
-      final TimerData timer = TimerData.of(timerId, namespace, timestamp, domain);
+      final TimerData timer = TimerData.of(timerId, namespace, timestamp, timestamp, domain);
 
       byte[] keyBytes = null;
       K key = null;
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java
index 22c2ac9..e5ba2d3 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java
@@ -218,6 +218,7 @@ public class SamzaTimerInternalsFactory<K> implements TimerInternalsFactory<K> {
                   timerData.getTimerId(),
                   timerData.getNamespace(),
                   new Instant(lastTimestamp),
+                  new Instant(lastTimestamp),
                   timerData.getDomain());
           deleteTimer(lastTimerData, false);
         }
@@ -244,12 +245,14 @@ public class SamzaTimerInternalsFactory<K> implements TimerInternalsFactory<K> {
 
     @Override
     public void deleteTimer(StateNamespace namespace, String timerId, TimeDomain timeDomain) {
-      deleteTimer(TimerData.of(timerId, namespace, Instant.now(), timeDomain));
+      Instant now = Instant.now();
+      deleteTimer(TimerData.of(timerId, namespace, now, now, timeDomain));
     }
 
     @Override
     public void deleteTimer(StateNamespace namespace, String timerId, String timerFamilyId) {
-      deleteTimer(TimerData.of(timerId, namespace, Instant.now(), TimeDomain.EVENT_TIME));
+      Instant now = Instant.now();
+      deleteTimer(TimerData.of(timerId, namespace, now, now, TimeDomain.EVENT_TIME));
     }
 
     @Override
@@ -455,7 +458,11 @@ public class SamzaTimerInternalsFactory<K> implements TimerInternalsFactory<K> {
           keyBytes,
           timerKey.key,
           TimerInternals.TimerData.of(
-              timerKey.timerId, timerKey.stateNamespace, new Instant(timestamp), domain));
+              timerKey.timerId,
+              timerKey.stateNamespace,
+              new Instant(timestamp),
+              new Instant(timestamp),
+              domain));
     }
 
     private TimerKey(K key, StateNamespace stateNamespace, String timerId) {
diff --git a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/KeyedTimerDataTest.java b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/KeyedTimerDataTest.java
index e521774..c16b4ce 100644
--- a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/KeyedTimerDataTest.java
+++ b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/KeyedTimerDataTest.java
@@ -36,7 +36,7 @@ public class KeyedTimerDataTest {
   public void testCoder() throws Exception {
     final TimerInternals.TimerData td =
         TimerInternals.TimerData.of(
-            "timer", StateNamespaces.global(), new Instant(), TimeDomain.EVENT_TIME);
+            "timer", StateNamespaces.global(), new Instant(), new Instant(), TimeDomain.EVENT_TIME);
 
     final String key = "timer-key";
     final ByteArrayOutputStream baos = new ByteArrayOutputStream();
diff --git a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactoryTest.java b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactoryTest.java
index 0f536c3..d0d6126 100644
--- a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactoryTest.java
+++ b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactoryTest.java
@@ -144,11 +144,13 @@ public class SamzaTimerInternalsFactoryTest {
     final StateNamespace nameSpace = StateNamespaces.global();
     final TimerInternals timerInternals = timerInternalsFactory.timerInternalsForKey("testKey");
     final TimerInternals.TimerData timer1 =
-        TimerInternals.TimerData.of("timer1", nameSpace, new Instant(10), TimeDomain.EVENT_TIME);
+        TimerInternals.TimerData.of(
+            "timer1", nameSpace, new Instant(10), new Instant(10), TimeDomain.EVENT_TIME);
     timerInternals.setTimer(timer1);
 
     final TimerInternals.TimerData timer2 =
-        TimerInternals.TimerData.of("timer2", nameSpace, new Instant(100), TimeDomain.EVENT_TIME);
+        TimerInternals.TimerData.of(
+            "timer2", nameSpace, new Instant(100), new Instant(100), TimeDomain.EVENT_TIME);
     timerInternals.setTimer(timer2);
 
     timerInternalsFactory.setInputWatermark(new Instant(5));
@@ -181,11 +183,13 @@ public class SamzaTimerInternalsFactoryTest {
     final StateNamespace nameSpace = StateNamespaces.global();
     final TimerInternals timerInternals = timerInternalsFactory.timerInternalsForKey(key);
     final TimerInternals.TimerData timer1 =
-        TimerInternals.TimerData.of("timer1", nameSpace, new Instant(10), TimeDomain.EVENT_TIME);
+        TimerInternals.TimerData.of(
+            "timer1", nameSpace, new Instant(10), new Instant(10), TimeDomain.EVENT_TIME);
     timerInternals.setTimer(timer1);
 
     final TimerInternals.TimerData timer2 =
-        TimerInternals.TimerData.of("timer2", nameSpace, new Instant(100), TimeDomain.EVENT_TIME);
+        TimerInternals.TimerData.of(
+            "timer2", nameSpace, new Instant(100), new Instant(100), TimeDomain.EVENT_TIME);
     timerInternals.setTimer(timer2);
 
     store.close();
@@ -226,12 +230,12 @@ public class SamzaTimerInternalsFactoryTest {
     final TimerInternals timerInternals = timerInternalsFactory.timerInternalsForKey("testKey");
     final TimerInternals.TimerData timer1 =
         TimerInternals.TimerData.of(
-            "timer1", nameSpace, new Instant(10), TimeDomain.PROCESSING_TIME);
+            "timer1", nameSpace, new Instant(10), new Instant(10), TimeDomain.PROCESSING_TIME);
     timerInternals.setTimer(timer1);
 
     final TimerInternals.TimerData timer2 =
         TimerInternals.TimerData.of(
-            "timer2", nameSpace, new Instant(100), TimeDomain.PROCESSING_TIME);
+            "timer2", nameSpace, new Instant(100), new Instant(100), TimeDomain.PROCESSING_TIME);
     timerInternals.setTimer(timer2);
 
     assertEquals(2, timerRegistry.timers.size());
@@ -267,16 +271,19 @@ public class SamzaTimerInternalsFactoryTest {
     final StateNamespace nameSpace = StateNamespaces.global();
     final TimerInternals timerInternals = timerInternalsFactory.timerInternalsForKey("testKey");
     final TimerInternals.TimerData timer1 =
-        TimerInternals.TimerData.of("timerId", nameSpace, new Instant(10), TimeDomain.EVENT_TIME);
+        TimerInternals.TimerData.of(
+            "timerId", nameSpace, new Instant(10), new Instant(10), TimeDomain.EVENT_TIME);
     timerInternals.setTimer(timer1);
 
     // this timer should override the first timer
     final TimerInternals.TimerData timer2 =
-        TimerInternals.TimerData.of("timerId", nameSpace, new Instant(100), TimeDomain.EVENT_TIME);
+        TimerInternals.TimerData.of(
+            "timerId", nameSpace, new Instant(100), new Instant(100), TimeDomain.EVENT_TIME);
     timerInternals.setTimer(timer2);
 
     final TimerInternals.TimerData timer3 =
-        TimerInternals.TimerData.of("timerId2", nameSpace, new Instant(200), TimeDomain.EVENT_TIME);
+        TimerInternals.TimerData.of(
+            "timerId2", nameSpace, new Instant(200), new Instant(200), TimeDomain.EVENT_TIME);
     timerInternals.setTimer(timer3);
 
     // this timer shouldn't override since it has a different id
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
index c4df77b..c673cbd 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
@@ -270,10 +270,10 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
   @Experimental(Kind.TIMERS)
   public abstract class OnTimerContext extends WindowedContext {
 
-    /** Returns the timestamp of the current timer. */
+    /** Returns the output timestamp of the current timer. */
     public abstract Instant timestamp();
 
-    /** Returns the output timestamp of the current timer. */
+    /** Returns the firing timestamp of the current timer. */
     public abstract Instant fireTimestamp();
 
     /** Returns the window in which the timer is firing. */
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index c71e963..bfae241 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -3541,6 +3541,44 @@ public class ParDoTest implements Serializable {
       UsesTimersInParDo.class,
       DataflowPortabilityApiUnsupported.class
     })
+    public void testOutputTimestampDefault() throws Exception {
+      final String timerId = "foo";
+      DoFn<KV<String, Long>, Long> fn1 =
+          new DoFn<KV<String, Long>, Long>() {
+
+            @TimerId(timerId)
+            private final TimerSpec timer = TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+            @ProcessElement
+            public void processElement(
+                @TimerId(timerId) Timer timer, @Timestamp Instant timestamp) {
+              timer
+                  .withOutputTimestamp(timestamp.plus(Duration.millis(5)))
+                  .set(timestamp.plus(Duration.millis(10)));
+            }
+
+            @OnTimer(timerId)
+            public void onTimer(@Timestamp Instant timestamp, OutputReceiver<Long> o) {
+              o.output(timestamp.getMillis());
+            }
+          };
+
+      PCollection<Long> output =
+          pipeline
+              .apply(Create.timestamped(TimestampedValue.of(KV.of("hello", 1L), new Instant(3))))
+              .setIsBoundedInternal(IsBounded.UNBOUNDED)
+              .apply("first", ParDo.of(fn1));
+
+      PAssert.that(output).containsInAnyOrder(new Instant(8).getMillis()); // result output
+      pipeline.run();
+    }
+
+    @Test
+    @Category({
+      ValidatesRunner.class,
+      UsesTimersInParDo.class,
+      DataflowPortabilityApiUnsupported.class
+    })
     public void testOutOfBoundsEventTimeTimerHold() throws Exception {
       final String timerId = "foo";