You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/03/28 23:46:08 UTC
incubator-beam git commit: Move tests from the sdk folder to the
sdks/java/core folder
Repository: incubator-beam
Updated Branches:
refs/heads/master 4f91c2eae -> cd0b6ec7d
Move tests from the sdk folder to the sdks/java/core folder
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/cd0b6ec7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/cd0b6ec7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/cd0b6ec7
Branch: refs/heads/master
Commit: cd0b6ec7de2f335bbd7b8462cba049e9f0aca38a
Parents: 4f91c2e
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Mar 28 14:45:37 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Mar 28 14:45:37 2016 -0700
----------------------------------------------------------------------
.../EncodabilityEnforcementFactoryTest.java | 260 -------------------
.../ImmutabilityEnforcementFactoryTest.java | 130 ----------
.../EncodabilityEnforcementFactoryTest.java | 260 +++++++++++++++++++
.../ImmutabilityEnforcementFactoryTest.java | 130 ++++++++++
4 files changed, 390 insertions(+), 390 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cd0b6ec7/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/EncodabilityEnforcementFactoryTest.java
----------------------------------------------------------------------
diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/EncodabilityEnforcementFactoryTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/EncodabilityEnforcementFactoryTest.java
deleted file mode 100644
index dcc9775..0000000
--- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/EncodabilityEnforcementFactoryTest.java
+++ /dev/null
@@ -1,260 +0,0 @@
-/*
- * Copyright (C) 2016 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.runners.inprocess;
-
-import static org.hamcrest.Matchers.isA;
-
-import com.google.cloud.dataflow.sdk.coders.AtomicCoder;
-import com.google.cloud.dataflow.sdk.coders.CoderException;
-import com.google.cloud.dataflow.sdk.coders.VarIntCoder;
-import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-import com.google.cloud.dataflow.sdk.testing.TestPipeline;
-import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
-import com.google.cloud.dataflow.sdk.transforms.Count;
-import com.google.cloud.dataflow.sdk.transforms.Create;
-import com.google.cloud.dataflow.sdk.util.UserCodeException;
-import com.google.cloud.dataflow.sdk.util.WindowedValue;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-
-import org.joda.time.Instant;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Collections;
-
-/**
- * Tests for {@link EncodabilityEnforcementFactory}.
- */
-public class EncodabilityEnforcementFactoryTest {
- @Rule public ExpectedException thrown = ExpectedException.none();
- private EncodabilityEnforcementFactory factory = EncodabilityEnforcementFactory.create();
-
- @Test
- public void encodeFailsThrows() {
- TestPipeline p = TestPipeline.create();
- PCollection<Record> unencodable =
- p.apply(Create.of(new Record()).withCoder(new RecordNoEncodeCoder()));
- AppliedPTransform<?, ?, ?> consumer =
- unencodable.apply(Count.<Record>globally()).getProducingTransformInternal();
-
- WindowedValue<Record> record = WindowedValue.valueInGlobalWindow(new Record());
- CommittedBundle<Record> input =
- InProcessBundle.unkeyed(unencodable).add(record).commit(Instant.now());
- ModelEnforcement<Record> enforcement = factory.forBundle(input, consumer);
-
- thrown.expect(UserCodeException.class);
- thrown.expectCause(isA(CoderException.class));
- thrown.expectMessage("Encode not allowed");
- enforcement.beforeElement(record);
- }
-
- @Test
- public void decodeFailsThrows() {
- TestPipeline p = TestPipeline.create();
- PCollection<Record> unencodable =
- p.apply(Create.of(new Record()).withCoder(new RecordNoDecodeCoder()));
- AppliedPTransform<?, ?, ?> consumer =
- unencodable.apply(Count.<Record>globally()).getProducingTransformInternal();
- WindowedValue<Record> record = WindowedValue.valueInGlobalWindow(new Record());
-
- CommittedBundle<Record> input =
- InProcessBundle.unkeyed(unencodable).add(record).commit(Instant.now());
- ModelEnforcement<Record> enforcement = factory.forBundle(input, consumer);
-
- thrown.expect(UserCodeException.class);
- thrown.expectCause(isA(CoderException.class));
- thrown.expectMessage("Decode not allowed");
- enforcement.beforeElement(record);
- }
-
- @Test
- public void consistentWithEqualsStructuralValueNotEqualThrows() {
- TestPipeline p = TestPipeline.create();
- PCollection<Record> unencodable =
- p.apply(Create.of(new Record()).withCoder(new RecordStructuralValueCoder()));
- AppliedPTransform<?, ?, ?> consumer =
- unencodable.apply(Count.<Record>globally()).getProducingTransformInternal();
-
- WindowedValue<Record> record =
- WindowedValue.<Record>valueInGlobalWindow(
- new Record() {
- @Override
- public String toString() {
- return "OriginalRecord";
- }
- });
-
- CommittedBundle<Record> input =
- InProcessBundle.unkeyed(unencodable).add(record).commit(Instant.now());
- ModelEnforcement<Record> enforcement = factory.forBundle(input, consumer);
-
- thrown.expect(UserCodeException.class);
- thrown.expectCause(isA(IllegalArgumentException.class));
- thrown.expectMessage("does not maintain structural value equality");
- thrown.expectMessage(RecordStructuralValueCoder.class.getSimpleName());
- thrown.expectMessage("OriginalRecord");
- enforcement.beforeElement(record);
- }
-
- @Test
- public void notConsistentWithEqualsStructuralValueNotEqualSucceeds() {
- TestPipeline p = TestPipeline.create();
- PCollection<Record> unencodable =
- p.apply(
- Create.of(new Record())
- .withCoder(new RecordNotConsistentWithEqualsStructuralValueCoder()));
- AppliedPTransform<?, ?, ?> consumer =
- unencodable.apply(Count.<Record>globally()).getProducingTransformInternal();
-
- WindowedValue<Record> record = WindowedValue.<Record>valueInGlobalWindow(new Record());
-
- CommittedBundle<Record> input =
- InProcessBundle.unkeyed(unencodable).add(record).commit(Instant.now());
- ModelEnforcement<Record> enforcement = factory.forBundle(input, consumer);
-
- enforcement.beforeElement(record);
- enforcement.afterElement(record);
- enforcement.afterFinish(
- input,
- StepTransformResult.withoutHold(consumer).build(),
- Collections.<CommittedBundle<?>>emptyList());
- }
-
- @Test
- public void structurallyEqualResultsSucceeds() {
- TestPipeline p = TestPipeline.create();
- PCollection<Integer> unencodable = p.apply(Create.of(1).withCoder(VarIntCoder.of()));
- AppliedPTransform<?, ?, ?> consumer =
- unencodable.apply(Count.<Integer>globally()).getProducingTransformInternal();
-
- WindowedValue<Integer> value = WindowedValue.valueInGlobalWindow(1);
-
- CommittedBundle<Integer> input =
- InProcessBundle.unkeyed(unencodable).add(value).commit(Instant.now());
- ModelEnforcement<Integer> enforcement = factory.forBundle(input, consumer);
-
- enforcement.beforeElement(value);
- enforcement.afterElement(value);
- enforcement.afterFinish(
- input,
- StepTransformResult.withoutHold(consumer).build(),
- Collections.<CommittedBundle<?>>emptyList());
- }
-
- private static class Record {}
- private static class RecordNoEncodeCoder extends AtomicCoder<Record> {
-
- @Override
- public void encode(
- Record value,
- OutputStream outStream,
- com.google.cloud.dataflow.sdk.coders.Coder.Context context)
- throws CoderException, IOException {
- throw new CoderException("Encode not allowed");
- }
-
- @Override
- public Record decode(
- InputStream inStream, com.google.cloud.dataflow.sdk.coders.Coder.Context context)
- throws CoderException, IOException {
- return null;
- }
- }
-
- private static class RecordNoDecodeCoder extends AtomicCoder<Record> {
- @Override
- public void encode(
- Record value,
- OutputStream outStream,
- com.google.cloud.dataflow.sdk.coders.Coder.Context context)
- throws CoderException, IOException {}
-
- @Override
- public Record decode(
- InputStream inStream, com.google.cloud.dataflow.sdk.coders.Coder.Context context)
- throws CoderException, IOException {
- throw new CoderException("Decode not allowed");
- }
- }
-
- private static class RecordStructuralValueCoder extends AtomicCoder<Record> {
- @Override
- public void encode(
- Record value,
- OutputStream outStream,
- com.google.cloud.dataflow.sdk.coders.Coder.Context context)
- throws CoderException, IOException {}
-
- @Override
- public Record decode(
- InputStream inStream, com.google.cloud.dataflow.sdk.coders.Coder.Context context)
- throws CoderException, IOException {
- return new Record() {
- @Override
- public String toString() {
- return "DecodedRecord";
- }
- };
- }
-
- @Override
- public boolean consistentWithEquals() {
- return true;
- }
-
- @Override
- public Object structuralValue(Record value) {
- return value;
- }
- }
-
- private static class RecordNotConsistentWithEqualsStructuralValueCoder
- extends AtomicCoder<Record> {
- @Override
- public void encode(
- Record value,
- OutputStream outStream,
- com.google.cloud.dataflow.sdk.coders.Coder.Context context)
- throws CoderException, IOException {}
-
- @Override
- public Record decode(
- InputStream inStream, com.google.cloud.dataflow.sdk.coders.Coder.Context context)
- throws CoderException, IOException {
- return new Record() {
- @Override
- public String toString() {
- return "DecodedRecord";
- }
- };
- }
-
- @Override
- public boolean consistentWithEquals() {
- return false;
- }
-
- @Override
- public Object structuralValue(Record value) {
- return value;
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cd0b6ec7/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactoryTest.java
----------------------------------------------------------------------
diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactoryTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactoryTest.java
deleted file mode 100644
index 87e12ce..0000000
--- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactoryTest.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Copyright (C) 2016 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.runners.inprocess;
-
-import static org.hamcrest.Matchers.isA;
-
-import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-import com.google.cloud.dataflow.sdk.testing.TestPipeline;
-import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
-import com.google.cloud.dataflow.sdk.transforms.Count;
-import com.google.cloud.dataflow.sdk.transforms.Create;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.util.IllegalMutationException;
-import com.google.cloud.dataflow.sdk.util.UserCodeException;
-import com.google.cloud.dataflow.sdk.util.WindowedValue;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-
-import org.joda.time.Instant;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-import java.io.Serializable;
-import java.util.Collections;
-
-/**
- * Tests for {@link ImmutabilityEnforcementFactory}.
- */
-@RunWith(JUnit4.class)
-public class ImmutabilityEnforcementFactoryTest implements Serializable {
- @Rule public transient ExpectedException thrown = ExpectedException.none();
- private transient ImmutabilityEnforcementFactory factory;
- private transient PCollection<byte[]> pcollection;
- private transient AppliedPTransform<?, ?, ?> consumer;
-
- @Before
- public void setup() {
- factory = new ImmutabilityEnforcementFactory();
- TestPipeline p = TestPipeline.create();
- pcollection =
- p.apply(Create.of("foo".getBytes(), "spamhameggs".getBytes()))
- .apply(
- ParDo.of(
- new DoFn<byte[], byte[]>() {
- @Override
- public void processElement(DoFn<byte[], byte[]>.ProcessContext c)
- throws Exception {
- c.element()[0] = 'b';
- }
- }));
- consumer = pcollection.apply(Count.<byte[]>globally()).getProducingTransformInternal();
- }
-
- @Test
- public void unchangedSucceeds() {
- WindowedValue<byte[]> element = WindowedValue.valueInGlobalWindow("bar".getBytes());
- CommittedBundle<byte[]> elements =
- InProcessBundle.unkeyed(pcollection).add(element).commit(Instant.now());
-
- ModelEnforcement<byte[]> enforcement = factory.forBundle(elements, consumer);
- enforcement.beforeElement(element);
- enforcement.afterElement(element);
- enforcement.afterFinish(
- elements,
- StepTransformResult.withoutHold(consumer).build(),
- Collections.<CommittedBundle<?>>emptyList());
- }
-
- @Test
- public void mutatedDuringProcessElementThrows() {
- WindowedValue<byte[]> element = WindowedValue.valueInGlobalWindow("bar".getBytes());
- CommittedBundle<byte[]> elements =
- InProcessBundle.unkeyed(pcollection).add(element).commit(Instant.now());
-
- ModelEnforcement<byte[]> enforcement = factory.forBundle(elements, consumer);
- enforcement.beforeElement(element);
- element.getValue()[0] = 'f';
- thrown.equals(UserCodeException.class);
- thrown.expectCause(isA(IllegalMutationException.class));
- thrown.expectMessage(consumer.getFullName());
- thrown.expectMessage("illegaly mutated");
- thrown.expectMessage("Input values must not be mutated");
- enforcement.afterElement(element);
- enforcement.afterFinish(
- elements,
- StepTransformResult.withoutHold(consumer).build(),
- Collections.<CommittedBundle<?>>emptyList());
- }
-
- @Test
- public void mutatedAfterProcessElementFails() {
-
- WindowedValue<byte[]> element = WindowedValue.valueInGlobalWindow("bar".getBytes());
- CommittedBundle<byte[]> elements =
- InProcessBundle.unkeyed(pcollection).add(element).commit(Instant.now());
-
- ModelEnforcement<byte[]> enforcement = factory.forBundle(elements, consumer);
- enforcement.beforeElement(element);
- enforcement.afterElement(element);
-
- element.getValue()[0] = 'f';
- thrown.equals(UserCodeException.class);
- thrown.expectCause(isA(IllegalMutationException.class));
- thrown.expectMessage(consumer.getFullName());
- thrown.expectMessage("illegaly mutated");
- thrown.expectMessage("Input values must not be mutated");
- enforcement.afterFinish(
- elements,
- StepTransformResult.withoutHold(consumer).build(),
- Collections.<CommittedBundle<?>>emptyList());
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cd0b6ec7/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/EncodabilityEnforcementFactoryTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/EncodabilityEnforcementFactoryTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/EncodabilityEnforcementFactoryTest.java
new file mode 100644
index 0000000..dcc9775
--- /dev/null
+++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/EncodabilityEnforcementFactoryTest.java
@@ -0,0 +1,260 @@
+/*
+ * Copyright (C) 2016 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package com.google.cloud.dataflow.sdk.runners.inprocess;
+
+import static org.hamcrest.Matchers.isA;
+
+import com.google.cloud.dataflow.sdk.coders.AtomicCoder;
+import com.google.cloud.dataflow.sdk.coders.CoderException;
+import com.google.cloud.dataflow.sdk.coders.VarIntCoder;
+import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
+import com.google.cloud.dataflow.sdk.testing.TestPipeline;
+import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
+import com.google.cloud.dataflow.sdk.transforms.Count;
+import com.google.cloud.dataflow.sdk.transforms.Create;
+import com.google.cloud.dataflow.sdk.util.UserCodeException;
+import com.google.cloud.dataflow.sdk.util.WindowedValue;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+
+import org.joda.time.Instant;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Collections;
+
+/**
+ * Tests for {@link EncodabilityEnforcementFactory}.
+ */
+public class EncodabilityEnforcementFactoryTest {
+ @Rule public ExpectedException thrown = ExpectedException.none();
+ private EncodabilityEnforcementFactory factory = EncodabilityEnforcementFactory.create();
+
+ @Test
+ public void encodeFailsThrows() {
+ TestPipeline p = TestPipeline.create();
+ PCollection<Record> unencodable =
+ p.apply(Create.of(new Record()).withCoder(new RecordNoEncodeCoder()));
+ AppliedPTransform<?, ?, ?> consumer =
+ unencodable.apply(Count.<Record>globally()).getProducingTransformInternal();
+
+ WindowedValue<Record> record = WindowedValue.valueInGlobalWindow(new Record());
+ CommittedBundle<Record> input =
+ InProcessBundle.unkeyed(unencodable).add(record).commit(Instant.now());
+ ModelEnforcement<Record> enforcement = factory.forBundle(input, consumer);
+
+ thrown.expect(UserCodeException.class);
+ thrown.expectCause(isA(CoderException.class));
+ thrown.expectMessage("Encode not allowed");
+ enforcement.beforeElement(record);
+ }
+
+ @Test
+ public void decodeFailsThrows() {
+ TestPipeline p = TestPipeline.create();
+ PCollection<Record> unencodable =
+ p.apply(Create.of(new Record()).withCoder(new RecordNoDecodeCoder()));
+ AppliedPTransform<?, ?, ?> consumer =
+ unencodable.apply(Count.<Record>globally()).getProducingTransformInternal();
+ WindowedValue<Record> record = WindowedValue.valueInGlobalWindow(new Record());
+
+ CommittedBundle<Record> input =
+ InProcessBundle.unkeyed(unencodable).add(record).commit(Instant.now());
+ ModelEnforcement<Record> enforcement = factory.forBundle(input, consumer);
+
+ thrown.expect(UserCodeException.class);
+ thrown.expectCause(isA(CoderException.class));
+ thrown.expectMessage("Decode not allowed");
+ enforcement.beforeElement(record);
+ }
+
+ @Test
+ public void consistentWithEqualsStructuralValueNotEqualThrows() {
+ TestPipeline p = TestPipeline.create();
+ PCollection<Record> unencodable =
+ p.apply(Create.of(new Record()).withCoder(new RecordStructuralValueCoder()));
+ AppliedPTransform<?, ?, ?> consumer =
+ unencodable.apply(Count.<Record>globally()).getProducingTransformInternal();
+
+ WindowedValue<Record> record =
+ WindowedValue.<Record>valueInGlobalWindow(
+ new Record() {
+ @Override
+ public String toString() {
+ return "OriginalRecord";
+ }
+ });
+
+ CommittedBundle<Record> input =
+ InProcessBundle.unkeyed(unencodable).add(record).commit(Instant.now());
+ ModelEnforcement<Record> enforcement = factory.forBundle(input, consumer);
+
+ thrown.expect(UserCodeException.class);
+ thrown.expectCause(isA(IllegalArgumentException.class));
+ thrown.expectMessage("does not maintain structural value equality");
+ thrown.expectMessage(RecordStructuralValueCoder.class.getSimpleName());
+ thrown.expectMessage("OriginalRecord");
+ enforcement.beforeElement(record);
+ }
+
+ @Test
+ public void notConsistentWithEqualsStructuralValueNotEqualSucceeds() {
+ TestPipeline p = TestPipeline.create();
+ PCollection<Record> unencodable =
+ p.apply(
+ Create.of(new Record())
+ .withCoder(new RecordNotConsistentWithEqualsStructuralValueCoder()));
+ AppliedPTransform<?, ?, ?> consumer =
+ unencodable.apply(Count.<Record>globally()).getProducingTransformInternal();
+
+ WindowedValue<Record> record = WindowedValue.<Record>valueInGlobalWindow(new Record());
+
+ CommittedBundle<Record> input =
+ InProcessBundle.unkeyed(unencodable).add(record).commit(Instant.now());
+ ModelEnforcement<Record> enforcement = factory.forBundle(input, consumer);
+
+ enforcement.beforeElement(record);
+ enforcement.afterElement(record);
+ enforcement.afterFinish(
+ input,
+ StepTransformResult.withoutHold(consumer).build(),
+ Collections.<CommittedBundle<?>>emptyList());
+ }
+
+ @Test
+ public void structurallyEqualResultsSucceeds() {
+ TestPipeline p = TestPipeline.create();
+ PCollection<Integer> unencodable = p.apply(Create.of(1).withCoder(VarIntCoder.of()));
+ AppliedPTransform<?, ?, ?> consumer =
+ unencodable.apply(Count.<Integer>globally()).getProducingTransformInternal();
+
+ WindowedValue<Integer> value = WindowedValue.valueInGlobalWindow(1);
+
+ CommittedBundle<Integer> input =
+ InProcessBundle.unkeyed(unencodable).add(value).commit(Instant.now());
+ ModelEnforcement<Integer> enforcement = factory.forBundle(input, consumer);
+
+ enforcement.beforeElement(value);
+ enforcement.afterElement(value);
+ enforcement.afterFinish(
+ input,
+ StepTransformResult.withoutHold(consumer).build(),
+ Collections.<CommittedBundle<?>>emptyList());
+ }
+
+ private static class Record {}
+ private static class RecordNoEncodeCoder extends AtomicCoder<Record> {
+
+ @Override
+ public void encode(
+ Record value,
+ OutputStream outStream,
+ com.google.cloud.dataflow.sdk.coders.Coder.Context context)
+ throws CoderException, IOException {
+ throw new CoderException("Encode not allowed");
+ }
+
+ @Override
+ public Record decode(
+ InputStream inStream, com.google.cloud.dataflow.sdk.coders.Coder.Context context)
+ throws CoderException, IOException {
+ return null;
+ }
+ }
+
+ private static class RecordNoDecodeCoder extends AtomicCoder<Record> {
+ @Override
+ public void encode(
+ Record value,
+ OutputStream outStream,
+ com.google.cloud.dataflow.sdk.coders.Coder.Context context)
+ throws CoderException, IOException {}
+
+ @Override
+ public Record decode(
+ InputStream inStream, com.google.cloud.dataflow.sdk.coders.Coder.Context context)
+ throws CoderException, IOException {
+ throw new CoderException("Decode not allowed");
+ }
+ }
+
+ private static class RecordStructuralValueCoder extends AtomicCoder<Record> {
+ @Override
+ public void encode(
+ Record value,
+ OutputStream outStream,
+ com.google.cloud.dataflow.sdk.coders.Coder.Context context)
+ throws CoderException, IOException {}
+
+ @Override
+ public Record decode(
+ InputStream inStream, com.google.cloud.dataflow.sdk.coders.Coder.Context context)
+ throws CoderException, IOException {
+ return new Record() {
+ @Override
+ public String toString() {
+ return "DecodedRecord";
+ }
+ };
+ }
+
+ @Override
+ public boolean consistentWithEquals() {
+ return true;
+ }
+
+ @Override
+ public Object structuralValue(Record value) {
+ return value;
+ }
+ }
+
+ private static class RecordNotConsistentWithEqualsStructuralValueCoder
+ extends AtomicCoder<Record> {
+ @Override
+ public void encode(
+ Record value,
+ OutputStream outStream,
+ com.google.cloud.dataflow.sdk.coders.Coder.Context context)
+ throws CoderException, IOException {}
+
+ @Override
+ public Record decode(
+ InputStream inStream, com.google.cloud.dataflow.sdk.coders.Coder.Context context)
+ throws CoderException, IOException {
+ return new Record() {
+ @Override
+ public String toString() {
+ return "DecodedRecord";
+ }
+ };
+ }
+
+ @Override
+ public boolean consistentWithEquals() {
+ return false;
+ }
+
+ @Override
+ public Object structuralValue(Record value) {
+ return value;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cd0b6ec7/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactoryTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactoryTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactoryTest.java
new file mode 100644
index 0000000..87e12ce
--- /dev/null
+++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactoryTest.java
@@ -0,0 +1,130 @@
+/*
+ * Copyright (C) 2016 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package com.google.cloud.dataflow.sdk.runners.inprocess;
+
+import static org.hamcrest.Matchers.isA;
+
+import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
+import com.google.cloud.dataflow.sdk.testing.TestPipeline;
+import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
+import com.google.cloud.dataflow.sdk.transforms.Count;
+import com.google.cloud.dataflow.sdk.transforms.Create;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.util.IllegalMutationException;
+import com.google.cloud.dataflow.sdk.util.UserCodeException;
+import com.google.cloud.dataflow.sdk.util.WindowedValue;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.io.Serializable;
+import java.util.Collections;
+
+/**
+ * Tests for {@link ImmutabilityEnforcementFactory}.
+ */
+@RunWith(JUnit4.class)
+public class ImmutabilityEnforcementFactoryTest implements Serializable {
+ @Rule public transient ExpectedException thrown = ExpectedException.none();
+ private transient ImmutabilityEnforcementFactory factory;
+ private transient PCollection<byte[]> pcollection;
+ private transient AppliedPTransform<?, ?, ?> consumer;
+
+ @Before
+ public void setup() {
+ factory = new ImmutabilityEnforcementFactory();
+ TestPipeline p = TestPipeline.create();
+ pcollection =
+ p.apply(Create.of("foo".getBytes(), "spamhameggs".getBytes()))
+ .apply(
+ ParDo.of(
+ new DoFn<byte[], byte[]>() {
+ @Override
+ public void processElement(DoFn<byte[], byte[]>.ProcessContext c)
+ throws Exception {
+ c.element()[0] = 'b';
+ }
+ }));
+ consumer = pcollection.apply(Count.<byte[]>globally()).getProducingTransformInternal();
+ }
+
+ @Test
+ public void unchangedSucceeds() {
+ WindowedValue<byte[]> element = WindowedValue.valueInGlobalWindow("bar".getBytes());
+ CommittedBundle<byte[]> elements =
+ InProcessBundle.unkeyed(pcollection).add(element).commit(Instant.now());
+
+ ModelEnforcement<byte[]> enforcement = factory.forBundle(elements, consumer);
+ enforcement.beforeElement(element);
+ enforcement.afterElement(element);
+ enforcement.afterFinish(
+ elements,
+ StepTransformResult.withoutHold(consumer).build(),
+ Collections.<CommittedBundle<?>>emptyList());
+ }
+
+ @Test
+ public void mutatedDuringProcessElementThrows() {
+ WindowedValue<byte[]> element = WindowedValue.valueInGlobalWindow("bar".getBytes());
+ CommittedBundle<byte[]> elements =
+ InProcessBundle.unkeyed(pcollection).add(element).commit(Instant.now());
+
+ ModelEnforcement<byte[]> enforcement = factory.forBundle(elements, consumer);
+ enforcement.beforeElement(element);
+ element.getValue()[0] = 'f';
+ thrown.equals(UserCodeException.class);
+ thrown.expectCause(isA(IllegalMutationException.class));
+ thrown.expectMessage(consumer.getFullName());
+ thrown.expectMessage("illegaly mutated");
+ thrown.expectMessage("Input values must not be mutated");
+ enforcement.afterElement(element);
+ enforcement.afterFinish(
+ elements,
+ StepTransformResult.withoutHold(consumer).build(),
+ Collections.<CommittedBundle<?>>emptyList());
+ }
+
+ @Test
+ public void mutatedAfterProcessElementFails() {
+
+ WindowedValue<byte[]> element = WindowedValue.valueInGlobalWindow("bar".getBytes());
+ CommittedBundle<byte[]> elements =
+ InProcessBundle.unkeyed(pcollection).add(element).commit(Instant.now());
+
+ ModelEnforcement<byte[]> enforcement = factory.forBundle(elements, consumer);
+ enforcement.beforeElement(element);
+ enforcement.afterElement(element);
+
+ element.getValue()[0] = 'f';
+ thrown.equals(UserCodeException.class);
+ thrown.expectCause(isA(IllegalMutationException.class));
+ thrown.expectMessage(consumer.getFullName());
+ thrown.expectMessage("illegaly mutated");
+ thrown.expectMessage("Input values must not be mutated");
+ enforcement.afterFinish(
+ elements,
+ StepTransformResult.withoutHold(consumer).build(),
+ Collections.<CommittedBundle<?>>emptyList());
+ }
+}
+