You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/11/29 17:34:56 UTC

[2/3] incubator-beam git commit: Reject stateful DoFn in DataflowRunner

Reject stateful DoFn in DataflowRunner


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/31a55f40
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/31a55f40
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/31a55f40

Branch: refs/heads/master
Commit: 31a55f407473f23a61cf6dfe42c3f6f4c7880920
Parents: bdd3e08
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Nov 15 21:35:03 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue Nov 29 09:24:55 2016 -0800

----------------------------------------------------------------------
 runners/google-cloud-dataflow-java/pom.xml        |  1 +
 .../dataflow/DataflowPipelineTranslator.java      | 18 ++++++++++++++++++
 2 files changed, 19 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/31a55f40/runners/google-cloud-dataflow-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml
index da3a4d6..59276e4 100644
--- a/runners/google-cloud-dataflow-java/pom.xml
+++ b/runners/google-cloud-dataflow-java/pom.xml
@@ -77,6 +77,7 @@
           <execution>
             <id>runnable-on-service-tests</id>
             <configuration>
+              <excludedGroups>org.apache.beam.sdk.testing.UsesStatefulParDo</excludedGroups>
               <excludes>
                 <exclude>org.apache.beam.sdk.transforms.FlattenTest</exclude>
               </excludes>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/31a55f40/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
index 9acf071..0549d5b 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
@@ -69,6 +69,7 @@ import org.apache.beam.sdk.options.StreamingOptions;
 import org.apache.beam.sdk.runners.TransformTreeNode;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.OldDoFn;
@@ -77,6 +78,7 @@ import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.HasDisplayData;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
 import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.util.AppliedCombineFn;
@@ -959,6 +961,8 @@ public class DataflowPipelineTranslator {
           private <InputT, OutputT> void translateMultiHelper(
               ParDo.BoundMulti<InputT, OutputT> transform,
               TranslationContext context) {
+            rejectStatefulDoFn(transform.getNewFn());
+
             context.addStep(transform, "ParallelDo");
             translateInputs(context.getInput(transform), transform.getSideInputs(), context);
             BiMap<Long, TupleTag<?>> outputMap =
@@ -987,6 +991,8 @@ public class DataflowPipelineTranslator {
           private <InputT, OutputT> void translateSingleHelper(
               ParDo.Bound<InputT, OutputT> transform,
               TranslationContext context) {
+            rejectStatefulDoFn(transform.getNewFn());
+
             context.addStep(transform, "ParallelDo");
             translateInputs(context.getInput(transform), transform.getSideInputs(), context);
             long mainOutput = context.addOutput(context.getOutput(transform));
@@ -1033,6 +1039,18 @@ public class DataflowPipelineTranslator {
     registerTransformTranslator(Read.Bounded.class, new ReadTranslator());
   }
 
+  private static void rejectStatefulDoFn(DoFn<?, ?> doFn) {
+    if (DoFnSignatures.getSignature(doFn.getClass()).isStateful()) {
+    throw new UnsupportedOperationException(
+        String.format(
+            "Found %s annotations on %s, but %s cannot yet be used with state in the %s.",
+            DoFn.StateId.class.getSimpleName(),
+            doFn.getClass().getName(),
+            DoFn.class.getSimpleName(),
+            DataflowRunner.class.getSimpleName()));
+    }
+  }
+
   private static void translateInputs(
       PCollection<?> input,
       List<PCollectionView<?>> sideInputs,