You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/11/24 00:03:06 UTC

[06/11] incubator-beam git commit: Reject stateful DoFn in ApexRunner

Reject stateful DoFn in ApexRunner


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/796ba7ab
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/796ba7ab
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/796ba7ab

Branch: refs/heads/python-sdk
Commit: 796ba7ab75bc8d01a3a59efc29cdc17bcd26af4a
Parents: 413a402
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Nov 15 21:33:01 2016 -0800
Committer: Davor Bonaci <da...@google.com>
Committed: Wed Nov 23 16:02:04 2016 -0800

----------------------------------------------------------------------
 runners/apex/pom.xml                            |  1 +
 .../translation/ParDoBoundMultiTranslator.java  | 67 +++++++++++++-------
 .../apex/translation/ParDoBoundTranslator.java  | 46 +++++++++-----
 3 files changed, 74 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/796ba7ab/runners/apex/pom.xml
----------------------------------------------------------------------
diff --git a/runners/apex/pom.xml b/runners/apex/pom.xml
index 5478b24..d0b0fdf 100644
--- a/runners/apex/pom.xml
+++ b/runners/apex/pom.xml
@@ -185,6 +185,7 @@
             </goals>
             <configuration>
               <groups>org.apache.beam.sdk.testing.RunnableOnService</groups>
+              <excludedGroups>org.apache.beam.sdk.testing.UsesStatefulParDo</excludedGroups>
               <parallel>none</parallel>
               <failIfNoTests>true</failIfNoTests>
               <dependenciesToScan>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/796ba7ab/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundMultiTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundMultiTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundMultiTranslator.java
index 7c91b91..fed5f4b 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundMultiTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundMultiTranslator.java
@@ -23,17 +23,17 @@ import static com.google.common.base.Preconditions.checkArgument;
 import com.datatorrent.api.Operator;
 import com.datatorrent.api.Operator.OutputPort;
 import com.google.common.collect.Maps;
-
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-
+import org.apache.beam.runners.apex.ApexRunner;
 import org.apache.beam.runners.apex.translation.operators.ApexParDoOperator;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
 import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
 import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
 import org.apache.beam.sdk.values.PCollection;
@@ -53,20 +53,35 @@ class ParDoBoundMultiTranslator<InputT, OutputT>
 
   @Override
   public void translate(ParDo.BoundMulti<InputT, OutputT> transform, TranslationContext context) {
-    OldDoFn<InputT, OutputT> doFn = transform.getFn();
+    DoFn<InputT, OutputT> doFn = transform.getNewFn();
+    if (DoFnSignatures.getSignature(doFn.getClass()).stateDeclarations().size() > 0) {
+      throw new UnsupportedOperationException(
+          String.format(
+              "Found %s annotations on %s, but %s cannot yet be used with state in the %s.",
+              DoFn.StateId.class.getSimpleName(),
+              doFn.getClass().getName(),
+              DoFn.class.getSimpleName(),
+              ApexRunner.class.getSimpleName()));
+    }
+    OldDoFn<InputT, OutputT> oldDoFn = transform.getFn();
     PCollectionTuple output = context.getOutput();
     PCollection<InputT> input = context.getInput();
     List<PCollectionView<?>> sideInputs = transform.getSideInputs();
     Coder<InputT> inputCoder = input.getCoder();
-    WindowedValueCoder<InputT> wvInputCoder = FullWindowedValueCoder.of(inputCoder,
-        input.getWindowingStrategy().getWindowFn().windowCoder());
+    WindowedValueCoder<InputT> wvInputCoder =
+        FullWindowedValueCoder.of(
+            inputCoder, input.getWindowingStrategy().getWindowFn().windowCoder());
 
-    ApexParDoOperator<InputT, OutputT> operator = new ApexParDoOperator<>(
-        context.getPipelineOptions(),
-        doFn, transform.getMainOutputTag(), transform.getSideOutputTags().getAll(),
-        context.<PCollection<?>>getInput().getWindowingStrategy(), sideInputs, wvInputCoder,
-        context.<Void>stateInternalsFactory()
-        );
+    ApexParDoOperator<InputT, OutputT> operator =
+        new ApexParDoOperator<>(
+            context.getPipelineOptions(),
+            oldDoFn,
+            transform.getMainOutputTag(),
+            transform.getSideOutputTags().getAll(),
+            context.<PCollection<?>>getInput().getWindowingStrategy(),
+            sideInputs,
+            wvInputCoder,
+            context.<Void>stateInternalsFactory());
 
     Map<TupleTag<?>, PCollection<?>> outputs = output.getAll();
     Map<PCollection<?>, OutputPort<?>> ports = Maps.newHashMapWithExpectedSize(outputs.size());
@@ -91,7 +106,9 @@ class ParDoBoundMultiTranslator<InputT, OutputT>
     }
   }
 
