You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by th...@apache.org on 2016/11/22 19:06:27 UTC

[1/3] incubator-beam git commit: Reject stateful DoFn in ApexRunner

Repository: incubator-beam
Updated Branches:
  refs/heads/master e7d7aa938 -> 6ec45f7e7


Reject stateful DoFn in ApexRunner


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e85cea78
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e85cea78
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e85cea78

Branch: refs/heads/master
Commit: e85cea78253d2f316a18d95d65aabc1176448841
Parents: f8b6bb7
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Nov 15 21:33:01 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Nov 21 21:32:47 2016 -0800

----------------------------------------------------------------------
 runners/apex/pom.xml                            |  1 +
 .../translation/ParDoBoundMultiTranslator.java  | 67 +++++++++++++-------
 .../apex/translation/ParDoBoundTranslator.java  | 46 +++++++++-----
 3 files changed, 74 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e85cea78/runners/apex/pom.xml
----------------------------------------------------------------------
diff --git a/runners/apex/pom.xml b/runners/apex/pom.xml
index 5478b24..d0b0fdf 100644
--- a/runners/apex/pom.xml
+++ b/runners/apex/pom.xml
@@ -185,6 +185,7 @@
             </goals>
             <configuration>
               <groups>org.apache.beam.sdk.testing.RunnableOnService</groups>
+              <excludedGroups>org.apache.beam.sdk.testing.UsesStatefulParDo</excludedGroups>
               <parallel>none</parallel>
               <failIfNoTests>true</failIfNoTests>
               <dependenciesToScan>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e85cea78/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundMultiTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundMultiTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundMultiTranslator.java
index 7c91b91..fed5f4b 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundMultiTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundMultiTranslator.java
@@ -23,17 +23,17 @@ import static com.google.common.base.Preconditions.checkArgument;
 import com.datatorrent.api.Operator;
 import com.datatorrent.api.Operator.OutputPort;
 import com.google.common.collect.Maps;
-
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-
+import org.apache.beam.runners.apex.ApexRunner;
 import org.apache.beam.runners.apex.translation.operators.ApexParDoOperator;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
 import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
 import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
 import org.apache.beam.sdk.values.PCollection;
@@ -53,20 +53,35 @@ class ParDoBoundMultiTranslator<InputT, OutputT>
 
   @Override
   public void translate(ParDo.BoundMulti<InputT, OutputT> transform, TranslationContext context) {
-    OldDoFn<InputT, OutputT> doFn = transform.getFn();
+    DoFn<InputT, OutputT> doFn = transform.getNewFn();
+    if (DoFnSignatures.getSignature(doFn.getClass()).stateDeclarations().size() > 0) {
+      throw new UnsupportedOperationException(
+          String.format(
+              "Found %s annotations on %s, but %s cannot yet be used with state in the %s.",
+              DoFn.StateId.class.getSimpleName(),
+              doFn.getClass().getName(),
+              DoFn.class.getSimpleName(),
+              ApexRunner.class.getSimpleName()));
+    }
+    OldDoFn<InputT, OutputT> oldDoFn = transform.getFn();
     PCollectionTuple output = context.getOutput();
     PCollection<InputT> input = context.getInput();
     List<PCollectionView<?>> sideInputs = transform.getSideInputs();
     Coder<InputT> inputCoder = input.getCoder();
-    WindowedValueCoder<InputT> wvInputCoder = FullWindowedValueCoder.of(inputCoder,
-        input.getWindowingStrategy().getWindowFn().windowCoder());
+    WindowedValueCoder<InputT> wvInputCoder =
+        FullWindowedValueCoder.of(
+            inputCoder, input.getWindowingStrategy().getWindowFn().windowCoder());
 
-    ApexParDoOperator<InputT, OutputT> operator = new ApexParDoOperator<>(
-        context.getPipelineOptions(),
-        doFn, transform.getMainOutputTag(), transform.getSideOutputTags().getAll(),
-        context.<PCollection<?>>getInput().getWindowingStrategy(), sideInputs, wvInputCoder,
-        context.<Void>stateInternalsFactory()
-        );
+    ApexParDoOperator<InputT, OutputT> operator =
+        new ApexParDoOperator<>(
+            context.getPipelineOptions(),
+            oldDoFn,
+            transform.getMainOutputTag(),
+            transform.getSideOutputTags().getAll(),
+            context.<PCollection<?>>getInput().getWindowingStrategy(),
+            sideInputs,
+            wvInputCoder,
+            context.<Void>stateInternalsFactory());
 
     Map<TupleTag<?>, PCollection<?>> outputs = output.getAll();
     Map<PCollection<?>, OutputPort<?>> ports = Maps.newHashMapWithExpectedSize(outputs.size());
@@ -91,7 +106,9 @@ class ParDoBoundMultiTranslator<InputT, OutputT>
     }
   }
 
