You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/12/08 18:09:38 UTC
[01/13] incubator-beam git commit: Disable tests for timers in ParDo
for Apex runner
Repository: incubator-beam
Updated Branches:
refs/heads/master 692905705 -> 96f9fce78
Disable tests for timers in ParDo for Apex runner
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/54c14be1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/54c14be1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/54c14be1
Branch: refs/heads/master
Commit: 54c14be11549f5733dc43932b08096b33ff74af4
Parents: 96c6a3b
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Dec 6 20:52:24 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Dec 8 09:53:07 2016 -0800
----------------------------------------------------------------------
runners/apex/pom.xml | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/54c14be1/runners/apex/pom.xml
----------------------------------------------------------------------
diff --git a/runners/apex/pom.xml b/runners/apex/pom.xml
index 629e890..b604237 100644
--- a/runners/apex/pom.xml
+++ b/runners/apex/pom.xml
@@ -187,6 +187,7 @@
<groups>org.apache.beam.sdk.testing.RunnableOnService</groups>
<excludedGroups>
org.apache.beam.sdk.testing.UsesStatefulParDo,
+ org.apache.beam.sdk.testing.UsesTimersInParDo,
org.apache.beam.sdk.testing.UsesSplittableParDo
</excludedGroups>
<parallel>none</parallel>
[02/13] incubator-beam git commit: Add JUnit category
UsesTimersInParDo
Posted by ke...@apache.org.
Add JUnit category UsesTimersInParDo
With this, various runners can disable tests for this capability
until they support it.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/96c6a3b4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/96c6a3b4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/96c6a3b4
Branch: refs/heads/master
Commit: 96c6a3b4cd46c6109ecc532b6e5f7fd47f702c9e
Parents: 6929057
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Dec 6 20:49:15 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Dec 8 09:53:07 2016 -0800
----------------------------------------------------------------------
.../beam/sdk/testing/UsesTimersInParDo.java | 25 ++++++++++++++++++++
1 file changed, 25 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96c6a3b4/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesTimersInParDo.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesTimersInParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesTimersInParDo.java
new file mode 100644
index 0000000..14123ed
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesTimersInParDo.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.testing;
+
+import org.apache.beam.sdk.transforms.ParDo;
+
+/**
+ * Category tag for validation tests which utilize timers in {@link ParDo}.
+ */
+public interface UsesTimersInParDo {}
[08/13] incubator-beam git commit: Disable tests for timers in ParDo
for Dataflow runner
Posted by ke...@apache.org.
Disable tests for timers in ParDo for Dataflow runner
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/5a8f9565
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/5a8f9565
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/5a8f9565
Branch: refs/heads/master
Commit: 5a8f9565d82dd813428f08d97f975c69b28a58ee
Parents: 320f888
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Dec 6 20:53:16 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Dec 8 09:53:08 2016 -0800
----------------------------------------------------------------------
runners/google-cloud-dataflow-java/pom.xml | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5a8f9565/runners/google-cloud-dataflow-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml
index 9ead74a..1543c0e 100644
--- a/runners/google-cloud-dataflow-java/pom.xml
+++ b/runners/google-cloud-dataflow-java/pom.xml
@@ -78,6 +78,7 @@
<id>runnable-on-service-tests</id>
<configuration>
<excludedGroups>
+ org.apache.beam.sdk.testing.UsesTimersInParDo,
org.apache.beam.sdk.testing.UsesSplittableParDo
</excludedGroups>
<excludes>
[07/13] incubator-beam git commit: Disables tests for timers in ParDo
for Spark runner
Posted by ke...@apache.org.
Disables tests for timers in ParDo for Spark runner
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/320f8881
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/320f8881
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/320f8881
Branch: refs/heads/master
Commit: 320f8881e91628fa843e0eb2918d2de7a7aae329
Parents: 2f13fe4
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Dec 6 20:53:05 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Dec 8 09:53:08 2016 -0800
----------------------------------------------------------------------
runners/spark/pom.xml | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/320f8881/runners/spark/pom.xml
----------------------------------------------------------------------
diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml
index 9a3adf6..d1ef225 100644
--- a/runners/spark/pom.xml
+++ b/runners/spark/pom.xml
@@ -74,6 +74,7 @@
<groups>org.apache.beam.sdk.testing.RunnableOnService</groups>
<excludedGroups>
org.apache.beam.sdk.testing.UsesStatefulParDo,
+ org.apache.beam.sdk.testing.UsesTimersInParDo,
org.apache.beam.sdk.testing.UsesSplittableParDo
</excludedGroups>
<forkCount>1</forkCount>
[03/13] incubator-beam git commit: Add basic test for timers in
ParDoTest
Posted by ke...@apache.org.
Add basic test for timers in ParDoTest
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/50ffc7be
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/50ffc7be
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/50ffc7be
Branch: refs/heads/master
Commit: 50ffc7be7f41a38ce214f30dd76aa56ddbd245aa
Parents: a99dba5
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Dec 6 20:49:40 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Dec 8 09:53:08 2016 -0800
----------------------------------------------------------------------
.../apache/beam/sdk/transforms/ParDoTest.java | 49 ++++++++++++++++++++
1 file changed, 49 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/50ffc7be/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index 41e795e..36666b2 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -55,7 +55,9 @@ import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.RunnableOnService;
import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.testing.UsesStatefulParDo;
+import org.apache.beam.sdk.testing.UsesTimersInParDo;
import org.apache.beam.sdk.transforms.DoFn.OnTimer;
import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
import org.apache.beam.sdk.transforms.ParDo.Bound;
@@ -69,6 +71,7 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.TimeDomain;
+import org.apache.beam.sdk.util.Timer;
import org.apache.beam.sdk.util.TimerSpec;
import org.apache.beam.sdk.util.TimerSpecs;
import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
@@ -1571,6 +1574,52 @@ public class ParDoTest implements Serializable {
p.run();
}
+ /**
+ * Tests that an event time timer fires and results in supplementary output.
+ *
+ * <p>This test relies on two properties:
+ *
+ * <ol>
+ * <li>A timer that is set on time should always get a chance to fire. For this to be true, timers
+ * per-key-and-window must be delivered in order so the timer is not wiped out until the
+ * window is expired by the runner.
+ * <li>A {@link Create} transform sends its elements on time, and later advances the watermark to
+ * infinity
+ * </ol>
+ *
+ * <p>Note that {@link TestStream} is not applicable because it requires very special runner hooks
+ * and is only supported by the direct runner.
+ */
+ @Test
+ @Category({RunnableOnService.class, UsesTimersInParDo.class})
+ public void testSimpleEventTimeTimer() throws Exception {
+ final String timerId = "foo";
+
+ DoFn<KV<String, Integer>, Integer> fn =
+ new DoFn<KV<String, Integer>, Integer>() {
+
+ @TimerId(timerId)
+ private final TimerSpec spec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+ @ProcessElement
+ public void processElement(ProcessContext context, @TimerId(timerId) Timer timer) {
+ timer.setForNowPlus(Duration.standardSeconds(1));
+ context.output(3);
+ }
+
+ @OnTimer(timerId)
+ public void onTimer(OnTimerContext context) {
+ context.output(42);
+ }
+ };
+
+ Pipeline p = TestPipeline.create();
+
+ PCollection<Integer> output = p.apply(Create.of(KV.of("hello", 37))).apply(ParDo.of(fn));
+ PAssert.that(output).containsInAnyOrder(3, 42);
+ p.run();
+ }
+
@Test
public void testWithOutputTagsDisplayData() {
DoFn<String, String> fn = new DoFn<String, String>() {
[10/13] incubator-beam git commit: Reject timers for ParDo in
SparkRunner
Posted by ke...@apache.org.
Reject timers for ParDo in SparkRunner
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/29f3af30
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/29f3af30
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/29f3af30
Branch: refs/heads/master
Commit: 29f3af30a4b871244e14998d670b8ca26bd8de94
Parents: 69e0ea2
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Dec 7 20:35:08 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Dec 8 09:53:08 2016 -0800
----------------------------------------------------------------------
.../spark/translation/TransformTranslator.java | 46 ++++++++++++--------
1 file changed, 27 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/29f3af30/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
index e033ab1..8170366 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
@@ -58,6 +58,7 @@ import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.Window;
@@ -81,7 +82,6 @@ import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
-
import scala.Tuple2;
@@ -228,20 +228,36 @@ public final class TransformTranslator {
};
}
+ private static void rejectStateAndTimers(DoFn<?, ?> doFn) {
+ DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass());
+
+ if (signature.stateDeclarations().size() > 0) {
+ throw new UnsupportedOperationException(
+ String.format(
+ "Found %s annotations on %s, but %s cannot yet be used with state in the %s.",
+ DoFn.StateId.class.getSimpleName(),
+ doFn.getClass().getName(),
+ DoFn.class.getSimpleName(),
+ SparkRunner.class.getSimpleName()));
+ }
+
+ if (signature.timerDeclarations().size() > 0) {
+ throw new UnsupportedOperationException(
+ String.format(
+ "Found %s annotations on %s, but %s cannot yet be used with timers in the %s.",
+ DoFn.TimerId.class.getSimpleName(),
+ doFn.getClass().getName(),
+ DoFn.class.getSimpleName(),
+ SparkRunner.class.getSimpleName()));
+ }
+ }
+
private static <InputT, OutputT> TransformEvaluator<ParDo.Bound<InputT, OutputT>> parDo() {
return new TransformEvaluator<ParDo.Bound<InputT, OutputT>>() {
@Override
public void evaluate(ParDo.Bound<InputT, OutputT> transform, EvaluationContext context) {
DoFn<InputT, OutputT> doFn = transform.getNewFn();
- if (DoFnSignatures.getSignature(doFn.getClass()).stateDeclarations().size() > 0) {
- throw new UnsupportedOperationException(
- String.format(
- "Found %s annotations on %s, but %s cannot yet be used with state in the %s.",
- DoFn.StateId.class.getSimpleName(),
- doFn.getClass().getName(),
- DoFn.class.getSimpleName(),
- SparkRunner.class.getSimpleName()));
- }
+ rejectStateAndTimers(doFn);
@SuppressWarnings("unchecked")
JavaRDD<WindowedValue<InputT>> inRDD =
((BoundedDataset<InputT>) context.borrowDataset(transform)).getRDD();
@@ -265,15 +281,7 @@ public final class TransformTranslator {
@Override
public void evaluate(ParDo.BoundMulti<InputT, OutputT> transform, EvaluationContext context) {
DoFn<InputT, OutputT> doFn = transform.getNewFn();
- if (DoFnSignatures.getSignature(doFn.getClass()).stateDeclarations().size() > 0) {
- throw new UnsupportedOperationException(
- String.format(
- "Found %s annotations on %s, but %s cannot yet be used with state in the %s.",
- DoFn.StateId.class.getSimpleName(),
- doFn.getClass().getName(),
- DoFn.class.getSimpleName(),
- SparkRunner.class.getSimpleName()));
- }
+ rejectStateAndTimers(doFn);
@SuppressWarnings("unchecked")
JavaRDD<WindowedValue<InputT>> inRDD =
((BoundedDataset<InputT>) context.borrowDataset(transform)).getRDD();
[12/13] incubator-beam git commit: Reject timers for ParDo in
FlinkRunner
Posted by ke...@apache.org.
Reject timers for ParDo in FlinkRunner
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/69e0ea25
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/69e0ea25
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/69e0ea25
Branch: refs/heads/master
Commit: 69e0ea25f24597b84c93137dd94e2f25a9b88a15
Parents: 18db3ac
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Dec 7 20:34:59 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Dec 8 09:53:08 2016 -0800
----------------------------------------------------------------------
.../FlinkBatchTransformTranslators.java | 46 ++++++++++++--------
.../FlinkStreamingTransformTranslators.java | 45 +++++++++++--------
2 files changed, 54 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69e0ea25/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
index 474d4e3..9ac907f 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
@@ -56,6 +56,7 @@ import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.join.RawUnionValue;
import org.apache.beam.sdk.transforms.join.UnionCoder;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
@@ -483,6 +484,30 @@ class FlinkBatchTransformTranslators {
}
}
+ private static void rejectStateAndTimers(DoFn<?, ?> doFn) {
+ DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass());
+
+ if (signature.stateDeclarations().size() > 0) {
+ throw new UnsupportedOperationException(
+ String.format(
+ "Found %s annotations on %s, but %s cannot yet be used with state in the %s.",
+ DoFn.StateId.class.getSimpleName(),
+ doFn.getClass().getName(),
+ DoFn.class.getSimpleName(),
+ FlinkRunner.class.getSimpleName()));
+ }
+
+ if (signature.timerDeclarations().size() > 0) {
+ throw new UnsupportedOperationException(
+ String.format(
+ "Found %s annotations on %s, but %s cannot yet be used with timers in the %s.",
+ DoFn.TimerId.class.getSimpleName(),
+ doFn.getClass().getName(),
+ DoFn.class.getSimpleName(),
+ FlinkRunner.class.getSimpleName()));
+ }
+ }
+
private static class ParDoBoundTranslatorBatch<InputT, OutputT>
implements FlinkBatchPipelineTranslator.BatchTransformTranslator<
ParDo.Bound<InputT, OutputT>> {
@@ -493,15 +518,7 @@ class FlinkBatchTransformTranslators {
FlinkBatchTranslationContext context) {
DoFn<InputT, OutputT> doFn = transform.getNewFn();
- if (DoFnSignatures.getSignature(doFn.getClass()).stateDeclarations().size() > 0) {
- throw new UnsupportedOperationException(
- String.format(
- "Found %s annotations on %s, but %s cannot yet be used with state in the %s.",
- DoFn.StateId.class.getSimpleName(),
- doFn.getClass().getName(),
- DoFn.class.getSimpleName(),
- FlinkRunner.class.getSimpleName()));
- }
+ rejectStateAndTimers(doFn);
DataSet<WindowedValue<InputT>> inputDataSet =
context.getInputDataSet(context.getInput(transform));
@@ -549,16 +566,7 @@ class FlinkBatchTransformTranslators {
ParDo.BoundMulti<InputT, OutputT> transform,
FlinkBatchTranslationContext context) {
DoFn<InputT, OutputT> doFn = transform.getNewFn();
- if (DoFnSignatures.getSignature(doFn.getClass()).stateDeclarations().size() > 0) {
- throw new UnsupportedOperationException(
- String.format(
- "Found %s annotations on %s, but %s cannot yet be used with state in the %s.",
- DoFn.StateId.class.getSimpleName(),
- doFn.getClass().getName(),
- DoFn.class.getSimpleName(),
- FlinkRunner.class.getSimpleName()));
- }
-
+ rejectStateAndTimers(doFn);
DataSet<WindowedValue<InputT>> inputDataSet =
context.getInputDataSet(context.getInput(transform));
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69e0ea25/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
index 7b32c76..042f8df 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
@@ -57,6 +57,7 @@ import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.join.RawUnionValue;
import org.apache.beam.sdk.transforms.join.UnionCoder;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
@@ -304,6 +305,30 @@ public class FlinkStreamingTransformTranslators {
}
}
+ private static void rejectStateAndTimers(DoFn<?, ?> doFn) {
+ DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass());
+
+ if (signature.stateDeclarations().size() > 0) {
+ throw new UnsupportedOperationException(
+ String.format(
+ "Found %s annotations on %s, but %s cannot yet be used with state in the %s.",
+ DoFn.StateId.class.getSimpleName(),
+ doFn.getClass().getName(),
+ DoFn.class.getSimpleName(),
+ FlinkRunner.class.getSimpleName()));
+ }
+
+ if (signature.timerDeclarations().size() > 0) {
+ throw new UnsupportedOperationException(
+ String.format(
+ "Found %s annotations on %s, but %s cannot yet be used with timers in the %s.",
+ DoFn.TimerId.class.getSimpleName(),
+ doFn.getClass().getName(),
+ DoFn.class.getSimpleName(),
+ FlinkRunner.class.getSimpleName()));
+ }
+ }
+
private static class ParDoBoundStreamingTranslator<InputT, OutputT>
extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<
ParDo.Bound<InputT, OutputT>> {
@@ -314,15 +339,7 @@ public class FlinkStreamingTransformTranslators {
FlinkStreamingTranslationContext context) {
DoFn<InputT, OutputT> doFn = transform.getNewFn();
- if (DoFnSignatures.getSignature(doFn.getClass()).stateDeclarations().size() > 0) {
- throw new UnsupportedOperationException(
- String.format(
- "Found %s annotations on %s, but %s cannot yet be used with state in the %s.",
- DoFn.StateId.class.getSimpleName(),
- doFn.getClass().getName(),
- DoFn.class.getSimpleName(),
- FlinkRunner.class.getSimpleName()));
- }
+ rejectStateAndTimers(doFn);
WindowingStrategy<?, ?> windowingStrategy =
context.getOutput(transform).getWindowingStrategy();
@@ -474,15 +491,7 @@ public class FlinkStreamingTransformTranslators {
FlinkStreamingTranslationContext context) {
DoFn<InputT, OutputT> doFn = transform.getNewFn();
- if (DoFnSignatures.getSignature(doFn.getClass()).stateDeclarations().size() > 0) {
- throw new UnsupportedOperationException(
- String.format(
- "Found %s annotations on %s, but %s cannot yet be used with state in the %s.",
- DoFn.StateId.class.getSimpleName(),
- doFn.getClass().getName(),
- DoFn.class.getSimpleName(),
- FlinkRunner.class.getSimpleName()));
- }
+ rejectStateAndTimers(doFn);
// we assume that the transformation does not change the windowing strategy.
WindowingStrategy<?, ?> windowingStrategy =
[09/13] incubator-beam git commit: No longer reject timers in ParDo
Posted by ke...@apache.org.
No longer reject timers in ParDo
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c22e2a43
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c22e2a43
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c22e2a43
Branch: refs/heads/master
Commit: c22e2a435113c9653b58f1040a4e9266059767f4
Parents: 274f17f
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Dec 7 20:04:51 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Dec 8 09:53:08 2016 -0800
----------------------------------------------------------------------
.../main/java/org/apache/beam/sdk/transforms/ParDo.java | 10 ----------
1 file changed, 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c22e2a43/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
index 4f7491e..e60c536 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
@@ -594,16 +594,6 @@ public class ParDo {
private static <InputT, OutputT> void validate(DoFn<InputT, OutputT> fn) {
DoFnSignature signature = DoFnSignatures.getSignature((Class) fn.getClass());
- // To be removed when the features are complete and runners have their own adequate
- // rejection logic
- if (!signature.timerDeclarations().isEmpty()) {
- throw new UnsupportedOperationException(
- String.format("Found %s annotations on %s, but %s cannot yet be used with timers.",
- DoFn.TimerId.class.getSimpleName(),
- fn.getClass().getName(),
- DoFn.class.getSimpleName()));
- }
-
// State is semantically incompatible with splitting
if (!signature.stateDeclarations().isEmpty() && signature.processElement().isSplittable()) {
throw new UnsupportedOperationException(
[13/13] incubator-beam git commit: This closes #1550
Posted by ke...@apache.org.
This closes #1550
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/96f9fce7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/96f9fce7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/96f9fce7
Branch: refs/heads/master
Commit: 96f9fce782d7ccc5257eff8993b4f9b8261651a6
Parents: 6929057 c22e2a4
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Dec 8 09:53:09 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Dec 8 09:53:09 2016 -0800
----------------------------------------------------------------------
runners/apex/pom.xml | 1 +
.../translation/ParDoBoundMultiTranslator.java | 16 ++++++-
.../apex/translation/ParDoBoundTranslator.java | 16 ++++++-
runners/direct-java/pom.xml | 1 +
.../direct/ParDoMultiOverrideFactory.java | 14 ++++--
runners/flink/runner/pom.xml | 1 +
.../FlinkBatchTransformTranslators.java | 46 ++++++++++--------
.../FlinkStreamingTransformTranslators.java | 45 +++++++++++-------
runners/google-cloud-dataflow-java/pom.xml | 1 +
runners/spark/pom.xml | 1 +
.../spark/translation/TransformTranslator.java | 46 ++++++++++--------
.../beam/sdk/testing/UsesTimersInParDo.java | 25 ++++++++++
.../org/apache/beam/sdk/transforms/ParDo.java | 10 ----
.../apache/beam/sdk/transforms/ParDoTest.java | 49 ++++++++++++++++++++
14 files changed, 201 insertions(+), 71 deletions(-)
----------------------------------------------------------------------
[06/13] incubator-beam git commit: Disable tests for timers in ParDo
for Flink runner
Posted by ke...@apache.org.
Disable tests for timers in ParDo for Flink runner
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2f13fe41
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2f13fe41
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2f13fe41
Branch: refs/heads/master
Commit: 2f13fe412c20f21b711b7686cdd18c68efbdd038
Parents: 54c14be
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Dec 6 20:52:49 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Dec 8 09:53:08 2016 -0800
----------------------------------------------------------------------
runners/flink/runner/pom.xml | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2f13fe41/runners/flink/runner/pom.xml
----------------------------------------------------------------------
diff --git a/runners/flink/runner/pom.xml b/runners/flink/runner/pom.xml
index 615d5f1..9509476 100644
--- a/runners/flink/runner/pom.xml
+++ b/runners/flink/runner/pom.xml
@@ -55,6 +55,7 @@
<groups>org.apache.beam.sdk.testing.RunnableOnService</groups>
<excludedGroups>
org.apache.beam.sdk.testing.UsesStatefulParDo,
+ org.apache.beam.sdk.testing.UsesTimersInParDo,
org.apache.beam.sdk.testing.UsesSplittableParDo
</excludedGroups>
<parallel>none</parallel>
[04/13] incubator-beam git commit: Reject timers for ParDo in
DirectRunner
Posted by ke...@apache.org.
Reject timers for ParDo in DirectRunner
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/274f17f0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/274f17f0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/274f17f0
Branch: refs/heads/master
Commit: 274f17f0c0df08785a78d9a60c22d5556e46584a
Parents: 29f3af3
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Dec 7 20:37:33 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Dec 8 09:53:08 2016 -0800
----------------------------------------------------------------------
.../runners/direct/ParDoMultiOverrideFactory.java | 14 +++++++++++---
1 file changed, 11 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/274f17f0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
index 8c96e9b..4e7914f 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
@@ -50,9 +50,17 @@ class ParDoMultiOverrideFactory<InputT, OutputT>
DoFnSignature signature = DoFnSignatures.getSignature(fn.getClass());
if (signature.processElement().isSplittable()) {
return new SplittableParDo(transform);
- } else if (signature.stateDeclarations().size() > 0
- || signature.timerDeclarations().size() > 0) {
-
+ } else if (signature.timerDeclarations().size() > 0) {
+ // Temporarily actually reject timers
+ throw new UnsupportedOperationException(
+ String.format(
+ "Found %s annotations on %s, but %s cannot yet be used with timers in the %s.",
+ DoFn.TimerId.class.getSimpleName(),
+ fn.getClass().getName(),
+ DoFn.class.getSimpleName(),
+ DirectRunner.class.getSimpleName()));
+
+ } else if (signature.stateDeclarations().size() > 0) {
// Based on the fact that the signature is stateful, DoFnSignatures ensures
// that it is also keyed
ParDo.BoundMulti<KV<?, ?>, OutputT> keyedTransform =
[11/13] incubator-beam git commit: Disable tests for timers in ParDo
for direct runner
Posted by ke...@apache.org.
Disable tests for timers in ParDo for direct runner
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a99dba59
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a99dba59
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a99dba59
Branch: refs/heads/master
Commit: a99dba5955813a092e78fb438a1b5b96480fae3a
Parents: 5a8f956
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Dec 7 20:24:34 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Dec 8 09:53:08 2016 -0800
----------------------------------------------------------------------
runners/direct-java/pom.xml | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a99dba59/runners/direct-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/direct-java/pom.xml b/runners/direct-java/pom.xml
index 79f3de6..c47f532 100644
--- a/runners/direct-java/pom.xml
+++ b/runners/direct-java/pom.xml
@@ -68,6 +68,7 @@
</goals>
<configuration>
<groups>org.apache.beam.sdk.testing.NeedsRunner</groups>
+ <excludedGroups>org.apache.beam.sdk.testing.UsesTimersInParDo</excludedGroups>
<parallel>none</parallel>
<failIfNoTests>true</failIfNoTests>
<dependenciesToScan>
[05/13] incubator-beam git commit: Reject timers for ParDo in
ApexRunner
Posted by ke...@apache.org.
Reject timers for ParDo in ApexRunner
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/18db3ace
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/18db3ace
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/18db3ace
Branch: refs/heads/master
Commit: 18db3ace77e89203d7ec3f342fe6ce24a2119226
Parents: 50ffc7b
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Dec 7 20:34:34 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Dec 8 09:53:08 2016 -0800
----------------------------------------------------------------------
.../apex/translation/ParDoBoundMultiTranslator.java | 16 +++++++++++++++-
.../apex/translation/ParDoBoundTranslator.java | 16 +++++++++++++++-
2 files changed, 30 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/18db3ace/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundMultiTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundMultiTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundMultiTranslator.java
index fed5f4b..706482a 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundMultiTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundMultiTranslator.java
@@ -33,6 +33,7 @@ import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
@@ -54,7 +55,9 @@ class ParDoBoundMultiTranslator<InputT, OutputT>
@Override
public void translate(ParDo.BoundMulti<InputT, OutputT> transform, TranslationContext context) {
DoFn<InputT, OutputT> doFn = transform.getNewFn();
- if (DoFnSignatures.getSignature(doFn.getClass()).stateDeclarations().size() > 0) {
+ DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass());
+
+ if (signature.stateDeclarations().size() > 0) {
throw new UnsupportedOperationException(
String.format(
"Found %s annotations on %s, but %s cannot yet be used with state in the %s.",
@@ -63,6 +66,17 @@ class ParDoBoundMultiTranslator<InputT, OutputT>
DoFn.class.getSimpleName(),
ApexRunner.class.getSimpleName()));
}
+
+ if (signature.timerDeclarations().size() > 0) {
+ throw new UnsupportedOperationException(
+ String.format(
+ "Found %s annotations on %s, but %s cannot yet be used with timers in the %s.",
+ DoFn.TimerId.class.getSimpleName(),
+ doFn.getClass().getName(),
+ DoFn.class.getSimpleName(),
+ ApexRunner.class.getSimpleName()));
+ }
+
OldDoFn<InputT, OutputT> oldDoFn = transform.getFn();
PCollectionTuple output = context.getOutput();
PCollection<InputT> input = context.getInput();
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/18db3ace/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslator.java
index 7a918a7..b5a50f6 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslator.java
@@ -25,6 +25,7 @@ import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
@@ -41,7 +42,9 @@ class ParDoBoundTranslator<InputT, OutputT>
@Override
public void translate(ParDo.Bound<InputT, OutputT> transform, TranslationContext context) {
DoFn<InputT, OutputT> doFn = transform.getNewFn();
- if (DoFnSignatures.getSignature(doFn.getClass()).stateDeclarations().size() > 0) {
+ DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass());
+
+ if (signature.stateDeclarations().size() > 0) {
throw new UnsupportedOperationException(
String.format(
"Found %s annotations on %s, but %s cannot yet be used with state in the %s.",
@@ -50,6 +53,17 @@ class ParDoBoundTranslator<InputT, OutputT>
DoFn.class.getSimpleName(),
ApexRunner.class.getSimpleName()));
}
+
+ if (signature.timerDeclarations().size() > 0) {
+ throw new UnsupportedOperationException(
+ String.format(
+ "Found %s annotations on %s, but %s cannot yet be used with timers in the %s.",
+ DoFn.TimerId.class.getSimpleName(),
+ doFn.getClass().getName(),
+ DoFn.class.getSimpleName(),
+ ApexRunner.class.getSimpleName()));
+ }
+
OldDoFn<InputT, OutputT> oldDoFn = transform.getOldFn();
PCollection<OutputT> output = context.getOutput();
PCollection<InputT> input = context.getInput();