You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ta...@apache.org on 2017/07/13 03:06:32 UTC
[20/50] [abbrv] beam git commit: Reject stateful ParDo if coder not
KvCoder with deterministic key coder
Reject stateful ParDo if coder not KvCoder with deterministic key coder
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f8974672
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f8974672
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f8974672
Branch: refs/heads/DSL_SQL
Commit: f89746722419ef3c60f92d7a0fa17e4e6247b265
Parents: 81a96ab
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Jul 5 17:24:25 2017 -0700
Committer: Tyler Akidau <ta...@apache.org>
Committed: Wed Jul 12 20:01:00 2017 -0700
----------------------------------------------------------------------
.../org/apache/beam/sdk/transforms/ParDo.java | 27 +++++
.../apache/beam/sdk/transforms/ParDoTest.java | 102 +++++++++++++++++++
2 files changed, 129 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/f8974672/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
index db1f791..0d03835 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
@@ -32,6 +32,7 @@ import org.apache.beam.sdk.PipelineRunner;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.transforms.DoFn.WindowedContext;
import org.apache.beam.sdk.transforms.display.DisplayData;
@@ -455,6 +456,27 @@ public class ParDo {
}
}
+ private static void validateStateApplicableForInput(
+ DoFn<?, ?> fn,
+ PCollection<?> input) {
+ Coder<?> inputCoder = input.getCoder();
+ checkArgument(
+ inputCoder instanceof KvCoder,
+ "%s requires its input to use %s in order to use state and timers.",
+ ParDo.class.getSimpleName(),
+ KvCoder.class.getSimpleName());
+
+ KvCoder<?, ?> kvCoder = (KvCoder<?, ?>) inputCoder;
+ try {
+ kvCoder.getKeyCoder().verifyDeterministic();
+ } catch (Coder.NonDeterministicException exc) {
+ throw new IllegalArgumentException(
+ String.format(
+ "%s requires a deterministic key coder in order to use state and timers",
+ ParDo.class.getSimpleName()));
+ }
+ }
+
/**
* Try to provide coders for as many of the type arguments of given
* {@link DoFnSignature.StateDeclaration} as possible.
@@ -737,6 +759,11 @@ public class ParDo {
// Use coder registry to determine coders for all StateSpec defined in the fn signature.
finishSpecifyingStateSpecs(fn, input.getPipeline().getCoderRegistry(), input.getCoder());
+ DoFnSignature signature = DoFnSignatures.getSignature(fn.getClass());
+ if (signature.usesState() || signature.usesTimers()) {
+ validateStateApplicableForInput(fn, input);
+ }
+
PCollectionTuple outputs = PCollectionTuple.ofPrimitiveOutputsInternal(
input.getPipeline(),
TupleTagList.of(mainOutputTag).and(additionalOutputTags.getAll()),
http://git-wip-us.apache.org/repos/asf/beam/blob/f8974672/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index 5b60ef3..fa4949e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -1593,6 +1593,108 @@ public class ParDoTest implements Serializable {
}
@Test
+ public void testStateNotKeyed() {
+ final String stateId = "foo";
+
+ DoFn<String, Integer> fn =
+ new DoFn<String, Integer>() {
+
+ @StateId(stateId)
+ private final StateSpec<ValueState<Integer>> intState =
+ StateSpecs.value();
+
+ @ProcessElement
+ public void processElement(
+ ProcessContext c, @StateId(stateId) ValueState<Integer> state) {}
+ };
+
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("state");
+ thrown.expectMessage("KvCoder");
+
+ pipeline.apply(Create.of("hello", "goodbye", "hello again")).apply(ParDo.of(fn));
+ }
+
+ @Test
+ public void testStateNotDeterministic() {
+ final String stateId = "foo";
+
+ // DoubleCoder is not deterministic, so this should crash
+ DoFn<KV<Double, String>, Integer> fn =
+ new DoFn<KV<Double, String>, Integer>() {
+
+ @StateId(stateId)
+ private final StateSpec<ValueState<Integer>> intState =
+ StateSpecs.value();
+
+ @ProcessElement
+ public void processElement(
+ ProcessContext c, @StateId(stateId) ValueState<Integer> state) {}
+ };
+
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("state");
+ thrown.expectMessage("deterministic");
+
+ pipeline
+ .apply(Create.of(KV.of(1.0, "hello"), KV.of(5.4, "goodbye"), KV.of(7.2, "hello again")))
+ .apply(ParDo.of(fn));
+ }
+
+ @Test
+ public void testTimerNotKeyed() {
+ final String timerId = "foo";
+
+ DoFn<String, Integer> fn =
+ new DoFn<String, Integer>() {
+
+ @TimerId(timerId)
+ private final TimerSpec timer = TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+ @ProcessElement
+ public void processElement(
+ ProcessContext c, @TimerId(timerId) Timer timer) {}
+
+ @OnTimer(timerId)
+ public void onTimer() {}
+ };
+
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("timer");
+ thrown.expectMessage("KvCoder");
+
+ pipeline.apply(Create.of("hello", "goodbye", "hello again")).apply(ParDo.of(fn));
+ }
+
+ @Test
+ public void testTimerNotDeterministic() {
+ final String timerId = "foo";
+
+ // DoubleCoder is not deterministic, so this should crash
+ DoFn<KV<Double, String>, Integer> fn =
+ new DoFn<KV<Double, String>, Integer>() {
+
+ @TimerId(timerId)
+ private final TimerSpec timer = TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+ @ProcessElement
+ public void processElement(
+ ProcessContext c, @TimerId(timerId) Timer timer) {}
+
+ @OnTimer(timerId)
+ public void onTimer() {}
+ };
+
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("timer");
+ thrown.expectMessage("deterministic");
+
+ pipeline
+ .apply(Create.of(KV.of(1.0, "hello"), KV.of(5.4, "goodbye"), KV.of(7.2, "hello again")))
+ .apply(ParDo.of(fn));
+ }
+
+ @Test
@Category({ValidatesRunner.class, UsesStatefulParDo.class})
public void testValueStateCoderInference() {
final String stateId = "foo";