You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/12/08 18:09:41 UTC
[04/13] incubator-beam git commit: Reject timers for ParDo in
DirectRunner
Reject timers for ParDo in DirectRunner
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/274f17f0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/274f17f0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/274f17f0
Branch: refs/heads/master
Commit: 274f17f0c0df08785a78d9a60c22d5556e46584a
Parents: 29f3af3
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Dec 7 20:37:33 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Dec 8 09:53:08 2016 -0800
----------------------------------------------------------------------
.../runners/direct/ParDoMultiOverrideFactory.java | 14 +++++++++++---
1 file changed, 11 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/274f17f0/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
index 8c96e9b..4e7914f 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
@@ -50,9 +50,17 @@ class ParDoMultiOverrideFactory<InputT, OutputT>
DoFnSignature signature = DoFnSignatures.getSignature(fn.getClass());
if (signature.processElement().isSplittable()) {
return new SplittableParDo(transform);
- } else if (signature.stateDeclarations().size() > 0
- || signature.timerDeclarations().size() > 0) {
-
+ } else if (signature.timerDeclarations().size() > 0) {
+ // Temporarily actually reject timers
+ throw new UnsupportedOperationException(
+ String.format(
+ "Found %s annotations on %s, but %s cannot yet be used with timers in the %s.",
+ DoFn.TimerId.class.getSimpleName(),
+ fn.getClass().getName(),
+ DoFn.class.getSimpleName(),
+ DirectRunner.class.getSimpleName()));
+
+ } else if (signature.stateDeclarations().size() > 0) {
// Based on the fact that the signature is stateful, DoFnSignatures ensures
// that it is also keyed
ParDo.BoundMulti<KV<?, ?>, OutputT> keyedTransform =