You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bo...@apache.org on 2020/04/03 16:33:43 UTC

[beam] branch master updated: Update Timer encoding

This is an automated email from the ASF dual-hosted git repository.

boyuanz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 151b2f8  Update Timer encoding
     new fff532e  Merge pull request #11199 from boyuanzz/timer_encoding
151b2f8 is described below

commit 151b2f80997810ed9311124d2831b70a51d1c07a
Author: Boyuan Zhang <bo...@google.com>
AuthorDate: Mon Mar 23 13:00:43 2020 -0700

    Update Timer encoding
---
 .../fn-execution/src/main/proto/beam_fn_api.proto  |   9 +-
 .../beam/model/fnexecution/v1/standard_coders.yaml |  42 ++++-
 .../pipeline/src/main/proto/beam_runner_api.proto  |  16 +-
 .../core/construction/CoderTranslators.java        |   4 +-
 .../core/construction/ParDoTranslation.java        |  13 +-
 .../beam/runners/core/construction/Timer.java      | 196 +++++++++++++++++----
 .../core/construction/CoderTranslationTest.java    |   2 +-
 .../runners/core/construction/CommonCoderTest.java |  33 +++-
 .../core/construction/ParDoTranslationTest.java    |   6 +-
 .../beam/runners/core/construction/TimerTest.java  | 142 ++++++++++++---
 .../streaming/ExecutableStageDoFnOperator.java     |  13 +-
 .../dataflow/worker/fn/control/TimerReceiver.java  |  16 +-
 .../worker/fn/control/TimerReceiverTest.java       |  10 +-
 .../fnexecution/control/TimerReceiverFactory.java  |   2 +-
 .../translation/PipelineTranslatorUtils.java       |  11 +-
 .../fnexecution/control/RemoteExecutionTest.java   |  22 ++-
 .../apache/beam/fn/harness/FnApiDoFnRunner.java    |  13 +-
 .../beam/fn/harness/FnApiDoFnRunnerTest.java       |  11 +-
 sdks/python/apache_beam/coders/coder_impl.py       |  49 +++++-
 sdks/python/apache_beam/coders/coders.py           |  20 ++-
 .../apache_beam/coders/coders_test_common.py       |  33 ++--
 .../apache_beam/coders/standard_coders_test.py     |  25 ++-
 .../apache_beam/runners/worker/bundle_processor.py |  24 ++-
 .../apache_beam/runners/worker/operations.py       |   2 +-
 sdks/python/apache_beam/transforms/userstate.py    |  16 +-
 25 files changed, 577 insertions(+), 153 deletions(-)

