You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bo...@apache.org on 2020/03/27 04:21:22 UTC

[beam] branch master updated: Remove TimeSpec from proto

This is an automated email from the ASF dual-hosted git repository.

boyuanz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 39703c6  Remove TimeSpec from proto
     new 33d78c5  Merge pull request #11216 from boyuanzz/timer_spec
39703c6 is described below

commit 39703c640b785349719f05278c8ee8b1944076d6
Author: Boyuan Zhang <bo...@google.com>
AuthorDate: Tue Mar 24 14:53:29 2020 -0700

    Remove TimeSpec from proto
---
 .../pipeline/src/main/proto/beam_runner_api.proto  | 16 ++-------
 .../core/construction/ParDoTranslation.java        | 32 +++++++----------
 .../runners/core/construction/SplittableParDo.java |  3 +-
 .../beam/runners/core/construction/Timer.java      |  3 +-
 .../graph/GreedyPCollectionFusers.java             |  5 +--
 .../core/construction/graph/PipelineValidator.java |  2 +-
 .../core/construction/graph/QueryablePipeline.java |  4 ++-
 .../construction/graph/ExecutableStageTest.java    |  4 +--
 .../graph/GreedyPipelineFuserTest.java             |  6 ++--
 .../construction/graph/GreedyStageFuserTest.java   |  4 +--
 .../graph/ImmutableExecutableStageTest.java        |  3 +-
 .../beam/runners/core/SimpleDoFnRunnerTest.java    |  7 ++--
 .../dataflow/PrimitiveParDoSingleFactory.java      | 10 +++---
 .../graph/CreateExecutableStageNodeFunction.java   |  4 +--
 .../worker/fn/control/TimerReceiverTest.java       | 22 +++++++-----
 .../control/ProcessBundleDescriptors.java          |  8 +++--
 .../beam/sdk/transforms/reflect/DoFnSignature.java |  6 ++++
 .../sdk/transforms/reflect/DoFnSignatures.java     | 19 ++++++----
 .../sdk/transforms/reflect/DoFnInvokersTest.java   | 10 +++---
 .../sdk/transforms/reflect/DoFnSignaturesTest.java | 42 +++++++++++++---------
 .../transforms/reflect/OnTimerInvokersTest.java    | 11 +++---
 .../apache/beam/fn/harness/FnApiDoFnRunner.java    |  4 +--
 .../beam/fn/harness/FnApiDoFnRunnerTest.java       | 16 +++++----
 .../runners/portability/fn_api_runner/fn_runner.py |  3 +-
 .../portability/fn_api_runner/translations.py      | 10 +++---
 .../apache_beam/runners/worker/bundle_processor.py | 22 ++++++------
 sdks/python/apache_beam/transforms/core.py         |  2 +-
 sdks/python/apache_beam/transforms/userstate.py    | 30 +++++++++++++---
 .../apache_beam/transforms/userstate_test.py       |  6 ++--
 29 files changed, 181 insertions(+), 133 deletions(-)

