You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/01/20 21:05:56 UTC
[3/4] beam git commit: Set USES_KEYED_STATE in Dataflow when DoFn
uses timers
Set USES_KEYED_STATE in Dataflow when DoFn uses timers
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f535d658
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f535d658
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f535d658
Branch: refs/heads/master
Commit: f535d658b551f56ebe4f2c77948fef63be7f44dc
Parents: 978ff55
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri Jan 20 11:36:52 2017 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Fri Jan 20 11:36:52 2017 -0800
----------------------------------------------------------------------
.../apache/beam/runners/dataflow/DataflowPipelineTranslator.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/f535d658/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
index cfb3d1a..babbe69 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
@@ -943,7 +943,7 @@ public class DataflowPipelineTranslator {
DoFnInfo.forFn(
fn, windowingStrategy, sideInputs, inputCoder, mainOutput, outputMap))));
- if (signature.usesState()) {
+ if (signature.usesState() || signature.usesTimers()) {
stepContext.addInput(PropertyNames.USES_KEYED_STATE, "true");
}
}