You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/03/28 19:19:30 UTC

[1/2] incubator-beam git commit: Add ModelEnforcements

Repository: incubator-beam
Updated Branches:
  refs/heads/master 460505175 -> f8f3745a8


Add ModelEnforcements

ModelEnforcements ensure that user-written code conforms to the model,
in order to ensure that it is portable between runners.

Add EncodabilityEnforcement and ImmutabilityEnforcement.
EncodabilityEnforcement ensures that all values can be encoded.
ImmutabilityEnforcement ensures that values are not mutated within the
processing of a Bundle.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4343e6f9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4343e6f9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4343e6f9

Branch: refs/heads/master
Commit: 4343e6f99b8a88d38bcdaf340fc8d2f0a5125b94
Parents: 9247ad7
Author: Thomas Groh <tg...@google.com>
Authored: Fri Mar 4 08:53:50 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Mar 28 10:18:39 2016 -0700

----------------------------------------------------------------------
 .../inprocess/AbstractModelEnforcement.java     |  36 +++
 .../EncodabilityEnforcementFactory.java         |  69 +++++
 .../ImmutabilityEnforcementFactory.java         |  94 +++++++
 .../sdk/runners/inprocess/ModelEnforcement.java |  61 +++++
 .../inprocess/ModelEnforcementFactory.java      |  28 ++
 .../EncodabilityEnforcementFactoryTest.java     | 260 +++++++++++++++++++
 .../ImmutabilityEnforcementFactoryTest.java     | 130 ++++++++++
 7 files changed, 678 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4343e6f9/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/AbstractModelEnforcement.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/AbstractModelEnforcement.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/AbstractModelEnforcement.java
