You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/11/29 17:34:55 UTC

[1/3] incubator-beam git commit: This closes #1411

Repository: incubator-beam
Updated Branches:
  refs/heads/master 3a8b9b521 -> 1716bfc49


This closes #1411


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1716bfc4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1716bfc4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1716bfc4

Branch: refs/heads/master
Commit: 1716bfc4906b97bae434d9740a0af172c91f5e10
Parents: 3a8b9b5 31a55f4
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Nov 29 09:24:55 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue Nov 29 09:24:55 2016 -0800

----------------------------------------------------------------------
 runners/google-cloud-dataflow-java/pom.xml        |  1 +
 .../dataflow/DataflowPipelineTranslator.java      | 18 ++++++++++++++++++
 .../sdk/transforms/reflect/DoFnSignature.java     |  7 +++++++
 3 files changed, 26 insertions(+)
----------------------------------------------------------------------



[3/3] incubator-beam git commit: Add isStateful() to DoFnSignature

Posted by ke...@apache.org.
Add isStateful() to DoFnSignature


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/bdd3e086
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/bdd3e086
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/bdd3e086

Branch: refs/heads/master
Commit: bdd3e0862b91ac682336eb8ff489fff104ea927d
Parents: 3a8b9b5
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Nov 28 16:15:55 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue Nov 29 09:24:55 2016 -0800

----------------------------------------------------------------------
 .../org/apache/beam/sdk/transforms/reflect/DoFnSignature.java | 7 +++++++
 1 file changed, 7 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bdd3e086/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
index cd93583..0750949 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
@@ -105,6 +105,13 @@ public abstract class DoFnSignature {
   @Nullable
   public abstract Map<String, OnTimerMethod> onTimerMethods();
 
+  /**
+   * Whether the {@link DoFn} described by this signature uses state.
+   */
+  public boolean isStateful() {
+    return stateDeclarations().size() > 0;
+  }
+
   static Builder builder() {
     return new AutoValue_DoFnSignature.Builder();
   }


[2/3] incubator-beam git commit: Reject stateful DoFn in DataflowRunner

Posted by ke...@apache.org.
Reject stateful DoFn in DataflowRunner


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/31a55f40
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/31a55f40
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/31a55f40

Branch: refs/heads/master
Commit: 31a55f407473f23a61cf6dfe42c3f6f4c7880920
Parents: bdd3e08
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Nov 15 21:35:03 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue Nov 29 09:24:55 2016 -0800

----------------------------------------------------------------------
 runners/google-cloud-dataflow-java/pom.xml        |  1 +
 .../dataflow/DataflowPipelineTranslator.java      | 18 ++++++++++++++++++
 2 files changed, 19 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/31a55f40/runners/google-cloud-dataflow-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml
index da3a4d6..59276e4 100644
--- a/runners/google-cloud-dataflow-java/pom.xml
+++ b/runners/google-cloud-dataflow-java/pom.xml
@@ -77,6 +77,7 @@
           <execution>
             <id>runnable-on-service-tests</id>
             <configuration>
+              <excludedGroups>org.apache.beam.sdk.testing.UsesStatefulParDo</excludedGroups>
               <excludes>
                 <exclude>org.apache.beam.sdk.transforms.FlattenTest</exclude>
               </excludes>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/31a55f40/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
index 9acf071..0549d5b 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
@@ -69,6 +69,7 @@ import org.apache.beam.sdk.options.StreamingOptions;
 import org.apache.beam.sdk.runners.TransformTreeNode;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.OldDoFn;
@@ -77,6 +78,7 @@ import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.HasDisplayData;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
 import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.util.AppliedCombineFn;
@@ -959,6 +961,8 @@ public class DataflowPipelineTranslator {
           private <InputT, OutputT> void translateMultiHelper(
               ParDo.BoundMulti<InputT, OutputT> transform,
               TranslationContext context) {
+            rejectStatefulDoFn(transform.getNewFn());
+
             context.addStep(transform, "ParallelDo");
             translateInputs(context.getInput(transform), transform.getSideInputs(), context);
             BiMap<Long, TupleTag<?>> outputMap =
@@ -987,6 +991,8 @@ public class DataflowPipelineTranslator {
           private <InputT, OutputT> void translateSingleHelper(
               ParDo.Bound<InputT, OutputT> transform,
               TranslationContext context) {
+            rejectStatefulDoFn(transform.getNewFn());
+
             context.addStep(transform, "ParallelDo");
             translateInputs(context.getInput(transform), transform.getSideInputs(), context);
             long mainOutput = context.addOutput(context.getOutput(transform));
@@ -1033,6 +1039,18 @@ public class DataflowPipelineTranslator {
     registerTransformTranslator(Read.Bounded.class, new ReadTranslator());
   }
 
+  private static void rejectStatefulDoFn(DoFn<?, ?> doFn) {
+    if (DoFnSignatures.getSignature(doFn.getClass()).isStateful()) {
+    throw new UnsupportedOperationException(
+        String.format(
+            "Found %s annotations on %s, but %s cannot yet be used with state in the %s.",
+            DoFn.StateId.class.getSimpleName(),
+            doFn.getClass().getName(),
+            DoFn.class.getSimpleName(),
+            DataflowRunner.class.getSimpleName()));
+    }
+  }
+
   private static void translateInputs(
       PCollection<?> input,
       List<PCollectionView<?>> sideInputs,