You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/06/08 02:35:13 UTC

[1/2] incubator-beam git commit: Relocate Immutability Enforcement Tests

Repository: incubator-beam
Updated Branches:
  refs/heads/master 787b3351a -> 9a8cb95db


Relocate Immutability Enforcement Tests

These tests are of runner behavior rather than the model, and should be
tested as a runner test.

Stop wrapping IllegalMutationExceptions to surface failures due to
model invariant violations directly, rather than going through a
PipelineExecutionException.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/de00bd82
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/de00bd82
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/de00bd82

Branch: refs/heads/master
Commit: de00bd8269d03ed6eee8fa51c6f7c803e384fd50
Parents: 2173000
Author: Thomas Groh <tg...@google.com>
Authored: Tue May 24 18:05:16 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Tue Jun 7 15:14:20 2016 -0700

----------------------------------------------------------------------
 .../ImmutabilityCheckingBundleFactory.java      |  22 ++-
 .../ImmutabilityCheckingBundleFactoryTest.java  |  11 +-
 .../direct/InProcessPipelineRunnerTest.java     | 138 ++++++++++++++++++-
 .../apache/beam/sdk/transforms/ParDoTest.java   | 136 +-----------------
 4 files changed, 148 insertions(+), 159 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/de00bd82/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java
index 92a57dd..2a965ed 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java
@@ -27,7 +27,6 @@ import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.util.IllegalMutationException;
 import org.apache.beam.sdk.util.MutationDetector;
 import org.apache.beam.sdk.util.MutationDetectors;
-import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollection;
 
@@ -111,17 +110,16 @@ class ImmutabilityCheckingBundleFactory implements BundleFactory {
         try {
           detector.verifyUnmodified();
         } catch (IllegalMutationException exn) {
-          throw UserCodeException.wrap(
-              new IllegalMutationException(
-                  String.format(
-                      "PTransform %s mutated value %s after it was output (new value was %s)."
-                          + " Values must not be mutated in any way after being output.",
-                      underlying.getPCollection().getProducingTransformInternal().getFullName(),
-                      exn.getSavedValue(),
-                      exn.getNewValue()),
-                  exn.getSavedValue(),
-                  exn.getNewValue(),
-                  exn));
+            throw new IllegalMutationException(
+                String.format(
+                    "PTransform %s mutated value %s after it was output (new value was %s)."
+                        + " Values must not be mutated in any way after being output.",
+                    underlying.getPCollection().getProducingTransformInternal().getFullName(),
+                    exn.getSavedValue(),
+                    exn.getNewValue()),
+                exn.getSavedValue(),
+                exn.getNewValue(),
+                exn);
         }
       }
       return underlying.commit(synchronizedProcessingTime);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/de00bd82/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java
index 2e7847d..20670ca 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java
@@ -18,7 +18,6 @@
 package org.apache.beam.runners.direct;
 
 import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.Matchers.isA;
 import static org.junit.Assert.assertThat;
 
 import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
@@ -32,7 +31,6 @@ import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.IllegalMutationException;
-import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollection;
 
@@ -168,8 +166,7 @@ public class ImmutabilityCheckingBundleFactoryTest {
     root.add(WindowedValue.valueInGlobalWindow(array));
 
     array[1] = 2;
-    thrown.expect(UserCodeException.class);
-    thrown.expectCause(isA(IllegalMutationException.class));
+    thrown.expect(IllegalMutationException.class);
     thrown.expectMessage("Values must not be mutated in any way after being output");
     CommittedBundle<byte[]> committed = root.commit(Instant.now());
   }
@@ -191,8 +188,7 @@ public class ImmutabilityCheckingBundleFactoryTest {
     keyed.add(windowedArray);
 
     array[0] = Byte.MAX_VALUE;
-    thrown.expect(UserCodeException.class);
-    thrown.expectCause(isA(IllegalMutationException.class));
+    thrown.expect(IllegalMutationException.class);
     thrown.expectMessage("Values must not be mutated in any way after being output");
     CommittedBundle<byte[]> committed = keyed.commit(Instant.now());
   }
