You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bo...@apache.org on 2020/05/09 03:55:38 UTC

[beam] branch master updated: [BEAM-9940] Set timer family spec for TimerDeclarations in dataflow runner

This is an automated email from the ASF dual-hosted git repository.

boyuanz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 5ef3d2f  [BEAM-9940] Set timer family spec for TimerDeclarations in dataflow runner
     new eea3a19  Merge pull request #11649 from y1chi/BEAM-9940
5ef3d2f is described below

commit 5ef3d2f934f325a5db65b2942f70fb08f35211b5
Author: Yichi Zhang <zy...@google.com>
AuthorDate: Fri May 8 18:42:19 2020 -0700

    [BEAM-9940] Set timer family spec for TimerDeclarations in dataflow runner
---
 .../beam/runners/dataflow/PrimitiveParDoSingleFactory.java    | 11 +++++++++++
 1 file changed, 11 insertions(+)

diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactory.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactory.java
index b6b60e3..3256848 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactory.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactory.java
@@ -22,6 +22,7 @@ import static org.apache.beam.runners.core.construction.ParDoTranslation.transla
 import static org.apache.beam.sdk.options.ExperimentalOptions.hasExperiment;
 import static org.apache.beam.sdk.transforms.reflect.DoFnSignatures.getStateSpecOrThrow;
 import static org.apache.beam.sdk.transforms.reflect.DoFnSignatures.getTimerFamilySpecOrThrow;
+import static org.apache.beam.sdk.transforms.reflect.DoFnSignatures.getTimerSpecOrThrow;
 import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
 
 import com.google.auto.service.AutoService;
@@ -250,6 +251,16 @@ public class PrimitiveParDoSingleFactory<InputT, OutputT>
                         windowCoder);
                 timerFamilySpecs.put(timerFamily.getKey(), spec);
               }
+              for (Map.Entry<String, DoFnSignature.TimerDeclaration> timer :
+                  signature.timerDeclarations().entrySet()) {
+                RunnerApi.TimerFamilySpec spec =
+                    translateTimerFamilySpec(
+                        getTimerSpecOrThrow(timer.getValue(), doFn),
+                        newComponents,
+                        keyCoder,
+                        windowCoder);
+                timerFamilySpecs.put(timer.getKey(), spec);
+              }
               return timerFamilySpecs;
             }