You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/06/08 02:35:13 UTC
[1/2] incubator-beam git commit: Relocate Immutability Enforcement
Tests
Repository: incubator-beam
Updated Branches:
refs/heads/master 787b3351a -> 9a8cb95db
Relocate Immutability Enforcement Tests
These tests are of runner behavior rather than the model, and should be
tested as a runner test.
Stop wrapping IllegalMutationExceptions to surface failures due to
model invariant violations directly, rather than going through a
PipelineExecutionException.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/de00bd82
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/de00bd82
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/de00bd82
Branch: refs/heads/master
Commit: de00bd8269d03ed6eee8fa51c6f7c803e384fd50
Parents: 2173000
Author: Thomas Groh <tg...@google.com>
Authored: Tue May 24 18:05:16 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Tue Jun 7 15:14:20 2016 -0700
----------------------------------------------------------------------
.../ImmutabilityCheckingBundleFactory.java | 22 ++-
.../ImmutabilityCheckingBundleFactoryTest.java | 11 +-
.../direct/InProcessPipelineRunnerTest.java | 138 ++++++++++++++++++-
.../apache/beam/sdk/transforms/ParDoTest.java | 136 +-----------------
4 files changed, 148 insertions(+), 159 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/de00bd82/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java
index 92a57dd..2a965ed 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java
@@ -27,7 +27,6 @@ import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.util.IllegalMutationException;
import org.apache.beam.sdk.util.MutationDetector;
import org.apache.beam.sdk.util.MutationDetectors;
-import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollection;
@@ -111,17 +110,16 @@ class ImmutabilityCheckingBundleFactory implements BundleFactory {
try {
detector.verifyUnmodified();
} catch (IllegalMutationException exn) {
- throw UserCodeException.wrap(
- new IllegalMutationException(
- String.format(
- "PTransform %s mutated value %s after it was output (new value was %s)."
- + " Values must not be mutated in any way after being output.",
- underlying.getPCollection().getProducingTransformInternal().getFullName(),
- exn.getSavedValue(),
- exn.getNewValue()),
- exn.getSavedValue(),
- exn.getNewValue(),
- exn));
+ throw new IllegalMutationException(
+ String.format(
+ "PTransform %s mutated value %s after it was output (new value was %s)."
+ + " Values must not be mutated in any way after being output.",
+ underlying.getPCollection().getProducingTransformInternal().getFullName(),
+ exn.getSavedValue(),
+ exn.getNewValue()),
+ exn.getSavedValue(),
+ exn.getNewValue(),
+ exn);
}
}
return underlying.commit(synchronizedProcessingTime);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/de00bd82/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java
index 2e7847d..20670ca 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java
@@ -18,7 +18,6 @@
package org.apache.beam.runners.direct;
import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.Matchers.isA;
import static org.junit.Assert.assertThat;
import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
@@ -32,7 +31,6 @@ import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.IllegalMutationException;
-import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollection;
@@ -168,8 +166,7 @@ public class ImmutabilityCheckingBundleFactoryTest {
root.add(WindowedValue.valueInGlobalWindow(array));
array[1] = 2;
- thrown.expect(UserCodeException.class);
- thrown.expectCause(isA(IllegalMutationException.class));
+ thrown.expect(IllegalMutationException.class);
thrown.expectMessage("Values must not be mutated in any way after being output");
CommittedBundle<byte[]> committed = root.commit(Instant.now());
}
@@ -191,8 +188,7 @@ public class ImmutabilityCheckingBundleFactoryTest {
keyed.add(windowedArray);
array[0] = Byte.MAX_VALUE;
- thrown.expect(UserCodeException.class);
- thrown.expectCause(isA(IllegalMutationException.class));
+ thrown.expect(IllegalMutationException.class);
thrown.expectMessage("Values must not be mutated in any way after being output");
CommittedBundle<byte[]> committed = keyed.commit(Instant.now());
}
@@ -212,8 +208,7 @@ public class ImmutabilityCheckingBundleFactoryTest {
intermediate.add(windowedArray);
array[2] = -3;
- thrown.expect(UserCodeException.class);
- thrown.expectCause(isA(IllegalMutationException.class));
+ thrown.expect(IllegalMutationException.class);
thrown.expectMessage("Values must not be mutated in any way after being output");
CommittedBundle<byte[]> committed = intermediate.commit(Instant.now());
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/de00bd82/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessPipelineRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessPipelineRunnerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessPipelineRunnerTest.java
index 9314f5e..5c26ac3 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessPipelineRunnerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessPipelineRunnerTest.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.fail;
import org.apache.beam.runners.direct.InProcessPipelineRunner.InProcessPipelineResult;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -37,6 +38,7 @@ import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.util.IllegalMutationException;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
@@ -44,6 +46,7 @@ import org.apache.beam.sdk.values.TypeDescriptor;
import com.google.common.collect.ImmutableMap;
+
import com.fasterxml.jackson.annotation.JsonValue;
import org.junit.Rule;
@@ -54,6 +57,8 @@ import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
import java.util.Map;
/**
@@ -63,6 +68,14 @@ import java.util.Map;
public class InProcessPipelineRunnerTest implements Serializable {
@Rule public transient ExpectedException thrown = ExpectedException.none();
+ private Pipeline getPipeline() {
+ PipelineOptions opts = PipelineOptionsFactory.create();
+ opts.setRunner(InProcessPipelineRunner.class);
+
+ Pipeline p = Pipeline.create(opts);
+ return p;
+ }
+
@Test
public void wordCountShouldSucceed() throws Throwable {
Pipeline p = getPipeline();
@@ -192,11 +205,126 @@ public class InProcessPipelineRunnerTest implements Serializable {
}
- private Pipeline getPipeline() {
- PipelineOptions opts = PipelineOptionsFactory.create();
- opts.setRunner(InProcessPipelineRunner.class);
+ /**
+ * Tests that a {@link DoFn} that mutates an output with a good equals() fails in the
+ * {@link InProcessPipelineRunner}.
+ */
+ @Test
+ public void testMutatingOutputThenOutputDoFnError() throws Exception {
+ Pipeline pipeline = getPipeline();
- Pipeline p = Pipeline.create(opts);
- return p;
+ pipeline
+ .apply(Create.of(42))
+ .apply(ParDo.of(new DoFn<Integer, List<Integer>>() {
+ @Override public void processElement(ProcessContext c) {
+ List<Integer> outputList = Arrays.asList(1, 2, 3, 4);
+ c.output(outputList);
+ outputList.set(0, 37);
+ c.output(outputList);
+ }
+ }));
+
+ thrown.expect(IllegalMutationException.class);
+ thrown.expectMessage("output");
+ thrown.expectMessage("must not be mutated");
+ pipeline.run();
+ }
+
+ /**
+ * Tests that a {@link DoFn} that mutates an output with a good equals() fails in the
+ * {@link InProcessPipelineRunner}.
+ */
+ @Test
+ public void testMutatingOutputThenTerminateDoFnError() throws Exception {
+ Pipeline pipeline = getPipeline();
+
+ pipeline
+ .apply(Create.of(42))
+ .apply(ParDo.of(new DoFn<Integer, List<Integer>>() {
+ @Override public void processElement(ProcessContext c) {
+ List<Integer> outputList = Arrays.asList(1, 2, 3, 4);
+ c.output(outputList);
+ outputList.set(0, 37);
+ }
+ }));
+
+ thrown.expect(IllegalMutationException.class);
+ thrown.expectMessage("output");
+ thrown.expectMessage("must not be mutated");
+ pipeline.run();
+ }
+
+ /**
+ * Tests that a {@link DoFn} that mutates an output with a bad equals() still fails
+ * in the {@link InProcessPipelineRunner}.
+ */
+ @Test
+ public void testMutatingOutputCoderDoFnError() throws Exception {
+ Pipeline pipeline = getPipeline();
+
+ pipeline
+ .apply(Create.of(42))
+ .apply(ParDo.of(new DoFn<Integer, byte[]>() {
+ @Override public void processElement(ProcessContext c) {
+ byte[] outputArray = new byte[]{0x1, 0x2, 0x3};
+ c.output(outputArray);
+ outputArray[0] = 0xa;
+ c.output(outputArray);
+ }
+ }));
+
+ thrown.expect(IllegalMutationException.class);
+ thrown.expectMessage("output");
+ thrown.expectMessage("must not be mutated");
+ pipeline.run();
+ }
+
+ /**
+ * Tests that a {@link DoFn} that mutates its input with a good equals() fails in the
+ * {@link InProcessPipelineRunner}.
+ */
+ @Test
+ public void testMutatingInputDoFnError() throws Exception {
+ Pipeline pipeline = getPipeline();
+
+ pipeline
+ .apply(Create.of(Arrays.asList(1, 2, 3), Arrays.asList(4, 5, 6))
+ .withCoder(ListCoder.of(VarIntCoder.of())))
+ .apply(ParDo.of(new DoFn<List<Integer>, Integer>() {
+ @Override public void processElement(ProcessContext c) {
+ List<Integer> inputList = c.element();
+ inputList.set(0, 37);
+ c.output(12);
+ }
+ }));
+
+ thrown.expect(IllegalMutationException.class);
+ thrown.expectMessage("Input");
+ thrown.expectMessage("must not be mutated");
+ pipeline.run();
+ }
+
+ /**
+ * Tests that a {@link DoFn} that mutates an input with a bad equals() still fails
+ * in the {@link InProcessPipelineRunner}.
+ */
+ @Test
+ public void testMutatingInputCoderDoFnError() throws Exception {
+ Pipeline pipeline = getPipeline();
+
+ pipeline
+ .apply(Create.of(new byte[]{0x1, 0x2, 0x3}, new byte[]{0x4, 0x5, 0x6}))
+ .apply(ParDo.of(new DoFn<byte[], Integer>() {
+ @Override public void processElement(ProcessContext c) {
+ byte[] inputArray = c.element();
+ inputArray[0] = 0xa;
+ c.output(13);
+ }
+ }));
+
+ thrown.expect(IllegalMutationException.class);
+ thrown.expectMessage("Input");
+ thrown.expectMessage("must not be mutated");
+ pipeline.run();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/de00bd82/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index 5f0f8ec..03ecf6f 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -24,24 +24,19 @@ import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.include
import static org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray;
import static org.apache.beam.sdk.util.StringUtils.byteArrayToJsonString;
import static org.apache.beam.sdk.util.StringUtils.jsonStringToByteArray;
-
import static org.hamcrest.Matchers.allOf;
-import static org.hamcrest.Matchers.isA;
+import static org.hamcrest.Matchers.anyOf;
+import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder;
import static org.hamcrest.collection.IsIterableContainingInOrder.contains;
-import static org.hamcrest.core.AnyOf.anyOf;
-import static org.hamcrest.core.IsEqual.equalTo;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.coders.VarIntCoder;
-import org.apache.beam.sdk.runners.DirectPipelineRunner;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.RunnableOnService;
@@ -53,7 +48,6 @@ import org.apache.beam.sdk.transforms.display.DisplayData.Builder;
import org.apache.beam.sdk.transforms.display.DisplayDataMatchers;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.util.IllegalMutationException;
import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
@@ -1420,132 +1414,6 @@ public class ParDoTest implements Serializable {
thrown.expectMessage("WindowFn attempted to access input timestamp when none was available");
pipeline.run();
}
-
- /**
- * Tests that a {@link DoFn} that mutates an output with a good equals() fails in the
- * {@link DirectPipelineRunner}.
- */
- @Test
- public void testMutatingOutputThenOutputDoFnError() throws Exception {
- Pipeline pipeline = TestPipeline.create();
-
- pipeline
- .apply(Create.of(42))
- .apply(ParDo.of(new DoFn<Integer, List<Integer>>() {
- @Override public void processElement(ProcessContext c) {
- List<Integer> outputList = Arrays.asList(1, 2, 3, 4);
- c.output(outputList);
- outputList.set(0, 37);
- c.output(outputList);
- }
- }));
-
- thrown.expect(PipelineExecutionException.class);
- thrown.expectCause(isA(IllegalMutationException.class));
- thrown.expectMessage("output");
- thrown.expectMessage("must not be mutated");
- pipeline.run();
- }
-
- /**
- * Tests that a {@link DoFn} that mutates an output with a good equals() fails in the
- * {@link DirectPipelineRunner}.
- */
- @Test
- public void testMutatingOutputThenTerminateDoFnError() throws Exception {
- Pipeline pipeline = TestPipeline.create();
-
- pipeline
- .apply(Create.of(42))
- .apply(ParDo.of(new DoFn<Integer, List<Integer>>() {
- @Override public void processElement(ProcessContext c) {
- List<Integer> outputList = Arrays.asList(1, 2, 3, 4);
- c.output(outputList);
- outputList.set(0, 37);
- }
- }));
-
- thrown.expect(IllegalMutationException.class);
- thrown.expectMessage("output");
- thrown.expectMessage("must not be mutated");
- pipeline.run();
- }
-
- /**
- * Tests that a {@link DoFn} that mutates an output with a bad equals() still fails
- * in the {@link DirectPipelineRunner}.
- */
- @Test
- public void testMutatingOutputCoderDoFnError() throws Exception {
- Pipeline pipeline = TestPipeline.create();
-
- pipeline
- .apply(Create.of(42))
- .apply(ParDo.of(new DoFn<Integer, byte[]>() {
- @Override public void processElement(ProcessContext c) {
- byte[] outputArray = new byte[]{0x1, 0x2, 0x3};
- c.output(outputArray);
- outputArray[0] = 0xa;
- c.output(outputArray);
- }
- }));
-
- thrown.expect(PipelineExecutionException.class);
- thrown.expectCause(isA(IllegalMutationException.class));
- thrown.expectMessage("output");
- thrown.expectMessage("must not be mutated");
- pipeline.run();
- }
-
- /**
- * Tests that a {@link DoFn} that mutates its input with a good equals() fails in the
- * {@link DirectPipelineRunner}.
- */
- @Test
- public void testMutatingInputDoFnError() throws Exception {
- Pipeline pipeline = TestPipeline.create();
-
- pipeline
- .apply(Create.of(Arrays.asList(1, 2, 3), Arrays.asList(4, 5, 6))
- .withCoder(ListCoder.of(VarIntCoder.of())))
- .apply(ParDo.of(new DoFn<List<Integer>, Integer>() {
- @Override public void processElement(ProcessContext c) {
- List<Integer> inputList = c.element();
- inputList.set(0, 37);
- c.output(12);
- }
- }));
-
- thrown.expect(IllegalMutationException.class);
- thrown.expectMessage("input");
- thrown.expectMessage("must not be mutated");
- pipeline.run();
- }
-
- /**
- * Tests that a {@link DoFn} that mutates an input with a bad equals() still fails
- * in the {@link DirectPipelineRunner}.
- */
- @Test
- public void testMutatingInputCoderDoFnError() throws Exception {
- Pipeline pipeline = TestPipeline.create();
-
- pipeline
- .apply(Create.of(new byte[]{0x1, 0x2, 0x3}, new byte[]{0x4, 0x5, 0x6}))
- .apply(ParDo.of(new DoFn<byte[], Integer>() {
- @Override public void processElement(ProcessContext c) {
- byte[] inputArray = c.element();
- inputArray[0] = 0xa;
- c.output(13);
- }
- }));
-
- thrown.expect(IllegalMutationException.class);
- thrown.expectMessage("input");
- thrown.expectMessage("must not be mutated");
- pipeline.run();
- }
-
@Test
public void testDoFnDisplayData() {
DoFn<String, String> fn = new DoFn<String, String>() {
[2/2] incubator-beam git commit: This closes #387
Posted by ke...@apache.org.
This closes #387
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9a8cb95d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9a8cb95d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9a8cb95d
Branch: refs/heads/master
Commit: 9a8cb95db5dcc8d50030ca030bf5e8745c29e3e8
Parents: 787b335 de00bd8
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Jun 7 19:34:24 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue Jun 7 19:34:24 2016 -0700
----------------------------------------------------------------------
.../ImmutabilityCheckingBundleFactory.java | 22 ++-
.../ImmutabilityCheckingBundleFactoryTest.java | 11 +-
.../direct/InProcessPipelineRunnerTest.java | 138 ++++++++++++++++++-
.../apache/beam/sdk/transforms/ParDoTest.java | 136 +-----------------
4 files changed, 148 insertions(+), 159 deletions(-)
----------------------------------------------------------------------