You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/12/08 18:09:41 UTC

[04/13] incubator-beam git commit: Reject timers for ParDo in DirectRunner

Reject timers for ParDo in DirectRunner


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/274f17f0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/274f17f0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/274f17f0

Branch: refs/heads/master
Commit: 274f17f0c0df08785a78d9a60c22d5556e46584a
Parents: 29f3af3
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Dec 7 20:37:33 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Dec 8 09:53:08 2016 -0800

----------------------------------------------------------------------
 .../runners/direct/ParDoMultiOverrideFactory.java     | 14 +++++++++++---
 1 file changed, 11 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/274f17f0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
index 8c96e9b..4e7914f 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
@@ -50,9 +50,17 @@ class ParDoMultiOverrideFactory<InputT, OutputT>
     DoFnSignature signature = DoFnSignatures.getSignature(fn.getClass());
     if (signature.processElement().isSplittable()) {
       return new SplittableParDo(transform);
-    } else if (signature.stateDeclarations().size() > 0
-        || signature.timerDeclarations().size() > 0) {
-
+    } else if (signature.timerDeclarations().size() > 0) {
+      // Temporarily actually reject timers
+      throw new UnsupportedOperationException(
+          String.format(
+              "Found %s annotations on %s, but %s cannot yet be used with timers in the %s.",
+              DoFn.TimerId.class.getSimpleName(),
+              fn.getClass().getName(),
+              DoFn.class.getSimpleName(),
+              DirectRunner.class.getSimpleName()));
+
+    } else if (signature.stateDeclarations().size() > 0) {
       // Based on the fact that the signature is stateful, DoFnSignatures ensures
       // that it is also keyed
       ParDo.BoundMulti<KV<?, ?>, OutputT> keyedTransform =