You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/03/28 23:46:08 UTC

incubator-beam git commit: Move tests from the sdk folder to the sdks/java/core folder

Repository: incubator-beam
Updated Branches:
  refs/heads/master 4f91c2eae -> cd0b6ec7d


Move tests from the sdk folder to the sdks/java/core folder


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/cd0b6ec7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/cd0b6ec7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/cd0b6ec7

Branch: refs/heads/master
Commit: cd0b6ec7de2f335bbd7b8462cba049e9f0aca38a
Parents: 4f91c2e
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Mar 28 14:45:37 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Mar 28 14:45:37 2016 -0700

----------------------------------------------------------------------
 .../EncodabilityEnforcementFactoryTest.java     | 260 -------------------
 .../ImmutabilityEnforcementFactoryTest.java     | 130 ----------
 .../EncodabilityEnforcementFactoryTest.java     | 260 +++++++++++++++++++
 .../ImmutabilityEnforcementFactoryTest.java     | 130 ++++++++++
 4 files changed, 390 insertions(+), 390 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cd0b6ec7/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/EncodabilityEnforcementFactoryTest.java
----------------------------------------------------------------------
diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/EncodabilityEnforcementFactoryTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/EncodabilityEnforcementFactoryTest.java
deleted file mode 100644
index dcc9775..0000000
--- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/EncodabilityEnforcementFactoryTest.java
+++ /dev/null
@@ -1,260 +0,0 @@
-/*
- * Copyright (C) 2016 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.runners.inprocess;
-
-import static org.hamcrest.Matchers.isA;
-
-import com.google.cloud.dataflow.sdk.coders.AtomicCoder;
-import com.google.cloud.dataflow.sdk.coders.CoderException;
-import com.google.cloud.dataflow.sdk.coders.VarIntCoder;
-import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-import com.google.cloud.dataflow.sdk.testing.TestPipeline;
-import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
-import com.google.cloud.dataflow.sdk.transforms.Count;
-import com.google.cloud.dataflow.sdk.transforms.Create;
-import com.google.cloud.dataflow.sdk.util.UserCodeException;
-import com.google.cloud.dataflow.sdk.util.WindowedValue;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-
-import org.joda.time.Instant;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Collections;
-
-/**
- * Tests for {@link EncodabilityEnforcementFactory}.
- */
-public class EncodabilityEnforcementFactoryTest {
-  @Rule public ExpectedException thrown = ExpectedException.none();
-  private EncodabilityEnforcementFactory factory = EncodabilityEnforcementFactory.create();
-
-  @Test
-  public void encodeFailsThrows() {
-    TestPipeline p = TestPipeline.create();
-    PCollection<Record> unencodable =
-        p.apply(Create.of(new Record()).withCoder(new RecordNoEncodeCoder()));
-    AppliedPTransform<?, ?, ?> consumer =
-        unencodable.apply(Count.<Record>globally()).getProducingTransformInternal();
-
-    WindowedValue<Record> record = WindowedValue.valueInGlobalWindow(new Record());
-    CommittedBundle<Record> input =
-        InProcessBundle.unkeyed(unencodable).add(record).commit(Instant.now());
-    ModelEnforcement<Record> enforcement = factory.forBundle(input, consumer);
-
-    thrown.expect(UserCodeException.class);
-    thrown.expectCause(isA(CoderException.class));
-    thrown.expectMessage("Encode not allowed");
-    enforcement.beforeElement(record);
-  }
-
-  @Test
-  public void decodeFailsThrows() {
-    TestPipeline p = TestPipeline.create();
-    PCollection<Record> unencodable =
-        p.apply(Create.of(new Record()).withCoder(new RecordNoDecodeCoder()));
-    AppliedPTransform<?, ?, ?> consumer =
-        unencodable.apply(Count.<Record>globally()).getProducingTransformInternal();
-    WindowedValue<Record> record = WindowedValue.valueInGlobalWindow(new Record());
-
-    CommittedBundle<Record> input =
-        InProcessBundle.unkeyed(unencodable).add(record).commit(Instant.now());
-    ModelEnforcement<Record> enforcement = factory.forBundle(input, consumer);
-
-    thrown.expect(UserCodeException.class);
-    thrown.expectCause(isA(CoderException.class));
-    thrown.expectMessage("Decode not allowed");
-    enforcement.beforeElement(record);
-  }
-
-  @Test
-  public void consistentWithEqualsStructuralValueNotEqualThrows() {
-    TestPipeline p = TestPipeline.create();
-    PCollection<Record> unencodable =
-        p.apply(Create.of(new Record()).withCoder(new RecordStructuralValueCoder()));
-    AppliedPTransform<?, ?, ?> consumer =
-        unencodable.apply(Count.<Record>globally()).getProducingTransformInternal();
-
-    WindowedValue<Record> record =
-        WindowedValue.<Record>valueInGlobalWindow(
-            new Record() {
-              @Override
-              public String toString() {
-                return "OriginalRecord";
-              }
-            });
-
-    CommittedBundle<Record> input =
-        InProcessBundle.unkeyed(unencodable).add(record).commit(Instant.now());
-    ModelEnforcement<Record> enforcement = factory.forBundle(input, consumer);
-
-    thrown.expect(UserCodeException.class);
-    thrown.expectCause(isA(IllegalArgumentException.class));
-    thrown.expectMessage("does not maintain structural value equality");
-    thrown.expectMessage(RecordStructuralValueCoder.class.getSimpleName());
-    thrown.expectMessage("OriginalRecord");
-    enforcement.beforeElement(record);
-  }
-
-  @Test
-  public void notConsistentWithEqualsStructuralValueNotEqualSucceeds() {
-    TestPipeline p = TestPipeline.create();
-    PCollection<Record> unencodable =
-        p.apply(
-            Create.of(new Record())
-                .withCoder(new RecordNotConsistentWithEqualsStructuralValueCoder()));
-    AppliedPTransform<?, ?, ?> consumer =
-        unencodable.apply(Count.<Record>globally()).getProducingTransformInternal();
-
-    WindowedValue<Record> record = WindowedValue.<Record>valueInGlobalWindow(new Record());
-
-    CommittedBundle<Record> input =
-        InProcessBundle.unkeyed(unencodable).add(record).commit(Instant.now());
-    ModelEnforcement<Record> enforcement = factory.forBundle(input, consumer);
-
-    enforcement.beforeElement(record);
-    enforcement.afterElement(record);
-    enforcement.afterFinish(
-        input,
-        StepTransformResult.withoutHold(consumer).build(),
-        Collections.<CommittedBundle<?>>emptyList());
-  }
-
-  @Test
-  public void structurallyEqualResultsSucceeds() {
-    TestPipeline p = TestPipeline.create();
-    PCollection<Integer> unencodable = p.apply(Create.of(1).withCoder(VarIntCoder.of()));
-    AppliedPTransform<?, ?, ?> consumer =
-        unencodable.apply(Count.<Integer>globally()).getProducingTransformInternal();
-
-    WindowedValue<Integer> value = WindowedValue.valueInGlobalWindow(1);
-
-    CommittedBundle<Integer> input =
-        InProcessBundle.unkeyed(unencodable).add(value).commit(Instant.now());
-    ModelEnforcement<Integer> enforcement = factory.forBundle(input, consumer);
-
-    enforcement.beforeElement(value);
-    enforcement.afterElement(value);
-    enforcement.afterFinish(
-        input,
-        StepTransformResult.withoutHold(consumer).build(),
-        Collections.<CommittedBundle<?>>emptyList());
-  }
-
-  private static class Record {}
-  private static class RecordNoEncodeCoder extends AtomicCoder<Record> {
-
-    @Override
-    public void encode(
-        Record value,
-        OutputStream outStream,
-        com.google.cloud.dataflow.sdk.coders.Coder.Context context)
-        throws CoderException, IOException {
-      throw new CoderException("Encode not allowed");
-    }
-
-    @Override
-    public Record decode(
-        InputStream inStream, com.google.cloud.dataflow.sdk.coders.Coder.Context context)
-        throws CoderException, IOException {
-      return null;
-    }
-  }
-
-  private static class RecordNoDecodeCoder extends AtomicCoder<Record> {
-    @Override
-    public void encode(
-        Record value,
-        OutputStream outStream,
-        com.google.cloud.dataflow.sdk.coders.Coder.Context context)
-        throws CoderException, IOException {}
-
-    @Override
-    public Record decode(
-        InputStream inStream, com.google.cloud.dataflow.sdk.coders.Coder.Context context)
-        throws CoderException, IOException {
-      throw new CoderException("Decode not allowed");
-    }
-  }
-
-  private static class RecordStructuralValueCoder extends AtomicCoder<Record> {
-    @Override
-    public void encode(
-        Record value,
-        OutputStream outStream,
-        com.google.cloud.dataflow.sdk.coders.Coder.Context context)
-        throws CoderException, IOException {}
-
-    @Override
-    public Record decode(
-        InputStream inStream, com.google.cloud.dataflow.sdk.coders.Coder.Context context)
-        throws CoderException, IOException {
-      return new Record() {
-        @Override
-        public String toString() {
-          return "DecodedRecord";
-        }
-      };
-    }
-
-    @Override
-    public boolean consistentWithEquals() {
-      return true;
-    }
-
-    @Override
-    public Object structuralValue(Record value) {
-      return value;
-    }
-  }
-
-  private static class RecordNotConsistentWithEqualsStructuralValueCoder
-      extends AtomicCoder<Record> {
-    @Override
-    public void encode(
-        Record value,
-        OutputStream outStream,
-        com.google.cloud.dataflow.sdk.coders.Coder.Context context)
-        throws CoderException, IOException {}
-
-    @Override
-    public Record decode(
-        InputStream inStream, com.google.cloud.dataflow.sdk.coders.Coder.Context context)
-        throws CoderException, IOException {
-      return new Record() {
-        @Override
-        public String toString() {
-          return "DecodedRecord";
-        }
-      };
-    }
-
-    @Override
-    public boolean consistentWithEquals() {
-      return false;
-    }
-
-    @Override
-    public Object structuralValue(Record value) {
-      return value;
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cd0b6ec7/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactoryTest.java
----------------------------------------------------------------------
diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactoryTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactoryTest.java
deleted file mode 100644
index 87e12ce..0000000
--- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactoryTest.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Copyright (C) 2016 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.runners.inprocess;
-
-import static org.hamcrest.Matchers.isA;
-
-import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-import com.google.cloud.dataflow.sdk.testing.TestPipeline;
-import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
-import com.google.cloud.dataflow.sdk.transforms.Count;
-import com.google.cloud.dataflow.sdk.transforms.Create;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.util.IllegalMutationException;
-import com.google.cloud.dataflow.sdk.util.UserCodeException;
-import com.google.cloud.dataflow.sdk.util.WindowedValue;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-
-import org.joda.time.Instant;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-import java.io.Serializable;
-import java.util.Collections;
-
-/**
- * Tests for {@link ImmutabilityEnforcementFactory}.
- */
-@RunWith(JUnit4.class)
-public class ImmutabilityEnforcementFactoryTest implements Serializable {
-  @Rule public transient ExpectedException thrown = ExpectedException.none();
-  private transient ImmutabilityEnforcementFactory factory;
-  private transient PCollection<byte[]> pcollection;
-  private transient AppliedPTransform<?, ?, ?> consumer;
-
-  @Before
-  public void setup() {
-    factory = new ImmutabilityEnforcementFactory();
-    TestPipeline p = TestPipeline.create();
-    pcollection =
-        p.apply(Create.of("foo".getBytes(), "spamhameggs".getBytes()))
-            .apply(
-                ParDo.of(
-                    new DoFn<byte[], byte[]>() {
-                      @Override
-                      public void processElement(DoFn<byte[], byte[]>.ProcessContext c)
-                          throws Exception {
-                        c.element()[0] = 'b';
-                      }
-                    }));
-    consumer = pcollection.apply(Count.<byte[]>globally()).getProducingTransformInternal();
-  }
-
-  @Test
-  public void unchangedSucceeds() {
-    WindowedValue<byte[]> element = WindowedValue.valueInGlobalWindow("bar".getBytes());
-    CommittedBundle<byte[]> elements =
-        InProcessBundle.unkeyed(pcollection).add(element).commit(Instant.now());
-
-    ModelEnforcement<byte[]> enforcement = factory.forBundle(elements, consumer);
-    enforcement.beforeElement(element);
-    enforcement.afterElement(element);
-    enforcement.afterFinish(
-        elements,
-        StepTransformResult.withoutHold(consumer).build(),
-        Collections.<CommittedBundle<?>>emptyList());
-  }
-
-  @Test
-  public void mutatedDuringProcessElementThrows() {
-    WindowedValue<byte[]> element = WindowedValue.valueInGlobalWindow("bar".getBytes());
-    CommittedBundle<byte[]> elements =
-        InProcessBundle.unkeyed(pcollection).add(element).commit(Instant.now());
-
-    ModelEnforcement<byte[]> enforcement = factory.forBundle(elements, consumer);
-    enforcement.beforeElement(element);
-    element.getValue()[0] = 'f';
-    thrown.equals(UserCodeException.class);
-    thrown.expectCause(isA(IllegalMutationException.class));
-    thrown.expectMessage(consumer.getFullName());
-    thrown.expectMessage("illegaly mutated");
-    thrown.expectMessage("Input values must not be mutated");
-    enforcement.afterElement(element);
-    enforcement.afterFinish(
-        elements,
-        StepTransformResult.withoutHold(consumer).build(),
-        Collections.<CommittedBundle<?>>emptyList());
-  }
-
-  @Test
-  public void mutatedAfterProcessElementFails() {
-
-    WindowedValue<byte[]> element = WindowedValue.valueInGlobalWindow("bar".getBytes());
-    CommittedBundle<byte[]> elements =
-        InProcessBundle.unkeyed(pcollection).add(element).commit(Instant.now());
-
-    ModelEnforcement<byte[]> enforcement = factory.forBundle(elements, consumer);
-    enforcement.beforeElement(element);
-    enforcement.afterElement(element);
-
-    element.getValue()[0] = 'f';
-    thrown.equals(UserCodeException.class);
-    thrown.expectCause(isA(IllegalMutationException.class));
-    thrown.expectMessage(consumer.getFullName());
-    thrown.expectMessage("illegaly mutated");
-    thrown.expectMessage("Input values must not be mutated");
-    enforcement.afterFinish(
-        elements,
-        StepTransformResult.withoutHold(consumer).build(),
-        Collections.<CommittedBundle<?>>emptyList());
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cd0b6ec7/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/EncodabilityEnforcementFactoryTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/EncodabilityEnforcementFactoryTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/EncodabilityEnforcementFactoryTest.java
new file mode 100644
index 0000000..dcc9775
--- /dev/null
+++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/EncodabilityEnforcementFactoryTest.java
@@ -0,0 +1,260 @@
+/*
+ * Copyright (C) 2016 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package com.google.cloud.dataflow.sdk.runners.inprocess;
+
+import static org.hamcrest.Matchers.isA;
+
+import com.google.cloud.dataflow.sdk.coders.AtomicCoder;
+import com.google.cloud.dataflow.sdk.coders.CoderException;
+import com.google.cloud.dataflow.sdk.coders.VarIntCoder;
+import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
+import com.google.cloud.dataflow.sdk.testing.TestPipeline;
+import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
+import com.google.cloud.dataflow.sdk.transforms.Count;
+import com.google.cloud.dataflow.sdk.transforms.Create;
+import com.google.cloud.dataflow.sdk.util.UserCodeException;
+import com.google.cloud.dataflow.sdk.util.WindowedValue;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+
+import org.joda.time.Instant;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Collections;
+
+/**
+ * Tests for {@link EncodabilityEnforcementFactory}.
+ */
+public class EncodabilityEnforcementFactoryTest {
+  @Rule public ExpectedException thrown = ExpectedException.none();
+  private EncodabilityEnforcementFactory factory = EncodabilityEnforcementFactory.create();
+
+  @Test
+  public void encodeFailsThrows() {
+    TestPipeline p = TestPipeline.create();
+    PCollection<Record> unencodable =
+        p.apply(Create.of(new Record()).withCoder(new RecordNoEncodeCoder()));
+    AppliedPTransform<?, ?, ?> consumer =
+        unencodable.apply(Count.<Record>globally()).getProducingTransformInternal();
+
+    WindowedValue<Record> record = WindowedValue.valueInGlobalWindow(new Record());
+    CommittedBundle<Record> input =
+        InProcessBundle.unkeyed(unencodable).add(record).commit(Instant.now());
+    ModelEnforcement<Record> enforcement = factory.forBundle(input, consumer);
+
+    thrown.expect(UserCodeException.class);
+    thrown.expectCause(isA(CoderException.class));
+    thrown.expectMessage("Encode not allowed");
+    enforcement.beforeElement(record);
+  }
+
+  @Test
+  public void decodeFailsThrows() {
+    TestPipeline p = TestPipeline.create();
+    PCollection<Record> unencodable =
+        p.apply(Create.of(new Record()).withCoder(new RecordNoDecodeCoder()));
+    AppliedPTransform<?, ?, ?> consumer =
+        unencodable.apply(Count.<Record>globally()).getProducingTransformInternal();
+    WindowedValue<Record> record = WindowedValue.valueInGlobalWindow(new Record());
+
+    CommittedBundle<Record> input =
+        InProcessBundle.unkeyed(unencodable).add(record).commit(Instant.now());
+    ModelEnforcement<Record> enforcement = factory.forBundle(input, consumer);
+
+    thrown.expect(UserCodeException.class);
+    thrown.expectCause(isA(CoderException.class));
+    thrown.expectMessage("Decode not allowed");
+    enforcement.beforeElement(record);
+  }
+
+  @Test
+  public void consistentWithEqualsStructuralValueNotEqualThrows() {
+    TestPipeline p = TestPipeline.create();
+    PCollection<Record> unencodable =
+        p.apply(Create.of(new Record()).withCoder(new RecordStructuralValueCoder()));
+    AppliedPTransform<?, ?, ?> consumer =
+        unencodable.apply(Count.<Record>globally()).getProducingTransformInternal();
+
+    WindowedValue<Record> record =
+        WindowedValue.<Record>valueInGlobalWindow(
+            new Record() {
+              @Override
+              public String toString() {
+                return "OriginalRecord";
+              }
+            });
+
+    CommittedBundle<Record> input =
+        InProcessBundle.unkeyed(unencodable).add(record).commit(Instant.now());
+    ModelEnforcement<Record> enforcement = factory.forBundle(input, consumer);
+
+    thrown.expect(UserCodeException.class);
+    thrown.expectCause(isA(IllegalArgumentException.class));
+    thrown.expectMessage("does not maintain structural value equality");
+    thrown.expectMessage(RecordStructuralValueCoder.class.getSimpleName());
+    thrown.expectMessage("OriginalRecord");
+    enforcement.beforeElement(record);
+  }
+
+  @Test
+  public void notConsistentWithEqualsStructuralValueNotEqualSucceeds() {
+    TestPipeline p = TestPipeline.create();
+    PCollection<Record> unencodable =
+        p.apply(
+            Create.of(new Record())
+                .withCoder(new RecordNotConsistentWithEqualsStructuralValueCoder()));
+    AppliedPTransform<?, ?, ?> consumer =
+        unencodable.apply(Count.<Record>globally()).getProducingTransformInternal();
+
+    WindowedValue<Record> record = WindowedValue.<Record>valueInGlobalWindow(new Record());
+
+    CommittedBundle<Record> input =
+        InProcessBundle.unkeyed(unencodable).add(record).commit(Instant.now());
+    ModelEnforcement<Record> enforcement = factory.forBundle(input, consumer);
+
+    enforcement.beforeElement(record);
+    enforcement.afterElement(record);
+    enforcement.afterFinish(
+        input,
+        StepTransformResult.withoutHold(consumer).build(),
+        Collections.<CommittedBundle<?>>emptyList());
+  }
+
+  @Test
+  public void structurallyEqualResultsSucceeds() {
+    TestPipeline p = TestPipeline.create();
+    PCollection<Integer> unencodable = p.apply(Create.of(1).withCoder(VarIntCoder.of()));
+    AppliedPTransform<?, ?, ?> consumer =
+        unencodable.apply(Count.<Integer>globally()).getProducingTransformInternal();
+
+    WindowedValue<Integer> value = WindowedValue.valueInGlobalWindow(1);
+
+    CommittedBundle<Integer> input =
+        InProcessBundle.unkeyed(unencodable).add(value).commit(Instant.now());
+    ModelEnforcement<Integer> enforcement = factory.forBundle(input, consumer);
+
+    enforcement.beforeElement(value);
+    enforcement.afterElement(value);
+    enforcement.afterFinish(
+        input,
+        StepTransformResult.withoutHold(consumer).build(),
+        Collections.<CommittedBundle<?>>emptyList());
+  }
+
+  private static class Record {}
+  private static class RecordNoEncodeCoder extends AtomicCoder<Record> {
+
+    @Override
+    public void encode(
+        Record value,
+        OutputStream outStream,
+        com.google.cloud.dataflow.sdk.coders.Coder.Context context)
+        throws CoderException, IOException {
+      throw new CoderException("Encode not allowed");
+    }
+
+    @Override
+    public Record decode(
+        InputStream inStream, com.google.cloud.dataflow.sdk.coders.Coder.Context context)
+        throws CoderException, IOException {
+      return null;
+    }
+  }
+
+  private static class RecordNoDecodeCoder extends AtomicCoder<Record> {
+    @Override
+    public void encode(
+        Record value,
+        OutputStream outStream,
+        com.google.cloud.dataflow.sdk.coders.Coder.Context context)
+        throws CoderException, IOException {}
+
+    @Override
+    public Record decode(
+        InputStream inStream, com.google.cloud.dataflow.sdk.coders.Coder.Context context)
+        throws CoderException, IOException {
+      throw new CoderException("Decode not allowed");
+    }
+  }
+
+  private static class RecordStructuralValueCoder extends AtomicCoder<Record> {
+    @Override
+    public void encode(
+        Record value,
+        OutputStream outStream,
+        com.google.cloud.dataflow.sdk.coders.Coder.Context context)
+        throws CoderException, IOException {}
+
+    @Override
+    public Record decode(
+        InputStream inStream, com.google.cloud.dataflow.sdk.coders.Coder.Context context)
+        throws CoderException, IOException {
+      return new Record() {
+        @Override
+        public String toString() {
+          return "DecodedRecord";
+        }
+      };
+    }
+
+    @Override
+    public boolean consistentWithEquals() {
+      return true;
+    }
+
+    @Override
+    public Object structuralValue(Record value) {
+      return value;
+    }
+  }
+
+  private static class RecordNotConsistentWithEqualsStructuralValueCoder
+      extends AtomicCoder<Record> {
+    @Override
+    public void encode(
+        Record value,
+        OutputStream outStream,
+        com.google.cloud.dataflow.sdk.coders.Coder.Context context)
+        throws CoderException, IOException {}
+
+    @Override
+    public Record decode(
+        InputStream inStream, com.google.cloud.dataflow.sdk.coders.Coder.Context context)
+        throws CoderException, IOException {
+      return new Record() {
+        @Override
+        public String toString() {
+          return "DecodedRecord";
+        }
+      };
+    }
+
+    @Override
+    public boolean consistentWithEquals() {
+      return false;
+    }
+
+    @Override
+    public Object structuralValue(Record value) {
+      return value;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cd0b6ec7/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactoryTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactoryTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactoryTest.java
new file mode 100644
index 0000000..87e12ce
--- /dev/null
+++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactoryTest.java
@@ -0,0 +1,130 @@
+/*
+ * Copyright (C) 2016 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package com.google.cloud.dataflow.sdk.runners.inprocess;
+
+import static org.hamcrest.Matchers.isA;
+
+import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
+import com.google.cloud.dataflow.sdk.testing.TestPipeline;
+import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
+import com.google.cloud.dataflow.sdk.transforms.Count;
+import com.google.cloud.dataflow.sdk.transforms.Create;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.util.IllegalMutationException;
+import com.google.cloud.dataflow.sdk.util.UserCodeException;
+import com.google.cloud.dataflow.sdk.util.WindowedValue;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.io.Serializable;
+import java.util.Collections;
+
+/**
+ * Tests for {@link ImmutabilityEnforcementFactory}.
+ */
+@RunWith(JUnit4.class)
+public class ImmutabilityEnforcementFactoryTest implements Serializable {
+  @Rule public transient ExpectedException thrown = ExpectedException.none();
+  private transient ImmutabilityEnforcementFactory factory;
+  private transient PCollection<byte[]> pcollection;
+  private transient AppliedPTransform<?, ?, ?> consumer;
+
+  @Before
+  public void setup() {
+    factory = new ImmutabilityEnforcementFactory();
+    TestPipeline p = TestPipeline.create();
+    pcollection =
+        p.apply(Create.of("foo".getBytes(), "spamhameggs".getBytes()))
+            .apply(
+                ParDo.of(
+                    new DoFn<byte[], byte[]>() {
+                      @Override
+                      public void processElement(DoFn<byte[], byte[]>.ProcessContext c)
+                          throws Exception {
+                        c.element()[0] = 'b';
+                      }
+                    }));
+    consumer = pcollection.apply(Count.<byte[]>globally()).getProducingTransformInternal();
+  }
+
+  @Test
+  public void unchangedSucceeds() {
+    WindowedValue<byte[]> element = WindowedValue.valueInGlobalWindow("bar".getBytes());
+    CommittedBundle<byte[]> elements =
+        InProcessBundle.unkeyed(pcollection).add(element).commit(Instant.now());
+
+    ModelEnforcement<byte[]> enforcement = factory.forBundle(elements, consumer);
+    enforcement.beforeElement(element);
+    enforcement.afterElement(element);
+    enforcement.afterFinish(
+        elements,
+        StepTransformResult.withoutHold(consumer).build(),
+        Collections.<CommittedBundle<?>>emptyList());
+  }
+
+  @Test
+  public void mutatedDuringProcessElementThrows() {
+    WindowedValue<byte[]> element = WindowedValue.valueInGlobalWindow("bar".getBytes());
+    CommittedBundle<byte[]> elements =
+        InProcessBundle.unkeyed(pcollection).add(element).commit(Instant.now());
+
+    ModelEnforcement<byte[]> enforcement = factory.forBundle(elements, consumer);
+    enforcement.beforeElement(element);
+    element.getValue()[0] = 'f';
+    thrown.equals(UserCodeException.class);
+    thrown.expectCause(isA(IllegalMutationException.class));
+    thrown.expectMessage(consumer.getFullName());
+    thrown.expectMessage("illegaly mutated");
+    thrown.expectMessage("Input values must not be mutated");
+    enforcement.afterElement(element);
+    enforcement.afterFinish(
+        elements,
+        StepTransformResult.withoutHold(consumer).build(),
+        Collections.<CommittedBundle<?>>emptyList());
+  }
+
+  @Test
+  public void mutatedAfterProcessElementFails() {
+
+    WindowedValue<byte[]> element = WindowedValue.valueInGlobalWindow("bar".getBytes());
+    CommittedBundle<byte[]> elements =
+        InProcessBundle.unkeyed(pcollection).add(element).commit(Instant.now());
+
+    ModelEnforcement<byte[]> enforcement = factory.forBundle(elements, consumer);
+    enforcement.beforeElement(element);
+    enforcement.afterElement(element);
+
+    element.getValue()[0] = 'f';
+    thrown.equals(UserCodeException.class);
+    thrown.expectCause(isA(IllegalMutationException.class));
+    thrown.expectMessage(consumer.getFullName());
+    thrown.expectMessage("illegaly mutated");
+    thrown.expectMessage("Input values must not be mutated");
+    enforcement.afterFinish(
+        elements,
+        StepTransformResult.withoutHold(consumer).build(),
+        Collections.<CommittedBundle<?>>emptyList());
+  }
+}
+