You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/10/20 01:02:14 UTC

[5/6] incubator-beam git commit: Reject all timers in ParDo, for now

Reject all timers in ParDo, for now


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f9712f2b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f9712f2b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f9712f2b

Branch: refs/heads/master
Commit: f9712f2bacb9aac9d5df5c6021bb3cfb59758806
Parents: ccefc6f
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Oct 18 13:09:57 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Wed Oct 19 17:52:21 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/transforms/ParDo.java | 17 +++++++++++++++++
 1 file changed, 17 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f9712f2b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
index 776f768..8aa87e4 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
@@ -563,12 +563,29 @@ public class ParDo {
               DoFn.class.getSimpleName()));
     }
 
+    // To be removed when the features are complete and runners have their own adequate
+    // rejection logic
+    if (!signature.timerDeclarations().isEmpty()) {
+      throw new UnsupportedOperationException(
+          String.format("Found %s annotations on %s, but %s cannot yet be used with timers.",
+              DoFn.TimerId.class.getSimpleName(),
+              fn.getClass().getName(),
+              DoFn.class.getSimpleName()));
+    }
+
     // State is semantically incompatible with splitting
     if (!signature.stateDeclarations().isEmpty() && signature.processElement().isSplittable()) {
       throw new UnsupportedOperationException(
           String.format("%s is splittable and uses state, but these are not compatible",
               fn.getClass().getName()));
     }
+
+    // Timers are semantically incompatible with splitting
+    if (!signature.timerDeclarations().isEmpty() && signature.processElement().isSplittable()) {
+      throw new UnsupportedOperationException(
+          String.format("%s is splittable and uses timers, but these are not compatible",
+              fn.getClass().getName()));
+    }
   }
 
   private static <InputT, OutputT> OldDoFn<InputT, OutputT> adapt(DoFn<InputT, OutputT> fn) {