You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/12/14 01:48:52 UTC
[1/3] incubator-beam git commit: Revert "Allow stateful DoFn in
DataflowRunner"
Repository: incubator-beam
Updated Branches:
refs/heads/release-0.4.0-incubating b2780881a -> 10bb4767a
Revert "Allow stateful DoFn in DataflowRunner"
This reverts commit 42bb15d2df28b99b6788010450f41f2932095771.
The Dataflow service has introduced a bug that was masked by various
test disabling.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c1af44fa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c1af44fa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c1af44fa
Branch: refs/heads/release-0.4.0-incubating
Commit: c1af44fa27633fd2a9592a13579415f6b974cfe6
Parents: f78d960
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Dec 13 16:36:42 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue Dec 13 16:57:26 2016 -0800
----------------------------------------------------------------------
.../dataflow/DataflowPipelineTranslator.java | 22 +++++++++++++-------
1 file changed, 14 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c1af44fa/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
index 8048df9..a56690c 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
@@ -77,7 +77,6 @@ import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.HasDisplayData;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
import org.apache.beam.sdk.transforms.windowing.Window;
@@ -956,6 +955,7 @@ public class DataflowPipelineTranslator {
private <InputT, OutputT> void translateMultiHelper(
ParDo.BoundMulti<InputT, OutputT> transform,
TranslationContext context) {
+ rejectStatefulDoFn(transform.getNewFn());
context.addStep(transform, "ParallelDo");
translateInputs(context.getInput(transform), transform.getSideInputs(), context);
@@ -985,6 +985,7 @@ public class DataflowPipelineTranslator {
private <InputT, OutputT> void translateSingleHelper(
ParDo.Bound<InputT, OutputT> transform,
TranslationContext context) {
+ rejectStatefulDoFn(transform.getNewFn());
context.addStep(transform, "ParallelDo");
translateInputs(context.getInput(transform), transform.getSideInputs(), context);
@@ -1032,6 +1033,18 @@ public class DataflowPipelineTranslator {
registerTransformTranslator(Read.Bounded.class, new ReadTranslator());
}
+ private static void rejectStatefulDoFn(DoFn<?, ?> doFn) {
+ if (DoFnSignatures.getSignature(doFn.getClass()).isStateful()) {
+ throw new UnsupportedOperationException(
+ String.format(
+ "Found %s annotations on %s, but %s cannot yet be used with state in the %s.",
+ DoFn.StateId.class.getSimpleName(),
+ doFn.getClass().getName(),
+ DoFn.class.getSimpleName(),
+ DataflowRunner.class.getSimpleName()));
+ }
+ }
+
private static void translateInputs(
PCollection<?> input,
List<PCollectionView<?>> sideInputs,
@@ -1063,9 +1076,6 @@ public class DataflowPipelineTranslator {
TranslationContext context,
long mainOutput,
Map<Long, TupleTag<?>> outputMap) {
-
- DoFnSignature signature = DoFnSignatures.getSignature(fn.getClass());
-
context.addInput(PropertyNames.USER_FN, fn.getClass().getName());
context.addInput(
PropertyNames.SERIALIZED_FN,
@@ -1073,10 +1083,6 @@ public class DataflowPipelineTranslator {
serializeToByteArray(
DoFnInfo.forFn(
fn, windowingStrategy, sideInputs, inputCoder, mainOutput, outputMap))));
-
- if (signature.isStateful()) {
- context.addInput(PropertyNames.USES_KEYED_STATE, "true");
- }
}
private static BiMap<Long, TupleTag<?>> translateOutputs(
[2/3] incubator-beam git commit: Re-exclude UsesStatefulParDo tests
for Dataflow
Posted by ke...@apache.org.
Re-exclude UsesStatefulParDo tests for Dataflow
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f78d9606
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f78d9606
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f78d9606
Branch: refs/heads/release-0.4.0-incubating
Commit: f78d96069b428356609f219357cbf0702ec56c26
Parents: b278088
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Dec 13 16:09:57 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue Dec 13 16:57:26 2016 -0800
----------------------------------------------------------------------
runners/google-cloud-dataflow-java/pom.xml | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f78d9606/runners/google-cloud-dataflow-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml
index 1543c0e..93e4054 100644
--- a/runners/google-cloud-dataflow-java/pom.xml
+++ b/runners/google-cloud-dataflow-java/pom.xml
@@ -78,6 +78,7 @@
<id>runnable-on-service-tests</id>
<configuration>
<excludedGroups>
+ org.apache.beam.sdk.testing.UsesStatefulParDo,
org.apache.beam.sdk.testing.UsesTimersInParDo,
org.apache.beam.sdk.testing.UsesSplittableParDo
</excludedGroups>
[3/3] incubator-beam git commit: This closes #1607
Posted by ke...@apache.org.
This closes #1607
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/10bb4767
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/10bb4767
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/10bb4767
Branch: refs/heads/release-0.4.0-incubating
Commit: 10bb4767a1f989a1a75778828c07d9c72c450495
Parents: b278088 c1af44f
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Dec 13 17:08:42 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue Dec 13 17:08:42 2016 -0800
----------------------------------------------------------------------
runners/google-cloud-dataflow-java/pom.xml | 1 +
.../dataflow/DataflowPipelineTranslator.java | 22 +++++++++++++-------
2 files changed, 15 insertions(+), 8 deletions(-)
----------------------------------------------------------------------