-  static void addSideInputs(ApexParDoOperator<?, ?> operator, List<PCollectionView<?>> sideInputs,
+  static void addSideInputs(
+      ApexParDoOperator<?, ?> operator,
+      List<PCollectionView<?>> sideInputs,
       TranslationContext context) {
     Operator.InputPort<?>[] sideInputPorts = {operator.sideInput1};
     if (sideInputs.size() > sideInputPorts.length) {
@@ -105,8 +122,8 @@ class ParDoBoundMultiTranslator<InputT, OutputT>
     }
   }
 
-  private static PCollection<?> unionSideInputs(List<PCollectionView<?>> sideInputs,
-      TranslationContext context) {
+  private static PCollection<?> unionSideInputs(
+      List<PCollectionView<?>> sideInputs, TranslationContext context) {
     checkArgument(sideInputs.size() > 1, "requires multiple side inputs");
     // flatten and assign union tag
     List<PCollection<Object>> sourceCollections = new ArrayList<>();
@@ -115,13 +132,16 @@ class ParDoBoundMultiTranslator<InputT, OutputT>
     for (int i = 0; i < sideInputs.size(); i++) {
       PCollectionView<?> sideInput = sideInputs.get(i);
       PCollection<?> sideInputCollection = context.getViewInput(sideInput);
-      if (!sideInputCollection.getWindowingStrategy().equals(
-          firstSideInput.getWindowingStrategy())) {
+      if (!sideInputCollection
+          .getWindowingStrategy()
+          .equals(firstSideInput.getWindowingStrategy())) {
         // TODO: check how to handle this in stream codec
         //String msg = "Multiple side inputs with different window strategies.";
         //throw new UnsupportedOperationException(msg);
-        LOG.warn("Side inputs union with different windowing strategies {} {}",
-            firstSideInput.getWindowingStrategy(), sideInputCollection.getWindowingStrategy());
+        LOG.warn(
+            "Side inputs union with different windowing strategies {} {}",
+            firstSideInput.getWindowingStrategy(),
+            sideInputCollection.getWindowingStrategy());
       }
       if (!sideInputCollection.getCoder().equals(firstSideInput.getCoder())) {
         String msg = "Multiple side inputs with different coders.";
@@ -131,12 +151,11 @@ class ParDoBoundMultiTranslator<InputT, OutputT>
       unionTags.put(sideInputCollection, i);
     }
 
-    PCollection<Object> resultCollection = FlattenPCollectionTranslator.intermediateCollection(
-        firstSideInput, firstSideInput.getCoder());
-    FlattenPCollectionTranslator.flattenCollections(sourceCollections, unionTags, resultCollection,
-        context);
+    PCollection<Object> resultCollection =
+        FlattenPCollectionTranslator.intermediateCollection(
+            firstSideInput, firstSideInput.getCoder());
+    FlattenPCollectionTranslator.flattenCollections(
+        sourceCollections, unionTags, resultCollection, context);
     return resultCollection;
-
   }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e85cea78/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslator.java
index c1ebbd5..7a918a7 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslator.java
@@ -19,12 +19,13 @@
 package org.apache.beam.runners.apex.translation;
 
 import java.util.List;
-
+import org.apache.beam.runners.apex.ApexRunner;
 import org.apache.beam.runners.apex.translation.operators.ApexParDoOperator;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
 import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
 import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
 import org.apache.beam.sdk.values.PCollection;
@@ -32,33 +33,46 @@ import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TupleTagList;
 
-/**
- * {@link ParDo.Bound} is translated to {link ApexParDoOperator} that wraps the {@link DoFn}.
- */
-class ParDoBoundTranslator<InputT, OutputT> implements
-    TransformTranslator<ParDo.Bound<InputT, OutputT>> {
+/** {@link ParDo.Bound} is translated to {link ApexParDoOperator} that wraps the {@link DoFn}. */
+class ParDoBoundTranslator<InputT, OutputT>
+    implements TransformTranslator<ParDo.Bound<InputT, OutputT>> {
   private static final long serialVersionUID = 1L;
 
   @Override
   public void translate(ParDo.Bound<InputT, OutputT> transform, TranslationContext context) {
-    OldDoFn<InputT, OutputT> doFn = transform.getFn();
+    DoFn<InputT, OutputT> doFn = transform.getNewFn();
+    if (DoFnSignatures.getSignature(doFn.getClass()).stateDeclarations().size() > 0) {
+      throw new UnsupportedOperationException(
+          String.format(
+              "Found %s annotations on %s, but %s cannot yet be used with state in the %s.",
+              DoFn.StateId.class.getSimpleName(),
+              doFn.getClass().getName(),
+              DoFn.class.getSimpleName(),
+              ApexRunner.class.getSimpleName()));
+    }
+    OldDoFn<InputT, OutputT> oldDoFn = transform.getOldFn();
     PCollection<OutputT> output = context.getOutput();
     PCollection<InputT> input = context.getInput();
     List<PCollectionView<?>> sideInputs = transform.getSideInputs();
     Coder<InputT> inputCoder = input.getCoder();
-    WindowedValueCoder<InputT> wvInputCoder = FullWindowedValueCoder.of(inputCoder,
-        input.getWindowingStrategy().getWindowFn().windowCoder());
+    WindowedValueCoder<InputT> wvInputCoder =
+        FullWindowedValueCoder.of(
+            inputCoder, input.getWindowingStrategy().getWindowFn().windowCoder());
 
-    ApexParDoOperator<InputT, OutputT> operator = new ApexParDoOperator<>(
-        context.getPipelineOptions(),
-        doFn, new TupleTag<OutputT>(), TupleTagList.empty().getAll() /*sideOutputTags*/,
-        output.getWindowingStrategy(), sideInputs, wvInputCoder,
-        context.<Void>stateInternalsFactory()
-        );
+    ApexParDoOperator<InputT, OutputT> operator =
+        new ApexParDoOperator<>(
+            context.getPipelineOptions(),
+            oldDoFn,
+            new TupleTag<OutputT>(),
+            TupleTagList.empty().getAll() /*sideOutputTags*/,
+            output.getWindowingStrategy(),
+            sideInputs,
+            wvInputCoder,
+            context.<Void>stateInternalsFactory());
     context.addOperator(operator, operator.output);
     context.addStream(context.getInput(), operator.input);
     if (!sideInputs.isEmpty()) {
-       ParDoBoundMultiTranslator.addSideInputs(operator, sideInputs, context);
+      ParDoBoundMultiTranslator.addSideInputs(operator, sideInputs, context);
     }
   }
 }


[2/3] incubator-beam git commit: Add JUnit category for stateful ParDo tests

Posted by th...@apache.org.
Add JUnit category for stateful ParDo tests


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f8b6bb7f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f8b6bb7f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f8b6bb7f

Branch: refs/heads/master
Commit: f8b6bb7f1ab8720ca4f2d766831d8f243dd27085
Parents: 70efa47
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Nov 21 15:41:13 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Nov 21 21:32:47 2016 -0800

----------------------------------------------------------------------
 .../beam/sdk/testing/UsesStatefulParDo.java     | 25 ++++++++++++++++++++
 1 file changed, 25 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f8b6bb7f/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesStatefulParDo.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesStatefulParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesStatefulParDo.java
new file mode 100644
index 0000000..8bd6330
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesStatefulParDo.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.testing;
+
+import org.apache.beam.sdk.transforms.ParDo;
+
+/**
+ * Category tag for validation tests which utilize stateful {@link ParDo}.
+ */
+public interface UsesStatefulParDo {}


[3/3] incubator-beam git commit: Closes #1410

Posted by th...@apache.org.
Closes #1410


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/6ec45f7e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/6ec45f7e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/6ec45f7e

Branch: refs/heads/master
Commit: 6ec45f7e732ede0e2f8256df7a483a017461594d
Parents: e7d7aa9 e85cea7
Author: Thomas Weise <th...@apache.org>
Authored: Tue Nov 22 11:03:56 2016 -0800
Committer: Thomas Weise <th...@apache.org>
Committed: Tue Nov 22 11:03:56 2016 -0800

----------------------------------------------------------------------
 runners/apex/pom.xml                            |  1 +
 .../translation/ParDoBoundMultiTranslator.java  | 67 +++++++++++++-------
 .../apex/translation/ParDoBoundTranslator.java  | 46 +++++++++-----
 3 files changed, 74 insertions(+), 40 deletions(-)
----------------------------------------------------------------------