You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/12/06 23:06:06 UTC
[2/2] incubator-beam git commit: Allow stateful DoFn in DataflowRunner
Allow stateful DoFn in DataflowRunner
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/42bb15d2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/42bb15d2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/42bb15d2
Branch: refs/heads/master
Commit: 42bb15d2df28b99b6788010450f41f2932095771
Parents: c72708c
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Dec 6 13:51:19 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue Dec 6 15:05:53 2016 -0800
----------------------------------------------------------------------
runners/google-cloud-dataflow-java/pom.xml | 1 -
.../dataflow/DataflowPipelineTranslator.java | 22 +++++++-------------
2 files changed, 8 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/42bb15d2/runners/google-cloud-dataflow-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml
index adebb2a..9ead74a 100644
--- a/runners/google-cloud-dataflow-java/pom.xml
+++ b/runners/google-cloud-dataflow-java/pom.xml
@@ -78,7 +78,6 @@
<id>runnable-on-service-tests</id>
<configuration>
<excludedGroups>
- org.apache.beam.sdk.testing.UsesStatefulParDo,
org.apache.beam.sdk.testing.UsesSplittableParDo
</excludedGroups>
<excludes>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/42bb15d2/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
index 1cff42a..f43e176 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
@@ -77,6 +77,7 @@ import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.HasDisplayData;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
import org.apache.beam.sdk.transforms.windowing.Window;
@@ -960,7 +961,6 @@ public class DataflowPipelineTranslator {
private <InputT, OutputT> void translateMultiHelper(
ParDo.BoundMulti<InputT, OutputT> transform,
TranslationContext context) {
- rejectStatefulDoFn(transform.getNewFn());
context.addStep(transform, "ParallelDo");
translateInputs(context.getInput(transform), transform.getSideInputs(), context);
@@ -990,7 +990,6 @@ public class DataflowPipelineTranslator {
private <InputT, OutputT> void translateSingleHelper(
ParDo.Bound<InputT, OutputT> transform,
TranslationContext context) {
- rejectStatefulDoFn(transform.getNewFn());
context.addStep(transform, "ParallelDo");
translateInputs(context.getInput(transform), transform.getSideInputs(), context);
@@ -1038,18 +1037,6 @@ public class DataflowPipelineTranslator {
registerTransformTranslator(Read.Bounded.class, new ReadTranslator());
}
- private static void rejectStatefulDoFn(DoFn<?, ?> doFn) {
- if (DoFnSignatures.getSignature(doFn.getClass()).isStateful()) {
- throw new UnsupportedOperationException(
- String.format(
- "Found %s annotations on %s, but %s cannot yet be used with state in the %s.",
- DoFn.StateId.class.getSimpleName(),
- doFn.getClass().getName(),
- DoFn.class.getSimpleName(),
- DataflowRunner.class.getSimpleName()));
- }
- }
-
private static void translateInputs(
PCollection<?> input,
List<PCollectionView<?>> sideInputs,
@@ -1081,6 +1068,9 @@ public class DataflowPipelineTranslator {
TranslationContext context,
long mainOutput,
Map<Long, TupleTag<?>> outputMap) {
+
+ DoFnSignature signature = DoFnSignatures.getSignature(fn.getClass());
+
context.addInput(PropertyNames.USER_FN, fn.getClass().getName());
context.addInput(
PropertyNames.SERIALIZED_FN,
@@ -1088,6 +1078,10 @@ public class DataflowPipelineTranslator {
serializeToByteArray(
DoFnInfo.forFn(
fn, windowingStrategy, sideInputs, inputCoder, mainOutput, outputMap))));
+
+ if (signature.isStateful()) {
+ context.addInput(PropertyNames.USES_KEYED_STATE, "true");
+ }
}
private static BiMap<Long, TupleTag<?>> translateOutputs(