You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/01/20 21:05:56 UTC

[3/4] beam git commit: Set USES_KEYED_STATE in Dataflow when DoFn uses timers

Set USES_KEYED_STATE in Dataflow when DoFn uses timers


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f535d658
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f535d658
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f535d658

Branch: refs/heads/master
Commit: f535d658b551f56ebe4f2c77948fef63be7f44dc
Parents: 978ff55
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri Jan 20 11:36:52 2017 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Fri Jan 20 11:36:52 2017 -0800

----------------------------------------------------------------------
 .../apache/beam/runners/dataflow/DataflowPipelineTranslator.java   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/f535d658/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
index cfb3d1a..babbe69 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
@@ -943,7 +943,7 @@ public class DataflowPipelineTranslator {
                 DoFnInfo.forFn(
                     fn, windowingStrategy, sideInputs, inputCoder, mainOutput, outputMap))));
 
-    if (signature.usesState()) {
+    if (signature.usesState() || signature.usesTimers()) {
       stepContext.addInput(PropertyNames.USES_KEYED_STATE, "true");
     }
   }