You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/12/08 18:09:38 UTC

[01/13] incubator-beam git commit: Disable tests for timers in ParDo for Apex runner

Repository: incubator-beam
Updated Branches:
  refs/heads/master 692905705 -> 96f9fce78


Disable tests for timers in ParDo for Apex runner


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/54c14be1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/54c14be1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/54c14be1

Branch: refs/heads/master
Commit: 54c14be11549f5733dc43932b08096b33ff74af4
Parents: 96c6a3b
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Dec 6 20:52:24 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Dec 8 09:53:07 2016 -0800

----------------------------------------------------------------------
 runners/apex/pom.xml | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/54c14be1/runners/apex/pom.xml
----------------------------------------------------------------------
diff --git a/runners/apex/pom.xml b/runners/apex/pom.xml
index 629e890..b604237 100644
--- a/runners/apex/pom.xml
+++ b/runners/apex/pom.xml
@@ -187,6 +187,7 @@
               <groups>org.apache.beam.sdk.testing.RunnableOnService</groups>
               <excludedGroups>
                 org.apache.beam.sdk.testing.UsesStatefulParDo,
+                org.apache.beam.sdk.testing.UsesTimersInParDo,
                 org.apache.beam.sdk.testing.UsesSplittableParDo
               </excludedGroups>
               <parallel>none</parallel>


[02/13] incubator-beam git commit: Add JUnit category UsesTimersInParDo

Posted by ke...@apache.org.
Add JUnit category UsesTimersInParDo

With this, various runners can disable tests for this capability
until they support it.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/96c6a3b4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/96c6a3b4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/96c6a3b4

Branch: refs/heads/master
Commit: 96c6a3b4cd46c6109ecc532b6e5f7fd47f702c9e
Parents: 6929057
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Dec 6 20:49:15 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Dec 8 09:53:07 2016 -0800

----------------------------------------------------------------------
 .../beam/sdk/testing/UsesTimersInParDo.java     | 25 ++++++++++++++++++++
 1 file changed, 25 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/96c6a3b4/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesTimersInParDo.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesTimersInParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesTimersInParDo.java
new file mode 100644
index 0000000..14123ed
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesTimersInParDo.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.testing;
+
+import org.apache.beam.sdk.transforms.ParDo;
+
+/**
+ * Category tag for validation tests which utilize timers in {@link ParDo}.
+ */
+public interface UsesTimersInParDo {}


[08/13] incubator-beam git commit: Disable tests for timers in ParDo for Dataflow runner

Posted by ke...@apache.org.
Disable tests for timers in ParDo for Dataflow runner


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/5a8f9565
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/5a8f9565
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/5a8f9565

Branch: refs/heads/master
Commit: 5a8f9565d82dd813428f08d97f975c69b28a58ee
Parents: 320f888
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Dec 6 20:53:16 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Dec 8 09:53:08 2016 -0800

----------------------------------------------------------------------
 runners/google-cloud-dataflow-java/pom.xml | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5a8f9565/runners/google-cloud-dataflow-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml
index 9ead74a..1543c0e 100644
--- a/runners/google-cloud-dataflow-java/pom.xml
+++ b/runners/google-cloud-dataflow-java/pom.xml
@@ -78,6 +78,7 @@
             <id>runnable-on-service-tests</id>
             <configuration>
               <excludedGroups>
+                org.apache.beam.sdk.testing.UsesTimersInParDo,
                 org.apache.beam.sdk.testing.UsesSplittableParDo
               </excludedGroups>
               <excludes>


[07/13] incubator-beam git commit: Disables tests for timers in ParDo for Spark runner

Posted by ke...@apache.org.
Disables tests for timers in ParDo for Spark runner


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/320f8881
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/320f8881
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/320f8881

Branch: refs/heads/master
Commit: 320f8881e91628fa843e0eb2918d2de7a7aae329
Parents: 2f13fe4
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Dec 6 20:53:05 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Dec 8 09:53:08 2016 -0800

----------------------------------------------------------------------
 runners/spark/pom.xml | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/320f8881/runners/spark/pom.xml
