You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/11/29 17:34:56 UTC
[2/3] incubator-beam git commit: Reject stateful DoFn in
DataflowRunner
Reject stateful DoFn in DataflowRunner
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/31a55f40
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/31a55f40
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/31a55f40
Branch: refs/heads/master
Commit: 31a55f407473f23a61cf6dfe42c3f6f4c7880920
Parents: bdd3e08
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Nov 15 21:35:03 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue Nov 29 09:24:55 2016 -0800
----------------------------------------------------------------------
runners/google-cloud-dataflow-java/pom.xml | 1 +
.../dataflow/DataflowPipelineTranslator.java | 18 ++++++++++++++++++
2 files changed, 19 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/31a55f40/runners/google-cloud-dataflow-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml
index da3a4d6..59276e4 100644
--- a/runners/google-cloud-dataflow-java/pom.xml
+++ b/runners/google-cloud-dataflow-java/pom.xml
@@ -77,6 +77,7 @@
<execution>
<id>runnable-on-service-tests</id>
<configuration>
+ <excludedGroups>org.apache.beam.sdk.testing.UsesStatefulParDo</excludedGroups>
<excludes>
<exclude>org.apache.beam.sdk.transforms.FlattenTest</exclude>
</excludes>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/31a55f40/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
index 9acf071..0549d5b 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
@@ -69,6 +69,7 @@ import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.runners.TransformTreeNode;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.OldDoFn;
@@ -77,6 +78,7 @@ import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.HasDisplayData;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.AppliedCombineFn;
@@ -959,6 +961,8 @@ public class DataflowPipelineTranslator {
private <InputT, OutputT> void translateMultiHelper(
ParDo.BoundMulti<InputT, OutputT> transform,
TranslationContext context) {
+ rejectStatefulDoFn(transform.getNewFn());
+
context.addStep(transform, "ParallelDo");
translateInputs(context.getInput(transform), transform.getSideInputs(), context);
BiMap<Long, TupleTag<?>> outputMap =
@@ -987,6 +991,8 @@ public class DataflowPipelineTranslator {
private <InputT, OutputT> void translateSingleHelper(
ParDo.Bound<InputT, OutputT> transform,
TranslationContext context) {
+ rejectStatefulDoFn(transform.getNewFn());
+
context.addStep(transform, "ParallelDo");
translateInputs(context.getInput(transform), transform.getSideInputs(), context);
long mainOutput = context.addOutput(context.getOutput(transform));
@@ -1033,6 +1039,18 @@ public class DataflowPipelineTranslator {
registerTransformTranslator(Read.Bounded.class, new ReadTranslator());
}
+ private static void rejectStatefulDoFn(DoFn<?, ?> doFn) {
+ if (DoFnSignatures.getSignature(doFn.getClass()).isStateful()) {
+ throw new UnsupportedOperationException(
+ String.format(
+ "Found %s annotations on %s, but %s cannot yet be used with state in the %s.",
+ DoFn.StateId.class.getSimpleName(),
+ doFn.getClass().getName(),
+ DoFn.class.getSimpleName(),
+ DataflowRunner.class.getSimpleName()));
+ }
+ }
+
private static void translateInputs(
PCollection<?> input,
List<PCollectionView<?>> sideInputs,