You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/11/29 17:34:55 UTC
[1/3] incubator-beam git commit: This closes #1411
Repository: incubator-beam
Updated Branches:
refs/heads/master 3a8b9b521 -> 1716bfc49
This closes #1411
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1716bfc4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1716bfc4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1716bfc4
Branch: refs/heads/master
Commit: 1716bfc4906b97bae434d9740a0af172c91f5e10
Parents: 3a8b9b5 31a55f4
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Nov 29 09:24:55 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue Nov 29 09:24:55 2016 -0800
----------------------------------------------------------------------
runners/google-cloud-dataflow-java/pom.xml | 1 +
.../dataflow/DataflowPipelineTranslator.java | 18 ++++++++++++++++++
.../sdk/transforms/reflect/DoFnSignature.java | 7 +++++++
3 files changed, 26 insertions(+)
----------------------------------------------------------------------
[3/3] incubator-beam git commit: Add isStateful() to DoFnSignature
Posted by ke...@apache.org.
Add isStateful() to DoFnSignature
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/bdd3e086
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/bdd3e086
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/bdd3e086
Branch: refs/heads/master
Commit: bdd3e0862b91ac682336eb8ff489fff104ea927d
Parents: 3a8b9b5
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Nov 28 16:15:55 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue Nov 29 09:24:55 2016 -0800
----------------------------------------------------------------------
.../org/apache/beam/sdk/transforms/reflect/DoFnSignature.java | 7 +++++++
1 file changed, 7 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bdd3e086/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
index cd93583..0750949 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
@@ -105,6 +105,13 @@ public abstract class DoFnSignature {
@Nullable
public abstract Map<String, OnTimerMethod> onTimerMethods();
+ /**
+ * Whether the {@link DoFn} described by this signature uses state.
+ */
+ public boolean isStateful() {
+ return stateDeclarations().size() > 0;
+ }
+
static Builder builder() {
return new AutoValue_DoFnSignature.Builder();
}
[2/3] incubator-beam git commit: Reject stateful DoFn in
DataflowRunner
Posted by ke...@apache.org.
Reject stateful DoFn in DataflowRunner
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/31a55f40
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/31a55f40
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/31a55f40
Branch: refs/heads/master
Commit: 31a55f407473f23a61cf6dfe42c3f6f4c7880920
Parents: bdd3e08
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Nov 15 21:35:03 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue Nov 29 09:24:55 2016 -0800
----------------------------------------------------------------------
runners/google-cloud-dataflow-java/pom.xml | 1 +
.../dataflow/DataflowPipelineTranslator.java | 18 ++++++++++++++++++
2 files changed, 19 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/31a55f40/runners/google-cloud-dataflow-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml
index da3a4d6..59276e4 100644
--- a/runners/google-cloud-dataflow-java/pom.xml
+++ b/runners/google-cloud-dataflow-java/pom.xml
@@ -77,6 +77,7 @@
<execution>
<id>runnable-on-service-tests</id>
<configuration>
+ <excludedGroups>org.apache.beam.sdk.testing.UsesStatefulParDo</excludedGroups>
<excludes>
<exclude>org.apache.beam.sdk.transforms.FlattenTest</exclude>
</excludes>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/31a55f40/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
index 9acf071..0549d5b 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
@@ -69,6 +69,7 @@ import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.runners.TransformTreeNode;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.OldDoFn;
@@ -77,6 +78,7 @@ import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.HasDisplayData;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.AppliedCombineFn;
@@ -959,6 +961,8 @@ public class DataflowPipelineTranslator {
private <InputT, OutputT> void translateMultiHelper(
ParDo.BoundMulti<InputT, OutputT> transform,
TranslationContext context) {
+ rejectStatefulDoFn(transform.getNewFn());
+
context.addStep(transform, "ParallelDo");
translateInputs(context.getInput(transform), transform.getSideInputs(), context);
BiMap<Long, TupleTag<?>> outputMap =
@@ -987,6 +991,8 @@ public class DataflowPipelineTranslator {
private <InputT, OutputT> void translateSingleHelper(
ParDo.Bound<InputT, OutputT> transform,
TranslationContext context) {
+ rejectStatefulDoFn(transform.getNewFn());
+
context.addStep(transform, "ParallelDo");
translateInputs(context.getInput(transform), transform.getSideInputs(), context);
long mainOutput = context.addOutput(context.getOutput(transform));
@@ -1033,6 +1039,18 @@ public class DataflowPipelineTranslator {
registerTransformTranslator(Read.Bounded.class, new ReadTranslator());
}
+ private static void rejectStatefulDoFn(DoFn<?, ?> doFn) {
+ if (DoFnSignatures.getSignature(doFn.getClass()).isStateful()) {
+ throw new UnsupportedOperationException(
+ String.format(
+ "Found %s annotations on %s, but %s cannot yet be used with state in the %s.",
+ DoFn.StateId.class.getSimpleName(),
+ doFn.getClass().getName(),
+ DoFn.class.getSimpleName(),
+ DataflowRunner.class.getSimpleName()));
+ }
+ }
+
private static void translateInputs(
PCollection<?> input,
List<PCollectionView<?>> sideInputs,