-  static void addSideInputs(ApexParDoOperator<?, ?> operator, List<PCollectionView<?>> sideInputs,
+  static void addSideInputs(
+      ApexParDoOperator<?, ?> operator,
+      List<PCollectionView<?>> sideInputs,
       TranslationContext context) {
     Operator.InputPort<?>[] sideInputPorts = {operator.sideInput1};
     if (sideInputs.size() > sideInputPorts.length) {
@@ -105,8 +122,8 @@ class ParDoBoundMultiTranslator<InputT, OutputT>
     }
   }
 
-  private static PCollection<?> unionSideInputs(List<PCollectionView<?>> sideInputs,
-      TranslationContext context) {
+  private static PCollection<?> unionSideInputs(
+      List<PCollectionView<?>> sideInputs, TranslationContext context) {
     checkArgument(sideInputs.size() > 1, "requires multiple side inputs");
     // flatten and assign union tag
     List<PCollection<Object>> sourceCollections = new ArrayList<>();
@@ -115,13 +132,16 @@ class ParDoBoundMultiTranslator<InputT, OutputT>
     for (int i = 0; i < sideInputs.size(); i++) {
       PCollectionView<?> sideInput = sideInputs.get(i);
       PCollection<?> sideInputCollection = context.getViewInput(sideInput);
-      if (!sideInputCollection.getWindowingStrategy().equals(
-          firstSideInput.getWindowingStrategy())) {
+      if (!sideInputCollection
+          .getWindowingStrategy()
+          .equals(firstSideInput.getWindowingStrategy())) {
         // TODO: check how to handle this in stream codec
         //String msg = "Multiple side inputs with different window strategies.";
         //throw new UnsupportedOperationException(msg);
-        LOG.warn("Side inputs union with different windowing strategies {} {}",
-            firstSideInput.getWindowingStrategy(), sideInputCollection.getWindowingStrategy());
+        LOG.warn(
+            "Side inputs union with different windowing strategies {} {}",
+            firstSideInput.getWindowingStrategy(),
+            sideInputCollection.getWindowingStrategy());
       }
       if (!sideInputCollection.getCoder().equals(firstSideInput.getCoder())) {
         String msg = "Multiple side inputs with different coders.";
@@ -131,12 +151,11 @@ class ParDoBoundMultiTranslator<InputT, OutputT>
       unionTags.put(sideInputCollection, i);
     }
 
-    PCollection<Object> resultCollection = FlattenPCollectionTranslator.intermediateCollection(
-        firstSideInput, firstSideInput.getCoder());
-    FlattenPCollectionTranslator.flattenCollections(sourceCollections, unionTags, resultCollection,
-        context);
+    PCollection<Object> resultCollection =
+        FlattenPCollectionTranslator.intermediateCollection(
+            firstSideInput, firstSideInput.getCoder());
+    FlattenPCollectionTranslator.flattenCollections(
+        sourceCollections, unionTags, resultCollection, context);
     return resultCollection;
-
   }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/796ba7ab/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslator.java
index c1ebbd5..7a918a7 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslator.java
@@ -19,12 +19,13 @@
 package org.apache.beam.runners.apex.translation;
 
 import java.util.List;
-
+import org.apache.beam.runners.apex.ApexRunner;
 import org.apache.beam.runners.apex.translation.operators.ApexParDoOperator;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
 import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
 import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
 import org.apache.beam.sdk.values.PCollection;
@@ -32,33 +33,46 @@ import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TupleTagList;
 
-/**
- * {@link ParDo.Bound} is translated to {link ApexParDoOperator} that wraps the {@link DoFn}.
- */
-class ParDoBoundTranslator<InputT, OutputT> implements
-    TransformTranslator<ParDo.Bound<InputT, OutputT>> {
+/** {@link ParDo.Bound} is translated to {link ApexParDoOperator} that wraps the {@link DoFn}. */
+class ParDoBoundTranslator<InputT, OutputT>
+    implements TransformTranslator<ParDo.Bound<InputT, OutputT>> {
   private static final long serialVersionUID = 1L;
 
   @Override
   public void translate(ParDo.Bound<InputT, OutputT> transform, TranslationContext context) {
-    OldDoFn<InputT, OutputT> doFn = transform.getFn();
+    DoFn<InputT, OutputT> doFn = transform.getNewFn();
+    if (DoFnSignatures.getSignature(doFn.getClass()).stateDeclarations().size() > 0) {
+      throw new UnsupportedOperationException(
+          String.format(
+              "Found %s annotations on %s, but %s cannot yet be used with state in the %s.",
+              DoFn.StateId.class.getSimpleName(),
+              doFn.getClass().getName(),
+              DoFn.class.getSimpleName(),
+              ApexRunner.class.getSimpleName()));
+    }
+    OldDoFn<InputT, OutputT> oldDoFn = transform.getOldFn();
     PCollection<OutputT> output = context.getOutput();
     PCollection<InputT> input = context.getInput();
     List<PCollectionView<?>> sideInputs = transform.getSideInputs();
     Coder<InputT> inputCoder = input.getCoder();
-    WindowedValueCoder<InputT> wvInputCoder = FullWindowedValueCoder.of(inputCoder,
-        input.getWindowingStrategy().getWindowFn().windowCoder());
+    WindowedValueCoder<InputT> wvInputCoder =
+        FullWindowedValueCoder.of(
+            inputCoder, input.getWindowingStrategy().getWindowFn().windowCoder());
 
-    ApexParDoOperator<InputT, OutputT> operator = new ApexParDoOperator<>(
-        context.getPipelineOptions(),
-        doFn, new TupleTag<OutputT>(), TupleTagList.empty().getAll() /*sideOutputTags*/,
-        output.getWindowingStrategy(), sideInputs, wvInputCoder,
-        context.<Void>stateInternalsFactory()
-        );
+    ApexParDoOperator<InputT, OutputT> operator =
+        new ApexParDoOperator<>(
+            context.getPipelineOptions(),
+            oldDoFn,
+            new TupleTag<OutputT>(),
+            TupleTagList.empty().getAll() /*sideOutputTags*/,
+            output.getWindowingStrategy(),
+            sideInputs,
+            wvInputCoder,
+            context.<Void>stateInternalsFactory());
     context.addOperator(operator, operator.output);
     context.addStream(context.getInput(), operator.input);
     if (!sideInputs.isEmpty()) {
-       ParDoBoundMultiTranslator.addSideInputs(operator, sideInputs, context);
+      ParDoBoundMultiTranslator.addSideInputs(operator, sideInputs, context);
     }
   }
 }