You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/10/20 01:02:14 UTC
[5/6] incubator-beam git commit: Reject all timers in ParDo, for now
Reject all timers in ParDo, for now
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f9712f2b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f9712f2b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f9712f2b
Branch: refs/heads/master
Commit: f9712f2bacb9aac9d5df5c6021bb3cfb59758806
Parents: ccefc6f
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Oct 18 13:09:57 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Wed Oct 19 17:52:21 2016 -0700
----------------------------------------------------------------------
.../java/org/apache/beam/sdk/transforms/ParDo.java | 17 +++++++++++++++++
1 file changed, 17 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f9712f2b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
index 776f768..8aa87e4 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
@@ -563,12 +563,29 @@ public class ParDo {
DoFn.class.getSimpleName()));
}
+ // To be removed when the features are complete and runners have their own adequate
+ // rejection logic
+ if (!signature.timerDeclarations().isEmpty()) {
+ throw new UnsupportedOperationException(
+ String.format("Found %s annotations on %s, but %s cannot yet be used with timers.",
+ DoFn.TimerId.class.getSimpleName(),
+ fn.getClass().getName(),
+ DoFn.class.getSimpleName()));
+ }
+
// State is semantically incompatible with splitting
if (!signature.stateDeclarations().isEmpty() && signature.processElement().isSplittable()) {
throw new UnsupportedOperationException(
String.format("%s is splittable and uses state, but these are not compatible",
fn.getClass().getName()));
}
+
+ // Timers are semantically incompatible with splitting
+ if (!signature.timerDeclarations().isEmpty() && signature.processElement().isSplittable()) {
+ throw new UnsupportedOperationException(
+ String.format("%s is splittable and uses timers, but these are not compatible",
+ fn.getClass().getName()));
+ }
}
private static <InputT, OutputT> OldDoFn<InputT, OutputT> adapt(DoFn<InputT, OutputT> fn) {