You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/01/07 02:00:55 UTC

[1/3] beam git commit: Update Dataflow worker to beam-master-20170106

Repository: beam
Updated Branches:
  refs/heads/master a6caa82a6 -> 418c597c4


Update Dataflow worker to beam-master-20170106


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5af7f42d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5af7f42d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5af7f42d

Branch: refs/heads/master
Commit: 5af7f42debcfb119c1ff89fcda897a76528bd4f7
Parents: a6caa82
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri Jan 6 10:30:43 2017 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Fri Jan 6 14:02:08 2017 -0800

----------------------------------------------------------------------
 .../org/apache/beam/runners/dataflow/dataflow.properties         | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/5af7f42d/runners/google-cloud-dataflow-java/src/main/resources/org/apache/beam/runners/dataflow/dataflow.properties
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/resources/org/apache/beam/runners/dataflow/dataflow.properties b/runners/google-cloud-dataflow-java/src/main/resources/org/apache/beam/runners/dataflow/dataflow.properties
index 399146d..77345d2 100644
--- a/runners/google-cloud-dataflow-java/src/main/resources/org/apache/beam/runners/dataflow/dataflow.properties
+++ b/runners/google-cloud-dataflow-java/src/main/resources/org/apache/beam/runners/dataflow/dataflow.properties
@@ -18,6 +18,6 @@
 
 environment.major.version=6
 
-worker.image.batch=dataflow.gcr.io/v1beta3/beam-java-batch:beam-master-20170103
+worker.image.batch=dataflow.gcr.io/v1beta3/beam-java-batch:beam-master-20170106
 
-worker.image.streaming=dataflow.gcr.io/v1beta3/beam-java-streaming:beam-master-20170103
+worker.image.streaming=dataflow.gcr.io/v1beta3/beam-java-streaming:beam-master-20170106


[3/3] beam git commit: This closes #1742: Rollforwards "Allow stateful DoFn in DataflowRunner"

Posted by ke...@apache.org.
This closes #1742: Rollforwards "Allow stateful DoFn in DataflowRunner"

  Rollforwards "Allow stateful DoFn in DataflowRunner"
  Update Dataflow worker to beam-master-20170106


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/418c597c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/418c597c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/418c597c

Branch: refs/heads/master
Commit: 418c597c444e399b53f8159654810f6caa75e420
Parents: a6caa82 d583a1c
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri Jan 6 17:49:54 2017 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Fri Jan 6 17:49:54 2017 -0800

----------------------------------------------------------------------
 runners/google-cloud-dataflow-java/pom.xml      |  1 -
 .../dataflow/DataflowPipelineTranslator.java    | 22 +++++++-------------
 .../beam/runners/dataflow/dataflow.properties   |  4 ++--
 3 files changed, 10 insertions(+), 17 deletions(-)
----------------------------------------------------------------------



[2/3] beam git commit: Rollforwards "Allow stateful DoFn in DataflowRunner""

Posted by ke...@apache.org.
Rollforwards "Allow stateful DoFn in DataflowRunner""

This rolls forward 42bb15d, allowing stateful DoFn in DataflowRunner.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d583a1ce
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d583a1ce
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d583a1ce

Branch: refs/heads/master
Commit: d583a1cedfd0c6abc9ca5059009965186b51e040
Parents: 5af7f42
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Jan 5 19:15:24 2017 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Fri Jan 6 14:04:05 2017 -0800

----------------------------------------------------------------------
 runners/google-cloud-dataflow-java/pom.xml      |  1 -
 .../dataflow/DataflowPipelineTranslator.java    | 22 +++++++-------------
 2 files changed, 8 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/d583a1ce/runners/google-cloud-dataflow-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml
index 64727b1..7bf2089 100644
--- a/runners/google-cloud-dataflow-java/pom.xml
+++ b/runners/google-cloud-dataflow-java/pom.xml
@@ -78,7 +78,6 @@
             <id>runnable-on-service-tests</id>
             <configuration>
               <excludedGroups>
-                org.apache.beam.sdk.testing.UsesStatefulParDo,
                 org.apache.beam.sdk.testing.UsesTimersInParDo,
                 org.apache.beam.sdk.testing.UsesSplittableParDo,
                 org.apache.beam.sdk.testing.UsesMetrics

http://git-wip-us.apache.org/repos/asf/beam/blob/d583a1ce/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
index 8e5901e..524c30b 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
@@ -79,6 +79,7 @@ import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.HasDisplayData;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
 import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
 import org.apache.beam.sdk.transforms.windowing.Window;
@@ -836,7 +837,6 @@ class DataflowPipelineTranslator {
 
           private <InputT, OutputT> void translateMultiHelper(
               ParDo.BoundMulti<InputT, OutputT> transform, TranslationContext context) {
-            DataflowPipelineTranslator.rejectStatefulDoFn(transform.getFn());
 
             StepTranslationContext stepContext = context.addStep(transform, "ParallelDo");
             translateInputs(
@@ -865,7 +865,6 @@ class DataflowPipelineTranslator {
 
           private <InputT, OutputT> void translateSingleHelper(
               ParDo.Bound<InputT, OutputT> transform, TranslationContext context) {
-            rejectStatefulDoFn(transform.getFn());
 
             StepTranslationContext stepContext = context.addStep(transform, "ParallelDo");
             translateInputs(
@@ -914,18 +913,6 @@ class DataflowPipelineTranslator {
     registerTransformTranslator(Read.Bounded.class, new ReadTranslator());
   }
 
-  private static void rejectStatefulDoFn(DoFn<?, ?> doFn) {
-    if (DoFnSignatures.getSignature(doFn.getClass()).isStateful()) {
-    throw new UnsupportedOperationException(
-        String.format(
-            "Found %s annotations on %s, but %s cannot yet be used with state in the %s.",
-            DoFn.StateId.class.getSimpleName(),
-            doFn.getClass().getName(),
-            DoFn.class.getSimpleName(),
-            DataflowRunner.class.getSimpleName()));
-    }
-  }
-
   private static void translateInputs(
       StepTranslationContext stepContext,
       PCollection<?> input,
@@ -960,6 +947,9 @@ class DataflowPipelineTranslator {
       TranslationContext context,
       long mainOutput,
       Map<Long, TupleTag<?>> outputMap) {
+
+    DoFnSignature signature = DoFnSignatures.getSignature(fn.getClass());
+
     stepContext.addInput(PropertyNames.USER_FN, fn.getClass().getName());
     stepContext.addInput(
         PropertyNames.SERIALIZED_FN,
@@ -967,6 +957,10 @@ class DataflowPipelineTranslator {
             serializeToByteArray(
                 DoFnInfo.forFn(
                     fn, windowingStrategy, sideInputs, inputCoder, mainOutput, outputMap))));
+
+    if (signature.isStateful()) {
+      stepContext.addInput(PropertyNames.USES_KEYED_STATE, "true");
+    }
   }
 
   private static BiMap<Long, TupleTag<?>> translateOutputs(