You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ta...@apache.org on 2017/07/13 03:06:32 UTC

[20/50] [abbrv] beam git commit: Reject stateful ParDo if coder not KvCoder with deterministic key coder

Reject stateful ParDo if coder not KvCoder with deterministic key coder


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f8974672
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f8974672
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f8974672

Branch: refs/heads/DSL_SQL
Commit: f89746722419ef3c60f92d7a0fa17e4e6247b265
Parents: 81a96ab
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Jul 5 17:24:25 2017 -0700
Committer: Tyler Akidau <ta...@apache.org>
Committed: Wed Jul 12 20:01:00 2017 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/transforms/ParDo.java   |  27 +++++
 .../apache/beam/sdk/transforms/ParDoTest.java   | 102 +++++++++++++++++++
 2 files changed, 129 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/f8974672/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
index db1f791..0d03835 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
@@ -32,6 +32,7 @@ import org.apache.beam.sdk.PipelineRunner;
 import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.state.StateSpec;
 import org.apache.beam.sdk.transforms.DoFn.WindowedContext;
 import org.apache.beam.sdk.transforms.display.DisplayData;
@@ -455,6 +456,27 @@ public class ParDo {
     }
   }
 
+  private static void validateStateApplicableForInput(
+      DoFn<?, ?> fn,
+      PCollection<?> input) {
+    Coder<?> inputCoder = input.getCoder();
+    checkArgument(
+        inputCoder instanceof KvCoder,
+        "%s requires its input to use %s in order to use state and timers.",
+        ParDo.class.getSimpleName(),
+        KvCoder.class.getSimpleName());
+
+    KvCoder<?, ?> kvCoder = (KvCoder<?, ?>) inputCoder;
+    try {
+        kvCoder.getKeyCoder().verifyDeterministic();
+    } catch (Coder.NonDeterministicException exc) {
+      throw new IllegalArgumentException(
+          String.format(
+              "%s requires a deterministic key coder in order to use state and timers",
+              ParDo.class.getSimpleName()));
+    }
+  }
+
   /**
    * Try to provide coders for as many of the type arguments of given
    * {@link DoFnSignature.StateDeclaration} as possible.
@@ -737,6 +759,11 @@ public class ParDo {
       // Use coder registry to determine coders for all StateSpec defined in the fn signature.
       finishSpecifyingStateSpecs(fn, input.getPipeline().getCoderRegistry(), input.getCoder());
 
+      DoFnSignature signature = DoFnSignatures.getSignature(fn.getClass());
+      if (signature.usesState() || signature.usesTimers()) {
+        validateStateApplicableForInput(fn, input);
+      }
+
       PCollectionTuple outputs = PCollectionTuple.ofPrimitiveOutputsInternal(
           input.getPipeline(),
           TupleTagList.of(mainOutputTag).and(additionalOutputTags.getAll()),

http://git-wip-us.apache.org/repos/asf/beam/blob/f8974672/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index 5b60ef3..fa4949e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -1593,6 +1593,108 @@ public class ParDoTest implements Serializable {
   }
 
   @Test
+  public void testStateNotKeyed() {
+    final String stateId = "foo";
+
+    DoFn<String, Integer> fn =
+        new DoFn<String, Integer>() {
+
+          @StateId(stateId)
+          private final StateSpec<ValueState<Integer>> intState =
+              StateSpecs.value();
+
+          @ProcessElement
+          public void processElement(
+              ProcessContext c, @StateId(stateId) ValueState<Integer> state) {}
+        };
+
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("state");
+    thrown.expectMessage("KvCoder");
+
+    pipeline.apply(Create.of("hello", "goodbye", "hello again")).apply(ParDo.of(fn));
+  }
+
+  @Test
+  public void testStateNotDeterministic() {
+    final String stateId = "foo";
+
+    // DoubleCoder is not deterministic, so this should crash
+    DoFn<KV<Double, String>, Integer> fn =
+        new DoFn<KV<Double, String>, Integer>() {
+
+          @StateId(stateId)
+          private final StateSpec<ValueState<Integer>> intState =
+              StateSpecs.value();
+
+          @ProcessElement
+          public void processElement(
+              ProcessContext c, @StateId(stateId) ValueState<Integer> state) {}
+        };
+
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("state");
+    thrown.expectMessage("deterministic");
+
+    pipeline
+        .apply(Create.of(KV.of(1.0, "hello"), KV.of(5.4, "goodbye"), KV.of(7.2, "hello again")))
+        .apply(ParDo.of(fn));
+  }
+
+  @Test
+  public void testTimerNotKeyed() {
+    final String timerId = "foo";
+
+    DoFn<String, Integer> fn =
+        new DoFn<String, Integer>() {
+
+          @TimerId(timerId)
+          private final TimerSpec timer = TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+          @ProcessElement
+          public void processElement(
+              ProcessContext c, @TimerId(timerId) Timer timer) {}
+
+          @OnTimer(timerId)
+          public void onTimer() {}
+        };
+
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("timer");
+    thrown.expectMessage("KvCoder");
+
+    pipeline.apply(Create.of("hello", "goodbye", "hello again")).apply(ParDo.of(fn));
+  }
+
+  @Test
+  public void testTimerNotDeterministic() {
+    final String timerId = "foo";
+
+    // DoubleCoder is not deterministic, so this should crash
+    DoFn<KV<Double, String>, Integer> fn =
+        new DoFn<KV<Double, String>, Integer>() {
+
+          @TimerId(timerId)
+          private final TimerSpec timer = TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+          @ProcessElement
+          public void processElement(
+              ProcessContext c, @TimerId(timerId) Timer timer) {}
+
+          @OnTimer(timerId)
+          public void onTimer() {}
+        };
+
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("timer");
+    thrown.expectMessage("deterministic");
+
+    pipeline
+        .apply(Create.of(KV.of(1.0, "hello"), KV.of(5.4, "goodbye"), KV.of(7.2, "hello again")))
+        .apply(ParDo.of(fn));
+  }
+
+  @Test
   @Category({ValidatesRunner.class, UsesStatefulParDo.class})
   public void testValueStateCoderInference() {
     final String stateId = "foo";