new file mode 100644
index 0000000..32b2a67
--- /dev/null
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/AbstractModelEnforcement.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright (C) 2016 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package com.google.cloud.dataflow.sdk.runners.inprocess;
+
+import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
+import com.google.cloud.dataflow.sdk.util.WindowedValue;
+
+/**
+ * An abstract {@link ModelEnforcement} that provides default empty implementations for each method.
+ */
+abstract class AbstractModelEnforcement<T> implements ModelEnforcement<T> {
+  @Override
+  public void beforeElement(WindowedValue<T> element) {}
+
+  @Override
+  public void afterElement(WindowedValue<T> element) {}
+
+  @Override
+  public void afterFinish(
+      CommittedBundle<T> input,
+      InProcessTransformResult result,
+      Iterable<? extends CommittedBundle<?>> outputs) {}
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4343e6f9/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/EncodabilityEnforcementFactory.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/EncodabilityEnforcementFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/EncodabilityEnforcementFactory.java
new file mode 100644
index 0000000..0e38b55
--- /dev/null
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/EncodabilityEnforcementFactory.java
@@ -0,0 +1,69 @@
+/*
+ * Copyright (C) 2016 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package com.google.cloud.dataflow.sdk.runners.inprocess;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.cloud.dataflow.sdk.coders.Coder;
+import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
+import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
+import com.google.cloud.dataflow.sdk.util.CoderUtils;
+import com.google.cloud.dataflow.sdk.util.SerializableUtils;
+import com.google.cloud.dataflow.sdk.util.UserCodeException;
+import com.google.cloud.dataflow.sdk.util.WindowedValue;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+
+/**
+ * Enforces that all elements in a {@link PCollection} can be encoded using that
+ * {@link PCollection PCollection's} {@link Coder}.
+ */
+class EncodabilityEnforcementFactory implements ModelEnforcementFactory {
+  public static EncodabilityEnforcementFactory create() {
+    return new EncodabilityEnforcementFactory();
+  }
+
+  @Override
+  public <T> ModelEnforcement<T> forBundle(
+      CommittedBundle<T> input, AppliedPTransform<?, ?, ?> consumer) {
+    return new EncodabilityEnforcement<>(input);
+  }
+
+  private static class EncodabilityEnforcement<T> extends AbstractModelEnforcement<T> {
+    private Coder<T> coder;
+
+    public EncodabilityEnforcement(CommittedBundle<T> input) {
+      coder = SerializableUtils.clone(input.getPCollection().getCoder());
+    }
+
+    @Override
+    public void beforeElement(WindowedValue<T> element) {
+      try {
+        T clone = CoderUtils.clone(coder, element.getValue());
+        if (coder.consistentWithEquals()) {
+          checkArgument(
+              coder.structuralValue(element.getValue()).equals(coder.structuralValue(clone)),
+              "Coder %s of class %s does not maintain structural value equality"
+                  + " on input element %s",
+              coder,
+              coder.getClass().getSimpleName(),
+              element.getValue());
+        }
+      } catch (Exception e) {
+        throw UserCodeException.wrap(e);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4343e6f9/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactory.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactory.java
new file mode 100644
index 0000000..dfc56a9
--- /dev/null
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactory.java
@@ -0,0 +1,94 @@
+/*
+ * Copyright (C) 2016 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package com.google.cloud.dataflow.sdk.runners.inprocess;
+
+import com.google.cloud.dataflow.sdk.coders.Coder;
+import com.google.cloud.dataflow.sdk.coders.CoderException;
+import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
+import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
+import com.google.cloud.dataflow.sdk.util.IllegalMutationException;
+import com.google.cloud.dataflow.sdk.util.MutationDetector;
+import com.google.cloud.dataflow.sdk.util.MutationDetectors;
+import com.google.cloud.dataflow.sdk.util.SerializableUtils;
+import com.google.cloud.dataflow.sdk.util.UserCodeException;
+import com.google.cloud.dataflow.sdk.util.WindowedValue;
+
+import java.util.IdentityHashMap;
+import java.util.Map;
+
+/**
+ * {@link ModelEnforcement} that enforces elements are not modified over the course of processing
+ * an element.
+ *
+ * <p>Implies {@link EncodabilityEnforcment}.
+ */
+class ImmutabilityEnforcementFactory implements ModelEnforcementFactory {
+  public static ModelEnforcementFactory create() {
+    return new ImmutabilityEnforcementFactory();
+  }
+
+  @Override
+  public <T> ModelEnforcement<T> forBundle(
+      CommittedBundle<T> input, AppliedPTransform<?, ?, ?> consumer) {
+    return new ImmutabilityCheckingEnforcement<T>(input, consumer);
+  }
+
+  private static class ImmutabilityCheckingEnforcement<T> extends AbstractModelEnforcement<T> {
+    private final AppliedPTransform<?, ?, ?> transform;
+    private final Map<WindowedValue<T>, MutationDetector> mutationElements;
+    private final Coder<T> coder;
+
+    private ImmutabilityCheckingEnforcement(
+        CommittedBundle<T> input, AppliedPTransform<?, ?, ?> transform) {
+      this.transform = transform;
+      coder = SerializableUtils.clone(input.getPCollection().getCoder());
+      mutationElements = new IdentityHashMap<>();
+    }
+
+    @Override
+    public void beforeElement(WindowedValue<T> element) {
+      try {
+        mutationElements.put(
+            element, MutationDetectors.forValueWithCoder(element.getValue(), coder));
+      } catch (CoderException e) {
+        throw UserCodeException.wrap(e);
+      }
+    }
+
+    @Override
+    public void afterFinish(
+        CommittedBundle<T> input,
+        InProcessTransformResult result,
+        Iterable<? extends CommittedBundle<?>> outputs) {
+      for (MutationDetector detector : mutationElements.values()) {
+        try {
+          detector.verifyUnmodified();
+        } catch (IllegalMutationException e) {
+          throw UserCodeException.wrap(
+              new IllegalMutationException(
+                  String.format(
+                      "PTransform %s illegaly mutated value %s of class %s."
+                          + " Input values must not be mutated in any way.",
+                      transform.getFullName(),
+                      e.getSavedValue(),
+                      e.getSavedValue().getClass()),
+                  e.getSavedValue(),
+                  e.getNewValue()));
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4343e6f9/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ModelEnforcement.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ModelEnforcement.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ModelEnforcement.java
new file mode 100644
index 0000000..66bea37
--- /dev/null
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ModelEnforcement.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright (C) 2016 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package com.google.cloud.dataflow.sdk.runners.inprocess;
+
+import com.google.cloud.dataflow.sdk.coders.Coder;
+import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
+import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
+import com.google.cloud.dataflow.sdk.util.WindowedValue;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+
+/**
+ * Enforcement tools that verify that executing code conforms to the model.
+ *
+ * <p>ModelEnforcement is performed on a per-element and per-bundle basis. The
+ * {@link ModelEnforcement} is provided with the input bundle as part of
+ * {@link ModelEnforcementFactory#forBundle(CommittedBundle, AppliedPTransform)}, each element
+ * before and after that element is provided to an underlying {@link TransformEvaluator}, and the
+ * output {@link InProcessTransformResult} and committed output bundles after the
+ * {@link TransformEvaluator} has completed.
+ *
+ * <p>Typically, {@link ModelEnforcement} will obtain required metadata (such as the {@link Coder}
+ * of the input {@link PCollection} on construction, and then enforce per-element behavior
+ * (such as the immutability of input elements). When the element is output or the bundle is
+ * completed, the required conditions can be enforced across all elements.
+ */
+public interface ModelEnforcement<T> {
+  /**
+   * Called before a call to {@link TransformEvaluator#processElement(WindowedValue)} on the
+   * provided {@link WindowedValue}.
+   */
+  void beforeElement(WindowedValue<T> element);
+
+  /**
+   * Called after a call to {@link TransformEvaluator#processElement(WindowedValue)} on the
+   * provided {@link WindowedValue}.
+   */
+  void afterElement(WindowedValue<T> element);
+
+  /**
+   * Called after a bundle has been completed and {@link TransformEvaluator#finishBundle()} has been
+   * called, producing the provided {@link InProcessTransformResult} and
+   * {@link CommittedBundle output bundles}.
+   */
+  void afterFinish(
+      CommittedBundle<T> input,
+      InProcessTransformResult result,
+      Iterable<? extends CommittedBundle<?>> outputs);
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4343e6f9/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ModelEnforcementFactory.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ModelEnforcementFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ModelEnforcementFactory.java
new file mode 100644
index 0000000..66c01b3
--- /dev/null
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ModelEnforcementFactory.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright (C) 2016 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package com.google.cloud.dataflow.sdk.runners.inprocess;
+
+import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
+import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
+
+/**
+ * Creates {@link ModelEnforcement} instances for an {@link AppliedPTransform} on an input
+ * {@link CommittedBundle bundle}. {@link ModelEnforcement} instances are created before the
+ * {@link TransformEvaluator} is created.
+ */
+public interface ModelEnforcementFactory {
+  <T> ModelEnforcement<T> forBundle(CommittedBundle<T> input, AppliedPTransform<?, ?, ?> consumer);
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4343e6f9/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/EncodabilityEnforcementFactoryTest.java
----------------------------------------------------------------------
diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/EncodabilityEnforcementFactoryTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/EncodabilityEnforcementFactoryTest.java
new file mode 100644
index 0000000..dcc9775
--- /dev/null
+++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/EncodabilityEnforcementFactoryTest.java
@@ -0,0 +1,260 @@
+/*
+ * Copyright (C) 2016 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package com.google.cloud.dataflow.sdk.runners.inprocess;
+
+import static org.hamcrest.Matchers.isA;
+
+import com.google.cloud.dataflow.sdk.coders.AtomicCoder;
+import com.google.cloud.dataflow.sdk.coders.CoderException;
+import com.google.cloud.dataflow.sdk.coders.VarIntCoder;
+import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
+import com.google.cloud.dataflow.sdk.testing.TestPipeline;
+import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
+import com.google.cloud.dataflow.sdk.transforms.Count;
+import com.google.cloud.dataflow.sdk.transforms.Create;
+import com.google.cloud.dataflow.sdk.util.UserCodeException;
+import com.google.cloud.dataflow.sdk.util.WindowedValue;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+
+import org.joda.time.Instant;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Collections;
+
+/**
+ * Tests for {@link EncodabilityEnforcementFactory}.
+ */
+public class EncodabilityEnforcementFactoryTest {
+  @Rule public ExpectedException thrown = ExpectedException.none();
+  private EncodabilityEnforcementFactory factory = EncodabilityEnforcementFactory.create();
+
+  @Test
+  public void encodeFailsThrows() {
+    TestPipeline p = TestPipeline.create();
+    PCollection<Record> unencodable =
+        p.apply(Create.of(new Record()).withCoder(new RecordNoEncodeCoder()));
+    AppliedPTransform<?, ?, ?> consumer =
+        unencodable.apply(Count.<Record>globally()).getProducingTransformInternal();
+
+    WindowedValue<Record> record = WindowedValue.valueInGlobalWindow(new Record());
+    CommittedBundle<Record> input =
+        InProcessBundle.unkeyed(unencodable).add(record).commit(Instant.now());
+    ModelEnforcement<Record> enforcement = factory.forBundle(input, consumer);
+
+    thrown.expect(UserCodeException.class);
+    thrown.expectCause(isA(CoderException.class));
+    thrown.expectMessage("Encode not allowed");
+    enforcement.beforeElement(record);
+  }
+
+  @Test
+  public void decodeFailsThrows() {
+    TestPipeline p = TestPipeline.create();
+    PCollection<Record> unencodable =
+        p.apply(Create.of(new Record()).withCoder(new RecordNoDecodeCoder()));
+    AppliedPTransform<?, ?, ?> consumer =
+        unencodable.apply(Count.<Record>globally()).getProducingTransformInternal();
+    WindowedValue<Record> record = WindowedValue.valueInGlobalWindow(new Record());
+
+    CommittedBundle<Record> input =
+        InProcessBundle.unkeyed(unencodable).add(record).commit(Instant.now());
+    ModelEnforcement<Record> enforcement = factory.forBundle(input, consumer);
+
+    thrown.expect(UserCodeException.class);
+    thrown.expectCause(isA(CoderException.class));
+    thrown.expectMessage("Decode not allowed");
+    enforcement.beforeElement(record);
+  }
+
+  @Test
+  public void consistentWithEqualsStructuralValueNotEqualThrows() {
+    TestPipeline p = TestPipeline.create();
+    PCollection<Record> unencodable =
+        p.apply(Create.of(new Record()).withCoder(new RecordStructuralValueCoder()));
+    AppliedPTransform<?, ?, ?> consumer =
+        unencodable.apply(Count.<Record>globally()).getProducingTransformInternal();
+
+    WindowedValue<Record> record =
+        WindowedValue.<Record>valueInGlobalWindow(
+            new Record() {
+              @Override
+              public String toString() {
+                return "OriginalRecord";
+              }
+            });
+
+    CommittedBundle<Record> input =
+        InProcessBundle.unkeyed(unencodable).add(record).commit(Instant.now());
+    ModelEnforcement<Record> enforcement = factory.forBundle(input, consumer);
+
+    thrown.expect(UserCodeException.class);
+    thrown.expectCause(isA(IllegalArgumentException.class));
+    thrown.expectMessage("does not maintain structural value equality");
+    thrown.expectMessage(RecordStructuralValueCoder.class.getSimpleName());
+    thrown.expectMessage("OriginalRecord");
+    enforcement.beforeElement(record);
+  }
+
+  @Test
+  public void notConsistentWithEqualsStructuralValueNotEqualSucceeds() {
+    TestPipeline p = TestPipeline.create();
+    PCollection<Record> unencodable =
+        p.apply(
+            Create.of(new Record())
+                .withCoder(new RecordNotConsistentWithEqualsStructuralValueCoder()));
+    AppliedPTransform<?, ?, ?> consumer =
+        unencodable.apply(Count.<Record>globally()).getProducingTransformInternal();
+
+    WindowedValue<Record> record = WindowedValue.<Record>valueInGlobalWindow(new Record());
+
+    CommittedBundle<Record> input =
+        InProcessBundle.unkeyed(unencodable).add(record).commit(Instant.now());
+    ModelEnforcement<Record> enforcement = factory.forBundle(input, consumer);
+
+    enforcement.beforeElement(record);
+    enforcement.afterElement(record);
+    enforcement.afterFinish(
+        input,
+        StepTransformResult.withoutHold(consumer).build(),
+        Collections.<CommittedBundle<?>>emptyList());
+  }
+
+  @Test
+  public void structurallyEqualResultsSucceeds() {
+    TestPipeline p = TestPipeline.create();
+    PCollection<Integer> unencodable = p.apply(Create.of(1).withCoder(VarIntCoder.of()));
+    AppliedPTransform<?, ?, ?> consumer =
+        unencodable.apply(Count.<Integer>globally()).getProducingTransformInternal();
+
+    WindowedValue<Integer> value = WindowedValue.valueInGlobalWindow(1);
+
+    CommittedBundle<Integer> input =
+        InProcessBundle.unkeyed(unencodable).add(value).commit(Instant.now());
+    ModelEnforcement<Integer> enforcement = factory.forBundle(input, consumer);
+
+    enforcement.beforeElement(value);
+    enforcement.afterElement(value);
+    enforcement.afterFinish(
+        input,
+        StepTransformResult.withoutHold(consumer).build(),
+        Collections.<CommittedBundle<?>>emptyList());
+  }
+
+  private static class Record {}
+  private static class RecordNoEncodeCoder extends AtomicCoder<Record> {
+
+    @Override
+    public void encode(
+        Record value,
+        OutputStream outStream,
+        com.google.cloud.dataflow.sdk.coders.Coder.Context context)
+        throws CoderException, IOException {
+      throw new CoderException("Encode not allowed");
+    }
+
+    @Override
+    public Record decode(
+        InputStream inStream, com.google.cloud.dataflow.sdk.coders.Coder.Context context)
+        throws CoderException, IOException {
+      return null;
+    }
+  }
+
+  private static class RecordNoDecodeCoder extends AtomicCoder<Record> {
+    @Override
+    public void encode(
+        Record value,
+        OutputStream outStream,
+        com.google.cloud.dataflow.sdk.coders.Coder.Context context)
+        throws CoderException, IOException {}
+
+    @Override
+    public Record decode(
+        InputStream inStream, com.google.cloud.dataflow.sdk.coders.Coder.Context context)
+        throws CoderException, IOException {
+      throw new CoderException("Decode not allowed");
+    }
+  }
+
+  private static class RecordStructuralValueCoder extends AtomicCoder<Record> {
+    @Override
+    public void encode(
+        Record value,
+        OutputStream outStream,
+        com.google.cloud.dataflow.sdk.coders.Coder.Context context)
+        throws CoderException, IOException {}
+
+    @Override
+    public Record decode(
+        InputStream inStream, com.google.cloud.dataflow.sdk.coders.Coder.Context context)
+        throws CoderException, IOException {
+      return new Record() {
+        @Override
+        public String toString() {
+          return "DecodedRecord";
+        }
+      };
+    }
+
+    @Override
+    public boolean consistentWithEquals() {
+      return true;
+    }
+
+    @Override
+    public Object structuralValue(Record value) {
+      return value;
+    }
+  }
+
+  private static class RecordNotConsistentWithEqualsStructuralValueCoder
+      extends AtomicCoder<Record> {
+    @Override
+    public void encode(
+        Record value,
+        OutputStream outStream,
+        com.google.cloud.dataflow.sdk.coders.Coder.Context context)
+        throws CoderException, IOException {}
+
+    @Override
+    public Record decode(
+        InputStream inStream, com.google.cloud.dataflow.sdk.coders.Coder.Context context)
+        throws CoderException, IOException {
+      return new Record() {
+        @Override
+        public String toString() {
+          return "DecodedRecord";
+        }
+      };
+    }
+
+    @Override
+    public boolean consistentWithEquals() {
+      return false;
+    }
+
+    @Override
+    public Object structuralValue(Record value) {
+      return value;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4343e6f9/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactoryTest.java
----------------------------------------------------------------------
diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactoryTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactoryTest.java
new file mode 100644
index 0000000..87e12ce
--- /dev/null
+++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactoryTest.java
@@ -0,0 +1,130 @@
+/*
+ * Copyright (C) 2016 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package com.google.cloud.dataflow.sdk.runners.inprocess;
+
+import static org.hamcrest.Matchers.isA;
+
+import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
+import com.google.cloud.dataflow.sdk.testing.TestPipeline;
+import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
+import com.google.cloud.dataflow.sdk.transforms.Count;
+import com.google.cloud.dataflow.sdk.transforms.Create;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.util.IllegalMutationException;
+import com.google.cloud.dataflow.sdk.util.UserCodeException;
+import com.google.cloud.dataflow.sdk.util.WindowedValue;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.io.Serializable;
+import java.util.Collections;
+
+/**
+ * Tests for {@link ImmutabilityEnforcementFactory}.
+ */
+@RunWith(JUnit4.class)
+public class ImmutabilityEnforcementFactoryTest implements Serializable {
+  @Rule public transient ExpectedException thrown = ExpectedException.none();
+  private transient ImmutabilityEnforcementFactory factory;
+  private transient PCollection<byte[]> pcollection;
+  private transient AppliedPTransform<?, ?, ?> consumer;
+
+  @Before
+  public void setup() {
+    factory = new ImmutabilityEnforcementFactory();
+    TestPipeline p = TestPipeline.create();
+    pcollection =
+        p.apply(Create.of("foo".getBytes(), "spamhameggs".getBytes()))
+            .apply(
+                ParDo.of(
+                    new DoFn<byte[], byte[]>() {
+                      @Override
+                      public void processElement(DoFn<byte[], byte[]>.ProcessContext c)
+                          throws Exception {
+                        c.element()[0] = 'b';
+                      }
+                    }));
+    consumer = pcollection.apply(Count.<byte[]>globally()).getProducingTransformInternal();
+  }
+
+  @Test
+  public void unchangedSucceeds() {
+    WindowedValue<byte[]> element = WindowedValue.valueInGlobalWindow("bar".getBytes());
+    CommittedBundle<byte[]> elements =
+        InProcessBundle.unkeyed(pcollection).add(element).commit(Instant.now());
+
+    ModelEnforcement<byte[]> enforcement = factory.forBundle(elements, consumer);
+    enforcement.beforeElement(element);
+    enforcement.afterElement(element);
+    enforcement.afterFinish(
+        elements,
+        StepTransformResult.withoutHold(consumer).build(),
+        Collections.<CommittedBundle<?>>emptyList());
+  }
+
+  @Test
+  public void mutatedDuringProcessElementThrows() {
+    WindowedValue<byte[]> element = WindowedValue.valueInGlobalWindow("bar".getBytes());
+    CommittedBundle<byte[]> elements =
+        InProcessBundle.unkeyed(pcollection).add(element).commit(Instant.now());
+
+    ModelEnforcement<byte[]> enforcement = factory.forBundle(elements, consumer);
+    enforcement.beforeElement(element);
+    element.getValue()[0] = 'f';
+    thrown.equals(UserCodeException.class);
+    thrown.expectCause(isA(IllegalMutationException.class));
+    thrown.expectMessage(consumer.getFullName());
+    thrown.expectMessage("illegaly mutated");
+    thrown.expectMessage("Input values must not be mutated");
+    enforcement.afterElement(element);
+    enforcement.afterFinish(
+        elements,
+        StepTransformResult.withoutHold(consumer).build(),
+        Collections.<CommittedBundle<?>>emptyList());
+  }
+
+  @Test
+  public void mutatedAfterProcessElementFails() {
+
+    WindowedValue<byte[]> element = WindowedValue.valueInGlobalWindow("bar".getBytes());
+    CommittedBundle<byte[]> elements =
+        InProcessBundle.unkeyed(pcollection).add(element).commit(Instant.now());
+
+    ModelEnforcement<byte[]> enforcement = factory.forBundle(elements, consumer);
+    enforcement.beforeElement(element);
+    enforcement.afterElement(element);
+
+    element.getValue()[0] = 'f';
+    thrown.equals(UserCodeException.class);
+    thrown.expectCause(isA(IllegalMutationException.class));
+    thrown.expectMessage(consumer.getFullName());
+    thrown.expectMessage("illegaly mutated");
+    thrown.expectMessage("Input values must not be mutated");
+    enforcement.afterFinish(
+        elements,
+        StepTransformResult.withoutHold(consumer).build(),
+        Collections.<CommittedBundle<?>>emptyList());
+  }
+}
+


[2/2] incubator-beam git commit: This closes #74

Posted by ke...@apache.org.
This closes #74


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f8f3745a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f8f3745a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f8f3745a

Branch: refs/heads/master
Commit: f8f3745a8c5685651d58b67ec26cb48ea56782a0
Parents: 4605051 4343e6f
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Mar 28 10:18:59 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Mar 28 10:18:59 2016 -0700

----------------------------------------------------------------------
 .../inprocess/AbstractModelEnforcement.java     |  36 +++
 .../EncodabilityEnforcementFactory.java         |  69 +++++
 .../ImmutabilityEnforcementFactory.java         |  94 +++++++
 .../sdk/runners/inprocess/ModelEnforcement.java |  61 +++++
 .../inprocess/ModelEnforcementFactory.java      |  28 ++
 .../EncodabilityEnforcementFactoryTest.java     | 260 +++++++++++++++++++
 .../ImmutabilityEnforcementFactoryTest.java     | 130 ++++++++++
 7 files changed, 678 insertions(+)
----------------------------------------------------------------------