diff --git a/model/pipeline/src/main/proto/beam_runner_api.proto b/model/pipeline/src/main/proto/beam_runner_api.proto
index e5d5eb6..17229d2 100644
--- a/model/pipeline/src/main/proto/beam_runner_api.proto
+++ b/model/pipeline/src/main/proto/beam_runner_api.proto
@@ -442,14 +442,9 @@ message ParDoPayload {
   // be placed in the pipeline requirements.
   map<string, StateSpec> state_specs = 4;
 
-  // (Optional) A mapping of local timer names to timer specifications.
-  // If this is set, the stateful processing requirement should also
-  // be placed in the pipeline requirements.
-  map<string, TimerSpec> timer_specs = 5;
-
-  // (Optional) A mapping of local timer family names to timer specifications.
-  // If this is set, the stateful processing requirement should also
-  // be placed in the pipeline requirements.
+  // (Optional) A mapping of local timer family names to timer family
+  // specifications. If this is set, the stateful processing requirement should
+  // also be placed in the pipeline requirements.
   map<string, TimerFamilySpec> timer_family_specs = 9;
 
   // (Optional) Only set when this ParDo contains a splittable DoFn.
@@ -507,11 +502,6 @@ message SetStateSpec {
   string element_coder_id = 1;
 }
 
-message TimerSpec {
-  TimeDomain.Enum time_domain = 1;
-  string timer_coder_id = 2;
-}
-
 message TimerFamilySpec {
   TimeDomain.Enum time_domain = 1;
   string timer_family_coder_id = 2;
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
index 256a31d..969ae25 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
@@ -185,7 +185,7 @@ public class ParDoTranslation {
 
       // https://s.apache.org/beam-portability-timers
       // Add a PCollection and coder for each timer. Also treat them as inputs and outputs.
-      for (String localTimerName : payload.getTimerSpecsMap().keySet()) {
+      for (String localTimerName : payload.getTimerFamilySpecsMap().keySet()) {
         PCollection<?> timerPCollection =
             PCollection.createPrimitiveOutputInternal(
                 // Create a dummy pipeline since we don't want to modify the current
@@ -294,12 +294,14 @@ public class ParDoTranslation {
           }
 
           @Override
-          public Map<String, RunnerApi.TimerSpec> translateTimerSpecs(SdkComponents newComponents) {
-            Map<String, RunnerApi.TimerSpec> timerSpecs = new HashMap<>();
+          public Map<String, RunnerApi.TimerFamilySpec> translateTimerSpecs(
+              SdkComponents newComponents) {
+            Map<String, RunnerApi.TimerFamilySpec> timerSpecs = new HashMap<>();
             for (Map.Entry<String, TimerDeclaration> timer :
                 signature.timerDeclarations().entrySet()) {
-              RunnerApi.TimerSpec spec =
-                  translateTimerSpec(getTimerSpecOrThrow(timer.getValue(), doFn), newComponents);
+              RunnerApi.TimerFamilySpec spec =
+                  translateTimerFamilySpec(
+                      getTimerSpecOrThrow(timer.getValue(), doFn), newComponents);
               timerSpecs.put(timer.getKey(), spec);
             }
 
@@ -538,7 +540,8 @@ public class ParDoTranslation {
     return Iterables.getOnlyElement(
         Sets.difference(
             ptransform.getInputsMap().keySet(),
-            Sets.union(payload.getSideInputsMap().keySet(), payload.getTimerSpecsMap().keySet())));
+            Sets.union(
+                payload.getSideInputsMap().keySet(), payload.getTimerFamilySpecsMap().keySet())));
   }
 
   /** Translate state specs. */
@@ -654,15 +657,6 @@ public class ParDoTranslation {
     }
   }
 
-  public static RunnerApi.TimerSpec translateTimerSpec(TimerSpec timer, SdkComponents components) {
-    return RunnerApi.TimerSpec.newBuilder()
-        .setTimeDomain(translateTimeDomain(timer.getTimeDomain()))
-        // TODO: Add support for timer payloads to the SDK
-        // We currently assume that all payloads are unspecified.
-        .setTimerCoderId(registerCoderOrThrow(components, Timer.Coder.of(VoidCoder.of())))
-        .build();
-  }
-
   public static RunnerApi.TimerFamilySpec translateTimerFamilySpec(
       TimerSpec timer, SdkComponents components) {
     return RunnerApi.TimerFamilySpec.newBuilder()
@@ -756,9 +750,7 @@ public class ParDoTranslation {
 
   public static boolean usesStateOrTimers(AppliedPTransform<?, ?, ?> transform) throws IOException {
     ParDoPayload payload = getParDoPayload(transform);
-    return payload.getStateSpecsCount() > 0
-        || payload.getTimerSpecsCount() > 0
-        || payload.getTimerFamilySpecsCount() > 0;
+    return payload.getStateSpecsCount() > 0 || payload.getTimerFamilySpecsCount() > 0;
   }
 
   public static boolean isSplittable(AppliedPTransform<?, ?, ?> transform) throws IOException {
@@ -783,7 +775,7 @@ public class ParDoTranslation {
     Map<String, RunnerApi.StateSpec> translateStateSpecs(SdkComponents components)
         throws IOException;
 
-    Map<String, RunnerApi.TimerSpec> translateTimerSpecs(SdkComponents newComponents);
+    Map<String, RunnerApi.TimerFamilySpec> translateTimerSpecs(SdkComponents newComponents);
 
     Map<String, RunnerApi.TimerFamilySpec> translateTimerFamilySpecs(SdkComponents newComponents);
 
@@ -822,7 +814,7 @@ public class ParDoTranslation {
     return ParDoPayload.newBuilder()
         .setDoFn(parDo.translateDoFn(components))
         .putAllStateSpecs(parDo.translateStateSpecs(components))
-        .putAllTimerSpecs(parDo.translateTimerSpecs(components))
+        .putAllTimerFamilySpecs(parDo.translateTimerSpecs(components))
         .putAllTimerFamilySpecs(parDo.translateTimerFamilySpecs(components))
         .putAllSideInputs(parDo.translateSideInputs(components))
         .setRequiresStableInput(parDo.isRequiresStableInput())
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
index 4dab59e..86384ef 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
@@ -32,7 +32,6 @@ import org.apache.beam.model.pipeline.v1.RunnerApi.ParDoPayload;
 import org.apache.beam.model.pipeline.v1.RunnerApi.SideInput;
 import org.apache.beam.model.pipeline.v1.RunnerApi.StateSpec;
 import org.apache.beam.model.pipeline.v1.RunnerApi.TimerFamilySpec;
-import org.apache.beam.model.pipeline.v1.RunnerApi.TimerSpec;
 import org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator;
 import org.apache.beam.runners.core.construction.ParDoTranslation.ParDoLike;
 import org.apache.beam.runners.core.construction.ReadTranslation.BoundedReadPayloadTranslator;
@@ -403,7 +402,7 @@ public class SplittableParDo<InputT, OutputT, RestrictionT, WatermarkEstimatorSt
                 }
 
                 @Override
-                public Map<String, TimerSpec> translateTimerSpecs(SdkComponents components) {
+                public Map<String, TimerFamilySpec> translateTimerSpecs(SdkComponents components) {
                   // SDFs don't have timers.
                   return ImmutableMap.of();
                 }
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Timer.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Timer.java
index 072dbc7..66afe7c 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Timer.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Timer.java
@@ -54,7 +54,8 @@ public abstract class Timer<T> {
    * Returns the timestamp of when the timer is scheduled to fire.
    *
    * <p>The time is relative to the time domain defined in the {@link
-   * org.apache.beam.model.pipeline.v1.RunnerApi.TimerSpec} that is associated with this timer.
+   * org.apache.beam.model.pipeline.v1.RunnerApi.TimerFamilySpec} that is associated with this
+   * timer.
    */
   public abstract Instant getTimestamp();
 
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPCollectionFusers.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPCollectionFusers.java
index 3d7d414..b3f18a2 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPCollectionFusers.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPCollectionFusers.java
@@ -206,12 +206,13 @@ class GreedyPCollectionFusers {
     try {
       ParDoPayload payload = ParDoPayload.parseFrom(parDo.getTransform().getSpec().getPayload());
       if (Maps.filterKeys(
-              parDo.getTransform().getInputsMap(), s -> payload.getTimerSpecsMap().containsKey(s))
+              parDo.getTransform().getInputsMap(),
+              s -> payload.getTimerFamilySpecsMap().containsKey(s))
           .values()
           .contains(candidate.getId())) {
         // Allow fusion across timer PCollections because they are a self loop.
         return true;
-      } else if (payload.getStateSpecsCount() > 0 || payload.getTimerSpecsCount() > 0) {
+      } else if (payload.getStateSpecsCount() > 0 || payload.getTimerFamilySpecsCount() > 0) {
         // Inputs to a ParDo that uses State or Timers must be key-partitioned, and elements for
         // a key must execute serially. To avoid checking if the rest of the stage is
         // key-partitioned and preserves keys, these ParDos do not fuse into an existing stage.
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/PipelineValidator.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/PipelineValidator.java
index c80947b..34e74a7 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/PipelineValidator.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/PipelineValidator.java
@@ -229,7 +229,7 @@ public class PipelineValidator {
           id,
           sideInputId);
     }
-    if (payload.getStateSpecsCount() > 0 || payload.getTimerSpecsCount() > 0) {
+    if (payload.getStateSpecsCount() > 0 || payload.getTimerFamilySpecsCount() > 0) {
       checkArgument(requirements.contains(ParDoTranslation.REQUIRES_STATEFUL_PROCESSING_URN));
       // TODO: Validate state_specs and timer_specs
     }
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/QueryablePipeline.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/QueryablePipeline.java
index 099294d..1f5e381 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/QueryablePipeline.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/QueryablePipeline.java
@@ -414,7 +414,9 @@ public class QueryablePipeline {
   private Set<String> getLocalTimerNames(PTransform transform) {
     if (PAR_DO_TRANSFORM_URN.equals(transform.getSpec().getUrn())) {
       try {
-        return ParDoPayload.parseFrom(transform.getSpec().getPayload()).getTimerSpecsMap().keySet();
+        return ParDoPayload.parseFrom(transform.getSpec().getPayload())
+            .getTimerFamilySpecsMap()
+            .keySet();
       } catch (InvalidProtocolBufferException e) {
         throw new RuntimeException(e);
       }
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/ExecutableStageTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/ExecutableStageTest.java
index 9a69ed8..6101e6a 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/ExecutableStageTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/ExecutableStageTest.java
@@ -36,7 +36,7 @@ import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform;
 import org.apache.beam.model.pipeline.v1.RunnerApi.ParDoPayload;
 import org.apache.beam.model.pipeline.v1.RunnerApi.SideInput;
 import org.apache.beam.model.pipeline.v1.RunnerApi.StateSpec;
-import org.apache.beam.model.pipeline.v1.RunnerApi.TimerSpec;
+import org.apache.beam.model.pipeline.v1.RunnerApi.TimerFamilySpec;
 import org.apache.beam.model.pipeline.v1.RunnerApi.WindowIntoPayload;
 import org.apache.beam.runners.core.construction.Environments;
 import org.apache.beam.runners.core.construction.PTransformTranslation;
@@ -68,7 +68,7 @@ public class ExecutableStageTest {
                             .setDoFn(FunctionSpec.newBuilder())
                             .putSideInputs("side_input", SideInput.getDefaultInstance())
                             .putStateSpecs("user_state", StateSpec.getDefaultInstance())
-                            .putTimerSpecs("timer", TimerSpec.getDefaultInstance())
+                            .putTimerFamilySpecs("timer", TimerFamilySpec.getDefaultInstance())
                             .build()
                             .toByteString()))
             .setEnvironmentId("foo")
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyPipelineFuserTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyPipelineFuserTest.java
index 97b8091..78cba31 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyPipelineFuserTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyPipelineFuserTest.java
@@ -43,7 +43,7 @@ import org.apache.beam.model.pipeline.v1.RunnerApi.ParDoPayload;
 import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline;
 import org.apache.beam.model.pipeline.v1.RunnerApi.SideInput;
 import org.apache.beam.model.pipeline.v1.RunnerApi.StateSpec;
-import org.apache.beam.model.pipeline.v1.RunnerApi.TimerSpec;
+import org.apache.beam.model.pipeline.v1.RunnerApi.TimerFamilySpec;
 import org.apache.beam.model.pipeline.v1.RunnerApi.WindowIntoPayload;
 import org.apache.beam.model.pipeline.v1.RunnerApi.WindowingStrategy;
 import org.apache.beam.runners.core.construction.Environments;
@@ -1041,7 +1041,7 @@ public class GreedyPipelineFuserTest {
                     .setPayload(
                         ParDoPayload.newBuilder()
                             .setDoFn(FunctionSpec.newBuilder())
-                            .putTimerSpecs("timer", TimerSpec.getDefaultInstance())
+                            .putTimerFamilySpecs("timer", TimerFamilySpec.getDefaultInstance())
                             .build()
                             .toByteString()))
             .setEnvironmentId("common")
@@ -1100,7 +1100,7 @@ public class GreedyPipelineFuserTest {
                         ParDoPayload.newBuilder()
                             .setDoFn(FunctionSpec.newBuilder())
                             .putStateSpecs("state", StateSpec.getDefaultInstance())
-                            .putTimerSpecs("timer", TimerSpec.getDefaultInstance())
+                            .putTimerFamilySpecs("timer", TimerFamilySpec.getDefaultInstance())
                             .build()
                             .toByteString()))
             .setEnvironmentId("common")
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyStageFuserTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyStageFuserTest.java
index 8dc0f1e..c60f5e1 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyStageFuserTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyStageFuserTest.java
@@ -36,7 +36,7 @@ import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform;
 import org.apache.beam.model.pipeline.v1.RunnerApi.ParDoPayload;
 import org.apache.beam.model.pipeline.v1.RunnerApi.SideInput;
 import org.apache.beam.model.pipeline.v1.RunnerApi.StateSpec;
-import org.apache.beam.model.pipeline.v1.RunnerApi.TimerSpec;
+import org.apache.beam.model.pipeline.v1.RunnerApi.TimerFamilySpec;
 import org.apache.beam.model.pipeline.v1.RunnerApi.WindowIntoPayload;
 import org.apache.beam.runners.core.construction.Environments;
 import org.apache.beam.runners.core.construction.PTransformTranslation;
@@ -339,7 +339,7 @@ public class GreedyStageFuserTest {
                     .setPayload(
                         ParDoPayload.newBuilder()
                             .setDoFn(FunctionSpec.newBuilder())
-                            .putTimerSpecs("timer", TimerSpec.getDefaultInstance())
+                            .putTimerFamilySpecs("timer", TimerFamilySpec.getDefaultInstance())
                             .build()
                             .toByteString()))
             .setEnvironmentId("common")
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/ImmutableExecutableStageTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/ImmutableExecutableStageTest.java
index 69336b7..5d11bfc 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/ImmutableExecutableStageTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/ImmutableExecutableStageTest.java
@@ -62,7 +62,8 @@ public class ImmutableExecutableStageTest {
                             .setDoFn(RunnerApi.FunctionSpec.newBuilder())
                             .putSideInputs("side_input", RunnerApi.SideInput.getDefaultInstance())
                             .putStateSpecs("user_state", RunnerApi.StateSpec.getDefaultInstance())
-                            .putTimerSpecs("timer", RunnerApi.TimerSpec.getDefaultInstance())
+                            .putTimerFamilySpecs(
+                                "timer", RunnerApi.TimerFamilySpec.getDefaultInstance())
                             .build()
                             .toByteString()))
             .build();
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
index 64a2d4d..e5696bf 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
@@ -36,6 +36,7 @@ import org.apache.beam.sdk.state.TimerSpec;
 import org.apache.beam.sdk.state.TimerSpecs;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature.TimerDeclaration;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
@@ -119,7 +120,7 @@ public class SimpleDoFnRunnerTest {
     thrown.expectCause(is(fn.exceptionToThrow));
 
     runner.onTimer(
-        ThrowingDoFn.TIMER_ID,
+        TimerDeclaration.PREFIX + ThrowingDoFn.TIMER_ID,
         "",
         GlobalWindow.INSTANCE,
         new Instant(0),
@@ -160,7 +161,7 @@ public class SimpleDoFnRunnerTest {
     verify(mockTimerInternals)
         .setTimer(
             StateNamespaces.window(new GlobalWindows().windowCoder(), GlobalWindow.INSTANCE),
-            DoFnWithTimers.TIMER_ID,
+            TimerDeclaration.PREFIX + DoFnWithTimers.TIMER_ID,
             "",
             currentTime.plus(DoFnWithTimers.TIMER_OFFSET),
             currentTime.plus(DoFnWithTimers.TIMER_OFFSET),
@@ -243,7 +244,7 @@ public class SimpleDoFnRunnerTest {
     // Mocking is not easily compatible with annotation analysis, so we manually record
     // the method call.
     runner.onTimer(
-        DoFnWithTimers.TIMER_ID,
+        TimerDeclaration.PREFIX + DoFnWithTimers.TIMER_ID,
         "",
         GlobalWindow.INSTANCE,
         currentTime.plus(offset),
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactory.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactory.java
index 7b148a2..4b9e882 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactory.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactory.java
@@ -19,7 +19,6 @@ package org.apache.beam.runners.dataflow;
 
 import static org.apache.beam.runners.core.construction.PTransformTranslation.PAR_DO_TRANSFORM_URN;
 import static org.apache.beam.runners.core.construction.ParDoTranslation.translateTimerFamilySpec;
-import static org.apache.beam.runners.core.construction.ParDoTranslation.translateTimerSpec;
 import static org.apache.beam.sdk.options.ExperimentalOptions.hasExperiment;
 import static org.apache.beam.sdk.transforms.reflect.DoFnSignatures.getStateSpecOrThrow;
 import static org.apache.beam.sdk.transforms.reflect.DoFnSignatures.getTimerFamilySpecOrThrow;
@@ -227,13 +226,14 @@ public class PrimitiveParDoSingleFactory<InputT, OutputT>
             }
 
             @Override
-            public Map<String, RunnerApi.TimerSpec> translateTimerSpecs(
+            public Map<String, RunnerApi.TimerFamilySpec> translateTimerSpecs(
                 SdkComponents newComponents) {
-              Map<String, RunnerApi.TimerSpec> timerSpecs = new HashMap<>();
+              Map<String, RunnerApi.TimerFamilySpec> timerSpecs = new HashMap<>();
               for (Map.Entry<String, DoFnSignature.TimerDeclaration> timer :
                   signature.timerDeclarations().entrySet()) {
-                RunnerApi.TimerSpec spec =
-                    translateTimerSpec(getTimerSpecOrThrow(timer.getValue(), doFn), newComponents);
+                RunnerApi.TimerFamilySpec spec =
+                    translateTimerFamilySpec(
+                        getTimerSpecOrThrow(timer.getValue(), doFn), newComponents);
                 timerSpecs.put(timer.getKey(), spec);
               }
               return timerSpecs;
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CreateExecutableStageNodeFunction.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CreateExecutableStageNodeFunction.java
index db25000..29ebcb9 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CreateExecutableStageNodeFunction.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CreateExecutableStageNodeFunction.java
@@ -344,8 +344,8 @@ public class CreateExecutableStageNodeFunction
 
             // Build the necessary components to inform the SDK Harness of the pipeline's
             // user timers and user state.
-            for (Map.Entry<String, RunnerApi.TimerSpec> entry :
-                parDoPayload.getTimerSpecsMap().entrySet()) {
+            for (Map.Entry<String, RunnerApi.TimerFamilySpec> entry :
+                parDoPayload.getTimerFamilySpecsMap().entrySet()) {
               timerIds.add(entry.getKey());
             }
             for (Map.Entry<String, RunnerApi.StateSpec> entry :
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/TimerReceiverTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/TimerReceiverTest.java
index a8c0586..550115d 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/TimerReceiverTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/TimerReceiverTest.java
@@ -63,6 +63,7 @@ import org.apache.beam.sdk.state.TimerSpecs;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Impulse;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature.TimerDeclaration;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
@@ -164,6 +165,7 @@ public class TimerReceiverTest implements Serializable {
   @Test
   public void testSingleTimerScheduling() throws Exception {
     final String timerId = "timerId";
+    final String timerDeclarationId = TimerDeclaration.PREFIX + timerId;
 
     Pipeline p = Pipeline.create();
     PCollection<Integer> output =
@@ -227,8 +229,8 @@ public class TimerReceiverTest implements Serializable {
               }
             });
 
-    String timerOutputPCollection = timerSpecMap.get(timerId).outputCollectionId();
-    String timerInputPCollection = timerSpecMap.get(timerId).inputCollectionId();
+    String timerOutputPCollection = timerSpecMap.get(timerDeclarationId).outputCollectionId();
+    String timerInputPCollection = timerSpecMap.get(timerDeclarationId).inputCollectionId();
 
     // Arbitrary offset.
     long testTimerOffset = 123456;
@@ -270,6 +272,8 @@ public class TimerReceiverTest implements Serializable {
   public void testMultiTimerScheduling() throws Exception {
     final String timerId1 = "timerId1";
     final String timerId2 = "timerId2";
+    final String timerDeclarationId1 = TimerDeclaration.PREFIX + timerId1;
+    final String timerDeclarationId2 = TimerDeclaration.PREFIX + timerId2;
 
     Pipeline p = Pipeline.create();
     PCollection<Integer> output =
@@ -362,9 +366,11 @@ public class TimerReceiverTest implements Serializable {
 
     // Simulate the SDK Harness sending a timer element to the Runner Harness.
     assertTrue(
-        timerReceiver.receive(timerSpecMap.get(timerId1).outputCollectionId(), windowedTimer1));
+        timerReceiver.receive(
+            timerSpecMap.get(timerDeclarationId1).outputCollectionId(), windowedTimer1));
     assertTrue(
-        timerReceiver.receive(timerSpecMap.get(timerId2).outputCollectionId(), windowedTimer2));
+        timerReceiver.receive(
+            timerSpecMap.get(timerDeclarationId2).outputCollectionId(), windowedTimer2));
 
     // Expect that we get a timer element when we finish.
     Object expectedTimer1 =
@@ -383,12 +389,12 @@ public class TimerReceiverTest implements Serializable {
 
     Mockito.verify(timerReceiver, Mockito.never())
         .fireTimer(
-            timerSpecMap.get(timerId1).inputCollectionId(),
+            timerSpecMap.get(timerDeclarationId1).inputCollectionId(),
             (WindowedValue<KV<Object, org.apache.beam.runners.core.construction.Timer>>)
                 expectedTimer1);
     Mockito.verify(timerReceiver, Mockito.never())
         .fireTimer(
-            timerSpecMap.get(timerId2).inputCollectionId(),
+            timerSpecMap.get(timerDeclarationId2).inputCollectionId(),
             (WindowedValue<KV<Object, org.apache.beam.runners.core.construction.Timer>>)
                 expectedTimer2);
 
@@ -397,12 +403,12 @@ public class TimerReceiverTest implements Serializable {
     timerReceiver.finish();
     Mockito.verify(timerReceiver)
         .fireTimer(
-            timerSpecMap.get(timerId1).inputCollectionId(),
+            timerSpecMap.get(timerDeclarationId1).inputCollectionId(),
             (WindowedValue<KV<Object, org.apache.beam.runners.core.construction.Timer>>)
                 expectedTimer1);
     Mockito.verify(timerReceiver)
         .fireTimer(
-            timerSpecMap.get(timerId2).inputCollectionId(),
+            timerSpecMap.get(timerDeclarationId2).inputCollectionId(),
             (WindowedValue<KV<Object, org.apache.beam.runners.core.construction.Timer>>)
                 expectedTimer2);
   }
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ProcessBundleDescriptors.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ProcessBundleDescriptors.java
index 80b3f61..5bfee8f 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ProcessBundleDescriptors.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ProcessBundleDescriptors.java
@@ -354,7 +354,7 @@ public class ProcessBundleDescriptors {
           RunnerApi.ParDoPayload.parseFrom(
               timerReference.transform().getTransform().getSpec().getPayload());
       RunnerApi.TimeDomain.Enum timeDomain =
-          payload.getTimerSpecsOrThrow(timerReference.localName()).getTimeDomain();
+          payload.getTimerFamilySpecsOrThrow(timerReference.localName()).getTimeDomain();
       org.apache.beam.sdk.state.TimerSpec spec;
       switch (timeDomain) {
         case EVENT_TIME:
@@ -380,13 +380,15 @@ public class ProcessBundleDescriptors {
                           timerReference.transform().getTransform().getInputsMap().keySet(),
                           Sets.union(
                               payload.getSideInputsMap().keySet(),
-                              payload.getTimerSpecsMap().keySet()))));
+                              payload.getTimerFamilySpecsMap().keySet()))));
       String timerCoderId =
           keyValueCoderId(
               components
                   .getCodersOrThrow(components.getPcollectionsOrThrow(mainInputName).getCoderId())
                   .getComponentCoderIds(0),
-              payload.getTimerSpecsOrThrow(timerReference.localName()).getTimerCoderId(),
+              payload
+                  .getTimerFamilySpecsOrThrow(timerReference.localName())
+                  .getTimerFamilyCoderId(),
               components);
       RunnerApi.PCollection timerCollectionSpec =
           components
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
index 1df70db..8a423a2 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
@@ -1135,6 +1135,9 @@ public abstract class DoFnSignature {
    */
   @AutoValue
   public abstract static class TimerDeclaration {
+
+    public static final String PREFIX = "ts-";
+
     public abstract String id();
 
     public abstract Field field();
@@ -1150,6 +1153,9 @@ public abstract class DoFnSignature {
    */
   @AutoValue
   public abstract static class TimerFamilyDeclaration {
+
+    public static final String PREFIX = "tfs-";
+
     public abstract String id();
 
     public abstract Field field();
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
index cea704b..31ad817 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
@@ -532,7 +532,7 @@ public class DoFnSignatures {
     HashMap<String, DoFnSignature.OnTimerMethod> onTimerMethodMap =
         Maps.newHashMapWithExpectedSize(onTimerMethods.size());
     for (Method onTimerMethod : onTimerMethods) {
-      String id = onTimerMethod.getAnnotation(DoFn.OnTimer.class).value();
+      String id = TimerDeclaration.PREFIX + onTimerMethod.getAnnotation(DoFn.OnTimer.class).value();
       errors.checkArgument(
           fnContext.getTimerDeclarations().containsKey(id),
           "Callback %s is for undeclared timer %s",
@@ -560,7 +560,9 @@ public class DoFnSignatures {
         Maps.newHashMapWithExpectedSize(onTimerFamilyMethods.size());
 
     for (Method onTimerFamilyMethod : onTimerFamilyMethods) {
-      String id = onTimerFamilyMethod.getAnnotation(DoFn.OnTimerFamily.class).value();
+      String id =
+          TimerFamilyDeclaration.PREFIX
+              + onTimerFamilyMethod.getAnnotation(DoFn.OnTimerFamily.class).value();
       errors.checkArgument(
           fnContext.getTimerFamilyDeclarations().containsKey(id),
           "Callback %s is for undeclared timerFamily %s",
@@ -1471,14 +1473,14 @@ public class DoFnSignatures {
 
   @Nullable
   private static String getTimerId(List<Annotation> annotations) {
-    DoFn.TimerId stateId = findFirstOfType(annotations, DoFn.TimerId.class);
-    return stateId != null ? stateId.value() : null;
+    DoFn.TimerId timerId = findFirstOfType(annotations, DoFn.TimerId.class);
+    return timerId != null ? TimerDeclaration.PREFIX + timerId.value() : null;
   }
 
   @Nullable
   private static String getTimerFamilyId(List<Annotation> annotations) {
     DoFn.TimerFamily timerFamilyId = findFirstOfType(annotations, DoFn.TimerFamily.class);
-    return timerFamilyId != null ? timerFamilyId.value() : null;
+    return timerFamilyId != null ? TimerFamilyDeclaration.PREFIX + timerFamilyId.value() : null;
   }
 
   @Nullable
@@ -1761,7 +1763,8 @@ public class DoFnSignatures {
     for (Field field : declaredFieldsWithAnnotation(DoFn.TimerFamily.class, fnClazz, DoFn.class)) {
       // TimerSpec fields may generally be private, but will be accessed via the signature
       field.setAccessible(true);
-      String id = field.getAnnotation(DoFn.TimerFamily.class).value();
+      String id =
+          TimerFamilyDeclaration.PREFIX + field.getAnnotation(DoFn.TimerFamily.class).value();
       validateTimerFamilyField(errors, declarations, id, field);
       declarations.put(id, TimerFamilyDeclaration.create(id, field));
     }
@@ -1775,7 +1778,9 @@ public class DoFnSignatures {
     for (Field field : declaredFieldsWithAnnotation(DoFn.TimerId.class, fnClazz, DoFn.class)) {
       // TimerSpec fields may generally be private, but will be accessed via the signature
       field.setAccessible(true);
-      String id = field.getAnnotation(DoFn.TimerId.class).value();
+      // Add fixed prefix to avoid key collision with TimerFamily.
+      String id =
+          DoFnSignature.TimerDeclaration.PREFIX + field.getAnnotation(DoFn.TimerId.class).value();
       validateTimerField(errors, declarations, id, field);
       declarations.put(id, DoFnSignature.TimerDeclaration.create(id, field));
     }
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
index dc97ec8..5224865 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
@@ -60,6 +60,7 @@ import org.apache.beam.sdk.transforms.DoFn.BundleFinalizer;
 import org.apache.beam.sdk.transforms.DoFn.MultiOutputReceiver;
 import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
 import org.apache.beam.sdk.transforms.reflect.DoFnInvoker.FakeArgumentProvider;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature.TimerDeclaration;
 import org.apache.beam.sdk.transforms.reflect.testhelper.DoFnInvokersTestHelper;
 import org.apache.beam.sdk.transforms.splittabledofn.HasDefaultTracker;
 import org.apache.beam.sdk.transforms.splittabledofn.HasDefaultWatermarkEstimator;
@@ -123,7 +124,8 @@ public class DoFnInvokersTest {
   }
 
   private void invokeOnTimer(String timerId, DoFn<String, String> fn) {
-    DoFnInvokers.invokerFor(fn).invokeOnTimer(timerId, "", mockArgumentProvider);
+    DoFnInvokers.invokerFor(fn)
+        .invokeOnTimer(TimerDeclaration.PREFIX + timerId, "", mockArgumentProvider);
   }
 
   @Test
@@ -269,7 +271,7 @@ public class DoFnInvokersTest {
   public void testDoFnWithTimer() throws Exception {
     Timer mockTimer = mock(Timer.class);
     final String timerId = "my-timer-id-here";
-    when(mockArgumentProvider.timer(timerId)).thenReturn(mockTimer);
+    when(mockArgumentProvider.timer(TimerDeclaration.PREFIX + timerId)).thenReturn(mockTimer);
 
     class MockFn extends DoFn<String, String> {
       @TimerId(timerId)
@@ -1038,7 +1040,7 @@ public class DoFnInvokersTest {
     SimpleTimerDoFn fn = new SimpleTimerDoFn();
 
     DoFnInvoker<String, String> invoker = DoFnInvokers.invokerFor(fn);
-    invoker.invokeOnTimer(timerId, "", mockArgumentProvider);
+    invoker.invokeOnTimer(TimerDeclaration.PREFIX + timerId, "", mockArgumentProvider);
     assertThat(fn.status, equalTo("OK now"));
   }
 
@@ -1067,7 +1069,7 @@ public class DoFnInvokersTest {
     SimpleTimerDoFn fn = new SimpleTimerDoFn();
 
     DoFnInvoker<String, String> invoker = DoFnInvokers.invokerFor(fn);
-    invoker.invokeOnTimer(timerId, "", mockArgumentProvider);
+    invoker.invokeOnTimer(TimerDeclaration.PREFIX + timerId, "", mockArgumentProvider);
     assertThat(fn.window, equalTo(testWindow));
   }
 
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
index 1414371..ee5ddc3 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
@@ -72,6 +72,7 @@ import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.TimeDomain
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.TimerParameter;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.TimestampParameter;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.WindowParameter;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature.TimerDeclaration;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignatures.FnAnalysisContext;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignaturesTestUtils.FakeDoFn;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
@@ -558,6 +559,7 @@ public class DoFnSignaturesTest {
   @Test
   public void testWindowParamOnTimer() throws Exception {
     final String timerId = "some-timer-id";
+    final String timerDeclarationId = TimerDeclaration.PREFIX + timerId;
 
     DoFnSignature sig =
         DoFnSignatures.getSignature(
@@ -572,15 +574,16 @@ public class DoFnSignaturesTest {
               public void onTimer(BoundedWindow w) {}
             }.getClass());
 
-    assertThat(sig.onTimerMethods().get(timerId).extraParameters().size(), equalTo(1));
+    assertThat(sig.onTimerMethods().get(timerDeclarationId).extraParameters().size(), equalTo(1));
     assertThat(
-        sig.onTimerMethods().get(timerId).extraParameters().get(0),
+        sig.onTimerMethods().get(timerDeclarationId).extraParameters().get(0),
         instanceOf(WindowParameter.class));
   }
 
   @Test
   public void testAllParamsOnTimer() throws Exception {
     final String timerId = "some-timer-id";
+    final String timerDeclarationId = TimerDeclaration.PREFIX + timerId;
 
     DoFnSignature sig =
         DoFnSignatures.getSignature(
@@ -596,15 +599,15 @@ public class DoFnSignaturesTest {
                   @Timestamp Instant timestamp, TimeDomain timeDomain, BoundedWindow w) {}
             }.getClass());
 
-    assertThat(sig.onTimerMethods().get(timerId).extraParameters().size(), equalTo(3));
+    assertThat(sig.onTimerMethods().get(timerDeclarationId).extraParameters().size(), equalTo(3));
     assertThat(
-        sig.onTimerMethods().get(timerId).extraParameters().get(0),
+        sig.onTimerMethods().get(timerDeclarationId).extraParameters().get(0),
         instanceOf(TimestampParameter.class));
     assertThat(
-        sig.onTimerMethods().get(timerId).extraParameters().get(1),
+        sig.onTimerMethods().get(timerDeclarationId).extraParameters().get(1),
         instanceOf(TimeDomainParameter.class));
     assertThat(
-        sig.onTimerMethods().get(timerId).extraParameters().get(2),
+        sig.onTimerMethods().get(timerDeclarationId).extraParameters().get(2),
         instanceOf(WindowParameter.class));
   }
 
@@ -631,7 +634,8 @@ public class DoFnSignaturesTest {
     assertThat(sig.processElement().extraParameters().size(), equalTo(2));
 
     DoFnSignature.TimerDeclaration decl =
-        sig.timerDeclarations().get(DoFnOverridingAbstractTimerUse.TIMER_ID);
+        sig.timerDeclarations()
+            .get(TimerDeclaration.PREFIX + DoFnOverridingAbstractTimerUse.TIMER_ID);
     TimerParameter timerParam = (TimerParameter) sig.processElement().extraParameters().get(1);
 
     assertThat(
@@ -656,9 +660,11 @@ public class DoFnSignaturesTest {
     assertThat(sig.onTimerMethods().size(), equalTo(1));
 
     DoFnSignature.TimerDeclaration decl =
-        sig.timerDeclarations().get(DoFnDeclaringTimerAndAbstractCallback.TIMER_ID);
+        sig.timerDeclarations()
+            .get(TimerDeclaration.PREFIX + DoFnDeclaringTimerAndAbstractCallback.TIMER_ID);
     DoFnSignature.OnTimerMethod callback =
-        sig.onTimerMethods().get(DoFnDeclaringTimerAndAbstractCallback.TIMER_ID);
+        sig.onTimerMethods()
+            .get(TimerDeclaration.PREFIX + DoFnDeclaringTimerAndAbstractCallback.TIMER_ID);
 
     assertThat(
         decl.field(),
@@ -727,10 +733,11 @@ public class DoFnSignaturesTest {
               public void onFoo() {}
             }.getClass());
 
+    final String timerDeclarationId = TimerDeclaration.PREFIX + "foo";
     assertThat(sig.timerDeclarations().size(), equalTo(1));
-    DoFnSignature.TimerDeclaration decl = sig.timerDeclarations().get("foo");
+    DoFnSignature.TimerDeclaration decl = sig.timerDeclarations().get(timerDeclarationId);
 
-    assertThat(decl.id(), equalTo("foo"));
+    assertThat(decl.id(), equalTo(timerDeclarationId));
     assertThat(decl.field().getName(), equalTo("bizzle"));
   }
 
@@ -749,14 +756,15 @@ public class DoFnSignaturesTest {
               public void onFoo(OnTimerContext c) {}
             }.getClass());
 
+    final String timerDeclarationId = TimerDeclaration.PREFIX + "foo";
     assertThat(sig.timerDeclarations().size(), equalTo(1));
-    DoFnSignature.TimerDeclaration decl = sig.timerDeclarations().get("foo");
+    DoFnSignature.TimerDeclaration decl = sig.timerDeclarations().get(timerDeclarationId);
 
-    assertThat(decl.id(), equalTo("foo"));
+    assertThat(decl.id(), equalTo(timerDeclarationId));
     assertThat(decl.field().getName(), equalTo("bizzle"));
 
     assertThat(
-        sig.onTimerMethods().get("foo").extraParameters().get(0),
+        sig.onTimerMethods().get(timerDeclarationId).extraParameters().get(0),
         equalTo((Parameter) Parameter.onTimerContext()));
   }
 
@@ -796,10 +804,12 @@ public class DoFnSignaturesTest {
     // Test classes at the bottom of the file
     DoFnSignature sig = DoFnSignatures.signatureForDoFn(new DoFnForTestSimpleTimerIdNamedDoFn());
 
+    final String timerDeclarationId = TimerDeclaration.PREFIX + "foo";
+
     assertThat(sig.timerDeclarations().size(), equalTo(1));
-    DoFnSignature.TimerDeclaration decl = sig.timerDeclarations().get("foo");
+    DoFnSignature.TimerDeclaration decl = sig.timerDeclarations().get(timerDeclarationId);
 
-    assertThat(decl.id(), equalTo("foo"));
+    assertThat(decl.id(), equalTo(timerDeclarationId));
     assertThat(
         decl.field(), equalTo(DoFnForTestSimpleTimerIdNamedDoFn.class.getDeclaredField("bizzle")));
   }
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvokersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvokersTest.java
index b4553c9..dbf2e2e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvokersTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvokersTest.java
@@ -26,6 +26,7 @@ import org.apache.beam.sdk.state.TimeDomain;
 import org.apache.beam.sdk.state.TimerSpec;
 import org.apache.beam.sdk.state.TimerSpecs;
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature.TimerDeclaration;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.junit.Before;
 import org.junit.Rule;
@@ -52,7 +53,8 @@ public class OnTimerInvokersTest {
   }
 
   private void invokeOnTimer(DoFn<String, String> fn, String timerId) {
-    OnTimerInvokers.forTimer(fn, timerId).invokeOnTimer(mockArgumentProvider);
+    OnTimerInvokers.forTimer(fn, TimerDeclaration.PREFIX + timerId)
+        .invokeOnTimer(mockArgumentProvider);
   }
 
   @Test
@@ -123,7 +125,8 @@ public class OnTimerInvokersTest {
   @Test
   public void testStableName() {
     OnTimerInvoker<Void, Void> invoker =
-        OnTimerInvokers.forTimer(new StableNameTestDoFn(), StableNameTestDoFn.TIMER_ID);
+        OnTimerInvokers.forTimer(
+            new StableNameTestDoFn(), TimerDeclaration.PREFIX + StableNameTestDoFn.TIMER_ID);
 
     assertThat(
         invoker.getClass().getName(),
@@ -132,7 +135,7 @@ public class OnTimerInvokersTest {
                 "%s$%s$%s$%s",
                 StableNameTestDoFn.class.getName(),
                 OnTimerInvoker.class.getSimpleName(),
-                "timeridwithspecialChars" /* alphanum only; human readable but not unique */,
-                "dGltZXItaWQud2l0aCBzcGVjaWFsQ2hhcnN7fQ" /* base64 encoding of UTF-8 timerId */)));
+                "tstimeridwithspecialChars" /* alphanum only; human readable but not unique */,
+                "dHMtdGltZXItaWQud2l0aCBzcGVjaWFsQ2hhcnN7fQ" /* base64 encoding of UTF-8 timerId */)));
   }
 }
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
index ec26195..49aa850 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
@@ -203,7 +203,7 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
           pTransform.getInputsOrThrow(mainInput), pTransformId, (FnDataReceiver) mainInputConsumer);
 
       // Register as a consumer for each timer PCollection.
-      for (String localName : runner.parDoPayload.getTimerSpecsMap().keySet()) {
+      for (String localName : runner.parDoPayload.getTimerFamilySpecsMap().keySet()) {
         TimeDomain timeDomain =
             DoFnSignatures.getTimerSpecOrThrow(
                     runner.doFnSignature.timerDeclarations().get(localName), runner.doFn)
@@ -362,7 +362,7 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
                   pTransform.getInputsMap().keySet(),
                   Sets.union(
                       parDoPayload.getSideInputsMap().keySet(),
-                      parDoPayload.getTimerSpecsMap().keySet())));
+                      parDoPayload.getTimerFamilySpecsMap().keySet())));
       PCollection mainInput = pCollections.get(pTransform.getInputsOrThrow(mainInputTag));
       Coder<?> maybeWindowedValueInputCoder = rehydratedComponents.getCoder(mainInput.getCoderId());
       // TODO: Stop passing windowed value coders within PCollections.
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
index 426bf85..7ed8b82 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
@@ -84,6 +84,7 @@ import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature.TimerDeclaration;
 import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
 import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
@@ -802,11 +803,14 @@ public class FnApiDoFnRunnerTest implements Serializable {
     RunnerApi.Pipeline pProto = PipelineTranslation.toProto(p, sdkComponents);
     String inputPCollectionId = sdkComponents.registerPCollection(valuePCollection);
     String outputPCollectionId = sdkComponents.registerPCollection(outputPCollection);
-    String eventTimerInputPCollectionId = "pTransformId/ParMultiDo(TestTimerful).event";
-    String eventTimerOutputPCollectionId = "pTransformId/ParMultiDo(TestTimerful).event.output";
-    String processingTimerInputPCollectionId = "pTransformId/ParMultiDo(TestTimerful).processing";
+    String eventTimerInputPCollectionId =
+        "pTransformId/ParMultiDo(TestTimerful)." + TimerDeclaration.PREFIX + "event";
+    String eventTimerOutputPCollectionId =
+        "pTransformId/ParMultiDo(TestTimerful)." + TimerDeclaration.PREFIX + "event.output";
+    String processingTimerInputPCollectionId =
+        "pTransformId/ParMultiDo(TestTimerful)." + TimerDeclaration.PREFIX + "processing";
     String processingTimerOutputPCollectionId =
-        "pTransformId/ParMultiDo(TestTimerful).processing.output";
+        "pTransformId/ParMultiDo(TestTimerful)." + TimerDeclaration.PREFIX + "processing.output";
 
     RunnerApi.PTransform pTransform =
         pProto
@@ -816,8 +820,8 @@ public class FnApiDoFnRunnerTest implements Serializable {
             .toBuilder()
             // We need to re-write the "output" PCollections that a runner would have inserted
             // on the way to a output sink.
-            .putOutputs("event", eventTimerOutputPCollectionId)
-            .putOutputs("processing", processingTimerOutputPCollectionId)
+            .putOutputs(TimerDeclaration.PREFIX + "event", eventTimerOutputPCollectionId)
+            .putOutputs(TimerDeclaration.PREFIX + "processing", processingTimerOutputPCollectionId)
             .build();
 
     FakeBeamFnStateClient fakeClient =
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
index 96789e6..8ccd2d4 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
@@ -253,8 +253,7 @@ class FnApiRunner(runner.PipelineRunner):
         if payload.requests_finalization:
           expected_requirements.add(
               common_urns.requirements.REQUIRES_BUNDLE_FINALIZATION.urn)
-        if (payload.state_specs or payload.timer_specs or
-            payload.timer_family_specs):
+        if (payload.state_specs or payload.timer_family_specs):
           expected_requirements.add(
               common_urns.requirements.REQUIRES_STATEFUL_PROCESSING.urn)
         if payload.requires_stable_input:
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
index 884fb48..83c9315 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
@@ -255,7 +255,7 @@ class Stage(object):
             user_states.append(
                 beam_runner_api_pb2.ExecutableStagePayload.UserStateId(
                     transform_id=transform_id, local_name=tag))
-          for tag in payload.timer_specs.keys():
+          for tag in payload.timer_family_specs.keys():
             timers.append(
                 beam_runner_api_pb2.ExecutableStagePayload.TimerId(
                     transform_id=transform_id, local_name=tag))
@@ -633,7 +633,7 @@ def annotate_stateful_dofns_as_roots(stages, pipeline_context):
       if transform.spec.urn == common_urns.primitives.PAR_DO.urn:
         pardo_payload = proto_utils.parse_Bytes(
             transform.spec.payload, beam_runner_api_pb2.ParDoPayload)
-        if pardo_payload.state_specs or pardo_payload.timer_specs:
+        if pardo_payload.state_specs or pardo_payload.timer_family_specs:
           stage.forced_root = True
     yield stage
 
@@ -1319,7 +1319,7 @@ def inject_timer_pcollections(stages, pipeline_context):
       if transform.spec.urn in PAR_DO_URNS:
         payload = proto_utils.parse_Bytes(
             transform.spec.payload, beam_runner_api_pb2.ParDoPayload)
-        for tag, spec in payload.timer_specs.items():
+        for tag, spec in payload.timer_family_specs.items():
           if len(transform.inputs) > 1:
             raise NotImplementedError('Timers and side inputs.')
           input_pcoll = pipeline_context.components.pcollections[next(
@@ -1334,7 +1334,9 @@ def inject_timer_pcollections(stages, pipeline_context):
               beam_runner_api_pb2.Coder(
                   spec=beam_runner_api_pb2.FunctionSpec(
                       urn=common_urns.coders.KV.urn),
-                  component_coder_ids=[key_coder_id, spec.timer_coder_id]))
+                  component_coder_ids=[
+                      key_coder_id, spec.timer_family_coder_id
+                  ]))
           # Inject the read and write pcollections.
           timer_read_pcoll = unique_name(
               pipeline_context.components.pcollections,
diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py b/sdks/python/apache_beam/runners/worker/bundle_processor.py
index a963a08..9180695 100644
--- a/sdks/python/apache_beam/runners/worker/bundle_processor.py
+++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py
@@ -601,7 +601,7 @@ class FnApiUserStateContext(userstate.UserStateContext):
                transform_id,  # type: str
                key_coder,  # type: coders.Coder
                window_coder,  # type: coders.Coder
-               timer_specs  # type: Mapping[str, beam_runner_api_pb2.TimerSpec]
+               timer_family_specs  # type: Mapping[str, beam_runner_api_pb2.TimerFamilySpec]
               ):
     # type: (...) -> None
 
@@ -612,14 +612,14 @@ class FnApiUserStateContext(userstate.UserStateContext):
       transform_id: The name of the PTransform that this context is associated.
       key_coder:
       window_coder:
-      timer_specs: A list of ``userstate.TimerSpec`` objects specifying the
-        timers associated with this operation.
+      timer_family_specs: A list of ``userstate.TimerSpec`` objects specifying
+        the timers associated with this operation.
     """
     self._state_handler = state_handler
     self._transform_id = transform_id
     self._key_coder = key_coder
     self._window_coder = window_coder
-    self._timer_specs = timer_specs
+    self._timer_family_specs = timer_family_specs
     self._timer_receivers = None  # type: Optional[Dict[str, operations.ConsumerSet]]
     self._all_states = {
     }  # type: Dict[tuple, userstate.AccumulatingRuntimeState]
@@ -629,7 +629,7 @@ class FnApiUserStateContext(userstate.UserStateContext):
 
     """TODO"""
     self._timer_receivers = {}
-    for tag in self._timer_specs:
+    for tag in self._timer_family_specs:
       self._timer_receivers[tag] = receivers.pop(tag)
 
   def get_timer(
@@ -1478,7 +1478,7 @@ def _create_pardo_operation(
     if pardo_proto:
       other_input_tags = set.union(
           set(pardo_proto.side_inputs),
-          set(pardo_proto.timer_specs))  # type: Container[str]
+          set(pardo_proto.timer_family_specs))  # type: Container[str]
     else:
       other_input_tags = ()
     pcoll_id, = [pcoll for tag, pcoll in transform_proto.inputs.items()
@@ -1488,12 +1488,12 @@ def _create_pardo_operation(
     serialized_fn = pickler.dumps(dofn_data[:-1] + (windowing, ))
 
   timer_inputs = None  # type: Optional[Dict[str, str]]
-  if pardo_proto and (pardo_proto.timer_specs or pardo_proto.state_specs or
-                      pardo_proto.restriction_coder_id):
+  if pardo_proto and (pardo_proto.timer_family_specs or pardo_proto.state_specs
+                      or pardo_proto.restriction_coder_id):
     main_input_coder = None  # type: Optional[WindowedValueCoder]
     timer_inputs = {}
     for tag, pcoll_id in transform_proto.inputs.items():
-      if tag in pardo_proto.timer_specs:
+      if tag in pardo_proto.timer_family_specs:
         timer_inputs[tag] = pcoll_id
       elif tag in pardo_proto.side_inputs:
         pass
@@ -1504,13 +1504,13 @@ def _create_pardo_operation(
         main_input_coder = factory.get_windowed_coder(pcoll_id)
     assert main_input_coder is not None
 
-    if pardo_proto.timer_specs or pardo_proto.state_specs:
+    if pardo_proto.timer_family_specs or pardo_proto.state_specs:
       user_state_context = FnApiUserStateContext(
           factory.state_handler,
           transform_id,
           main_input_coder.key_coder(),
           main_input_coder.window_coder,
-          timer_specs=pardo_proto.timer_specs
+          timer_family_specs=pardo_proto.timer_family_specs
       )  # type: Optional[FnApiUserStateContext]
     else:
       user_state_context = None
diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py
index 8ebffbd..4f502c5 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -1325,7 +1325,7 @@ class ParDo(PTransformWithSideInputs):
                 spec.name: spec.to_runner_api(context)
                 for spec in state_specs
             },
-            timer_specs={
+            timer_family_specs={
                 spec.name: spec.to_runner_api(context)
                 for spec in timer_specs
             },
diff --git a/sdks/python/apache_beam/transforms/userstate.py b/sdks/python/apache_beam/transforms/userstate.py
index 70cdca2..f3ffa88 100644
--- a/sdks/python/apache_beam/transforms/userstate.py
+++ b/sdks/python/apache_beam/transforms/userstate.py
@@ -131,8 +131,10 @@ class CombiningValueStateSpec(StateSpec):
 
 class TimerSpec(object):
   """Specification for a user stateful DoFn timer."""
+  prefix = "ts-"
+
   def __init__(self, name, time_domain):
-    self.name = name
+    self.name = self.prefix + name
     if time_domain not in (TimeDomain.WATERMARK, TimeDomain.REAL_TIME):
       raise ValueError('Unsupported TimeDomain: %r.' % (time_domain, ))
     self.time_domain = time_domain
@@ -142,10 +144,30 @@ class TimerSpec(object):
     return '%s(%s)' % (self.__class__.__name__, self.name)
 
   def to_runner_api(self, context):
-    # type: (PipelineContext) -> beam_runner_api_pb2.TimerSpec
-    return beam_runner_api_pb2.TimerSpec(
+    # type: (PipelineContext) -> beam_runner_api_pb2.TimerFamilySpec
+    return beam_runner_api_pb2.TimerFamilySpec(
+        time_domain=TimeDomain.to_runner_api(self.time_domain),
+        timer_family_coder_id=context.coders.get_id(
+            coders._TimerCoder(coders.SingletonCoder(None))))
+
+
+# TODO(BEAM-9602): Provide support for dynamic timer.
+class TimerFamilySpec(object):
+  prefix = "tfs-"
+
+  def __init__(self, name, time_domain):
+    self.name = self.prefix + name
+    if time_domain not in (TimeDomain.WATERMARK, TimeDomain.REAL_TIME):
+      raise ValueError('Unsupported TimeDomain: %r.' % (time_domain, ))
+    self.time_domain = time_domain
+
+  def __repr__(self):
+    return '%s(%s)' % (self.__class__.__name__, self.name)
+
+  def to_runner_api(self, context):
+    return beam_runner_api_pb2.TimerFamilySpec(
         time_domain=TimeDomain.to_runner_api(self.time_domain),
-        timer_coder_id=context.coders.get_id(
+        timer_family_coder_id=context.coders.get_id(
             coders._TimerCoder(coders.SingletonCoder(None))))
 
 
diff --git a/sdks/python/apache_beam/transforms/userstate_test.py b/sdks/python/apache_beam/transforms/userstate_test.py
index 75d9416..8ad3e95 100644
--- a/sdks/python/apache_beam/transforms/userstate_test.py
+++ b/sdks/python/apache_beam/transforms/userstate_test.py
@@ -265,7 +265,7 @@ class InterfaceTest(unittest.TestCase):
     # different timer callbacks.
     with self.assertRaisesRegex(
         ValueError,
-        r'Multiple on_timer callbacks registered for TimerSpec\(expiry1\).'):
+        r'Multiple on_timer callbacks registered for TimerSpec\(.*expiry1\).'):
 
       class StatefulDoFnWithTimerWithTypo1(DoFn):  # pylint: disable=unused-variable
         BUFFER_STATE = BagStateSpec('buffer', BytesCoder())
@@ -315,7 +315,7 @@ class InterfaceTest(unittest.TestCase):
     dofn = StatefulDoFnWithTimerWithTypo2()
     with self.assertRaisesRegex(
         ValueError,
-        (r'The on_timer callback for TimerSpec\(expiry1\) is not the '
+        (r'The on_timer callback for TimerSpec\(.*expiry1\) is not the '
          r'specified .on_expiry_1 method for DoFn '
          r'StatefulDoFnWithTimerWithTypo2 \(perhaps it was overwritten\?\).')):
       validate_stateful_dofn(dofn)
@@ -348,7 +348,7 @@ class InterfaceTest(unittest.TestCase):
     with self.assertRaisesRegex(
         ValueError,
         (r'DoFn StatefulDoFnWithTimerWithTypo3 has a TimerSpec without an '
-         r'associated on_timer callback: TimerSpec\(expiry2\).')):
+         r'associated on_timer callback: TimerSpec\(.*expiry2\).')):
       validate_stateful_dofn(dofn)