----------------------------------------------------------------------
diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml
index 9a3adf6..d1ef225 100644
--- a/runners/spark/pom.xml
+++ b/runners/spark/pom.xml
@@ -74,6 +74,7 @@
                   <groups>org.apache.beam.sdk.testing.RunnableOnService</groups>
                   <excludedGroups>
                     org.apache.beam.sdk.testing.UsesStatefulParDo,
+                    org.apache.beam.sdk.testing.UsesTimersInParDo,
                     org.apache.beam.sdk.testing.UsesSplittableParDo
                   </excludedGroups>
                   <forkCount>1</forkCount>


[03/13] incubator-beam git commit: Add basic test for timers in ParDoTest

Posted by ke...@apache.org.
Add basic test for timers in ParDoTest


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/50ffc7be
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/50ffc7be
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/50ffc7be

Branch: refs/heads/master
Commit: 50ffc7be7f41a38ce214f30dd76aa56ddbd245aa
Parents: a99dba5
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Dec 6 20:49:40 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Dec 8 09:53:08 2016 -0800

----------------------------------------------------------------------
 .../apache/beam/sdk/transforms/ParDoTest.java   | 49 ++++++++++++++++++++
 1 file changed, 49 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/50ffc7be/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index 41e795e..36666b2 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -55,7 +55,9 @@ import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestStream;
 import org.apache.beam.sdk.testing.UsesStatefulParDo;
+import org.apache.beam.sdk.testing.UsesTimersInParDo;
 import org.apache.beam.sdk.transforms.DoFn.OnTimer;
 import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
 import org.apache.beam.sdk.transforms.ParDo.Bound;
@@ -69,6 +71,7 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.util.TimeDomain;
+import org.apache.beam.sdk.util.Timer;
 import org.apache.beam.sdk.util.TimerSpec;
 import org.apache.beam.sdk.util.TimerSpecs;
 import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
@@ -1571,6 +1574,52 @@ public class ParDoTest implements Serializable {
     p.run();
   }
 
