You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/01/07 02:00:55 UTC
[1/3] beam git commit: Update Dataflow worker to beam-master-20170106
Repository: beam
Updated Branches:
refs/heads/master a6caa82a6 -> 418c597c4
Update Dataflow worker to beam-master-20170106
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5af7f42d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5af7f42d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5af7f42d
Branch: refs/heads/master
Commit: 5af7f42debcfb119c1ff89fcda897a76528bd4f7
Parents: a6caa82
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri Jan 6 10:30:43 2017 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Fri Jan 6 14:02:08 2017 -0800
----------------------------------------------------------------------
.../org/apache/beam/runners/dataflow/dataflow.properties | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/5af7f42d/runners/google-cloud-dataflow-java/src/main/resources/org/apache/beam/runners/dataflow/dataflow.properties
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/resources/org/apache/beam/runners/dataflow/dataflow.properties b/runners/google-cloud-dataflow-java/src/main/resources/org/apache/beam/runners/dataflow/dataflow.properties
index 399146d..77345d2 100644
--- a/runners/google-cloud-dataflow-java/src/main/resources/org/apache/beam/runners/dataflow/dataflow.properties
+++ b/runners/google-cloud-dataflow-java/src/main/resources/org/apache/beam/runners/dataflow/dataflow.properties
@@ -18,6 +18,6 @@
environment.major.version=6
-worker.image.batch=dataflow.gcr.io/v1beta3/beam-java-batch:beam-master-20170103
+worker.image.batch=dataflow.gcr.io/v1beta3/beam-java-batch:beam-master-20170106
-worker.image.streaming=dataflow.gcr.io/v1beta3/beam-java-streaming:beam-master-20170103
+worker.image.streaming=dataflow.gcr.io/v1beta3/beam-java-streaming:beam-master-20170106
[3/3] beam git commit: This closes #1742: Rollforwards "Allow
stateful DoFn in DataflowRunner"
Posted by ke...@apache.org.
This closes #1742: Rollforwards "Allow stateful DoFn in DataflowRunner"
Rollforwards "Allow stateful DoFn in DataflowRunner"
Update Dataflow worker to beam-master-20170106
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/418c597c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/418c597c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/418c597c
Branch: refs/heads/master
Commit: 418c597c444e399b53f8159654810f6caa75e420
Parents: a6caa82 d583a1c
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri Jan 6 17:49:54 2017 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Fri Jan 6 17:49:54 2017 -0800
----------------------------------------------------------------------
runners/google-cloud-dataflow-java/pom.xml | 1 -
.../dataflow/DataflowPipelineTranslator.java | 22 +++++++-------------
.../beam/runners/dataflow/dataflow.properties | 4 ++--
3 files changed, 10 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
[2/3] beam git commit: Rollforwards "Allow stateful DoFn in
DataflowRunner""
Posted by ke...@apache.org.
Rollforwards "Allow stateful DoFn in DataflowRunner""
This rolls forward 42bb15d, allowing stateful DoFn in DataflowRunner.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d583a1ce
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d583a1ce
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d583a1ce
Branch: refs/heads/master
Commit: d583a1cedfd0c6abc9ca5059009965186b51e040
Parents: 5af7f42
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Jan 5 19:15:24 2017 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Fri Jan 6 14:04:05 2017 -0800
----------------------------------------------------------------------
runners/google-cloud-dataflow-java/pom.xml | 1 -
.../dataflow/DataflowPipelineTranslator.java | 22 +++++++-------------
2 files changed, 8 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/d583a1ce/runners/google-cloud-dataflow-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml
index 64727b1..7bf2089 100644
--- a/runners/google-cloud-dataflow-java/pom.xml
+++ b/runners/google-cloud-dataflow-java/pom.xml
@@ -78,7 +78,6 @@
<id>runnable-on-service-tests</id>
<configuration>
<excludedGroups>
- org.apache.beam.sdk.testing.UsesStatefulParDo,
org.apache.beam.sdk.testing.UsesTimersInParDo,
org.apache.beam.sdk.testing.UsesSplittableParDo,
org.apache.beam.sdk.testing.UsesMetrics
http://git-wip-us.apache.org/repos/asf/beam/blob/d583a1ce/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
index 8e5901e..524c30b 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
@@ -79,6 +79,7 @@ import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.HasDisplayData;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
import org.apache.beam.sdk.transforms.windowing.Window;
@@ -836,7 +837,6 @@ class DataflowPipelineTranslator {
private <InputT, OutputT> void translateMultiHelper(
ParDo.BoundMulti<InputT, OutputT> transform, TranslationContext context) {
- DataflowPipelineTranslator.rejectStatefulDoFn(transform.getFn());
StepTranslationContext stepContext = context.addStep(transform, "ParallelDo");
translateInputs(
@@ -865,7 +865,6 @@ class DataflowPipelineTranslator {
private <InputT, OutputT> void translateSingleHelper(
ParDo.Bound<InputT, OutputT> transform, TranslationContext context) {
- rejectStatefulDoFn(transform.getFn());
StepTranslationContext stepContext = context.addStep(transform, "ParallelDo");
translateInputs(
@@ -914,18 +913,6 @@ class DataflowPipelineTranslator {
registerTransformTranslator(Read.Bounded.class, new ReadTranslator());
}
- private static void rejectStatefulDoFn(DoFn<?, ?> doFn) {
- if (DoFnSignatures.getSignature(doFn.getClass()).isStateful()) {
- throw new UnsupportedOperationException(
- String.format(
- "Found %s annotations on %s, but %s cannot yet be used with state in the %s.",
- DoFn.StateId.class.getSimpleName(),
- doFn.getClass().getName(),
- DoFn.class.getSimpleName(),
- DataflowRunner.class.getSimpleName()));
- }
- }
-
private static void translateInputs(
StepTranslationContext stepContext,
PCollection<?> input,
@@ -960,6 +947,9 @@ class DataflowPipelineTranslator {
TranslationContext context,
long mainOutput,
Map<Long, TupleTag<?>> outputMap) {
+
+ DoFnSignature signature = DoFnSignatures.getSignature(fn.getClass());
+
stepContext.addInput(PropertyNames.USER_FN, fn.getClass().getName());
stepContext.addInput(
PropertyNames.SERIALIZED_FN,
@@ -967,6 +957,10 @@ class DataflowPipelineTranslator {
serializeToByteArray(
DoFnInfo.forFn(
fn, windowingStrategy, sideInputs, inputCoder, mainOutput, outputMap))));
+
+ if (signature.isStateful()) {
+ stepContext.addInput(PropertyNames.USES_KEYED_STATE, "true");
+ }
}
private static BiMap<Long, TupleTag<?>> translateOutputs(