You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bo...@apache.org on 2020/03/27 04:21:22 UTC
[beam] branch master updated: Remove TimeSpec from proto
This is an automated email from the ASF dual-hosted git repository.
boyuanz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 39703c6 Remove TimeSpec from proto
new 33d78c5 Merge pull request #11216 from boyuanzz/timer_spec
39703c6 is described below
commit 39703c640b785349719f05278c8ee8b1944076d6
Author: Boyuan Zhang <bo...@google.com>
AuthorDate: Tue Mar 24 14:53:29 2020 -0700
Remove TimeSpec from proto
---
.../pipeline/src/main/proto/beam_runner_api.proto | 16 ++-------
.../core/construction/ParDoTranslation.java | 32 +++++++----------
.../runners/core/construction/SplittableParDo.java | 3 +-
.../beam/runners/core/construction/Timer.java | 3 +-
.../graph/GreedyPCollectionFusers.java | 5 +--
.../core/construction/graph/PipelineValidator.java | 2 +-
.../core/construction/graph/QueryablePipeline.java | 4 ++-
.../construction/graph/ExecutableStageTest.java | 4 +--
.../graph/GreedyPipelineFuserTest.java | 6 ++--
.../construction/graph/GreedyStageFuserTest.java | 4 +--
.../graph/ImmutableExecutableStageTest.java | 3 +-
.../beam/runners/core/SimpleDoFnRunnerTest.java | 7 ++--
.../dataflow/PrimitiveParDoSingleFactory.java | 10 +++---
.../graph/CreateExecutableStageNodeFunction.java | 4 +--
.../worker/fn/control/TimerReceiverTest.java | 22 +++++++-----
.../control/ProcessBundleDescriptors.java | 8 +++--
.../beam/sdk/transforms/reflect/DoFnSignature.java | 6 ++++
.../sdk/transforms/reflect/DoFnSignatures.java | 19 ++++++----
.../sdk/transforms/reflect/DoFnInvokersTest.java | 10 +++---
.../sdk/transforms/reflect/DoFnSignaturesTest.java | 42 +++++++++++++---------
.../transforms/reflect/OnTimerInvokersTest.java | 11 +++---
.../apache/beam/fn/harness/FnApiDoFnRunner.java | 4 +--
.../beam/fn/harness/FnApiDoFnRunnerTest.java | 16 +++++----
.../runners/portability/fn_api_runner/fn_runner.py | 3 +-
.../portability/fn_api_runner/translations.py | 10 +++---
.../apache_beam/runners/worker/bundle_processor.py | 22 ++++++------
sdks/python/apache_beam/transforms/core.py | 2 +-
sdks/python/apache_beam/transforms/userstate.py | 30 +++++++++++++---
.../apache_beam/transforms/userstate_test.py | 6 ++--
29 files changed, 181 insertions(+), 133 deletions(-)
diff --git a/model/pipeline/src/main/proto/beam_runner_api.proto b/model/pipeline/src/main/proto/beam_runner_api.proto
index e5d5eb6..17229d2 100644
--- a/model/pipeline/src/main/proto/beam_runner_api.proto
+++ b/model/pipeline/src/main/proto/beam_runner_api.proto
@@ -442,14 +442,9 @@ message ParDoPayload {
// be placed in the pipeline requirements.
map<string, StateSpec> state_specs = 4;
- // (Optional) A mapping of local timer names to timer specifications.
- // If this is set, the stateful processing requirement should also
- // be placed in the pipeline requirements.
- map<string, TimerSpec> timer_specs = 5;
-
- // (Optional) A mapping of local timer family names to timer specifications.
- // If this is set, the stateful processing requirement should also
- // be placed in the pipeline requirements.
+ // (Optional) A mapping of local timer family names to timer family
+ // specifications. If this is set, the stateful processing requirement should
+ // also be placed in the pipeline requirements.
map<string, TimerFamilySpec> timer_family_specs = 9;
// (Optional) Only set when this ParDo contains a splittable DoFn.
@@ -507,11 +502,6 @@ message SetStateSpec {
string element_coder_id = 1;
}
-message TimerSpec {
- TimeDomain.Enum time_domain = 1;
- string timer_coder_id = 2;
-}
-
message TimerFamilySpec {
TimeDomain.Enum time_domain = 1;
string timer_family_coder_id = 2;
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
index 256a31d..969ae25 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
@@ -185,7 +185,7 @@ public class ParDoTranslation {
// https://s.apache.org/beam-portability-timers
// Add a PCollection and coder for each timer. Also treat them as inputs and outputs.
- for (String localTimerName : payload.getTimerSpecsMap().keySet()) {
+ for (String localTimerName : payload.getTimerFamilySpecsMap().keySet()) {
PCollection<?> timerPCollection =
PCollection.createPrimitiveOutputInternal(
// Create a dummy pipeline since we don't want to modify the current
@@ -294,12 +294,14 @@ public class ParDoTranslation {
}
@Override
- public Map<String, RunnerApi.TimerSpec> translateTimerSpecs(SdkComponents newComponents) {
- Map<String, RunnerApi.TimerSpec> timerSpecs = new HashMap<>();
+ public Map<String, RunnerApi.TimerFamilySpec> translateTimerSpecs(
+ SdkComponents newComponents) {
+ Map<String, RunnerApi.TimerFamilySpec> timerSpecs = new HashMap<>();
for (Map.Entry<String, TimerDeclaration> timer :
signature.timerDeclarations().entrySet()) {
- RunnerApi.TimerSpec spec =
- translateTimerSpec(getTimerSpecOrThrow(timer.getValue(), doFn), newComponents);
+ RunnerApi.TimerFamilySpec spec =
+ translateTimerFamilySpec(
+ getTimerSpecOrThrow(timer.getValue(), doFn), newComponents);
timerSpecs.put(timer.getKey(), spec);
}
@@ -538,7 +540,8 @@ public class ParDoTranslation {
return Iterables.getOnlyElement(
Sets.difference(
ptransform.getInputsMap().keySet(),
- Sets.union(payload.getSideInputsMap().keySet(), payload.getTimerSpecsMap().keySet())));
+ Sets.union(
+ payload.getSideInputsMap().keySet(), payload.getTimerFamilySpecsMap().keySet())));
}
/** Translate state specs. */
@@ -654,15 +657,6 @@ public class ParDoTranslation {
}
}
- public static RunnerApi.TimerSpec translateTimerSpec(TimerSpec timer, SdkComponents components) {
- return RunnerApi.TimerSpec.newBuilder()
- .setTimeDomain(translateTimeDomain(timer.getTimeDomain()))
- // TODO: Add support for timer payloads to the SDK
- // We currently assume that all payloads are unspecified.
- .setTimerCoderId(registerCoderOrThrow(components, Timer.Coder.of(VoidCoder.of())))
- .build();
- }
-
public static RunnerApi.TimerFamilySpec translateTimerFamilySpec(
TimerSpec timer, SdkComponents components) {
return RunnerApi.TimerFamilySpec.newBuilder()
@@ -756,9 +750,7 @@ public class ParDoTranslation {
public static boolean usesStateOrTimers(AppliedPTransform<?, ?, ?> transform) throws IOException {
ParDoPayload payload = getParDoPayload(transform);
- return payload.getStateSpecsCount() > 0
- || payload.getTimerSpecsCount() > 0
- || payload.getTimerFamilySpecsCount() > 0;
+ return payload.getStateSpecsCount() > 0 || payload.getTimerFamilySpecsCount() > 0;
}
public static boolean isSplittable(AppliedPTransform<?, ?, ?> transform) throws IOException {
@@ -783,7 +775,7 @@ public class ParDoTranslation {
Map<String, RunnerApi.StateSpec> translateStateSpecs(SdkComponents components)
throws IOException;
- Map<String, RunnerApi.TimerSpec> translateTimerSpecs(SdkComponents newComponents);
+ Map<String, RunnerApi.TimerFamilySpec> translateTimerSpecs(SdkComponents newComponents);
Map<String, RunnerApi.TimerFamilySpec> translateTimerFamilySpecs(SdkComponents newComponents);
@@ -822,7 +814,7 @@ public class ParDoTranslation {
return ParDoPayload.newBuilder()
.setDoFn(parDo.translateDoFn(components))
.putAllStateSpecs(parDo.translateStateSpecs(components))
- .putAllTimerSpecs(parDo.translateTimerSpecs(components))
+ .putAllTimerFamilySpecs(parDo.translateTimerSpecs(components))
.putAllTimerFamilySpecs(parDo.translateTimerFamilySpecs(components))
.putAllSideInputs(parDo.translateSideInputs(components))
.setRequiresStableInput(parDo.isRequiresStableInput())
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
index 4dab59e..86384ef 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
@@ -32,7 +32,6 @@ import org.apache.beam.model.pipeline.v1.RunnerApi.ParDoPayload;
import org.apache.beam.model.pipeline.v1.RunnerApi.SideInput;
import org.apache.beam.model.pipeline.v1.RunnerApi.StateSpec;
import org.apache.beam.model.pipeline.v1.RunnerApi.TimerFamilySpec;
-import org.apache.beam.model.pipeline.v1.RunnerApi.TimerSpec;
import org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator;
import org.apache.beam.runners.core.construction.ParDoTranslation.ParDoLike;
import org.apache.beam.runners.core.construction.ReadTranslation.BoundedReadPayloadTranslator;
@@ -403,7 +402,7 @@ public class SplittableParDo<InputT, OutputT, RestrictionT, WatermarkEstimatorSt
}
@Override
- public Map<String, TimerSpec> translateTimerSpecs(SdkComponents components) {
+ public Map<String, TimerFamilySpec> translateTimerSpecs(SdkComponents components) {
// SDFs don't have timers.
return ImmutableMap.of();
}
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Timer.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Timer.java
index 072dbc7..66afe7c 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Timer.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Timer.java
@@ -54,7 +54,8 @@ public abstract class Timer<T> {
* Returns the timestamp of when the timer is scheduled to fire.
*
* <p>The time is relative to the time domain defined in the {@link
- * org.apache.beam.model.pipeline.v1.RunnerApi.TimerSpec} that is associated with this timer.
+ * org.apache.beam.model.pipeline.v1.RunnerApi.TimerFamilySpec} that is associated with this
+ * timer.
*/
public abstract Instant getTimestamp();
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPCollectionFusers.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPCollectionFusers.java
index 3d7d414..b3f18a2 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPCollectionFusers.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPCollectionFusers.java
@@ -206,12 +206,13 @@ class GreedyPCollectionFusers {
try {
ParDoPayload payload = ParDoPayload.parseFrom(parDo.getTransform().getSpec().getPayload());
if (Maps.filterKeys(
- parDo.getTransform().getInputsMap(), s -> payload.getTimerSpecsMap().containsKey(s))
+ parDo.getTransform().getInputsMap(),
+ s -> payload.getTimerFamilySpecsMap().containsKey(s))
.values()
.contains(candidate.getId())) {
// Allow fusion across timer PCollections because they are a self loop.
return true;
- } else if (payload.getStateSpecsCount() > 0 || payload.getTimerSpecsCount() > 0) {
+ } else if (payload.getStateSpecsCount() > 0 || payload.getTimerFamilySpecsCount() > 0) {
// Inputs to a ParDo that uses State or Timers must be key-partitioned, and elements for
// a key must execute serially. To avoid checking if the rest of the stage is
// key-partitioned and preserves keys, these ParDos do not fuse into an existing stage.
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/PipelineValidator.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/PipelineValidator.java
index c80947b..34e74a7 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/PipelineValidator.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/PipelineValidator.java
@@ -229,7 +229,7 @@ public class PipelineValidator {
id,
sideInputId);
}
- if (payload.getStateSpecsCount() > 0 || payload.getTimerSpecsCount() > 0) {
+ if (payload.getStateSpecsCount() > 0 || payload.getTimerFamilySpecsCount() > 0) {
checkArgument(requirements.contains(ParDoTranslation.REQUIRES_STATEFUL_PROCESSING_URN));
// TODO: Validate state_specs and timer_specs
}
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/QueryablePipeline.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/QueryablePipeline.java
index 099294d..1f5e381 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/QueryablePipeline.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/QueryablePipeline.java
@@ -414,7 +414,9 @@ public class QueryablePipeline {
private Set<String> getLocalTimerNames(PTransform transform) {
if (PAR_DO_TRANSFORM_URN.equals(transform.getSpec().getUrn())) {
try {
- return ParDoPayload.parseFrom(transform.getSpec().getPayload()).getTimerSpecsMap().keySet();
+ return ParDoPayload.parseFrom(transform.getSpec().getPayload())
+ .getTimerFamilySpecsMap()
+ .keySet();
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(e);
}
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/ExecutableStageTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/ExecutableStageTest.java
index 9a69ed8..6101e6a 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/ExecutableStageTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/ExecutableStageTest.java
@@ -36,7 +36,7 @@ import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform;
import org.apache.beam.model.pipeline.v1.RunnerApi.ParDoPayload;
import org.apache.beam.model.pipeline.v1.RunnerApi.SideInput;
import org.apache.beam.model.pipeline.v1.RunnerApi.StateSpec;
-import org.apache.beam.model.pipeline.v1.RunnerApi.TimerSpec;
+import org.apache.beam.model.pipeline.v1.RunnerApi.TimerFamilySpec;
import org.apache.beam.model.pipeline.v1.RunnerApi.WindowIntoPayload;
import org.apache.beam.runners.core.construction.Environments;
import org.apache.beam.runners.core.construction.PTransformTranslation;
@@ -68,7 +68,7 @@ public class ExecutableStageTest {
.setDoFn(FunctionSpec.newBuilder())
.putSideInputs("side_input", SideInput.getDefaultInstance())
.putStateSpecs("user_state", StateSpec.getDefaultInstance())
- .putTimerSpecs("timer", TimerSpec.getDefaultInstance())
+ .putTimerFamilySpecs("timer", TimerFamilySpec.getDefaultInstance())
.build()
.toByteString()))
.setEnvironmentId("foo")
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyPipelineFuserTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyPipelineFuserTest.java
index 97b8091..78cba31 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyPipelineFuserTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyPipelineFuserTest.java
@@ -43,7 +43,7 @@ import org.apache.beam.model.pipeline.v1.RunnerApi.ParDoPayload;
import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline;
import org.apache.beam.model.pipeline.v1.RunnerApi.SideInput;
import org.apache.beam.model.pipeline.v1.RunnerApi.StateSpec;
-import org.apache.beam.model.pipeline.v1.RunnerApi.TimerSpec;
+import org.apache.beam.model.pipeline.v1.RunnerApi.TimerFamilySpec;
import org.apache.beam.model.pipeline.v1.RunnerApi.WindowIntoPayload;
import org.apache.beam.model.pipeline.v1.RunnerApi.WindowingStrategy;
import org.apache.beam.runners.core.construction.Environments;
@@ -1041,7 +1041,7 @@ public class GreedyPipelineFuserTest {
.setPayload(
ParDoPayload.newBuilder()
.setDoFn(FunctionSpec.newBuilder())
- .putTimerSpecs("timer", TimerSpec.getDefaultInstance())
+ .putTimerFamilySpecs("timer", TimerFamilySpec.getDefaultInstance())
.build()
.toByteString()))
.setEnvironmentId("common")
@@ -1100,7 +1100,7 @@ public class GreedyPipelineFuserTest {
ParDoPayload.newBuilder()
.setDoFn(FunctionSpec.newBuilder())
.putStateSpecs("state", StateSpec.getDefaultInstance())
- .putTimerSpecs("timer", TimerSpec.getDefaultInstance())
+ .putTimerFamilySpecs("timer", TimerFamilySpec.getDefaultInstance())
.build()
.toByteString()))
.setEnvironmentId("common")
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyStageFuserTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyStageFuserTest.java
index 8dc0f1e..c60f5e1 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyStageFuserTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyStageFuserTest.java
@@ -36,7 +36,7 @@ import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform;
import org.apache.beam.model.pipeline.v1.RunnerApi.ParDoPayload;
import org.apache.beam.model.pipeline.v1.RunnerApi.SideInput;
import org.apache.beam.model.pipeline.v1.RunnerApi.StateSpec;
-import org.apache.beam.model.pipeline.v1.RunnerApi.TimerSpec;
+import org.apache.beam.model.pipeline.v1.RunnerApi.TimerFamilySpec;
import org.apache.beam.model.pipeline.v1.RunnerApi.WindowIntoPayload;
import org.apache.beam.runners.core.construction.Environments;
import org.apache.beam.runners.core.construction.PTransformTranslation;
@@ -339,7 +339,7 @@ public class GreedyStageFuserTest {
.setPayload(
ParDoPayload.newBuilder()
.setDoFn(FunctionSpec.newBuilder())
- .putTimerSpecs("timer", TimerSpec.getDefaultInstance())
+ .putTimerFamilySpecs("timer", TimerFamilySpec.getDefaultInstance())
.build()
.toByteString()))
.setEnvironmentId("common")
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/ImmutableExecutableStageTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/ImmutableExecutableStageTest.java
index 69336b7..5d11bfc 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/ImmutableExecutableStageTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/ImmutableExecutableStageTest.java
@@ -62,7 +62,8 @@ public class ImmutableExecutableStageTest {
.setDoFn(RunnerApi.FunctionSpec.newBuilder())
.putSideInputs("side_input", RunnerApi.SideInput.getDefaultInstance())
.putStateSpecs("user_state", RunnerApi.StateSpec.getDefaultInstance())
- .putTimerSpecs("timer", RunnerApi.TimerSpec.getDefaultInstance())
+ .putTimerFamilySpecs(
+ "timer", RunnerApi.TimerFamilySpec.getDefaultInstance())
.build()
.toByteString()))
.build();
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
index 64a2d4d..e5696bf 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
@@ -36,6 +36,7 @@ import org.apache.beam.sdk.state.TimerSpec;
import org.apache.beam.sdk.state.TimerSpecs;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature.TimerDeclaration;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
@@ -119,7 +120,7 @@ public class SimpleDoFnRunnerTest {
thrown.expectCause(is(fn.exceptionToThrow));
runner.onTimer(
- ThrowingDoFn.TIMER_ID,
+ TimerDeclaration.PREFIX + ThrowingDoFn.TIMER_ID,
"",
GlobalWindow.INSTANCE,
new Instant(0),
@@ -160,7 +161,7 @@ public class SimpleDoFnRunnerTest {
verify(mockTimerInternals)
.setTimer(
StateNamespaces.window(new GlobalWindows().windowCoder(), GlobalWindow.INSTANCE),
- DoFnWithTimers.TIMER_ID,
+ TimerDeclaration.PREFIX + DoFnWithTimers.TIMER_ID,
"",
currentTime.plus(DoFnWithTimers.TIMER_OFFSET),
currentTime.plus(DoFnWithTimers.TIMER_OFFSET),
@@ -243,7 +244,7 @@ public class SimpleDoFnRunnerTest {
// Mocking is not easily compatible with annotation analysis, so we manually record
// the method call.
runner.onTimer(
- DoFnWithTimers.TIMER_ID,
+ TimerDeclaration.PREFIX + DoFnWithTimers.TIMER_ID,
"",
GlobalWindow.INSTANCE,
currentTime.plus(offset),
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactory.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactory.java
index 7b148a2..4b9e882 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactory.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactory.java
@@ -19,7 +19,6 @@ package org.apache.beam.runners.dataflow;
import static org.apache.beam.runners.core.construction.PTransformTranslation.PAR_DO_TRANSFORM_URN;
import static org.apache.beam.runners.core.construction.ParDoTranslation.translateTimerFamilySpec;
-import static org.apache.beam.runners.core.construction.ParDoTranslation.translateTimerSpec;
import static org.apache.beam.sdk.options.ExperimentalOptions.hasExperiment;
import static org.apache.beam.sdk.transforms.reflect.DoFnSignatures.getStateSpecOrThrow;
import static org.apache.beam.sdk.transforms.reflect.DoFnSignatures.getTimerFamilySpecOrThrow;
@@ -227,13 +226,14 @@ public class PrimitiveParDoSingleFactory<InputT, OutputT>
}
@Override
- public Map<String, RunnerApi.TimerSpec> translateTimerSpecs(
+ public Map<String, RunnerApi.TimerFamilySpec> translateTimerSpecs(
SdkComponents newComponents) {
- Map<String, RunnerApi.TimerSpec> timerSpecs = new HashMap<>();
+ Map<String, RunnerApi.TimerFamilySpec> timerSpecs = new HashMap<>();
for (Map.Entry<String, DoFnSignature.TimerDeclaration> timer :
signature.timerDeclarations().entrySet()) {
- RunnerApi.TimerSpec spec =
- translateTimerSpec(getTimerSpecOrThrow(timer.getValue(), doFn), newComponents);
+ RunnerApi.TimerFamilySpec spec =
+ translateTimerFamilySpec(
+ getTimerSpecOrThrow(timer.getValue(), doFn), newComponents);
timerSpecs.put(timer.getKey(), spec);
}
return timerSpecs;
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CreateExecutableStageNodeFunction.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CreateExecutableStageNodeFunction.java
index db25000..29ebcb9 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CreateExecutableStageNodeFunction.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CreateExecutableStageNodeFunction.java
@@ -344,8 +344,8 @@ public class CreateExecutableStageNodeFunction
// Build the necessary components to inform the SDK Harness of the pipeline's
// user timers and user state.
- for (Map.Entry<String, RunnerApi.TimerSpec> entry :
- parDoPayload.getTimerSpecsMap().entrySet()) {
+ for (Map.Entry<String, RunnerApi.TimerFamilySpec> entry :
+ parDoPayload.getTimerFamilySpecsMap().entrySet()) {
timerIds.add(entry.getKey());
}
for (Map.Entry<String, RunnerApi.StateSpec> entry :
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/TimerReceiverTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/TimerReceiverTest.java
index a8c0586..550115d 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/TimerReceiverTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/control/TimerReceiverTest.java
@@ -63,6 +63,7 @@ import org.apache.beam.sdk.state.TimerSpecs;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Impulse;
import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature.TimerDeclaration;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
@@ -164,6 +165,7 @@ public class TimerReceiverTest implements Serializable {
@Test
public void testSingleTimerScheduling() throws Exception {
final String timerId = "timerId";
+ final String timerDeclarationId = TimerDeclaration.PREFIX + timerId;
Pipeline p = Pipeline.create();
PCollection<Integer> output =
@@ -227,8 +229,8 @@ public class TimerReceiverTest implements Serializable {
}
});
- String timerOutputPCollection = timerSpecMap.get(timerId).outputCollectionId();
- String timerInputPCollection = timerSpecMap.get(timerId).inputCollectionId();
+ String timerOutputPCollection = timerSpecMap.get(timerDeclarationId).outputCollectionId();
+ String timerInputPCollection = timerSpecMap.get(timerDeclarationId).inputCollectionId();
// Arbitrary offset.
long testTimerOffset = 123456;
@@ -270,6 +272,8 @@ public class TimerReceiverTest implements Serializable {
public void testMultiTimerScheduling() throws Exception {
final String timerId1 = "timerId1";
final String timerId2 = "timerId2";
+ final String timerDeclarationId1 = TimerDeclaration.PREFIX + timerId1;
+ final String timerDeclarationId2 = TimerDeclaration.PREFIX + timerId2;
Pipeline p = Pipeline.create();
PCollection<Integer> output =
@@ -362,9 +366,11 @@ public class TimerReceiverTest implements Serializable {
// Simulate the SDK Harness sending a timer element to the Runner Harness.
assertTrue(
- timerReceiver.receive(timerSpecMap.get(timerId1).outputCollectionId(), windowedTimer1));
+ timerReceiver.receive(
+ timerSpecMap.get(timerDeclarationId1).outputCollectionId(), windowedTimer1));
assertTrue(
- timerReceiver.receive(timerSpecMap.get(timerId2).outputCollectionId(), windowedTimer2));
+ timerReceiver.receive(
+ timerSpecMap.get(timerDeclarationId2).outputCollectionId(), windowedTimer2));
// Expect that we get a timer element when we finish.
Object expectedTimer1 =
@@ -383,12 +389,12 @@ public class TimerReceiverTest implements Serializable {
Mockito.verify(timerReceiver, Mockito.never())
.fireTimer(
- timerSpecMap.get(timerId1).inputCollectionId(),
+ timerSpecMap.get(timerDeclarationId1).inputCollectionId(),
(WindowedValue<KV<Object, org.apache.beam.runners.core.construction.Timer>>)
expectedTimer1);
Mockito.verify(timerReceiver, Mockito.never())
.fireTimer(
- timerSpecMap.get(timerId2).inputCollectionId(),
+ timerSpecMap.get(timerDeclarationId2).inputCollectionId(),
(WindowedValue<KV<Object, org.apache.beam.runners.core.construction.Timer>>)
expectedTimer2);
@@ -397,12 +403,12 @@ public class TimerReceiverTest implements Serializable {
timerReceiver.finish();
Mockito.verify(timerReceiver)
.fireTimer(
- timerSpecMap.get(timerId1).inputCollectionId(),
+ timerSpecMap.get(timerDeclarationId1).inputCollectionId(),
(WindowedValue<KV<Object, org.apache.beam.runners.core.construction.Timer>>)
expectedTimer1);
Mockito.verify(timerReceiver)
.fireTimer(
- timerSpecMap.get(timerId2).inputCollectionId(),
+ timerSpecMap.get(timerDeclarationId2).inputCollectionId(),
(WindowedValue<KV<Object, org.apache.beam.runners.core.construction.Timer>>)
expectedTimer2);
}
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ProcessBundleDescriptors.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ProcessBundleDescriptors.java
index 80b3f61..5bfee8f 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ProcessBundleDescriptors.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ProcessBundleDescriptors.java
@@ -354,7 +354,7 @@ public class ProcessBundleDescriptors {
RunnerApi.ParDoPayload.parseFrom(
timerReference.transform().getTransform().getSpec().getPayload());
RunnerApi.TimeDomain.Enum timeDomain =
- payload.getTimerSpecsOrThrow(timerReference.localName()).getTimeDomain();
+ payload.getTimerFamilySpecsOrThrow(timerReference.localName()).getTimeDomain();
org.apache.beam.sdk.state.TimerSpec spec;
switch (timeDomain) {
case EVENT_TIME:
@@ -380,13 +380,15 @@ public class ProcessBundleDescriptors {
timerReference.transform().getTransform().getInputsMap().keySet(),
Sets.union(
payload.getSideInputsMap().keySet(),
- payload.getTimerSpecsMap().keySet()))));
+ payload.getTimerFamilySpecsMap().keySet()))));
String timerCoderId =
keyValueCoderId(
components
.getCodersOrThrow(components.getPcollectionsOrThrow(mainInputName).getCoderId())
.getComponentCoderIds(0),
- payload.getTimerSpecsOrThrow(timerReference.localName()).getTimerCoderId(),
+ payload
+ .getTimerFamilySpecsOrThrow(timerReference.localName())
+ .getTimerFamilyCoderId(),
components);
RunnerApi.PCollection timerCollectionSpec =
components
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
index 1df70db..8a423a2 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
@@ -1135,6 +1135,9 @@ public abstract class DoFnSignature {
*/
@AutoValue
public abstract static class TimerDeclaration {
+
+ public static final String PREFIX = "ts-";
+
public abstract String id();
public abstract Field field();
@@ -1150,6 +1153,9 @@ public abstract class DoFnSignature {
*/
@AutoValue
public abstract static class TimerFamilyDeclaration {
+
+ public static final String PREFIX = "tfs-";
+
public abstract String id();
public abstract Field field();
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
index cea704b..31ad817 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
@@ -532,7 +532,7 @@ public class DoFnSignatures {
HashMap<String, DoFnSignature.OnTimerMethod> onTimerMethodMap =
Maps.newHashMapWithExpectedSize(onTimerMethods.size());
for (Method onTimerMethod : onTimerMethods) {
- String id = onTimerMethod.getAnnotation(DoFn.OnTimer.class).value();
+ String id = TimerDeclaration.PREFIX + onTimerMethod.getAnnotation(DoFn.OnTimer.class).value();
errors.checkArgument(
fnContext.getTimerDeclarations().containsKey(id),
"Callback %s is for undeclared timer %s",
@@ -560,7 +560,9 @@ public class DoFnSignatures {
Maps.newHashMapWithExpectedSize(onTimerFamilyMethods.size());
for (Method onTimerFamilyMethod : onTimerFamilyMethods) {
- String id = onTimerFamilyMethod.getAnnotation(DoFn.OnTimerFamily.class).value();
+ String id =
+ TimerFamilyDeclaration.PREFIX
+ + onTimerFamilyMethod.getAnnotation(DoFn.OnTimerFamily.class).value();
errors.checkArgument(
fnContext.getTimerFamilyDeclarations().containsKey(id),
"Callback %s is for undeclared timerFamily %s",
@@ -1471,14 +1473,14 @@ public class DoFnSignatures {
@Nullable
private static String getTimerId(List<Annotation> annotations) {
- DoFn.TimerId stateId = findFirstOfType(annotations, DoFn.TimerId.class);
- return stateId != null ? stateId.value() : null;
+ DoFn.TimerId timerId = findFirstOfType(annotations, DoFn.TimerId.class);
+ return timerId != null ? TimerDeclaration.PREFIX + timerId.value() : null;
}
@Nullable
private static String getTimerFamilyId(List<Annotation> annotations) {
DoFn.TimerFamily timerFamilyId = findFirstOfType(annotations, DoFn.TimerFamily.class);
- return timerFamilyId != null ? timerFamilyId.value() : null;
+ return timerFamilyId != null ? TimerFamilyDeclaration.PREFIX + timerFamilyId.value() : null;
}
@Nullable
@@ -1761,7 +1763,8 @@ public class DoFnSignatures {
for (Field field : declaredFieldsWithAnnotation(DoFn.TimerFamily.class, fnClazz, DoFn.class)) {
// TimerSpec fields may generally be private, but will be accessed via the signature
field.setAccessible(true);
- String id = field.getAnnotation(DoFn.TimerFamily.class).value();
+ String id =
+ TimerFamilyDeclaration.PREFIX + field.getAnnotation(DoFn.TimerFamily.class).value();
validateTimerFamilyField(errors, declarations, id, field);
declarations.put(id, TimerFamilyDeclaration.create(id, field));
}
@@ -1775,7 +1778,9 @@ public class DoFnSignatures {
for (Field field : declaredFieldsWithAnnotation(DoFn.TimerId.class, fnClazz, DoFn.class)) {
// TimerSpec fields may generally be private, but will be accessed via the signature
field.setAccessible(true);
- String id = field.getAnnotation(DoFn.TimerId.class).value();
+ // Add fixed prefix to avoid key collision with TimerFamily.
+ String id =
+ DoFnSignature.TimerDeclaration.PREFIX + field.getAnnotation(DoFn.TimerId.class).value();
validateTimerField(errors, declarations, id, field);
declarations.put(id, DoFnSignature.TimerDeclaration.create(id, field));
}
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
index dc97ec8..5224865 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
@@ -60,6 +60,7 @@ import org.apache.beam.sdk.transforms.DoFn.BundleFinalizer;
import org.apache.beam.sdk.transforms.DoFn.MultiOutputReceiver;
import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.reflect.DoFnInvoker.FakeArgumentProvider;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature.TimerDeclaration;
import org.apache.beam.sdk.transforms.reflect.testhelper.DoFnInvokersTestHelper;
import org.apache.beam.sdk.transforms.splittabledofn.HasDefaultTracker;
import org.apache.beam.sdk.transforms.splittabledofn.HasDefaultWatermarkEstimator;
@@ -123,7 +124,8 @@ public class DoFnInvokersTest {
}
private void invokeOnTimer(String timerId, DoFn<String, String> fn) {
- DoFnInvokers.invokerFor(fn).invokeOnTimer(timerId, "", mockArgumentProvider);
+ DoFnInvokers.invokerFor(fn)
+ .invokeOnTimer(TimerDeclaration.PREFIX + timerId, "", mockArgumentProvider);
}
@Test
@@ -269,7 +271,7 @@ public class DoFnInvokersTest {
public void testDoFnWithTimer() throws Exception {
Timer mockTimer = mock(Timer.class);
final String timerId = "my-timer-id-here";
- when(mockArgumentProvider.timer(timerId)).thenReturn(mockTimer);
+ when(mockArgumentProvider.timer(TimerDeclaration.PREFIX + timerId)).thenReturn(mockTimer);
class MockFn extends DoFn<String, String> {
@TimerId(timerId)
@@ -1038,7 +1040,7 @@ public class DoFnInvokersTest {
SimpleTimerDoFn fn = new SimpleTimerDoFn();
DoFnInvoker<String, String> invoker = DoFnInvokers.invokerFor(fn);
- invoker.invokeOnTimer(timerId, "", mockArgumentProvider);
+ invoker.invokeOnTimer(TimerDeclaration.PREFIX + timerId, "", mockArgumentProvider);
assertThat(fn.status, equalTo("OK now"));
}
@@ -1067,7 +1069,7 @@ public class DoFnInvokersTest {
SimpleTimerDoFn fn = new SimpleTimerDoFn();
DoFnInvoker<String, String> invoker = DoFnInvokers.invokerFor(fn);
- invoker.invokeOnTimer(timerId, "", mockArgumentProvider);
+ invoker.invokeOnTimer(TimerDeclaration.PREFIX + timerId, "", mockArgumentProvider);
assertThat(fn.window, equalTo(testWindow));
}
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
index 1414371..ee5ddc3 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
@@ -72,6 +72,7 @@ import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.TimeDomain
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.TimerParameter;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.TimestampParameter;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.WindowParameter;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature.TimerDeclaration;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures.FnAnalysisContext;
import org.apache.beam.sdk.transforms.reflect.DoFnSignaturesTestUtils.FakeDoFn;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
@@ -558,6 +559,7 @@ public class DoFnSignaturesTest {
@Test
public void testWindowParamOnTimer() throws Exception {
final String timerId = "some-timer-id";
+ final String timerDeclarationId = TimerDeclaration.PREFIX + timerId;
DoFnSignature sig =
DoFnSignatures.getSignature(
@@ -572,15 +574,16 @@ public class DoFnSignaturesTest {
public void onTimer(BoundedWindow w) {}
}.getClass());
- assertThat(sig.onTimerMethods().get(timerId).extraParameters().size(), equalTo(1));
+ assertThat(sig.onTimerMethods().get(timerDeclarationId).extraParameters().size(), equalTo(1));
assertThat(
- sig.onTimerMethods().get(timerId).extraParameters().get(0),
+ sig.onTimerMethods().get(timerDeclarationId).extraParameters().get(0),
instanceOf(WindowParameter.class));
}
@Test
public void testAllParamsOnTimer() throws Exception {
final String timerId = "some-timer-id";
+ final String timerDeclarationId = TimerDeclaration.PREFIX + timerId;
DoFnSignature sig =
DoFnSignatures.getSignature(
@@ -596,15 +599,15 @@ public class DoFnSignaturesTest {
@Timestamp Instant timestamp, TimeDomain timeDomain, BoundedWindow w) {}
}.getClass());
- assertThat(sig.onTimerMethods().get(timerId).extraParameters().size(), equalTo(3));
+ assertThat(sig.onTimerMethods().get(timerDeclarationId).extraParameters().size(), equalTo(3));
assertThat(
- sig.onTimerMethods().get(timerId).extraParameters().get(0),
+ sig.onTimerMethods().get(timerDeclarationId).extraParameters().get(0),
instanceOf(TimestampParameter.class));
assertThat(
- sig.onTimerMethods().get(timerId).extraParameters().get(1),
+ sig.onTimerMethods().get(timerDeclarationId).extraParameters().get(1),
instanceOf(TimeDomainParameter.class));
assertThat(
- sig.onTimerMethods().get(timerId).extraParameters().get(2),
+ sig.onTimerMethods().get(timerDeclarationId).extraParameters().get(2),
instanceOf(WindowParameter.class));
}
@@ -631,7 +634,8 @@ public class DoFnSignaturesTest {
assertThat(sig.processElement().extraParameters().size(), equalTo(2));
DoFnSignature.TimerDeclaration decl =
- sig.timerDeclarations().get(DoFnOverridingAbstractTimerUse.TIMER_ID);
+ sig.timerDeclarations()
+ .get(TimerDeclaration.PREFIX + DoFnOverridingAbstractTimerUse.TIMER_ID);
TimerParameter timerParam = (TimerParameter) sig.processElement().extraParameters().get(1);
assertThat(
@@ -656,9 +660,11 @@ public class DoFnSignaturesTest {
assertThat(sig.onTimerMethods().size(), equalTo(1));
DoFnSignature.TimerDeclaration decl =
- sig.timerDeclarations().get(DoFnDeclaringTimerAndAbstractCallback.TIMER_ID);
+ sig.timerDeclarations()
+ .get(TimerDeclaration.PREFIX + DoFnDeclaringTimerAndAbstractCallback.TIMER_ID);
DoFnSignature.OnTimerMethod callback =
- sig.onTimerMethods().get(DoFnDeclaringTimerAndAbstractCallback.TIMER_ID);
+ sig.onTimerMethods()
+ .get(TimerDeclaration.PREFIX + DoFnDeclaringTimerAndAbstractCallback.TIMER_ID);
assertThat(
decl.field(),
@@ -727,10 +733,11 @@ public class DoFnSignaturesTest {
public void onFoo() {}
}.getClass());
+ final String timerDeclarationId = TimerDeclaration.PREFIX + "foo";
assertThat(sig.timerDeclarations().size(), equalTo(1));
- DoFnSignature.TimerDeclaration decl = sig.timerDeclarations().get("foo");
+ DoFnSignature.TimerDeclaration decl = sig.timerDeclarations().get(timerDeclarationId);
- assertThat(decl.id(), equalTo("foo"));
+ assertThat(decl.id(), equalTo(timerDeclarationId));
assertThat(decl.field().getName(), equalTo("bizzle"));
}
@@ -749,14 +756,15 @@ public class DoFnSignaturesTest {
public void onFoo(OnTimerContext c) {}
}.getClass());
+ final String timerDeclarationId = TimerDeclaration.PREFIX + "foo";
assertThat(sig.timerDeclarations().size(), equalTo(1));
- DoFnSignature.TimerDeclaration decl = sig.timerDeclarations().get("foo");
+ DoFnSignature.TimerDeclaration decl = sig.timerDeclarations().get(timerDeclarationId);
- assertThat(decl.id(), equalTo("foo"));
+ assertThat(decl.id(), equalTo(timerDeclarationId));
assertThat(decl.field().getName(), equalTo("bizzle"));
assertThat(
- sig.onTimerMethods().get("foo").extraParameters().get(0),
+ sig.onTimerMethods().get(timerDeclarationId).extraParameters().get(0),
equalTo((Parameter) Parameter.onTimerContext()));
}
@@ -796,10 +804,12 @@ public class DoFnSignaturesTest {
// Test classes at the bottom of the file
DoFnSignature sig = DoFnSignatures.signatureForDoFn(new DoFnForTestSimpleTimerIdNamedDoFn());
+ final String timerDeclarationId = TimerDeclaration.PREFIX + "foo";
+
assertThat(sig.timerDeclarations().size(), equalTo(1));
- DoFnSignature.TimerDeclaration decl = sig.timerDeclarations().get("foo");
+ DoFnSignature.TimerDeclaration decl = sig.timerDeclarations().get(timerDeclarationId);
- assertThat(decl.id(), equalTo("foo"));
+ assertThat(decl.id(), equalTo(timerDeclarationId));
assertThat(
decl.field(), equalTo(DoFnForTestSimpleTimerIdNamedDoFn.class.getDeclaredField("bizzle")));
}
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvokersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvokersTest.java
index b4553c9..dbf2e2e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvokersTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvokersTest.java
@@ -26,6 +26,7 @@ import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.state.TimerSpec;
import org.apache.beam.sdk.state.TimerSpecs;
import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature.TimerDeclaration;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.junit.Before;
import org.junit.Rule;
@@ -52,7 +53,8 @@ public class OnTimerInvokersTest {
}
private void invokeOnTimer(DoFn<String, String> fn, String timerId) {
- OnTimerInvokers.forTimer(fn, timerId).invokeOnTimer(mockArgumentProvider);
+ OnTimerInvokers.forTimer(fn, TimerDeclaration.PREFIX + timerId)
+ .invokeOnTimer(mockArgumentProvider);
}
@Test
@@ -123,7 +125,8 @@ public class OnTimerInvokersTest {
@Test
public void testStableName() {
OnTimerInvoker<Void, Void> invoker =
- OnTimerInvokers.forTimer(new StableNameTestDoFn(), StableNameTestDoFn.TIMER_ID);
+ OnTimerInvokers.forTimer(
+ new StableNameTestDoFn(), TimerDeclaration.PREFIX + StableNameTestDoFn.TIMER_ID);
assertThat(
invoker.getClass().getName(),
@@ -132,7 +135,7 @@ public class OnTimerInvokersTest {
"%s$%s$%s$%s",
StableNameTestDoFn.class.getName(),
OnTimerInvoker.class.getSimpleName(),
- "timeridwithspecialChars" /* alphanum only; human readable but not unique */,
- "dGltZXItaWQud2l0aCBzcGVjaWFsQ2hhcnN7fQ" /* base64 encoding of UTF-8 timerId */)));
+ "tstimeridwithspecialChars" /* alphanum only; human readable but not unique */,
+ "dHMtdGltZXItaWQud2l0aCBzcGVjaWFsQ2hhcnN7fQ" /* base64 encoding of UTF-8 timerId */)));
}
}
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
index ec26195..49aa850 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
@@ -203,7 +203,7 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
pTransform.getInputsOrThrow(mainInput), pTransformId, (FnDataReceiver) mainInputConsumer);
// Register as a consumer for each timer PCollection.
- for (String localName : runner.parDoPayload.getTimerSpecsMap().keySet()) {
+ for (String localName : runner.parDoPayload.getTimerFamilySpecsMap().keySet()) {
TimeDomain timeDomain =
DoFnSignatures.getTimerSpecOrThrow(
runner.doFnSignature.timerDeclarations().get(localName), runner.doFn)
@@ -362,7 +362,7 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
pTransform.getInputsMap().keySet(),
Sets.union(
parDoPayload.getSideInputsMap().keySet(),
- parDoPayload.getTimerSpecsMap().keySet())));
+ parDoPayload.getTimerFamilySpecsMap().keySet())));
PCollection mainInput = pCollections.get(pTransform.getInputsOrThrow(mainInputTag));
Coder<?> maybeWindowedValueInputCoder = rehydratedComponents.getCoder(mainInput.getCoderId());
// TODO: Stop passing windowed value coders within PCollections.
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
index 426bf85..7ed8b82 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
@@ -84,6 +84,7 @@ import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature.TimerDeclaration;
import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
@@ -802,11 +803,14 @@ public class FnApiDoFnRunnerTest implements Serializable {
RunnerApi.Pipeline pProto = PipelineTranslation.toProto(p, sdkComponents);
String inputPCollectionId = sdkComponents.registerPCollection(valuePCollection);
String outputPCollectionId = sdkComponents.registerPCollection(outputPCollection);
- String eventTimerInputPCollectionId = "pTransformId/ParMultiDo(TestTimerful).event";
- String eventTimerOutputPCollectionId = "pTransformId/ParMultiDo(TestTimerful).event.output";
- String processingTimerInputPCollectionId = "pTransformId/ParMultiDo(TestTimerful).processing";
+ String eventTimerInputPCollectionId =
+ "pTransformId/ParMultiDo(TestTimerful)." + TimerDeclaration.PREFIX + "event";
+ String eventTimerOutputPCollectionId =
+ "pTransformId/ParMultiDo(TestTimerful)." + TimerDeclaration.PREFIX + "event.output";
+ String processingTimerInputPCollectionId =
+ "pTransformId/ParMultiDo(TestTimerful)." + TimerDeclaration.PREFIX + "processing";
String processingTimerOutputPCollectionId =
- "pTransformId/ParMultiDo(TestTimerful).processing.output";
+ "pTransformId/ParMultiDo(TestTimerful)." + TimerDeclaration.PREFIX + "processing.output";
RunnerApi.PTransform pTransform =
pProto
@@ -816,8 +820,8 @@ public class FnApiDoFnRunnerTest implements Serializable {
.toBuilder()
// We need to re-write the "output" PCollections that a runner would have inserted
// on the way to a output sink.
- .putOutputs("event", eventTimerOutputPCollectionId)
- .putOutputs("processing", processingTimerOutputPCollectionId)
+ .putOutputs(TimerDeclaration.PREFIX + "event", eventTimerOutputPCollectionId)
+ .putOutputs(TimerDeclaration.PREFIX + "processing", processingTimerOutputPCollectionId)
.build();
FakeBeamFnStateClient fakeClient =
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
index 96789e6..8ccd2d4 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
@@ -253,8 +253,7 @@ class FnApiRunner(runner.PipelineRunner):
if payload.requests_finalization:
expected_requirements.add(
common_urns.requirements.REQUIRES_BUNDLE_FINALIZATION.urn)
- if (payload.state_specs or payload.timer_specs or
- payload.timer_family_specs):
+ if (payload.state_specs or payload.timer_family_specs):
expected_requirements.add(
common_urns.requirements.REQUIRES_STATEFUL_PROCESSING.urn)
if payload.requires_stable_input:
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
index 884fb48..83c9315 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
@@ -255,7 +255,7 @@ class Stage(object):
user_states.append(
beam_runner_api_pb2.ExecutableStagePayload.UserStateId(
transform_id=transform_id, local_name=tag))
- for tag in payload.timer_specs.keys():
+ for tag in payload.timer_family_specs.keys():
timers.append(
beam_runner_api_pb2.ExecutableStagePayload.TimerId(
transform_id=transform_id, local_name=tag))
@@ -633,7 +633,7 @@ def annotate_stateful_dofns_as_roots(stages, pipeline_context):
if transform.spec.urn == common_urns.primitives.PAR_DO.urn:
pardo_payload = proto_utils.parse_Bytes(
transform.spec.payload, beam_runner_api_pb2.ParDoPayload)
- if pardo_payload.state_specs or pardo_payload.timer_specs:
+ if pardo_payload.state_specs or pardo_payload.timer_family_specs:
stage.forced_root = True
yield stage
@@ -1319,7 +1319,7 @@ def inject_timer_pcollections(stages, pipeline_context):
if transform.spec.urn in PAR_DO_URNS:
payload = proto_utils.parse_Bytes(
transform.spec.payload, beam_runner_api_pb2.ParDoPayload)
- for tag, spec in payload.timer_specs.items():
+ for tag, spec in payload.timer_family_specs.items():
if len(transform.inputs) > 1:
raise NotImplementedError('Timers and side inputs.')
input_pcoll = pipeline_context.components.pcollections[next(
@@ -1334,7 +1334,9 @@ def inject_timer_pcollections(stages, pipeline_context):
beam_runner_api_pb2.Coder(
spec=beam_runner_api_pb2.FunctionSpec(
urn=common_urns.coders.KV.urn),
- component_coder_ids=[key_coder_id, spec.timer_coder_id]))
+ component_coder_ids=[
+ key_coder_id, spec.timer_family_coder_id
+ ]))
# Inject the read and write pcollections.
timer_read_pcoll = unique_name(
pipeline_context.components.pcollections,
diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py b/sdks/python/apache_beam/runners/worker/bundle_processor.py
index a963a08..9180695 100644
--- a/sdks/python/apache_beam/runners/worker/bundle_processor.py
+++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py
@@ -601,7 +601,7 @@ class FnApiUserStateContext(userstate.UserStateContext):
transform_id, # type: str
key_coder, # type: coders.Coder
window_coder, # type: coders.Coder
- timer_specs # type: Mapping[str, beam_runner_api_pb2.TimerSpec]
+ timer_family_specs # type: Mapping[str, beam_runner_api_pb2.TimerFamilySpec]
):
# type: (...) -> None
@@ -612,14 +612,14 @@ class FnApiUserStateContext(userstate.UserStateContext):
transform_id: The name of the PTransform that this context is associated.
key_coder:
window_coder:
- timer_specs: A list of ``userstate.TimerSpec`` objects specifying the
- timers associated with this operation.
+ timer_family_specs: A list of ``userstate.TimerSpec`` objects specifying
+ the timers associated with this operation.
"""
self._state_handler = state_handler
self._transform_id = transform_id
self._key_coder = key_coder
self._window_coder = window_coder
- self._timer_specs = timer_specs
+ self._timer_family_specs = timer_family_specs
self._timer_receivers = None # type: Optional[Dict[str, operations.ConsumerSet]]
self._all_states = {
} # type: Dict[tuple, userstate.AccumulatingRuntimeState]
@@ -629,7 +629,7 @@ class FnApiUserStateContext(userstate.UserStateContext):
"""TODO"""
self._timer_receivers = {}
- for tag in self._timer_specs:
+ for tag in self._timer_family_specs:
self._timer_receivers[tag] = receivers.pop(tag)
def get_timer(
@@ -1478,7 +1478,7 @@ def _create_pardo_operation(
if pardo_proto:
other_input_tags = set.union(
set(pardo_proto.side_inputs),
- set(pardo_proto.timer_specs)) # type: Container[str]
+ set(pardo_proto.timer_family_specs)) # type: Container[str]
else:
other_input_tags = ()
pcoll_id, = [pcoll for tag, pcoll in transform_proto.inputs.items()
@@ -1488,12 +1488,12 @@ def _create_pardo_operation(
serialized_fn = pickler.dumps(dofn_data[:-1] + (windowing, ))
timer_inputs = None # type: Optional[Dict[str, str]]
- if pardo_proto and (pardo_proto.timer_specs or pardo_proto.state_specs or
- pardo_proto.restriction_coder_id):
+ if pardo_proto and (pardo_proto.timer_family_specs or pardo_proto.state_specs
+ or pardo_proto.restriction_coder_id):
main_input_coder = None # type: Optional[WindowedValueCoder]
timer_inputs = {}
for tag, pcoll_id in transform_proto.inputs.items():
- if tag in pardo_proto.timer_specs:
+ if tag in pardo_proto.timer_family_specs:
timer_inputs[tag] = pcoll_id
elif tag in pardo_proto.side_inputs:
pass
@@ -1504,13 +1504,13 @@ def _create_pardo_operation(
main_input_coder = factory.get_windowed_coder(pcoll_id)
assert main_input_coder is not None
- if pardo_proto.timer_specs or pardo_proto.state_specs:
+ if pardo_proto.timer_family_specs or pardo_proto.state_specs:
user_state_context = FnApiUserStateContext(
factory.state_handler,
transform_id,
main_input_coder.key_coder(),
main_input_coder.window_coder,
- timer_specs=pardo_proto.timer_specs
+ timer_family_specs=pardo_proto.timer_family_specs
) # type: Optional[FnApiUserStateContext]
else:
user_state_context = None
diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py
index 8ebffbd..4f502c5 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -1325,7 +1325,7 @@ class ParDo(PTransformWithSideInputs):
spec.name: spec.to_runner_api(context)
for spec in state_specs
},
- timer_specs={
+ timer_family_specs={
spec.name: spec.to_runner_api(context)
for spec in timer_specs
},
diff --git a/sdks/python/apache_beam/transforms/userstate.py b/sdks/python/apache_beam/transforms/userstate.py
index 70cdca2..f3ffa88 100644
--- a/sdks/python/apache_beam/transforms/userstate.py
+++ b/sdks/python/apache_beam/transforms/userstate.py
@@ -131,8 +131,10 @@ class CombiningValueStateSpec(StateSpec):
class TimerSpec(object):
"""Specification for a user stateful DoFn timer."""
+ prefix = "ts-"
+
def __init__(self, name, time_domain):
- self.name = name
+ self.name = self.prefix + name
if time_domain not in (TimeDomain.WATERMARK, TimeDomain.REAL_TIME):
raise ValueError('Unsupported TimeDomain: %r.' % (time_domain, ))
self.time_domain = time_domain
@@ -142,10 +144,30 @@ class TimerSpec(object):
return '%s(%s)' % (self.__class__.__name__, self.name)
def to_runner_api(self, context):
- # type: (PipelineContext) -> beam_runner_api_pb2.TimerSpec
- return beam_runner_api_pb2.TimerSpec(
+ # type: (PipelineContext) -> beam_runner_api_pb2.TimerFamilySpec
+ return beam_runner_api_pb2.TimerFamilySpec(
+ time_domain=TimeDomain.to_runner_api(self.time_domain),
+ timer_family_coder_id=context.coders.get_id(
+ coders._TimerCoder(coders.SingletonCoder(None))))
+
+
+# TODO(BEAM-9602): Provide support for dynamic timer.
+class TimerFamilySpec(object):
+ prefix = "tfs-"
+
+ def __init__(self, name, time_domain):
+ self.name = self.prefix + name
+ if time_domain not in (TimeDomain.WATERMARK, TimeDomain.REAL_TIME):
+ raise ValueError('Unsupported TimeDomain: %r.' % (time_domain, ))
+ self.time_domain = time_domain
+
+ def __repr__(self):
+ return '%s(%s)' % (self.__class__.__name__, self.name)
+
+ def to_runner_api(self, context):
+ return beam_runner_api_pb2.TimerFamilySpec(
time_domain=TimeDomain.to_runner_api(self.time_domain),
- timer_coder_id=context.coders.get_id(
+ timer_family_coder_id=context.coders.get_id(
coders._TimerCoder(coders.SingletonCoder(None))))
diff --git a/sdks/python/apache_beam/transforms/userstate_test.py b/sdks/python/apache_beam/transforms/userstate_test.py
index 75d9416..8ad3e95 100644
--- a/sdks/python/apache_beam/transforms/userstate_test.py
+++ b/sdks/python/apache_beam/transforms/userstate_test.py
@@ -265,7 +265,7 @@ class InterfaceTest(unittest.TestCase):
# different timer callbacks.
with self.assertRaisesRegex(
ValueError,
- r'Multiple on_timer callbacks registered for TimerSpec\(expiry1\).'):
+ r'Multiple on_timer callbacks registered for TimerSpec\(.*expiry1\).'):
class StatefulDoFnWithTimerWithTypo1(DoFn): # pylint: disable=unused-variable
BUFFER_STATE = BagStateSpec('buffer', BytesCoder())
@@ -315,7 +315,7 @@ class InterfaceTest(unittest.TestCase):
dofn = StatefulDoFnWithTimerWithTypo2()
with self.assertRaisesRegex(
ValueError,
- (r'The on_timer callback for TimerSpec\(expiry1\) is not the '
+ (r'The on_timer callback for TimerSpec\(.*expiry1\) is not the '
r'specified .on_expiry_1 method for DoFn '
r'StatefulDoFnWithTimerWithTypo2 \(perhaps it was overwritten\?\).')):
validate_stateful_dofn(dofn)
@@ -348,7 +348,7 @@ class InterfaceTest(unittest.TestCase):
with self.assertRaisesRegex(
ValueError,
(r'DoFn StatefulDoFnWithTimerWithTypo3 has a TimerSpec without an '
- r'associated on_timer callback: TimerSpec\(expiry2\).')):
+ r'associated on_timer callback: TimerSpec\(.*expiry2\).')):
validate_stateful_dofn(dofn)