diff --git a/model/fn-execution/src/main/proto/beam_fn_api.proto b/model/fn-execution/src/main/proto/beam_fn_api.proto
index 6590b88..68f2ceb 100644
--- a/model/fn-execution/src/main/proto/beam_fn_api.proto
+++ b/model/fn-execution/src/main/proto/beam_fn_api.proto
@@ -609,12 +609,13 @@ message Elements {
     // represents the producer of these timers.
     string transform_id = 2;
 
-    // (Optional) The local timer name used to identify the associated timer specification.
-    string timer_id = 3;
+    // (Optional) The local timer family name used to identify the associated
+    // timer family specification
+    string timer_family_id = 3;
 
-    // (Optional) Represents a logical byte stream of a timer. Encoded according
+    // (Optional) Represents a logical byte stream of timers. Encoded according
     // to the coder in the timer spec.
-    bytes timer = 4;
+    bytes timers = 4;
 
     // (Optional) Set this bit to indicate the this is the last data block
     // for the given instruction and transform, ending the stream.
diff --git a/model/fn-execution/src/main/resources/org/apache/beam/model/fnexecution/v1/standard_coders.yaml b/model/fn-execution/src/main/resources/org/apache/beam/model/fnexecution/v1/standard_coders.yaml
index c9a4288..7afa00b 100644
--- a/model/fn-execution/src/main/resources/org/apache/beam/model/fnexecution/v1/standard_coders.yaml
+++ b/model/fn-execution/src/main/resources/org/apache/beam/model/fnexecution/v1/standard_coders.yaml
@@ -28,6 +28,23 @@
 #             one of a few standard JSON types such as numbers, strings, dicts that map naturally
 #             to the type encoded by the coder.
 #
+# Java code snippet to generate example bytes:
+#   Coder<Timer<String>> coder = Timer.Coder.of(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE);
+#   Instant now = new Instant(1000L);
+#   Timer<String> timer = Timer.of(
+#     "key",
+#     "tag",
+#     Collections.singletonList(GlobalWindow.INSTANCE),
+#     now,
+#     now,
+#     PaneInfo.NO_FIRING);
+#   byte[] bytes = CoderUtils.encodeToByteArray(coder, timer);
+#   String str = new String(bytes, java.nio.charset.StandardCharsets.ISO_8859_1);
+#   String example = "";
+#   for(int i = 0; i < str.length(); i++){
+#      example += CharUtils.unicodeEscaped(str.charAt(i));
+#   }
+#
 # These choices were made to strike a balance between portability, ease of use, and simple
 # legibility of this file itself.
 #
@@ -207,15 +224,24 @@ examples:
 
 coder:
   urn: "beam:coder:timer:v1"
-  components: [{urn: "beam:coder:bytes:v1"}]
+  components: [{urn: "beam:coder:string_utf8:v1"},
+               {urn: "beam:coder:global_window:v1"}]
 examples:
-  "\0\0\0\0\0\0\0\0\u0003abc": {timestamp: -9223372036854775808, payload: abc}
-  "\x7fÿÿÿÿÿÿ\x01\u0003abc": {timestamp: -255, payload: abc}
-  "\x7fÿÿÿÿÿÿÿ\u0003abc": {timestamp: -1, payload: abc}
-  "\x80\0\0\0\0\0\0\0\u0003abc": {timestamp: 0, payload: abc}
-  "\x80\0\0\0\0\0\0\x01\u0003abc": {timestamp: 1, payload: abc}
-  "\x80\0\0\0\0\0\x01\0\u0003abc": {timestamp: 256, payload: abc}
-  "ÿÿÿÿÿÿÿÿ\u0003abc": {timestamp: 9223372036854775807, payload: abc}
+  "\u0003\u006b\u0065\u0079\u0003\u0074\u0061\u0067\u0000\u0000\u0000\u0001\u0000\u0080\u0000\u0000\u0000\u0000\u0000\u0004\u00d2\u0080\u0000\u0000\u0000\u0000\u0000\u0016\u002e\u000f": {
+    userKey: key,
+    dynamicTimerTag: tag,
+    windows: ["global"],
+    clearBit: False,
+    fireTimestamp: 1234,
+    holdTimestamp: 5678,
+    pane: {is_first: True, is_last: True, timing: UNKNOWN, index: 0, on_time_index: 0},
+  }
+  "\u0003\u006b\u0065\u0079\u0003\u0074\u0061\u0067\u0000\u0000\u0000\u0001\u0001": {
+    userKey: key,
+    dynamicTimerTag: tag,
+    windows: ["global"],
+    clearBit: True,
+  }
 
 ---
 
diff --git a/model/pipeline/src/main/proto/beam_runner_api.proto b/model/pipeline/src/main/proto/beam_runner_api.proto
index 17229d2..7d63501 100644
--- a/model/pipeline/src/main/proto/beam_runner_api.proto
+++ b/model/pipeline/src/main/proto/beam_runner_api.proto
@@ -693,9 +693,14 @@ message StandardCoders {
     // Components: Coder for a single element.
     ITERABLE = 3 [(beam_urn) = "beam:coder:iterable:v1"];
 
-    // Encodes a timer containing a timestamp and a user specified payload.
-    // The encoding is represented as: timestamp payload
-    //   timestamp - a big endian 8 byte integer representing millis-since-epoch.
+    // Encodes a timer containing a user key, a dynamic timer tag, a clear bit,
+    // a fire timestamp, a hold timestamp, the windows and the paneinfo.
+    // The encoding is represented as:
+    //   user key - user defined key, uses the component coder.
+    //   dynamic timer tag - a string which identifies a timer.
+    //   windows - uses component coders.
+    //   clear bit - a boolean set for clearing the timer.
+    //   fire timestamp - a big endian 8 byte integer representing millis-since-epoch.
     //     The encoded representation is shifted so that the byte representation of
     //     negative values are lexicographically ordered before the byte representation
     //     of positive values. This is typically done by subtracting -9223372036854775808
@@ -708,8 +713,9 @@ message StandardCoders {
     //                        1: 80 00 00 00 00 00 00 01
     //                      256: 80 00 00 00 00 00 01 00
     //      9223372036854775807: FF FF FF FF FF FF FF FF
-    //   payload - user defined data, uses the component coder
-    // Components: Coder for the payload.
+    //   hold timestamp - similar to the fire timestamp.
+    //   paneinfo - similar to the paneinfo of the windowed_value.
+    // Components: Coder for the key and windows.
     TIMER = 4 [(beam_urn) = "beam:coder:timer:v1"];
 
     /*
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslators.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslators.java
index d7abf4e..ae7e9dc 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslators.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslators.java
@@ -86,12 +86,12 @@ class CoderTranslators {
     return new SimpleStructuredCoderTranslator<Timer.Coder<?>>() {
       @Override
       public List<? extends Coder<?>> getComponents(Timer.Coder<?> from) {
-        return from.getCoderArguments();
+        return from.getComponents();
       }
 
       @Override
       public Timer.Coder<?> fromComponents(List<Coder<?>> components) {
-        return Timer.Coder.of(components.get(0));
+        return Timer.Coder.of(components.get(0), (Coder<BoundedWindow>) components.get(1));
       }
     };
   }
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
index 969ae25..d74c4e4 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
@@ -50,7 +50,7 @@ import org.apache.beam.runners.core.construction.PTransformTranslation.Transform
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.state.StateSpec;
 import org.apache.beam.sdk.state.StateSpecs;
@@ -70,6 +70,7 @@ import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature.StateDeclaration;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature.TimerDeclaration;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
 import org.apache.beam.sdk.util.DoFnWithExecutionInformation;
 import org.apache.beam.sdk.util.SerializableUtils;
@@ -195,9 +196,7 @@ public class ParDoTranslation {
                 mainInput.isBounded(),
                 KvCoder.of(
                     ((KvCoder) mainInput.getCoder()).getKeyCoder(),
-                    // TODO: Add support for timer payloads to the SDK
-                    // We currently assume that all payloads are unspecified.
-                    Timer.Coder.of(VoidCoder.of())));
+                    Timer.Coder.of(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE)));
         timerPCollection.setName(
             String.format("%s.%s", appliedPTransform.getFullName(), localTimerName));
         String timerPCollectionId = components.registerPCollection(timerPCollection);
@@ -656,12 +655,14 @@ public class ParDoTranslation {
       throw new RuntimeException("Failure to register coder", exc);
     }
   }
-
+  // TODO(BEAM-9562): Plumb through actual keyCoder and windowCoder.
   public static RunnerApi.TimerFamilySpec translateTimerFamilySpec(
       TimerSpec timer, SdkComponents components) {
     return RunnerApi.TimerFamilySpec.newBuilder()
         .setTimeDomain(translateTimeDomain(timer.getTimeDomain()))
-        .setTimerFamilyCoderId(registerCoderOrThrow(components, Timer.Coder.of(VoidCoder.of())))
+        .setTimerFamilyCoderId(
+            registerCoderOrThrow(
+                components, Timer.Coder.of(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE)))
         .build();
   }
 
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Timer.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Timer.java
index 66afe7c..158d471 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Timer.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Timer.java
@@ -21,17 +21,29 @@ import com.google.auto.value.AutoValue;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.Objects;
 import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.BooleanCoder;
 import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CollectionCoder;
 import org.apache.beam.sdk.coders.InstantCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.StructuredCoder;
+import org.apache.beam.sdk.state.TimerSpec;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo.PaneInfoCoder;
 import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
 import org.joda.time.Instant;
 
 /**
- * A timer consists of a timestamp and a corresponding user supplied payload.
+ * A timer consists of a user key, a dynamic timer tag, a set of windows and either a bit that says
+ * that this timer should be cleared or data representing the firing timestamp, hold timestamp and
+ * pane information that should be used when producing output.
  *
  * <p>Note that this is an implementation helper specifically intended for use during execution by
  * runners and the Java SDK harness. The API for pipeline authors is {@link
@@ -40,92 +52,198 @@ import org.joda.time.Instant;
 @AutoValue
 public abstract class Timer<T> {
 
-  /** Returns a timer for the given timestamp with a {@code null} payload. */
-  public static Timer<Void> of(Instant time) {
-    return of(time, (Void) null);
+  /**
+   * Returns a non-cleared timer for the given {@code userKey}, {@code dynamicTimerTag}, {@code
+   * fireTimestamp}, {@code holdTimestamp}, {@code windows} and {@code pane}.
+   */
+  public static <T> Timer<T> of(
+      T userKey,
+      String dynamicTimerTag,
+      Collection<? extends BoundedWindow> windows,
+      Instant fireTimestamp,
+      Instant holdTimestamp,
+      PaneInfo pane) {
+    return new AutoValue_Timer(
+        userKey, dynamicTimerTag, windows, false, fireTimestamp, holdTimestamp, pane);
   }
 
-  /** Returns a timer for the given timestamp with a user specified payload. */
-  public static <T> Timer<T> of(Instant timestamp, @Nullable T payload) {
-    return new AutoValue_Timer(timestamp, payload);
+  /**
+   * Returns a cleared timer for the given {@code userKey}, {@code dynamicTimerTag} and {@code
+   * windows}.
+   */
+  public static <T> Timer<T> cleared(
+      T userKey, String dynamicTimerTag, Collection<? extends BoundedWindow> windows) {
+    return new AutoValue_Timer(userKey, dynamicTimerTag, windows, true, null, null, null);
   }
 
+  /** Returns the key that the timer is set on. */
+  public abstract T getUserKey();
+
   /**
-   * Returns the timestamp of when the timer is scheduled to fire.
+   * Returns the tag that the timer is set on. The tag is {@code ""} when the timer is for a {@link
+   * TimerSpec}.
+   */
+  public abstract String getDynamicTimerTag();
+
+  /** Returns the windows which are associated with the timer. */
+  public abstract Collection<? extends BoundedWindow> getWindows();
+
+  /** Returns whether the timer is going to be cleared. */
+  public abstract boolean getClearBit();
+
+  /**
+   * Returns the timestamp of when the timer is scheduled to fire. This field is nullable only when
+   * the timer is being cleared.
    *
-   * <p>The time is relative to the time domain defined in the {@link
+   * <p>The time is absolute to the time domain defined in the {@link
    * org.apache.beam.model.pipeline.v1.RunnerApi.TimerFamilySpec} that is associated with this
    * timer.
    */
-  public abstract Instant getTimestamp();
+  @Nullable
+  public abstract Instant getFireTimestamp();
+
+  /**
+   * Returns the watermark that the timer is supposed to be held. This field is nullable only when
+   * the timer is being cleared.
+   */
+  @Nullable
+  public abstract Instant getHoldTimestamp();
 
-  /** A user supplied payload. */
+  /**
+   * Returns the {@link PaneInfo} that is related to the timer. This field is nullable only when the
+   * timer is being cleared.
+   */
   @Nullable
-  public abstract T getPayload();
+  public abstract PaneInfo getPane();
+
+  @Override
+  public boolean equals(Object other) {
+    if (!(other instanceof Timer)) {
+      return false;
+    }
+    Timer<?> that = (Timer<?>) other;
+    return Objects.equals(this.getUserKey(), that.getUserKey())
+        && Objects.equals(this.getDynamicTimerTag(), that.getDynamicTimerTag())
+        && Objects.equals(this.getWindows(), that.getWindows())
+        && (this.getClearBit() == that.getClearBit())
+        && Objects.equals(this.getFireTimestamp(), that.getFireTimestamp())
+        && Objects.equals(this.getHoldTimestamp(), that.getHoldTimestamp())
+        && Objects.equals(this.getPane(), that.getPane());
+  }
+
+  @Override
+  public int hashCode() {
+    // Hash only the millis of the timestamp to be consistent with equals
+    if (getClearBit()) {
+      return Objects.hash(getUserKey(), getDynamicTimerTag(), getClearBit(), getWindows());
+    }
+    return Objects.hash(
+        getUserKey(),
+        getDynamicTimerTag(),
+        getClearBit(),
+        getFireTimestamp().getMillis(),
+        getHoldTimestamp().getMillis(),
+        getWindows(),
+        getPane());
+  }
 
   /**
    * A {@link org.apache.beam.sdk.coders.Coder} for timers.
    *
-   * <p>This coder is deterministic if the payload coder is deterministic.
+   * <p>This coder is deterministic if both the key coder and window coder are deterministic.
    *
-   * <p>This coder is inexpensive for size estimation of elements if the payload coder is
-   * inexpensive for size estimation.
+   * <p>This coder is inexpensive for size estimation of elements if the key coder and window coder
+   * are inexpensive for size estimation.
    */
   public static class Coder<T> extends StructuredCoder<Timer<T>> {
 
-    public static <T> Coder of(org.apache.beam.sdk.coders.Coder<T> payloadCoder) {
-      return new Coder(payloadCoder);
+    public static <T> Coder<T> of(
+        org.apache.beam.sdk.coders.Coder<T> keyCoder,
+        org.apache.beam.sdk.coders.Coder<? extends BoundedWindow> windowCoder) {
+      return new Coder<>(keyCoder, windowCoder);
     }
 
-    private final org.apache.beam.sdk.coders.Coder<T> payloadCoder;
-
-    private Coder(org.apache.beam.sdk.coders.Coder<T> payloadCoder) {
-      this.payloadCoder = payloadCoder;
+    private final org.apache.beam.sdk.coders.Coder<T> keyCoder;
+    private final org.apache.beam.sdk.coders.Coder<Collection<? extends BoundedWindow>>
+        windowsCoder;
+    private final org.apache.beam.sdk.coders.Coder<? extends BoundedWindow> windowCoder;
+
+    private Coder(
+        org.apache.beam.sdk.coders.Coder<T> keyCoder,
+        org.apache.beam.sdk.coders.Coder<? extends BoundedWindow> windowCoder) {
+      this.windowCoder = windowCoder;
+      this.keyCoder = keyCoder;
+      this.windowsCoder = (org.apache.beam.sdk.coders.Coder) CollectionCoder.of(windowCoder);
     }
 
     @Override
     public void encode(Timer<T> timer, OutputStream outStream) throws CoderException, IOException {
-      InstantCoder.of().encode(timer.getTimestamp(), outStream);
-      payloadCoder.encode(timer.getPayload(), outStream);
+      keyCoder.encode(timer.getUserKey(), outStream);
+      StringUtf8Coder.of().encode(timer.getDynamicTimerTag(), outStream);
+      windowsCoder.encode(timer.getWindows(), outStream);
+      BooleanCoder.of().encode(timer.getClearBit(), outStream);
+      if (!timer.getClearBit()) {
+        InstantCoder.of().encode(timer.getFireTimestamp(), outStream);
+        InstantCoder.of().encode(timer.getHoldTimestamp(), outStream);
+        PaneInfoCoder.INSTANCE.encode(timer.getPane(), outStream);
+      }
     }
 
     @Override
     public Timer<T> decode(InputStream inStream) throws CoderException, IOException {
-      Instant instant = InstantCoder.of().decode(inStream);
-      T value = payloadCoder.decode(inStream);
-      return Timer.of(instant, value);
+      T userKey = keyCoder.decode(inStream);
+      String dynamicTimerTag = StringUtf8Coder.of().decode(inStream);
+      Collection<? extends BoundedWindow> windows = windowsCoder.decode(inStream);
+      boolean clearBit = BooleanCoder.of().decode(inStream);
+      if (clearBit) {
+        return Timer.cleared(userKey, dynamicTimerTag, windows);
+      }
+      Instant fireTimestamp = InstantCoder.of().decode(inStream);
+      Instant holdTimestamp = InstantCoder.of().decode(inStream);
+      PaneInfo pane = PaneInfoCoder.INSTANCE.decode(inStream);
+      return Timer.of(userKey, dynamicTimerTag, windows, fireTimestamp, holdTimestamp, pane);
     }
 
     @Override
     public List<? extends org.apache.beam.sdk.coders.Coder<?>> getCoderArguments() {
-      return Collections.singletonList(payloadCoder);
+      return Collections.singletonList(keyCoder);
     }
 
     @Override
-    public void verifyDeterministic() throws NonDeterministicException {
-      verifyDeterministic(this, "Payload coder must be deterministic", payloadCoder);
+    public List<? extends org.apache.beam.sdk.coders.Coder<?>> getComponents() {
+      return Arrays.asList(keyCoder, windowCoder);
     }
 
-    @Override
-    public boolean consistentWithEquals() {
-      return payloadCoder.consistentWithEquals();
+    public org.apache.beam.sdk.coders.Coder<? extends BoundedWindow> getWindowCoder() {
+      return windowCoder;
     }
 
-    @Override
-    public Object structuralValue(Timer<T> value) {
-      return Timer.of(value.getTimestamp(), payloadCoder.structuralValue(value.getPayload()));
+    public org.apache.beam.sdk.coders.Coder<Collection<? extends BoundedWindow>> getWindowsCoder() {
+      return windowsCoder;
+    }
+
+    public org.apache.beam.sdk.coders.Coder<T> getValueCoder() {
+      return keyCoder;
     }
 
     @Override
-    public boolean isRegisterByteSizeObserverCheap(Timer<T> value) {
-      return payloadCoder.isRegisterByteSizeObserverCheap(value.getPayload());
+    public void verifyDeterministic() throws NonDeterministicException {
+      verifyDeterministic(this, "UserKey coder must be deterministic", keyCoder);
+      verifyDeterministic(this, "Window coder must be deterministic", windowCoder);
     }
 
     @Override
     public void registerByteSizeObserver(Timer<T> value, ElementByteSizeObserver observer)
         throws Exception {
-      InstantCoder.of().registerByteSizeObserver(value.getTimestamp(), observer);
-      payloadCoder.registerByteSizeObserver(value.getPayload(), observer);
+      keyCoder.registerByteSizeObserver(value.getUserKey(), observer);
+      StringUtf8Coder.of().registerByteSizeObserver(value.getDynamicTimerTag(), observer);
+      windowsCoder.registerByteSizeObserver(value.getWindows(), observer);
+      BooleanCoder.of().registerByteSizeObserver(value.getClearBit(), observer);
+      if (!value.getClearBit()) {
+        InstantCoder.of().registerByteSizeObserver(value.getFireTimestamp(), observer);
+        InstantCoder.of().registerByteSizeObserver(value.getHoldTimestamp(), observer);
+        PaneInfoCoder.INSTANCE.registerByteSizeObserver(value.getPane(), observer);
+      }
     }
   }
 }
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CoderTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CoderTranslationTest.java
index b2adc2b..e6b45c4 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CoderTranslationTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CoderTranslationTest.java
@@ -74,7 +74,7 @@ public class CoderTranslationTest {
           .add(StringUtf8Coder.of())
           .add(IntervalWindowCoder.of())
           .add(IterableCoder.of(ByteArrayCoder.of()))
-          .add(Timer.Coder.of(ByteArrayCoder.of()))
+          .add(Timer.Coder.of(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE))
           .add(LengthPrefixCoder.of(IterableCoder.of(VarLongCoder.of())))
           .add(GlobalWindow.Coder.INSTANCE)
           .add(
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CommonCoderTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CommonCoderTest.java
index 2ac9db5..fe08582 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CommonCoderTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CommonCoderTest.java
@@ -255,10 +255,34 @@ public class CommonCoderTest {
       return ((Number) value).longValue();
     } else if (s.equals(getUrn(StandardCoders.Enum.TIMER))) {
       Map<String, Object> kvMap = (Map<String, Object>) value;
-      Coder<?> payloadCoder = (Coder) coder.getCoderArguments().get(0);
+      Coder<?> keyCoder = ((Timer.Coder) coder).getValueCoder();
+      Coder<? extends BoundedWindow> windowCoder = ((Timer.Coder) coder).getWindowCoder();
+      List<BoundedWindow> windows = new ArrayList<>();
+      for (Object window : (List<Object>) kvMap.get("windows")) {
+        windows.add(
+            (BoundedWindow) convertValue(window, coderSpec.getComponents().get(1), windowCoder));
+      }
+      if ((boolean) kvMap.get("clearBit")) {
+        return Timer.cleared(
+            convertValue(kvMap.get("userKey"), coderSpec.getComponents().get(0), keyCoder),
+            (String) kvMap.get("dynamicTimerTag"),
+            windows);
+      }
+      Map<String, Object> paneInfoMap = (Map<String, Object>) kvMap.get("pane");
+      PaneInfo paneInfo =
+          PaneInfo.createPane(
+              (boolean) paneInfoMap.get("is_first"),
+              (boolean) paneInfoMap.get("is_last"),
+              PaneInfo.Timing.valueOf((String) paneInfoMap.get("timing")),
+              (int) paneInfoMap.get("index"),
+              (int) paneInfoMap.get("on_time_index"));
       return Timer.of(
-          new Instant(((Number) kvMap.get("timestamp")).longValue()),
-          convertValue(kvMap.get("payload"), coderSpec.getComponents().get(0), payloadCoder));
+          convertValue(kvMap.get("userKey"), coderSpec.getComponents().get(0), keyCoder),
+          (String) kvMap.get("dynamicTimerTag"),
+          windows,
+          new Instant(((Number) kvMap.get("fireTimestamp")).longValue()),
+          new Instant(((Number) kvMap.get("holdTimestamp")).longValue()),
+          paneInfo);
     } else if (s.equals(getUrn(StandardCoders.Enum.INTERVAL_WINDOW))) {
       Map<String, Object> kvMap = (Map<String, Object>) value;
       Instant end = new Instant(((Number) kvMap.get("end")).longValue());
@@ -434,8 +458,7 @@ public class CommonCoderTest {
       assertFalse(expectedValueIterator.hasNext());
 
     } else if (s.equals(getUrn(StandardCoders.Enum.TIMER))) {
-      assertEquals(((Timer) expectedValue).getTimestamp(), ((Timer) actualValue).getTimestamp());
-      assertThat(((Timer) expectedValue).getPayload(), equalTo(((Timer) actualValue).getPayload()));
+      assertEquals((Timer) expectedValue, (Timer) actualValue);
 
     } else if (s.equals(getUrn(StandardCoders.Enum.GLOBAL_WINDOW))) {
       assertEquals(expectedValue, actualValue);
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java
index 4e634d0..de961b5 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java
@@ -34,7 +34,6 @@ import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.coders.VarLongCoder;
-import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.GenerateSequence;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.state.BagState;
@@ -59,6 +58,7 @@ import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
@@ -207,8 +207,10 @@ public class ParDoTranslationTest {
             CoderTranslation.fromProto(
                 components.getCodersOrThrow(timerKvCoderComponents.valueCoderId()),
                 rehydratedComponents);
+        // TODO(BEAM-9562) Plump through actual Timer value coder.
         assertEquals(
-            org.apache.beam.runners.core.construction.Timer.Coder.of(VoidCoder.of()),
+            org.apache.beam.runners.core.construction.Timer.Coder.of(
+                StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE),
             timerValueCoder);
       }
     }
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TimerTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TimerTest.java
index ac3bd70..f3d12e8 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TimerTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TimerTest.java
@@ -17,14 +17,16 @@
  */
 package org.apache.beam.runners.core.construction;
 
-import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
-import org.apache.beam.sdk.coders.ByteArrayCoder;
+import java.util.Collections;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.testing.CoderProperties;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.joda.time.Instant;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -33,42 +35,128 @@ import org.junit.runners.JUnit4;
 /** Tests for {@link Timer}. */
 @RunWith(JUnit4.class)
 public class TimerTest {
-  private static final Instant INSTANT = Instant.now();
+  private static final Instant FIRE_TIME = new Instant(123L);
+  private static final Instant HOLD_TIME = new Instant(456L);
 
   @Test
-  public void testTimer() {
-    Timer<Void> timerA = Timer.of(INSTANT);
-    assertEquals(INSTANT, timerA.getTimestamp());
-    assertNull(timerA.getPayload());
+  public void testClearTimer() {
+    Timer<String> clearedTimer =
+        Timer.cleared("timer", "tag", Collections.singleton(GlobalWindow.INSTANCE));
+    assertTrue(clearedTimer.getClearBit());
+    assertEquals("timer", clearedTimer.getUserKey());
+    assertEquals("tag", clearedTimer.getDynamicTimerTag());
+    assertEquals(Collections.singleton(GlobalWindow.INSTANCE), clearedTimer.getWindows());
+  }
 
-    Timer<String> timerB = Timer.of(INSTANT, "ABC");
-    assertEquals(INSTANT, timerB.getTimestamp());
-    assertEquals("ABC", timerB.getPayload());
+  @Test
+  public void testTimer() {
+    Timer<String> timer =
+        Timer.of(
+            "key",
+            "tag",
+            Collections.singleton(GlobalWindow.INSTANCE),
+            FIRE_TIME,
+            HOLD_TIME,
+            PaneInfo.NO_FIRING);
+    assertEquals("key", timer.getUserKey());
+    assertEquals("tag", timer.getDynamicTimerTag());
+    assertEquals(FIRE_TIME, timer.getFireTimestamp());
+    assertEquals(HOLD_TIME, timer.getHoldTimestamp());
+    assertEquals(Collections.singleton(GlobalWindow.INSTANCE), timer.getWindows());
+    assertEquals(PaneInfo.NO_FIRING, timer.getPane());
+    assertFalse(timer.getClearBit());
   }
 
   @Test
-  public void testTimerCoderWithInconsistentWithEqualsPayloadCoder() throws Exception {
-    Coder<Timer<byte[]>> coder = Timer.Coder.of(ByteArrayCoder.of());
+  public void testTimerCoderWithInconsistentWithEqualsComponentCoders() throws Exception {
+    Coder<Timer<String>> coder = Timer.Coder.of(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE);
     CoderProperties.coderSerializable(coder);
     CoderProperties.structuralValueDecodeEncodeEqual(
-        coder, Timer.of(INSTANT, "ABC".getBytes(UTF_8)));
+        coder,
+        Timer.of(
+            "key",
+            "tag",
+            Collections.singleton(GlobalWindow.INSTANCE),
+            FIRE_TIME,
+            HOLD_TIME,
+            PaneInfo.NO_FIRING));
+    CoderProperties.structuralValueDecodeEncodeEqual(
+        coder, Timer.cleared("key", "tag", Collections.singleton(GlobalWindow.INSTANCE)));
     CoderProperties.structuralValueConsistentWithEquals(
-        coder, Timer.of(INSTANT, "ABC".getBytes(UTF_8)), Timer.of(INSTANT, "ABC".getBytes(UTF_8)));
+        coder,
+        Timer.of(
+            "key",
+            "tag",
+            Collections.singleton(GlobalWindow.INSTANCE),
+            FIRE_TIME,
+            HOLD_TIME,
+            PaneInfo.NO_FIRING),
+        Timer.of(
+            "key",
+            "tag",
+            Collections.singleton(GlobalWindow.INSTANCE),
+            FIRE_TIME,
+            HOLD_TIME,
+            PaneInfo.NO_FIRING));
+    CoderProperties.structuralValueConsistentWithEquals(
+        coder,
+        Timer.cleared("key", "tag", Collections.singleton(GlobalWindow.INSTANCE)),
+        Timer.cleared("key", "tag", Collections.singleton(GlobalWindow.INSTANCE)));
   }
 
   @Test
-  public void testTimerCoderWithConsistentWithEqualsPayloadCoder() throws Exception {
-    Coder<Timer<String>> coder = Timer.Coder.of(StringUtf8Coder.of());
-    CoderProperties.coderDecodeEncodeEqual(coder, Timer.of(INSTANT, "ABC"));
+  public void testTimerCoderWithConsistentWithEqualsComponentCoders() throws Exception {
+    Coder<Timer<String>> coder = Timer.Coder.of(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE);
+    CoderProperties.coderDecodeEncodeEqual(
+        coder,
+        Timer.of(
+            "key",
+            "tag",
+            Collections.singletonList(GlobalWindow.INSTANCE),
+            FIRE_TIME,
+            HOLD_TIME,
+            PaneInfo.NO_FIRING));
+    CoderProperties.coderDecodeEncodeEqual(
+        coder, Timer.cleared("key", "tag", Collections.singletonList(GlobalWindow.INSTANCE)));
     CoderProperties.coderConsistentWithEquals(
-        coder, Timer.of(INSTANT, "ABC"), Timer.of(INSTANT, "ABC"));
-    CoderProperties.coderDeterministic(coder, Timer.of(INSTANT, "ABC"), Timer.of(INSTANT, "ABC"));
-  }
-
-  @Test
-  public void testTimerCoderWireFormat() throws Exception {
-    Coder<Timer<String>> coder = Timer.Coder.of(StringUtf8Coder.of());
-    CoderProperties.coderEncodesBase64(
-        coder, Timer.of(new Instant(255L), "ABC"), "gAAAAAAAAP8DQUJD");
+        coder,
+        Timer.of(
+            "key",
+            "tag",
+            Collections.singletonList(GlobalWindow.INSTANCE),
+            FIRE_TIME,
+            HOLD_TIME,
+            PaneInfo.NO_FIRING),
+        Timer.of(
+            "key",
+            "tag",
+            Collections.singletonList(GlobalWindow.INSTANCE),
+            FIRE_TIME,
+            HOLD_TIME,
+            PaneInfo.NO_FIRING));
+    CoderProperties.coderConsistentWithEquals(
+        coder,
+        Timer.cleared("key", "tag", Collections.singletonList(GlobalWindow.INSTANCE)),
+        Timer.cleared("key", "tag", Collections.singletonList(GlobalWindow.INSTANCE)));
+    CoderProperties.coderDeterministic(
+        coder,
+        Timer.of(
+            "key",
+            "tag",
+            Collections.singletonList(GlobalWindow.INSTANCE),
+            FIRE_TIME,
+            HOLD_TIME,
+            PaneInfo.NO_FIRING),
+        Timer.of(
+            "key",
+            "tag",
+            Collections.singletonList(GlobalWindow.INSTANCE),
+            FIRE_TIME,
+            HOLD_TIME,
+            PaneInfo.NO_FIRING));
+    CoderProperties.coderDeterministic(
+        coder,
+        Timer.cleared("key", "tag", Collections.singletonList(GlobalWindow.INSTANCE)),
+        Timer.cleared("key", "tag", Collections.singletonList(GlobalWindow.INSTANCE)));
   }
 }
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
index 72e5c5d..7e5287a 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
@@ -83,6 +83,7 @@ import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
 import org.apache.beam.sdk.transforms.join.RawUnionValue;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
@@ -685,7 +686,15 @@ public class ExecutableStageDoFnOperator<InputT, OutputT> extends DoFnOperator<I
               timerId);
       WindowedValue<KV<Object, Timer>> timerValue =
           WindowedValue.of(
-              KV.of(timerKey, Timer.of(timestamp, new byte[0])),
+              KV.of(
+                  timerKey,
+                  Timer.of(
+                      "",
+                      "",
+                      Collections.singleton(GlobalWindow.INSTANCE),
+                      timestamp,
+                      timestamp,
+                      PaneInfo.NO_FIRING)),
               outputTimestamp,
               Collections.singleton(window),
               PaneInfo.NO_FIRING);
@@ -749,7 +758,7 @@ public class ExecutableStageDoFnOperator<InputT, OutputT> extends DoFnOperator<I
                 TimerInternals.TimerData.of(
                     timerSpec.inputCollectionId(),
                     namespace,
-                    timer.getTimestamp(),
+                    timer.getFireTimestamp(),
                     timerSpec.getTimerSpec().getTimeDomain());
             timerRegistration.accept(windowedValue, timerData);
           }
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/TimerReceiver.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/TimerReceiver.java
index 89b2b7d..c8b25ec 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/TimerReceiver.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/TimerReceiver.java
@@ -120,10 +120,14 @@ public class TimerReceiver {
 
         TimerInternals timerInternals = stepContext.namespacedToUser().timerInternals();
         timerInternals.setTimer(
-            namespace, timerId, "", timer.getTimestamp(), windowedValue.getTimestamp(), timeDomain);
+            namespace,
+            timerId,
+            "",
+            timer.getFireTimestamp(),
+            windowedValue.getTimestamp(),
+            timeDomain);
 
         timerIdToKey.put(timerId, windowedValue.getValue().getKey());
-        timerIdToPayload.put(timerId, timer.getPayload());
       }
       return true;
     }
@@ -143,7 +147,13 @@ public class TimerReceiver {
           WindowedValue.of(
               KV.of(
                   timerIdToKey.get(timerData.getTimerId()),
-                  Timer.of(timerData.getTimestamp(), timerIdToPayload.get(timerData.getTimerId()))),
+                  Timer.of(
+                      "",
+                      "",
+                      Collections.singleton(GlobalWindow.INSTANCE),
+                      timerData.getOutputTimestamp(),
+                      timerData.getOutputTimestamp(),
+                      PaneInfo.NO_FIRING)),
               timerData.getOutputTimestamp(),
               Collections.singleton(window),
               PaneInfo.NO_FIRING);
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/TimerReceiverTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/TimerReceiverTest.java
index 550115d..56a771b 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/TimerReceiverTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/TimerReceiverTest.java
@@ -21,6 +21,7 @@ import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Prec
 import static org.junit.Assert.assertTrue;
 
 import java.io.Serializable;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
@@ -67,7 +68,6 @@ import org.apache.beam.sdk.transforms.reflect.DoFnSignature.TimerDeclaration;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
@@ -507,13 +507,17 @@ public class TimerReceiverTest implements Serializable {
     }
   }
 
-  private KV<String, org.apache.beam.runners.core.construction.Timer<byte[]>> timerBytes(
+  private KV<String, org.apache.beam.runners.core.construction.Timer<String>> timerBytes(
       String key, long timestampOffset) throws CoderException {
     return KV.of(
         key,
         org.apache.beam.runners.core.construction.Timer.of(
+            "",
+            "",
+            Collections.singleton(GlobalWindow.INSTANCE),
             BoundedWindow.TIMESTAMP_MIN_VALUE.plus(timestampOffset),
-            CoderUtils.encodeToByteArray(VoidCoder.of(), null, Coder.Context.NESTED)));
+            BoundedWindow.TIMESTAMP_MIN_VALUE.plus(timestampOffset),
+            PaneInfo.NO_FIRING));
   }
 
   private static DataflowExecutionContext.DataflowStepContext buildDataflowStepContext() {
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/TimerReceiverFactory.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/TimerReceiverFactory.java
index a277b9e..1a1da66 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/TimerReceiverFactory.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/TimerReceiverFactory.java
@@ -82,7 +82,7 @@ public class TimerReceiverFactory implements OutputReceiverFactory {
         TimeDomain timeDomain = timerSpec.getTimerSpec().getTimeDomain();
         String timerId = timerSpec.inputCollectionId();
         TimerInternals.TimerData timerData =
-            TimerInternals.TimerData.of(timerId, namespace, timer.getTimestamp(), timeDomain);
+            TimerInternals.TimerData.of(timerId, namespace, timer.getFireTimestamp(), timeDomain);
         timerDataConsumer.accept(windowedValue, timerData);
       }
     };
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/translation/PipelineTranslatorUtils.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/translation/PipelineTranslatorUtils.java
index 56b8179..d67776c 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/translation/PipelineTranslatorUtils.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/translation/PipelineTranslatorUtils.java
@@ -36,6 +36,7 @@ import org.apache.beam.runners.core.construction.graph.PipelineNode;
 import org.apache.beam.runners.fnexecution.wire.WireCoders;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
@@ -142,7 +143,15 @@ public final class PipelineTranslatorUtils {
     Instant outputTimestamp = timer.getOutputTimestamp();
     WindowedValue<KV<Object, Timer>> timerValue =
         WindowedValue.of(
-            KV.of(currentTimerKey, Timer.of(timestamp, new byte[0])),
+            KV.of(
+                currentTimerKey,
+                Timer.of(
+                    "",
+                    "",
+                    Collections.singleton(GlobalWindow.INSTANCE),
+                    timestamp,
+                    timestamp,
+                    PaneInfo.NO_FIRING)),
             outputTimestamp,
             Collections.singleton(window),
             PaneInfo.NO_FIRING);
diff --git a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
index 365f679..b2a8c1f 100644
--- a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
+++ b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
@@ -74,12 +74,10 @@ import org.apache.beam.runners.fnexecution.state.StateRequestHandlers.MultimapSi
 import org.apache.beam.runners.fnexecution.state.StateRequestHandlers.SideInputHandlerFactory;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.BigEndianLongCoder;
-import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.fn.data.FnDataReceiver;
 import org.apache.beam.sdk.fn.stream.OutboundObserverFactory;
 import org.apache.beam.sdk.fn.test.InProcessManagedChannelFactory;
@@ -105,6 +103,7 @@ import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.WithKeys;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
@@ -1251,25 +1250,32 @@ public class RemoteExecutionTest implements Serializable {
     return KV.of(key, CoderUtils.encodeToByteArray(BigEndianLongCoder.of(), value));
   }
 
-  private KV<String, org.apache.beam.runners.core.construction.Timer<byte[]>> timerBytes(
+  private KV<String, org.apache.beam.runners.core.construction.Timer<String>> timerBytes(
       String key, long timestampOffset) throws CoderException {
     return KV.of(
         key,
         org.apache.beam.runners.core.construction.Timer.of(
+            "",
+            "",
+            Collections.singleton(GlobalWindow.INSTANCE),
             BoundedWindow.TIMESTAMP_MIN_VALUE.plus(timestampOffset),
-            CoderUtils.encodeToByteArray(VoidCoder.of(), null, Coder.Context.NESTED)));
+            BoundedWindow.TIMESTAMP_MIN_VALUE.plus(timestampOffset),
+            PaneInfo.NO_FIRING));
   }
 
-  private Object timerStructuralValue(Object timer) {
+  private Object timerStructuralValue(WindowedValue<?> timer) {
     return WindowedValue.FullWindowedValueCoder.of(
             KvCoder.of(
                 StringUtf8Coder.of(),
-                org.apache.beam.runners.core.construction.Timer.Coder.of(ByteArrayCoder.of())),
+                org.apache.beam.runners.core.construction.Timer.Coder.of(
+                    StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE)),
             GlobalWindow.Coder.INSTANCE)
-        .structuralValue(timer);
+        .structuralValue(
+            (WindowedValue<KV<String, org.apache.beam.runners.core.construction.Timer<String>>>)
+                timer);
   }
 
-  private Collection<Object> timerStructuralValues(Collection<?> timers) {
+  private Collection<Object> timerStructuralValues(Collection<WindowedValue<?>> timers) {
     return Collections2.transform(timers, this::timerStructuralValue);
   }
 }
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
index 49aa850..8c20237 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
@@ -25,6 +25,7 @@ import com.google.auto.service.AutoService;
 import com.google.auto.value.AutoValue;
 import java.io.IOException;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.function.BiFunction;
@@ -974,7 +975,15 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
       outputTo(
           consumers,
           WindowedValue.of(
-              KV.of(key, Timer.of(scheduledTime)),
+              KV.of(
+                  key,
+                  Timer.of(
+                      "",
+                      "",
+                      Collections.singleton(GlobalWindow.INSTANCE),
+                      scheduledTime,
+                      scheduledTime,
+                      PaneInfo.NO_FIRING)),
               currentOutputTimestamp,
               currentElementOrTimer.getWindows(),
               currentElementOrTimer.getPane()));
@@ -1362,7 +1371,7 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
 
       @Override
       public Instant fireTimestamp() {
-        return currentTimer.getValue().getValue().getTimestamp();
+        return currentTimer.getValue().getValue().getFireTimestamp();
       }
 
       @Override
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
index 7ed8b82..bf0716f 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
@@ -33,6 +33,7 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.ServiceLoader;
 import org.apache.beam.fn.harness.control.BundleSplitListener;
@@ -988,7 +989,15 @@ public class FnApiDoFnRunnerTest implements Serializable {
       WindowedValue<KV<T, org.apache.beam.runners.core.construction.Timer>> timerInGlobalWindow(
           T value, Instant valueTimestamp, Instant scheduledTimestamp) {
     return timestampedValueInGlobalWindow(
-        KV.of(value, org.apache.beam.runners.core.construction.Timer.of(scheduledTimestamp)),
+        KV.of(
+            value,
+            org.apache.beam.runners.core.construction.Timer.of(
+                "",
+                "",
+                Collections.singleton(GlobalWindow.INSTANCE),
+                scheduledTimestamp,
+                scheduledTimestamp,
+                PaneInfo.NO_FIRING)),
         valueTimestamp);
   }
 
diff --git a/sdks/python/apache_beam/coders/coder_impl.py b/sdks/python/apache_beam/coders/coder_impl.py
index a2090a9..8fdc4a4 100644
--- a/sdks/python/apache_beam/coders/coder_impl.py
+++ b/sdks/python/apache_beam/coders/coder_impl.py
@@ -629,22 +629,55 @@ class TimestampCoderImpl(StreamCoderImpl):
 
 class TimerCoderImpl(StreamCoderImpl):
   """For internal use only; no backwards-compatibility guarantees."""
-  def __init__(self, payload_coder_impl):
+  def __init__(self, key_coder_impl, window_coder_impl):
     self._timestamp_coder_impl = TimestampCoderImpl()
-    self._payload_coder_impl = payload_coder_impl
+    self._boolean_coder_impl = BooleanCoderImpl()
+    self._pane_info_coder_impl = PaneInfoCoderImpl()
+    self._key_coder_impl = key_coder_impl
+    self._windows_coder_impl = TupleSequenceCoderImpl(window_coder_impl)
+    from apache_beam.coders.coders import StrUtf8Coder
+    self._tag_coder_impl = StrUtf8Coder().get_impl()
 
   def encode_to_stream(self, value, out, nested):
     # type: (dict, create_OutputStream, bool) -> None
-    self._timestamp_coder_impl.encode_to_stream(value['timestamp'], out, True)
-    self._payload_coder_impl.encode_to_stream(value.get('payload'), out, True)
+    self._key_coder_impl.encode_to_stream(value.user_key, out, True)
+    self._tag_coder_impl.encode_to_stream(value.dynamic_timer_tag, out, True)
+    self._windows_coder_impl.encode_to_stream(value.windows, out, True)
+    self._boolean_coder_impl.encode_to_stream(value.clear_bit, out, True)
+    if not value.clear_bit:
+      self._timestamp_coder_impl.encode_to_stream(
+          value.fire_timestamp, out, True)
+      self._timestamp_coder_impl.encode_to_stream(
+          value.hold_timestamp, out, True)
+      self._pane_info_coder_impl.encode_to_stream(value.paneinfo, out, True)
 
   def decode_from_stream(self, in_stream, nested):
     # type: (create_InputStream, bool) -> dict
-    # TODO(robertwb): Consider using a concrete class rather than a dict here.
-    return dict(
-        timestamp=self._timestamp_coder_impl.decode_from_stream(
+    from apache_beam.transforms import userstate
+    user_key = self._key_coder_impl.decode_from_stream(in_stream, True)
+    dynamic_timer_tag = self._tag_coder_impl.decode_from_stream(in_stream, True)
+    windows = self._windows_coder_impl.decode_from_stream(in_stream, True)
+    clear_bit = self._boolean_coder_impl.decode_from_stream(in_stream, True)
+    if clear_bit:
+      return userstate.Timer(
+          user_key=user_key,
+          dynamic_timer_tag=dynamic_timer_tag,
+          windows=windows,
+          clear_bit=clear_bit,
+          fire_timestamp=None,
+          hold_timestamp=None,
+          paneinfo=None)
+
+    return userstate.Timer(
+        user_key=user_key,
+        dynamic_timer_tag=dynamic_timer_tag,
+        windows=windows,
+        clear_bit=clear_bit,
+        fire_timestamp=self._timestamp_coder_impl.decode_from_stream(
             in_stream, True),
-        payload=self._payload_coder_impl.decode_from_stream(in_stream, True))
+        hold_timestamp=self._timestamp_coder_impl.decode_from_stream(
+            in_stream, True),
+        paneinfo=self._pane_info_coder_impl.decode_from_stream(in_stream, True))
 
 
 small_ints = [chr(_).encode('latin-1') for _ in range(128)]
diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py
index b14ccf0..e7ce77b 100644
--- a/sdks/python/apache_beam/coders/coders.py
+++ b/sdks/python/apache_beam/coders/coders.py
@@ -588,28 +588,32 @@ class _TimerCoder(FastCoder):
   """A coder used for timer values.
 
   For internal use."""
-  def __init__(self, payload_coder):
+  def __init__(self, key_coder, window_coder):
     # type: (Coder) -> None
-    self._payload_coder = payload_coder
+    self._key_coder = key_coder
+    self._window_coder = window_coder
 
   def _get_component_coders(self):
     # type: () -> List[Coder]
-    return [self._payload_coder]
+    return [self._key_coder, self._window_coder]
 
   def _create_impl(self):
-    return coder_impl.TimerCoderImpl(self._payload_coder.get_impl())
+    return coder_impl.TimerCoderImpl(
+        self._key_coder.get_impl(), self._window_coder.get_impl())
 
   def is_deterministic(self):
     # () -> bool
-    return self._payload_coder.is_deterministic()
+    return (
+        self._key_coder.is_deterministic() and
+        self._window_coder.is_deterministic())
 
   def __eq__(self, other):
     return (
-        type(self) == type(other) and
-        self._payload_coder == other._payload_coder)
+        type(self) == type(other) and self._key_coder == other._key_coder and
+        self._window_coder == other._window_coder)
 
   def __hash__(self):
-    return hash(type(self)) + hash(self._payload_coder)
+    return hash(type(self)) + hash(self._key_coder) + hash(self._window_coder)
 
 
 Coder.register_structured_urn(common_urns.coders.TIMER.urn, _TimerCoder)
diff --git a/sdks/python/apache_beam/coders/coders_test_common.py b/sdks/python/apache_beam/coders/coders_test_common.py
index fd21261..63dde4e 100644
--- a/sdks/python/apache_beam/coders/coders_test_common.py
+++ b/sdks/python/apache_beam/coders/coders_test_common.py
@@ -32,6 +32,7 @@ from apache_beam.coders import proto2_coder_test_messages_pb2 as test_message
 from apache_beam.coders import coders
 from apache_beam.internal import pickler
 from apache_beam.runners import pipeline_context
+from apache_beam.transforms import userstate
 from apache_beam.transforms import window
 from apache_beam.transforms.window import GlobalWindow
 from apache_beam.utils import timestamp
@@ -80,8 +81,9 @@ class CodersTest(unittest.TestCase):
         coders.RunnerAPICoderHolder,
         coders.ToBytesCoder
     ])
-    assert not standard - cls.seen, standard - cls.seen
-    assert not standard - cls.seen_nested, standard - cls.seen_nested
+    cls.seen_nested -= set([coders.ProtoCoder, CustomCoder])
+    assert not standard - cls.seen
+    assert not cls.seen_nested - standard
 
   @classmethod
   def _observe(cls, coder):
@@ -229,14 +231,25 @@ class CodersTest(unittest.TestCase):
 
   def test_timer_coder(self):
     self.check_coder(
-        coders._TimerCoder(coders.BytesCoder()),
-        *[{
-            'timestamp': timestamp.Timestamp(micros=x), 'payload': b'xyz'
-        } for x in (-3000, 0, 3000)])
-    self.check_coder(
-        coders.TupleCoder((coders._TimerCoder(coders.VarIntCoder()), )), ({
-            'timestamp': timestamp.Timestamp.of(37000), 'payload': 389
-        }, ))
+        coders._TimerCoder(coders.StrUtf8Coder(), coders.GlobalWindowCoder()),
+        *[
+            userstate.Timer(
+                user_key="key",
+                dynamic_timer_tag="tag",
+                windows=(GlobalWindow(), ),
+                clear_bit=True,
+                fire_timestamp=None,
+                hold_timestamp=None,
+                paneinfo=None),
+            userstate.Timer(
+                user_key="key",
+                dynamic_timer_tag="tag",
+                windows=(GlobalWindow(), ),
+                clear_bit=False,
+                fire_timestamp=timestamp.Timestamp.of(123),
+                hold_timestamp=timestamp.Timestamp.of(456),
+                paneinfo=windowed_value.PANE_INFO_UNKNOWN)
+        ])
 
   def test_tuple_coder(self):
     kv_coder = coders.TupleCoder((coders.VarIntCoder(), coders.BytesCoder()))
diff --git a/sdks/python/apache_beam/coders/standard_coders_test.py b/sdks/python/apache_beam/coders/standard_coders_test.py
index 9e118a0..0cc5271 100644
--- a/sdks/python/apache_beam/coders/standard_coders_test.py
+++ b/sdks/python/apache_beam/coders/standard_coders_test.py
@@ -38,6 +38,7 @@ from apache_beam.coders import coder_impl
 from apache_beam.portability.api import beam_runner_api_pb2
 from apache_beam.portability.api import schema_pb2
 from apache_beam.runners import pipeline_context
+from apache_beam.transforms import userstate
 from apache_beam.transforms import window
 from apache_beam.transforms.window import IntervalWindow
 from apache_beam.typehints import schemas
@@ -145,9 +146,27 @@ class StandardCodersTest(unittest.TestCase):
               x['pane']['index'],
               x['pane']['on_time_index'])),
       'beam:coder:timer:v1': lambda x,
-      payload_parser: dict(
-          payload=payload_parser(x['payload']),
-          timestamp=Timestamp(micros=x['timestamp'] * 1000)),
+      value_parser,
+      window_parser: userstate.Timer(
+          user_key=value_parser(x['userKey']),
+          dynamic_timer_tag=x['dynamicTimerTag'],
+          clear_bit=x['clearBit'],
+          windows=tuple([window_parser(w) for w in x['windows']]),
+          fire_timestamp=None,
+          hold_timestamp=None,
+          paneinfo=None) if x['clearBit'] else userstate.Timer(
+              user_key=value_parser(x['userKey']),
+              dynamic_timer_tag=x['dynamicTimerTag'],
+              clear_bit=x['clearBit'],
+              fire_timestamp=Timestamp(micros=x['fireTimestamp'] * 1000),
+              hold_timestamp=Timestamp(micros=x['holdTimestamp'] * 1000),
+              windows=tuple([window_parser(w) for w in x['windows']]),
+              paneinfo=PaneInfo(
+                  x['pane']['is_first'],
+                  x['pane']['is_last'],
+                  PaneInfoTiming.from_string(x['pane']['timing']),
+                  x['pane']['index'],
+                  x['pane']['on_time_index'])),
       'beam:coder:double:v1': parse_float,
   }
 
diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py b/sdks/python/apache_beam/runners/worker/bundle_processor.py
index 9180695..b0f7024 100644
--- a/sdks/python/apache_beam/runners/worker/bundle_processor.py
+++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py
@@ -580,16 +580,36 @@ class OutputTimer(object):
 
   def set(self, ts):
     ts = timestamp.Timestamp.of(ts)
+    # TODO(BEAM-9562): Plumb through actual timer fields.
     self._receiver.receive(
-        windowed_value.WindowedValue((self._key, dict(timestamp=ts)),
+        windowed_value.WindowedValue((
+            self._key,
+            userstate.Timer(
+                user_key='',
+                dynamic_timer_tag='',
+                windows=(self._window, ),
+                clear_bit=False,
+                fire_timestamp=ts,
+                hold_timestamp=ts,
+                paneinfo=windowed_value.PANE_INFO_UNKNOWN)),
                                      ts, (self._window, )))
 
   def clear(self):
     # type: () -> None
     dummy_millis = int(common_urns.constants.MAX_TIMESTAMP_MILLIS.constant) + 1
     clear_ts = timestamp.Timestamp(micros=dummy_millis * 1000)
+    # TODO(BEAM-9562): Plumb through actual paneinfo.
     self._receiver.receive(
-        windowed_value.WindowedValue((self._key, dict(timestamp=clear_ts)),
+        windowed_value.WindowedValue((
+            self._key,
+            userstate.Timer(
+                user_key='',
+                dynamic_timer_tag='',
+                windows=(self._window, ),
+                clear_bit=False,
+                fire_timestamp=timestamp.Timestamp.of(clear_ts),
+                hold_timestamp=timestamp.Timestamp.of(0),
+                paneinfo=windowed_value.PANE_INFO_UNKNOWN)),
                                      0, (self._window, )))
 
 
diff --git a/sdks/python/apache_beam/runners/worker/operations.py b/sdks/python/apache_beam/runners/worker/operations.py
index 6d71dc5..6250f38 100644
--- a/sdks/python/apache_beam/runners/worker/operations.py
+++ b/sdks/python/apache_beam/runners/worker/operations.py
@@ -713,7 +713,7 @@ class DoOperation(Operation):
     key, timer_data = windowed_timer.value
     timer_spec = self.timer_specs[tag]
     self.dofn_runner.process_user_timer(
-        timer_spec, key, windowed_timer.windows[0], timer_data['timestamp'])
+        timer_spec, key, windowed_timer.windows[0], timer_data.fire_timestamp)
 
   def finish(self):
     # type: () -> None
diff --git a/sdks/python/apache_beam/transforms/userstate.py b/sdks/python/apache_beam/transforms/userstate.py
index f3ffa88..e91374f 100644
--- a/sdks/python/apache_beam/transforms/userstate.py
+++ b/sdks/python/apache_beam/transforms/userstate.py
@@ -26,6 +26,7 @@ from __future__ import absolute_import
 
 import types
 from builtins import object
+from collections import namedtuple
 from typing import TYPE_CHECKING
 from typing import Any
 from typing import Callable
@@ -129,6 +130,18 @@ class CombiningValueStateSpec(StateSpec):
             accumulator_coder_id=context.coders.get_id(self.coder)))
 
 
+Timer = namedtuple(
+    'Timer',
+    'user_key '
+    'dynamic_timer_tag '
+    'windows '
+    'clear_bit '
+    'fire_timestamp '
+    'hold_timestamp '
+    'paneinfo')
+
+
+# TODO(BEAM-9562): Plumb through actual key_coder and window_coder.
 class TimerSpec(object):
   """Specification for a user stateful DoFn timer."""
   prefix = "ts-"
@@ -148,7 +161,8 @@ class TimerSpec(object):
     return beam_runner_api_pb2.TimerFamilySpec(
         time_domain=TimeDomain.to_runner_api(self.time_domain),
         timer_family_coder_id=context.coders.get_id(
-            coders._TimerCoder(coders.SingletonCoder(None))))
+            coders._TimerCoder(
+                coders.StrUtf8Coder(), coders.GlobalWindowCoder())))
 
 
 # TODO(BEAM-9602): Provide support for dynamic timer.