@@ -212,8 +208,7 @@ public class ImmutabilityCheckingBundleFactoryTest {
     intermediate.add(windowedArray);
 
     array[2] = -3;
-    thrown.expect(UserCodeException.class);
-    thrown.expectCause(isA(IllegalMutationException.class));
+    thrown.expect(IllegalMutationException.class);
     thrown.expectMessage("Values must not be mutated in any way after being output");
     CommittedBundle<byte[]> committed = intermediate.commit(Instant.now());
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/de00bd82/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessPipelineRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessPipelineRunnerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessPipelineRunnerTest.java
index 9314f5e..5c26ac3 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessPipelineRunnerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessPipelineRunnerTest.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.fail;
 import org.apache.beam.runners.direct.InProcessPipelineRunner.InProcessPipelineResult;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.ListCoder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -37,6 +38,7 @@ import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.SimpleFunction;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.util.IllegalMutationException;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionList;
@@ -44,6 +46,7 @@ import org.apache.beam.sdk.values.TypeDescriptor;
 
 import com.google.common.collect.ImmutableMap;
 
+
 import com.fasterxml.jackson.annotation.JsonValue;
 
 import org.junit.Rule;
@@ -54,6 +57,8 @@ import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
 import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -63,6 +68,14 @@ import java.util.Map;
 public class InProcessPipelineRunnerTest implements Serializable {
   @Rule public transient ExpectedException thrown = ExpectedException.none();
 
+  private Pipeline getPipeline() {
+    PipelineOptions opts = PipelineOptionsFactory.create();
+    opts.setRunner(InProcessPipelineRunner.class);
+
+    Pipeline p = Pipeline.create(opts);
+    return p;
+  }
+
   @Test
   public void wordCountShouldSucceed() throws Throwable {
     Pipeline p = getPipeline();
@@ -192,11 +205,126 @@ public class InProcessPipelineRunnerTest implements Serializable {
   }
 
 
-  private Pipeline getPipeline() {
-    PipelineOptions opts = PipelineOptionsFactory.create();
-    opts.setRunner(InProcessPipelineRunner.class);
+  /**
+   * Tests that a {@link DoFn} that mutates an output with a good equals() fails in the
+   * {@link InProcessPipelineRunner}.
+   */
+  @Test
+  public void testMutatingOutputThenOutputDoFnError() throws Exception {
+    Pipeline pipeline = getPipeline();
 
-    Pipeline p = Pipeline.create(opts);
-    return p;
+    pipeline
+        .apply(Create.of(42))
+        .apply(ParDo.of(new DoFn<Integer, List<Integer>>() {
+          @Override public void processElement(ProcessContext c) {
+            List<Integer> outputList = Arrays.asList(1, 2, 3, 4);
+            c.output(outputList);
+            outputList.set(0, 37);
+            c.output(outputList);
+          }
+        }));
+
+    thrown.expect(IllegalMutationException.class);
+    thrown.expectMessage("output");
+    thrown.expectMessage("must not be mutated");
+    pipeline.run();
+  }
+
+  /**
+   * Tests that a {@link DoFn} that mutates an output with a good equals() fails in the
+   * {@link InProcessPipelineRunner}.
+   */
+  @Test
+  public void testMutatingOutputThenTerminateDoFnError() throws Exception {
+    Pipeline pipeline = getPipeline();
+
+    pipeline
+        .apply(Create.of(42))
+        .apply(ParDo.of(new DoFn<Integer, List<Integer>>() {
+          @Override public void processElement(ProcessContext c) {
+            List<Integer> outputList = Arrays.asList(1, 2, 3, 4);
+            c.output(outputList);
+            outputList.set(0, 37);
+          }
+        }));
+
+    thrown.expect(IllegalMutationException.class);
+    thrown.expectMessage("output");
+    thrown.expectMessage("must not be mutated");
+    pipeline.run();
+  }
+
+  /**
+   * Tests that a {@link DoFn} that mutates an output with a bad equals() still fails
+   * in the {@link InProcessPipelineRunner}.
+   */
+  @Test
+  public void testMutatingOutputCoderDoFnError() throws Exception {
+    Pipeline pipeline = getPipeline();
+
+    pipeline
+        .apply(Create.of(42))
+        .apply(ParDo.of(new DoFn<Integer, byte[]>() {
+          @Override public void processElement(ProcessContext c) {
+            byte[] outputArray = new byte[]{0x1, 0x2, 0x3};
+            c.output(outputArray);
+            outputArray[0] = 0xa;
+            c.output(outputArray);
+          }
+        }));
+
+    thrown.expect(IllegalMutationException.class);
+    thrown.expectMessage("output");
+    thrown.expectMessage("must not be mutated");
+    pipeline.run();
+  }
+
+  /**
+   * Tests that a {@link DoFn} that mutates its input with a good equals() fails in the
+   * {@link InProcessPipelineRunner}.
+   */
+  @Test
+  public void testMutatingInputDoFnError() throws Exception {
+    Pipeline pipeline = getPipeline();
+
+    pipeline
+        .apply(Create.of(Arrays.asList(1, 2, 3), Arrays.asList(4, 5, 6))
+            .withCoder(ListCoder.of(VarIntCoder.of())))
+        .apply(ParDo.of(new DoFn<List<Integer>, Integer>() {
+          @Override public void processElement(ProcessContext c) {
+            List<Integer> inputList = c.element();
+            inputList.set(0, 37);
+            c.output(12);
+          }
+        }));
+
+    thrown.expect(IllegalMutationException.class);
+    thrown.expectMessage("Input");
+    thrown.expectMessage("must not be mutated");
+    pipeline.run();
+  }
+
+  /**
+   * Tests that a {@link DoFn} that mutates an input with a bad equals() still fails
+   * in the {@link InProcessPipelineRunner}.
+   */
+  @Test
+  public void testMutatingInputCoderDoFnError() throws Exception {
+    Pipeline pipeline = getPipeline();
+
+    pipeline
+        .apply(Create.of(new byte[]{0x1, 0x2, 0x3}, new byte[]{0x4, 0x5, 0x6}))
+        .apply(ParDo.of(new DoFn<byte[], Integer>() {
+          @Override public void processElement(ProcessContext c) {
+            byte[] inputArray = c.element();
+            inputArray[0] = 0xa;
+            c.output(13);
+          }
+        }));
+
+    thrown.expect(IllegalMutationException.class);
+    thrown.expectMessage("Input");
+    thrown.expectMessage("must not be mutated");
+    pipeline.run();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/de00bd82/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index 5f0f8ec..03ecf6f 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -24,24 +24,19 @@ import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.include
 import static org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray;
 import static org.apache.beam.sdk.util.StringUtils.byteArrayToJsonString;
 import static org.apache.beam.sdk.util.StringUtils.jsonStringToByteArray;
-
 import static org.hamcrest.Matchers.allOf;
-import static org.hamcrest.Matchers.isA;
+import static org.hamcrest.Matchers.anyOf;
+import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder;
 import static org.hamcrest.collection.IsIterableContainingInOrder.contains;
-import static org.hamcrest.core.AnyOf.anyOf;
-import static org.hamcrest.core.IsEqual.equalTo;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
 
 import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
 import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.ListCoder;
 import org.apache.beam.sdk.coders.VarIntCoder;
-import org.apache.beam.sdk.runners.DirectPipelineRunner;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.RunnableOnService;
@@ -53,7 +48,6 @@ import org.apache.beam.sdk.transforms.display.DisplayData.Builder;
 import org.apache.beam.sdk.transforms.display.DisplayDataMatchers;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.util.IllegalMutationException;
 import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
@@ -1420,132 +1414,6 @@ public class ParDoTest implements Serializable {
     thrown.expectMessage("WindowFn attempted to access input timestamp when none was available");
     pipeline.run();
   }
-
-  /**
-   * Tests that a {@link DoFn} that mutates an output with a good equals() fails in the
-   * {@link DirectPipelineRunner}.
-   */
-  @Test
-  public void testMutatingOutputThenOutputDoFnError() throws Exception {
-    Pipeline pipeline = TestPipeline.create();
-
-    pipeline
-        .apply(Create.of(42))
-        .apply(ParDo.of(new DoFn<Integer, List<Integer>>() {
-          @Override public void processElement(ProcessContext c) {
-            List<Integer> outputList = Arrays.asList(1, 2, 3, 4);
-            c.output(outputList);
-            outputList.set(0, 37);
-            c.output(outputList);
-          }
-        }));
-
-    thrown.expect(PipelineExecutionException.class);
-    thrown.expectCause(isA(IllegalMutationException.class));
-    thrown.expectMessage("output");
-    thrown.expectMessage("must not be mutated");
-    pipeline.run();
-  }
-
-  /**
-   * Tests that a {@link DoFn} that mutates an output with a good equals() fails in the
-   * {@link DirectPipelineRunner}.
-   */
-  @Test
-  public void testMutatingOutputThenTerminateDoFnError() throws Exception {
-    Pipeline pipeline = TestPipeline.create();
-
-    pipeline
-        .apply(Create.of(42))
-        .apply(ParDo.of(new DoFn<Integer, List<Integer>>() {
-          @Override public void processElement(ProcessContext c) {
-            List<Integer> outputList = Arrays.asList(1, 2, 3, 4);
-            c.output(outputList);
-            outputList.set(0, 37);
-          }
-        }));
-
-    thrown.expect(IllegalMutationException.class);
-    thrown.expectMessage("output");
-    thrown.expectMessage("must not be mutated");
-    pipeline.run();
-  }
-
-  /**
-   * Tests that a {@link DoFn} that mutates an output with a bad equals() still fails
-   * in the {@link DirectPipelineRunner}.
-   */
-  @Test
-  public void testMutatingOutputCoderDoFnError() throws Exception {
-    Pipeline pipeline = TestPipeline.create();
-
-    pipeline
-        .apply(Create.of(42))
-        .apply(ParDo.of(new DoFn<Integer, byte[]>() {
-          @Override public void processElement(ProcessContext c) {
-            byte[] outputArray = new byte[]{0x1, 0x2, 0x3};
-            c.output(outputArray);
-            outputArray[0] = 0xa;
-            c.output(outputArray);
-          }
-        }));
-
-    thrown.expect(PipelineExecutionException.class);
-    thrown.expectCause(isA(IllegalMutationException.class));
-    thrown.expectMessage("output");
-    thrown.expectMessage("must not be mutated");
-    pipeline.run();
-  }
-
-  /**
-   * Tests that a {@link DoFn} that mutates its input with a good equals() fails in the
-   * {@link DirectPipelineRunner}.
-   */
-  @Test
-  public void testMutatingInputDoFnError() throws Exception {
-    Pipeline pipeline = TestPipeline.create();
-
-    pipeline
-        .apply(Create.of(Arrays.asList(1, 2, 3), Arrays.asList(4, 5, 6))
-            .withCoder(ListCoder.of(VarIntCoder.of())))
-        .apply(ParDo.of(new DoFn<List<Integer>, Integer>() {
-          @Override public void processElement(ProcessContext c) {
-            List<Integer> inputList = c.element();
-            inputList.set(0, 37);
-            c.output(12);
-          }
-        }));
-
-    thrown.expect(IllegalMutationException.class);
-    thrown.expectMessage("input");
-    thrown.expectMessage("must not be mutated");
-    pipeline.run();
-  }
-
-  /**
-   * Tests that a {@link DoFn} that mutates an input with a bad equals() still fails
-   * in the {@link DirectPipelineRunner}.
-   */
-  @Test
-  public void testMutatingInputCoderDoFnError() throws Exception {
-    Pipeline pipeline = TestPipeline.create();
-
-    pipeline
-        .apply(Create.of(new byte[]{0x1, 0x2, 0x3}, new byte[]{0x4, 0x5, 0x6}))
-        .apply(ParDo.of(new DoFn<byte[], Integer>() {
-          @Override public void processElement(ProcessContext c) {
-            byte[] inputArray = c.element();
-            inputArray[0] = 0xa;
-            c.output(13);
-          }
-        }));
-
-    thrown.expect(IllegalMutationException.class);
-    thrown.expectMessage("input");
-    thrown.expectMessage("must not be mutated");
-    pipeline.run();
-  }
-
   @Test
   public void testDoFnDisplayData() {
     DoFn<String, String> fn = new DoFn<String, String>() {


[2/2] incubator-beam git commit: This closes #387

Posted by ke...@apache.org.
This closes #387


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9a8cb95d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9a8cb95d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9a8cb95d

Branch: refs/heads/master
Commit: 9a8cb95db5dcc8d50030ca030bf5e8745c29e3e8
Parents: 787b335 de00bd8
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Jun 7 19:34:24 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue Jun 7 19:34:24 2016 -0700

----------------------------------------------------------------------
 .../ImmutabilityCheckingBundleFactory.java      |  22 ++-
 .../ImmutabilityCheckingBundleFactoryTest.java  |  11 +-
 .../direct/InProcessPipelineRunnerTest.java     | 138 ++++++++++++++++++-
 .../apache/beam/sdk/transforms/ParDoTest.java   | 136 +-----------------
 4 files changed, 148 insertions(+), 159 deletions(-)
----------------------------------------------------------------------