You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/11/22 18:51:33 UTC
[1/3] incubator-beam git commit: Add JUnit category for stateful
ParDo tests
Repository: incubator-beam
Updated Branches:
refs/heads/master c2dc38639 -> 7949b7082
Add JUnit category for stateful ParDo tests
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8d715689
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8d715689
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8d715689
Branch: refs/heads/master
Commit: 8d715689dd5283b7b180c0b9ec4e188abba140f5
Parents: 70efa47
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Nov 21 15:41:13 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Nov 21 21:34:19 2016 -0800
----------------------------------------------------------------------
.../beam/sdk/testing/UsesStatefulParDo.java | 25 ++++++++++++++++++++
1 file changed, 25 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8d715689/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesStatefulParDo.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesStatefulParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesStatefulParDo.java
new file mode 100644
index 0000000..8bd6330
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesStatefulParDo.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.testing;
+
+import org.apache.beam.sdk.transforms.ParDo;
+
+/**
+ * Category tag for validation tests which utilize stateful {@link ParDo}.
+ */
+public interface UsesStatefulParDo {}
[3/3] incubator-beam git commit: This closes #1409
Posted by ke...@apache.org.
This closes #1409
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7949b708
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7949b708
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7949b708
Branch: refs/heads/master
Commit: 7949b708257c6e1dbd86db99bb5e10e2e47d33aa
Parents: c2dc386 b0d07d7
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Nov 22 10:51:14 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue Nov 22 10:51:14 2016 -0800
----------------------------------------------------------------------
runners/spark/pom.xml | 1 +
.../spark/translation/TransformTranslator.java | 23 ++++++++++++++++++
.../beam/sdk/testing/UsesStatefulParDo.java | 25 ++++++++++++++++++++
3 files changed, 49 insertions(+)
----------------------------------------------------------------------
[2/3] incubator-beam git commit: Reject stateful DoFn in SparkRunner
Posted by ke...@apache.org.
Reject stateful DoFn in SparkRunner
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b0d07d74
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b0d07d74
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b0d07d74
Branch: refs/heads/master
Commit: b0d07d74f7805ee1d30fdedf54c089790d63d898
Parents: 8d71568
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Nov 15 21:33:13 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Nov 21 21:34:20 2016 -0800
----------------------------------------------------------------------
runners/spark/pom.xml | 1 +
.../spark/translation/TransformTranslator.java | 23 ++++++++++++++++++++
2 files changed, 24 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b0d07d74/runners/spark/pom.xml
----------------------------------------------------------------------
diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml
index 4c5b3f5..88223e2 100644
--- a/runners/spark/pom.xml
+++ b/runners/spark/pom.xml
@@ -72,6 +72,7 @@
</goals>
<configuration>
<groups>org.apache.beam.sdk.testing.RunnableOnService</groups>
+ <excludedGroups>org.apache.beam.sdk.testing.UsesStatefulParDo</excludedGroups>
<forkCount>1</forkCount>
<reuseForks>false</reuseForks>
<failIfNoTests>true</failIfNoTests>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b0d07d74/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
index c902ee3..60d668e 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
@@ -31,6 +31,7 @@ import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapreduce.AvroJob;
import org.apache.avro.mapreduce.AvroKeyInputFormat;
import org.apache.beam.runners.core.AssignWindowsDoFn;
+import org.apache.beam.runners.spark.SparkRunner;
import org.apache.beam.runners.spark.aggregators.AccumulatorSingleton;
import org.apache.beam.runners.spark.aggregators.NamedAggregators;
import org.apache.beam.runners.spark.io.SourceRDD;
@@ -47,12 +48,14 @@ import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.CombineWithContext;
import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
@@ -225,6 +228,16 @@ public final class TransformTranslator {
return new TransformEvaluator<ParDo.Bound<InputT, OutputT>>() {
@Override
public void evaluate(ParDo.Bound<InputT, OutputT> transform, EvaluationContext context) {
+ DoFn<InputT, OutputT> doFn = transform.getNewFn();
+ if (DoFnSignatures.getSignature(doFn.getClass()).stateDeclarations().size() > 0) {
+ throw new UnsupportedOperationException(
+ String.format(
+ "Found %s annotations on %s, but %s cannot yet be used with state in the %s.",
+ DoFn.StateId.class.getSimpleName(),
+ doFn.getClass().getName(),
+ DoFn.class.getSimpleName(),
+ SparkRunner.class.getSimpleName()));
+ }
@SuppressWarnings("unchecked")
JavaRDD<WindowedValue<InputT>> inRDD =
((BoundedDataset<InputT>) context.borrowDataset(transform)).getRDD();
@@ -247,6 +260,16 @@ public final class TransformTranslator {
return new TransformEvaluator<ParDo.BoundMulti<InputT, OutputT>>() {
@Override
public void evaluate(ParDo.BoundMulti<InputT, OutputT> transform, EvaluationContext context) {
+ DoFn<InputT, OutputT> doFn = transform.getNewFn();
+ if (DoFnSignatures.getSignature(doFn.getClass()).stateDeclarations().size() > 0) {
+ throw new UnsupportedOperationException(
+ String.format(
+ "Found %s annotations on %s, but %s cannot yet be used with state in the %s.",
+ DoFn.StateId.class.getSimpleName(),
+ doFn.getClass().getName(),
+ DoFn.class.getSimpleName(),
+ SparkRunner.class.getSimpleName()));
+ }
@SuppressWarnings("unchecked")
JavaRDD<WindowedValue<InputT>> inRDD =
((BoundedDataset<InputT>) context.borrowDataset(transform)).getRDD();