+  /**
+   * Tests that an event time timer fires and results in supplementary output.
+   *
+   * <p>This test relies on two properties:
+   *
+   * <ol>
+   * <li>A timer that is set on time should always get a chance to fire. For this to be true, timers
+   *     per-key-and-window must be delivered in order so the timer is not wiped out until the
+   *     window is expired by the runner.
+   * <li>A {@link Create} transform sends its elements on time, and later advances the watermark to
+   *     infinity
+   * </ol>
+   *
+   * <p>Note that {@link TestStream} is not applicable because it requires very special runner hooks
+   * and is only supported by the direct runner.
+   */
+  @Test
+  @Category({RunnableOnService.class, UsesTimersInParDo.class})
+  public void testSimpleEventTimeTimer() throws Exception {
+    final String timerId = "foo";
+
+    DoFn<KV<String, Integer>, Integer> fn =
+        new DoFn<KV<String, Integer>, Integer>() {
+
+          @TimerId(timerId)
+          private final TimerSpec spec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+          @ProcessElement
+          public void processElement(ProcessContext context, @TimerId(timerId) Timer timer) {
+            timer.setForNowPlus(Duration.standardSeconds(1));
+            context.output(3);
+          }
+
+          @OnTimer(timerId)
+          public void onTimer(OnTimerContext context) {
+            context.output(42);
+          }
+        };
+
+    Pipeline p = TestPipeline.create();
+
+    PCollection<Integer> output = p.apply(Create.of(KV.of("hello", 37))).apply(ParDo.of(fn));
+    PAssert.that(output).containsInAnyOrder(3, 42);
+    p.run();
+  }
+
   @Test
   public void testWithOutputTagsDisplayData() {
     DoFn<String, String> fn = new DoFn<String, String>() {


[10/13] incubator-beam git commit: Reject timers for ParDo in SparkRunner

Posted by ke...@apache.org.
Reject timers for ParDo in SparkRunner


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/29f3af30
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/29f3af30
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/29f3af30

Branch: refs/heads/master
Commit: 29f3af30a4b871244e14998d670b8ca26bd8de94
Parents: 69e0ea2
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Dec 7 20:35:08 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Dec 8 09:53:08 2016 -0800

----------------------------------------------------------------------
 .../spark/translation/TransformTranslator.java  | 46 ++++++++++++--------
 1 file changed, 27 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/29f3af30/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
index e033ab1..8170366 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
@@ -58,6 +58,7 @@ import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.Window;
@@ -81,7 +82,6 @@ import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.api.java.function.Function;
 import org.apache.spark.api.java.function.PairFunction;
-
 import scala.Tuple2;
 
 
@@ -228,20 +228,36 @@ public final class TransformTranslator {
     };
   }
 
+  private static void rejectStateAndTimers(DoFn<?, ?> doFn) {
+    DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass());
+
+    if (signature.stateDeclarations().size() > 0) {
+      throw new UnsupportedOperationException(
+          String.format(
+              "Found %s annotations on %s, but %s cannot yet be used with state in the %s.",
+              DoFn.StateId.class.getSimpleName(),
+              doFn.getClass().getName(),
+              DoFn.class.getSimpleName(),
+              SparkRunner.class.getSimpleName()));
+    }
+
+    if (signature.timerDeclarations().size() > 0) {
+      throw new UnsupportedOperationException(
+          String.format(
+              "Found %s annotations on %s, but %s cannot yet be used with timers in the %s.",
+              DoFn.TimerId.class.getSimpleName(),
+              doFn.getClass().getName(),
+              DoFn.class.getSimpleName(),
+              SparkRunner.class.getSimpleName()));
+    }
+  }
+
   private static <InputT, OutputT> TransformEvaluator<ParDo.Bound<InputT, OutputT>> parDo() {
     return new TransformEvaluator<ParDo.Bound<InputT, OutputT>>() {
       @Override
       public void evaluate(ParDo.Bound<InputT, OutputT> transform, EvaluationContext context) {
         DoFn<InputT, OutputT> doFn = transform.getNewFn();
-        if (DoFnSignatures.getSignature(doFn.getClass()).stateDeclarations().size() > 0) {
-          throw new UnsupportedOperationException(
-              String.format(
-                  "Found %s annotations on %s, but %s cannot yet be used with state in the %s.",
-                  DoFn.StateId.class.getSimpleName(),
-                  doFn.getClass().getName(),
-                  DoFn.class.getSimpleName(),
-                  SparkRunner.class.getSimpleName()));
-        }
+        rejectStateAndTimers(doFn);
         @SuppressWarnings("unchecked")
         JavaRDD<WindowedValue<InputT>> inRDD =
             ((BoundedDataset<InputT>) context.borrowDataset(transform)).getRDD();
@@ -265,15 +281,7 @@ public final class TransformTranslator {
       @Override
       public void evaluate(ParDo.BoundMulti<InputT, OutputT> transform, EvaluationContext context) {
         DoFn<InputT, OutputT> doFn = transform.getNewFn();
-        if (DoFnSignatures.getSignature(doFn.getClass()).stateDeclarations().size() > 0) {
-          throw new UnsupportedOperationException(
-              String.format(
-                  "Found %s annotations on %s, but %s cannot yet be used with state in the %s.",
-                  DoFn.StateId.class.getSimpleName(),
-                  doFn.getClass().getName(),
-                  DoFn.class.getSimpleName(),
-                  SparkRunner.class.getSimpleName()));
-        }
+        rejectStateAndTimers(doFn);
         @SuppressWarnings("unchecked")
         JavaRDD<WindowedValue<InputT>> inRDD =
             ((BoundedDataset<InputT>) context.borrowDataset(transform)).getRDD();


[12/13] incubator-beam git commit: Reject timers for ParDo in FlinkRunner

Posted by ke...@apache.org.
Reject timers for ParDo in FlinkRunner


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/69e0ea25
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/69e0ea25
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/69e0ea25

Branch: refs/heads/master
Commit: 69e0ea25f24597b84c93137dd94e2f25a9b88a15
Parents: 18db3ac
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Dec 7 20:34:59 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Dec 8 09:53:08 2016 -0800

----------------------------------------------------------------------
 .../FlinkBatchTransformTranslators.java         | 46 ++++++++++++--------
 .../FlinkStreamingTransformTranslators.java     | 45 +++++++++++--------
 2 files changed, 54 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69e0ea25/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
index 474d4e3..9ac907f 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
@@ -56,6 +56,7 @@ import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.join.RawUnionValue;
 import org.apache.beam.sdk.transforms.join.UnionCoder;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
@@ -483,6 +484,30 @@ class FlinkBatchTransformTranslators {
     }
   }
 
+  private static void rejectStateAndTimers(DoFn<?, ?> doFn) {
+    DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass());
+
+    if (signature.stateDeclarations().size() > 0) {
+      throw new UnsupportedOperationException(
+          String.format(
+              "Found %s annotations on %s, but %s cannot yet be used with state in the %s.",
+              DoFn.StateId.class.getSimpleName(),
+              doFn.getClass().getName(),
+              DoFn.class.getSimpleName(),
+              FlinkRunner.class.getSimpleName()));
+    }
+
+    if (signature.timerDeclarations().size() > 0) {
+      throw new UnsupportedOperationException(
+          String.format(
+              "Found %s annotations on %s, but %s cannot yet be used with timers in the %s.",
+              DoFn.TimerId.class.getSimpleName(),
+              doFn.getClass().getName(),
+              DoFn.class.getSimpleName(),
+              FlinkRunner.class.getSimpleName()));
+    }
+  }
+
   private static class ParDoBoundTranslatorBatch<InputT, OutputT>
       implements FlinkBatchPipelineTranslator.BatchTransformTranslator<
           ParDo.Bound<InputT, OutputT>> {
@@ -493,15 +518,7 @@ class FlinkBatchTransformTranslators {
 
         FlinkBatchTranslationContext context) {
       DoFn<InputT, OutputT> doFn = transform.getNewFn();
-      if (DoFnSignatures.getSignature(doFn.getClass()).stateDeclarations().size() > 0) {
-        throw new UnsupportedOperationException(
-            String.format(
-                "Found %s annotations on %s, but %s cannot yet be used with state in the %s.",
-                DoFn.StateId.class.getSimpleName(),
-                doFn.getClass().getName(),
-                DoFn.class.getSimpleName(),
-                FlinkRunner.class.getSimpleName()));
-      }
+      rejectStateAndTimers(doFn);
 
       DataSet<WindowedValue<InputT>> inputDataSet =
           context.getInputDataSet(context.getInput(transform));
@@ -549,16 +566,7 @@ class FlinkBatchTransformTranslators {
         ParDo.BoundMulti<InputT, OutputT> transform,
         FlinkBatchTranslationContext context) {
       DoFn<InputT, OutputT> doFn = transform.getNewFn();
-      if (DoFnSignatures.getSignature(doFn.getClass()).stateDeclarations().size() > 0) {
-        throw new UnsupportedOperationException(
-            String.format(
-                "Found %s annotations on %s, but %s cannot yet be used with state in the %s.",
-                DoFn.StateId.class.getSimpleName(),
-                doFn.getClass().getName(),
-                DoFn.class.getSimpleName(),
-                FlinkRunner.class.getSimpleName()));
-      }
-
+      rejectStateAndTimers(doFn);
       DataSet<WindowedValue<InputT>> inputDataSet =
           context.getInputDataSet(context.getInput(transform));
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69e0ea25/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
index 7b32c76..042f8df 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
@@ -57,6 +57,7 @@ import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.join.RawUnionValue;
 import org.apache.beam.sdk.transforms.join.UnionCoder;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
@@ -304,6 +305,30 @@ public class FlinkStreamingTransformTranslators {
     }
   }
 
+  private static void rejectStateAndTimers(DoFn<?, ?> doFn) {
+    DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass());
+
+    if (signature.stateDeclarations().size() > 0) {
+      throw new UnsupportedOperationException(
+          String.format(
+              "Found %s annotations on %s, but %s cannot yet be used with state in the %s.",
+              DoFn.StateId.class.getSimpleName(),
+              doFn.getClass().getName(),
+              DoFn.class.getSimpleName(),
+              FlinkRunner.class.getSimpleName()));
+    }
+
+    if (signature.timerDeclarations().size() > 0) {
+      throw new UnsupportedOperationException(
+          String.format(
+              "Found %s annotations on %s, but %s cannot yet be used with timers in the %s.",
+              DoFn.TimerId.class.getSimpleName(),
+              doFn.getClass().getName(),
+              DoFn.class.getSimpleName(),
+              FlinkRunner.class.getSimpleName()));
+    }
+  }
+
   private static class ParDoBoundStreamingTranslator<InputT, OutputT>
       extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<
         ParDo.Bound<InputT, OutputT>> {
@@ -314,15 +339,7 @@ public class FlinkStreamingTransformTranslators {
         FlinkStreamingTranslationContext context) {
 
       DoFn<InputT, OutputT> doFn = transform.getNewFn();
-      if (DoFnSignatures.getSignature(doFn.getClass()).stateDeclarations().size() > 0) {
-        throw new UnsupportedOperationException(
-            String.format(
-                "Found %s annotations on %s, but %s cannot yet be used with state in the %s.",
-                DoFn.StateId.class.getSimpleName(),
-                doFn.getClass().getName(),
-                DoFn.class.getSimpleName(),
-                FlinkRunner.class.getSimpleName()));
-      }
+      rejectStateAndTimers(doFn);
 
       WindowingStrategy<?, ?> windowingStrategy =
           context.getOutput(transform).getWindowingStrategy();
@@ -474,15 +491,7 @@ public class FlinkStreamingTransformTranslators {
         FlinkStreamingTranslationContext context) {
 
       DoFn<InputT, OutputT> doFn = transform.getNewFn();
-      if (DoFnSignatures.getSignature(doFn.getClass()).stateDeclarations().size() > 0) {
-        throw new UnsupportedOperationException(
-            String.format(
-                "Found %s annotations on %s, but %s cannot yet be used with state in the %s.",
-                DoFn.StateId.class.getSimpleName(),
-                doFn.getClass().getName(),
-                DoFn.class.getSimpleName(),
-                FlinkRunner.class.getSimpleName()));
-      }
+      rejectStateAndTimers(doFn);
 
       // we assume that the transformation does not change the windowing strategy.
       WindowingStrategy<?, ?> windowingStrategy =


[09/13] incubator-beam git commit: No longer reject timers in ParDo

Posted by ke...@apache.org.
No longer reject timers in ParDo


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c22e2a43
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c22e2a43
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c22e2a43

Branch: refs/heads/master
Commit: c22e2a435113c9653b58f1040a4e9266059767f4
Parents: 274f17f
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Dec 7 20:04:51 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Dec 8 09:53:08 2016 -0800

----------------------------------------------------------------------
 .../main/java/org/apache/beam/sdk/transforms/ParDo.java   | 10 ----------
 1 file changed, 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c22e2a43/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
index 4f7491e..e60c536 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
@@ -594,16 +594,6 @@ public class ParDo {
   private static <InputT, OutputT> void validate(DoFn<InputT, OutputT> fn) {
     DoFnSignature signature = DoFnSignatures.getSignature((Class) fn.getClass());
 
-    // To be removed when the features are complete and runners have their own adequate
-    // rejection logic
-    if (!signature.timerDeclarations().isEmpty()) {
-      throw new UnsupportedOperationException(
-          String.format("Found %s annotations on %s, but %s cannot yet be used with timers.",
-              DoFn.TimerId.class.getSimpleName(),
-              fn.getClass().getName(),
-              DoFn.class.getSimpleName()));
-    }
-
     // State is semantically incompatible with splitting
     if (!signature.stateDeclarations().isEmpty() && signature.processElement().isSplittable()) {
       throw new UnsupportedOperationException(


[13/13] incubator-beam git commit: This closes #1550

Posted by ke...@apache.org.
This closes #1550


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/96f9fce7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/96f9fce7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/96f9fce7

Branch: refs/heads/master
Commit: 96f9fce782d7ccc5257eff8993b4f9b8261651a6
Parents: 6929057 c22e2a4
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Dec 8 09:53:09 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Dec 8 09:53:09 2016 -0800

----------------------------------------------------------------------
 runners/apex/pom.xml                            |  1 +
 .../translation/ParDoBoundMultiTranslator.java  | 16 ++++++-
 .../apex/translation/ParDoBoundTranslator.java  | 16 ++++++-
 runners/direct-java/pom.xml                     |  1 +
 .../direct/ParDoMultiOverrideFactory.java       | 14 ++++--
 runners/flink/runner/pom.xml                    |  1 +
 .../FlinkBatchTransformTranslators.java         | 46 ++++++++++--------
 .../FlinkStreamingTransformTranslators.java     | 45 +++++++++++-------
 runners/google-cloud-dataflow-java/pom.xml      |  1 +
 runners/spark/pom.xml                           |  1 +
 .../spark/translation/TransformTranslator.java  | 46 ++++++++++--------
 .../beam/sdk/testing/UsesTimersInParDo.java     | 25 ++++++++++
 .../org/apache/beam/sdk/transforms/ParDo.java   | 10 ----
 .../apache/beam/sdk/transforms/ParDoTest.java   | 49 ++++++++++++++++++++
 14 files changed, 201 insertions(+), 71 deletions(-)
----------------------------------------------------------------------



[06/13] incubator-beam git commit: Disable tests for timers in ParDo for Flink runner

Posted by ke...@apache.org.
Disable tests for timers in ParDo for Flink runner


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2f13fe41
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2f13fe41
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2f13fe41

Branch: refs/heads/master
Commit: 2f13fe412c20f21b711b7686cdd18c68efbdd038
Parents: 54c14be
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Dec 6 20:52:49 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Dec 8 09:53:08 2016 -0800

----------------------------------------------------------------------
 runners/flink/runner/pom.xml | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2f13fe41/runners/flink/runner/pom.xml
----------------------------------------------------------------------
diff --git a/runners/flink/runner/pom.xml b/runners/flink/runner/pom.xml
index 615d5f1..9509476 100644
--- a/runners/flink/runner/pom.xml
+++ b/runners/flink/runner/pom.xml
@@ -55,6 +55,7 @@
                   <groups>org.apache.beam.sdk.testing.RunnableOnService</groups>
                   <excludedGroups>
                     org.apache.beam.sdk.testing.UsesStatefulParDo,
+                    org.apache.beam.sdk.testing.UsesTimersInParDo,
                     org.apache.beam.sdk.testing.UsesSplittableParDo
                   </excludedGroups>
                   <parallel>none</parallel>


[04/13] incubator-beam git commit: Reject timers for ParDo in DirectRunner

Posted by ke...@apache.org.
Reject timers for ParDo in DirectRunner


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/274f17f0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/274f17f0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/274f17f0

Branch: refs/heads/master
Commit: 274f17f0c0df08785a78d9a60c22d5556e46584a
Parents: 29f3af3
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Dec 7 20:37:33 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Dec 8 09:53:08 2016 -0800

----------------------------------------------------------------------
 .../runners/direct/ParDoMultiOverrideFactory.java     | 14 +++++++++++---
 1 file changed, 11 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/274f17f0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
index 8c96e9b..4e7914f 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
@@ -50,9 +50,17 @@ class ParDoMultiOverrideFactory<InputT, OutputT>
     DoFnSignature signature = DoFnSignatures.getSignature(fn.getClass());
     if (signature.processElement().isSplittable()) {
       return new SplittableParDo(transform);
-    } else if (signature.stateDeclarations().size() > 0
-        || signature.timerDeclarations().size() > 0) {
-
+    } else if (signature.timerDeclarations().size() > 0) {
+      // Temporarily actually reject timers
+      throw new UnsupportedOperationException(
+          String.format(
+              "Found %s annotations on %s, but %s cannot yet be used with timers in the %s.",
+              DoFn.TimerId.class.getSimpleName(),
+              fn.getClass().getName(),
+              DoFn.class.getSimpleName(),
+              DirectRunner.class.getSimpleName()));
+
+    } else if (signature.stateDeclarations().size() > 0) {
       // Based on the fact that the signature is stateful, DoFnSignatures ensures
       // that it is also keyed
       ParDo.BoundMulti<KV<?, ?>, OutputT> keyedTransform =


[11/13] incubator-beam git commit: Disable tests for timers in ParDo for direct runner

Posted by ke...@apache.org.
Disable tests for timers in ParDo for direct runner


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a99dba59
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a99dba59
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a99dba59

Branch: refs/heads/master
Commit: a99dba5955813a092e78fb438a1b5b96480fae3a
Parents: 5a8f956
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Dec 7 20:24:34 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Dec 8 09:53:08 2016 -0800

----------------------------------------------------------------------
 runners/direct-java/pom.xml | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a99dba59/runners/direct-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/direct-java/pom.xml b/runners/direct-java/pom.xml
index 79f3de6..c47f532 100644
--- a/runners/direct-java/pom.xml
+++ b/runners/direct-java/pom.xml
@@ -68,6 +68,7 @@
             </goals>
             <configuration>
               <groups>org.apache.beam.sdk.testing.NeedsRunner</groups>
+              <excludedGroups>org.apache.beam.sdk.testing.UsesTimersInParDo</excludedGroups>
               <parallel>none</parallel>
               <failIfNoTests>true</failIfNoTests>
               <dependenciesToScan>


[05/13] incubator-beam git commit: Reject timers for ParDo in ApexRunner

Posted by ke...@apache.org.
Reject timers for ParDo in ApexRunner


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/18db3ace
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/18db3ace
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/18db3ace

Branch: refs/heads/master
Commit: 18db3ace77e89203d7ec3f342fe6ce24a2119226
Parents: 50ffc7b
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Dec 7 20:34:34 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Dec 8 09:53:08 2016 -0800

----------------------------------------------------------------------
 .../apex/translation/ParDoBoundMultiTranslator.java | 16 +++++++++++++++-
 .../apex/translation/ParDoBoundTranslator.java      | 16 +++++++++++++++-
 2 files changed, 30 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/18db3ace/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundMultiTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundMultiTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundMultiTranslator.java
index fed5f4b..706482a 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundMultiTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundMultiTranslator.java
@@ -33,6 +33,7 @@ import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
 import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
 import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
@@ -54,7 +55,9 @@ class ParDoBoundMultiTranslator<InputT, OutputT>
   @Override
   public void translate(ParDo.BoundMulti<InputT, OutputT> transform, TranslationContext context) {
     DoFn<InputT, OutputT> doFn = transform.getNewFn();
-    if (DoFnSignatures.getSignature(doFn.getClass()).stateDeclarations().size() > 0) {
+    DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass());
+
+    if (signature.stateDeclarations().size() > 0) {
       throw new UnsupportedOperationException(
           String.format(
               "Found %s annotations on %s, but %s cannot yet be used with state in the %s.",
@@ -63,6 +66,17 @@ class ParDoBoundMultiTranslator<InputT, OutputT>
               DoFn.class.getSimpleName(),
               ApexRunner.class.getSimpleName()));
     }
+
+    if (signature.timerDeclarations().size() > 0) {
+      throw new UnsupportedOperationException(
+          String.format(
+              "Found %s annotations on %s, but %s cannot yet be used with timers in the %s.",
+              DoFn.TimerId.class.getSimpleName(),
+              doFn.getClass().getName(),
+              DoFn.class.getSimpleName(),
+              ApexRunner.class.getSimpleName()));
+    }
+
     OldDoFn<InputT, OutputT> oldDoFn = transform.getFn();
     PCollectionTuple output = context.getOutput();
     PCollection<InputT> input = context.getInput();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/18db3ace/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslator.java
index 7a918a7..b5a50f6 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoBoundTranslator.java
@@ -25,6 +25,7 @@ import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
 import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
 import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
@@ -41,7 +42,9 @@ class ParDoBoundTranslator<InputT, OutputT>
   @Override
   public void translate(ParDo.Bound<InputT, OutputT> transform, TranslationContext context) {
     DoFn<InputT, OutputT> doFn = transform.getNewFn();
-    if (DoFnSignatures.getSignature(doFn.getClass()).stateDeclarations().size() > 0) {
+    DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass());
+
+    if (signature.stateDeclarations().size() > 0) {
       throw new UnsupportedOperationException(
           String.format(
               "Found %s annotations on %s, but %s cannot yet be used with state in the %s.",
@@ -50,6 +53,17 @@ class ParDoBoundTranslator<InputT, OutputT>
               DoFn.class.getSimpleName(),
               ApexRunner.class.getSimpleName()));
     }
+
+    if (signature.timerDeclarations().size() > 0) {
+      throw new UnsupportedOperationException(
+          String.format(
+              "Found %s annotations on %s, but %s cannot yet be used with timers in the %s.",
+              DoFn.TimerId.class.getSimpleName(),
+              doFn.getClass().getName(),
+              DoFn.class.getSimpleName(),
+              ApexRunner.class.getSimpleName()));
+    }
+
     OldDoFn<InputT, OutputT> oldDoFn = transform.getOldFn();
     PCollection<OutputT> output = context.getOutput();
     PCollection<InputT> input = context.getInput();