You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by th...@apache.org on 2016/11/22 19:06:27 UTC
[1/3] incubator-beam git commit: Reject stateful DoFn in ApexRunner
Repository: incubator-beam
Updated Branches:
refs/heads/master e7d7aa938 -> 6ec45f7e7
Reject stateful DoFn in ApexRunner
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e85cea78
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e85cea78
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e85cea78
Branch: refs/heads/master
Commit: e85cea78253d2f316a18d95d65aabc1176448841
Parents: f8b6bb7
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Nov 15 21:33:01 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Nov 21 21:32:47 2016 -0800
----------------------------------------------------------------------
runners/apex/pom.xml | 1 +
.../translation/ParDoBoundMultiTranslator.java | 67 +++++++++++++-------
.../apex/translation/ParDoBoundTranslator.java | 46 +++++++++-----
3 files changed, 74 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e85cea78/runners/apex/pom.xml
----------------------------------------------------------------------
diff --git a/runners/apex/pom.xml b/runners/apex/pom.xml
index 5478b24..d0b0fdf 100644
--- a/runners/apex/pom.xml
+++ b/runners/apex/pom.xml
@@ -185,6 +185,7 @@
</goals>
<configuration>
<groups>org.apache.beam.sdk.testing.RunnableOnService</groups>
+ <excludedGroups>org.apache.beam.sdk.testing.UsesStatefulParDo</excludedGroups>
<parallel>none</parallel>
<failIfNoTests>true</failIfNoTests>
<dependenciesToScan>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e85cea78/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundMultiTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundMultiTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundMultiTranslator.java
index 7c91b91..fed5f4b 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundMultiTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundMultiTranslator.java
@@ -23,17 +23,17 @@ import static com.google.common.base.Preconditions.checkArgument;
import com.datatorrent.api.Operator;
import com.datatorrent.api.Operator.OutputPort;
import com.google.common.collect.Maps;
-
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-
+import org.apache.beam.runners.apex.ApexRunner;
import org.apache.beam.runners.apex.translation.operators.ApexParDoOperator;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
import org.apache.beam.sdk.values.PCollection;
@@ -53,20 +53,35 @@ class ParDoBoundMultiTranslator<InputT, OutputT>
@Override
public void translate(ParDo.BoundMulti<InputT, OutputT> transform, TranslationContext context) {
- OldDoFn<InputT, OutputT> doFn = transform.getFn();
+ DoFn<InputT, OutputT> doFn = transform.getNewFn();
+ if (DoFnSignatures.getSignature(doFn.getClass()).stateDeclarations().size() > 0) {
+ throw new UnsupportedOperationException(
+ String.format(
+ "Found %s annotations on %s, but %s cannot yet be used with state in the %s.",
+ DoFn.StateId.class.getSimpleName(),
+ doFn.getClass().getName(),
+ DoFn.class.getSimpleName(),
+ ApexRunner.class.getSimpleName()));
+ }
+ OldDoFn<InputT, OutputT> oldDoFn = transform.getFn();
PCollectionTuple output = context.getOutput();
PCollection<InputT> input = context.getInput();
List<PCollectionView<?>> sideInputs = transform.getSideInputs();
Coder<InputT> inputCoder = input.getCoder();
- WindowedValueCoder<InputT> wvInputCoder = FullWindowedValueCoder.of(inputCoder,
- input.getWindowingStrategy().getWindowFn().windowCoder());
+ WindowedValueCoder<InputT> wvInputCoder =
+ FullWindowedValueCoder.of(
+ inputCoder, input.getWindowingStrategy().getWindowFn().windowCoder());
- ApexParDoOperator<InputT, OutputT> operator = new ApexParDoOperator<>(
- context.getPipelineOptions(),
- doFn, transform.getMainOutputTag(), transform.getSideOutputTags().getAll(),
- context.<PCollection<?>>getInput().getWindowingStrategy(), sideInputs, wvInputCoder,
- context.<Void>stateInternalsFactory()
- );
+ ApexParDoOperator<InputT, OutputT> operator =
+ new ApexParDoOperator<>(
+ context.getPipelineOptions(),
+ oldDoFn,
+ transform.getMainOutputTag(),
+ transform.getSideOutputTags().getAll(),
+ context.<PCollection<?>>getInput().getWindowingStrategy(),
+ sideInputs,
+ wvInputCoder,
+ context.<Void>stateInternalsFactory());
Map<TupleTag<?>, PCollection<?>> outputs = output.getAll();
Map<PCollection<?>, OutputPort<?>> ports = Maps.newHashMapWithExpectedSize(outputs.size());
@@ -91,7 +106,9 @@ class ParDoBoundMultiTranslator<InputT, OutputT>
}
}
- static void addSideInputs(ApexParDoOperator<?, ?> operator, List<PCollectionView<?>> sideInputs,
+ static void addSideInputs(
+ ApexParDoOperator<?, ?> operator,
+ List<PCollectionView<?>> sideInputs,
TranslationContext context) {
Operator.InputPort<?>[] sideInputPorts = {operator.sideInput1};
if (sideInputs.size() > sideInputPorts.length) {
@@ -105,8 +122,8 @@ class ParDoBoundMultiTranslator<InputT, OutputT>
}
}
- private static PCollection<?> unionSideInputs(List<PCollectionView<?>> sideInputs,
- TranslationContext context) {
+ private static PCollection<?> unionSideInputs(
+ List<PCollectionView<?>> sideInputs, TranslationContext context) {
checkArgument(sideInputs.size() > 1, "requires multiple side inputs");
// flatten and assign union tag
List<PCollection<Object>> sourceCollections = new ArrayList<>();
@@ -115,13 +132,16 @@ class ParDoBoundMultiTranslator<InputT, OutputT>
for (int i = 0; i < sideInputs.size(); i++) {
PCollectionView<?> sideInput = sideInputs.get(i);
PCollection<?> sideInputCollection = context.getViewInput(sideInput);
- if (!sideInputCollection.getWindowingStrategy().equals(
- firstSideInput.getWindowingStrategy())) {
+ if (!sideInputCollection
+ .getWindowingStrategy()
+ .equals(firstSideInput.getWindowingStrategy())) {
// TODO: check how to handle this in stream codec
//String msg = "Multiple side inputs with different window strategies.";
//throw new UnsupportedOperationException(msg);
- LOG.warn("Side inputs union with different windowing strategies {} {}",
- firstSideInput.getWindowingStrategy(), sideInputCollection.getWindowingStrategy());
+ LOG.warn(
+ "Side inputs union with different windowing strategies {} {}",
+ firstSideInput.getWindowingStrategy(),
+ sideInputCollection.getWindowingStrategy());
}
if (!sideInputCollection.getCoder().equals(firstSideInput.getCoder())) {
String msg = "Multiple side inputs with different coders.";
@@ -131,12 +151,11 @@ class ParDoBoundMultiTranslator<InputT, OutputT>
unionTags.put(sideInputCollection, i);
}
- PCollection<Object> resultCollection = FlattenPCollectionTranslator.intermediateCollection(
- firstSideInput, firstSideInput.getCoder());
- FlattenPCollectionTranslator.flattenCollections(sourceCollections, unionTags, resultCollection,
- context);
+ PCollection<Object> resultCollection =
+ FlattenPCollectionTranslator.intermediateCollection(
+ firstSideInput, firstSideInput.getCoder());
+ FlattenPCollectionTranslator.flattenCollections(
+ sourceCollections, unionTags, resultCollection, context);
return resultCollection;
-
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e85cea78/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslator.java
index c1ebbd5..7a918a7 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslator.java
@@ -19,12 +19,13 @@
package org.apache.beam.runners.apex.translation;
import java.util.List;
-
+import org.apache.beam.runners.apex.ApexRunner;
import org.apache.beam.runners.apex.translation.operators.ApexParDoOperator;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
import org.apache.beam.sdk.values.PCollection;
@@ -32,33 +33,46 @@ import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
-/**
- * {@link ParDo.Bound} is translated to {link ApexParDoOperator} that wraps the {@link DoFn}.
- */
-class ParDoBoundTranslator<InputT, OutputT> implements
- TransformTranslator<ParDo.Bound<InputT, OutputT>> {
+/** {@link ParDo.Bound} is translated to {link ApexParDoOperator} that wraps the {@link DoFn}. */
+class ParDoBoundTranslator<InputT, OutputT>
+ implements TransformTranslator<ParDo.Bound<InputT, OutputT>> {
private static final long serialVersionUID = 1L;
@Override
public void translate(ParDo.Bound<InputT, OutputT> transform, TranslationContext context) {
- OldDoFn<InputT, OutputT> doFn = transform.getFn();
+ DoFn<InputT, OutputT> doFn = transform.getNewFn();
+ if (DoFnSignatures.getSignature(doFn.getClass()).stateDeclarations().size() > 0) {
+ throw new UnsupportedOperationException(
+ String.format(
+ "Found %s annotations on %s, but %s cannot yet be used with state in the %s.",
+ DoFn.StateId.class.getSimpleName(),
+ doFn.getClass().getName(),
+ DoFn.class.getSimpleName(),
+ ApexRunner.class.getSimpleName()));
+ }
+ OldDoFn<InputT, OutputT> oldDoFn = transform.getOldFn();
PCollection<OutputT> output = context.getOutput();
PCollection<InputT> input = context.getInput();
List<PCollectionView<?>> sideInputs = transform.getSideInputs();
Coder<InputT> inputCoder = input.getCoder();
- WindowedValueCoder<InputT> wvInputCoder = FullWindowedValueCoder.of(inputCoder,
- input.getWindowingStrategy().getWindowFn().windowCoder());
+ WindowedValueCoder<InputT> wvInputCoder =
+ FullWindowedValueCoder.of(
+ inputCoder, input.getWindowingStrategy().getWindowFn().windowCoder());
- ApexParDoOperator<InputT, OutputT> operator = new ApexParDoOperator<>(
- context.getPipelineOptions(),
- doFn, new TupleTag<OutputT>(), TupleTagList.empty().getAll() /*sideOutputTags*/,
- output.getWindowingStrategy(), sideInputs, wvInputCoder,
- context.<Void>stateInternalsFactory()
- );
+ ApexParDoOperator<InputT, OutputT> operator =
+ new ApexParDoOperator<>(
+ context.getPipelineOptions(),
+ oldDoFn,
+ new TupleTag<OutputT>(),
+ TupleTagList.empty().getAll() /*sideOutputTags*/,
+ output.getWindowingStrategy(),
+ sideInputs,
+ wvInputCoder,
+ context.<Void>stateInternalsFactory());
context.addOperator(operator, operator.output);
context.addStream(context.getInput(), operator.input);
if (!sideInputs.isEmpty()) {
- ParDoBoundMultiTranslator.addSideInputs(operator, sideInputs, context);
+ ParDoBoundMultiTranslator.addSideInputs(operator, sideInputs, context);
}
}
}
[2/3] incubator-beam git commit: Add JUnit category for stateful
ParDo tests
Posted by th...@apache.org.
Add JUnit category for stateful ParDo tests
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f8b6bb7f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f8b6bb7f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f8b6bb7f
Branch: refs/heads/master
Commit: f8b6bb7f1ab8720ca4f2d766831d8f243dd27085
Parents: 70efa47
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Nov 21 15:41:13 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Nov 21 21:32:47 2016 -0800
----------------------------------------------------------------------
.../beam/sdk/testing/UsesStatefulParDo.java | 25 ++++++++++++++++++++
1 file changed, 25 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f8b6bb7f/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesStatefulParDo.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesStatefulParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesStatefulParDo.java
new file mode 100644
index 0000000..8bd6330
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesStatefulParDo.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.testing;
+
+import org.apache.beam.sdk.transforms.ParDo;
+
+/**
+ * Category tag for validation tests which utilize stateful {@link ParDo}.
+ */
+public interface UsesStatefulParDo {}
[3/3] incubator-beam git commit: Closes #1410
Posted by th...@apache.org.
Closes #1410
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/6ec45f7e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/6ec45f7e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/6ec45f7e
Branch: refs/heads/master
Commit: 6ec45f7e732ede0e2f8256df7a483a017461594d
Parents: e7d7aa9 e85cea7
Author: Thomas Weise <th...@apache.org>
Authored: Tue Nov 22 11:03:56 2016 -0800
Committer: Thomas Weise <th...@apache.org>
Committed: Tue Nov 22 11:03:56 2016 -0800
----------------------------------------------------------------------
runners/apex/pom.xml | 1 +
.../translation/ParDoBoundMultiTranslator.java | 67 +++++++++++++-------
.../apex/translation/ParDoBoundTranslator.java | 46 +++++++++-----
3 files changed, 74 insertions(+), 40 deletions(-)
----------------------------------------------------------------------