You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/03/28 23:28:16 UTC

[1/2] incubator-beam git commit: Move Model Enforcements into the proper module

Repository: incubator-beam
Updated Branches:
  refs/heads/master f8f3745a8 -> 4f91c2eae


Move Model Enforcements into the proper module


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/111e3932
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/111e3932
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/111e3932

Branch: refs/heads/master
Commit: 111e393219adfd7a41f8545f66a80c638dc38165
Parents: f8f3745
Author: Thomas Groh <tg...@google.com>
Authored: Mon Mar 28 12:02:58 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Mon Mar 28 13:46:04 2016 -0700

----------------------------------------------------------------------
 .../inprocess/AbstractModelEnforcement.java     | 36 --------
 .../EncodabilityEnforcementFactory.java         | 69 --------------
 .../ImmutabilityEnforcementFactory.java         | 94 --------------------
 .../sdk/runners/inprocess/ModelEnforcement.java | 61 -------------
 .../inprocess/ModelEnforcementFactory.java      | 28 ------
 .../inprocess/AbstractModelEnforcement.java     | 36 ++++++++
 .../EncodabilityEnforcementFactory.java         | 69 ++++++++++++++
 .../ImmutabilityEnforcementFactory.java         | 94 ++++++++++++++++++++
 .../sdk/runners/inprocess/ModelEnforcement.java | 61 +++++++++++++
 .../inprocess/ModelEnforcementFactory.java      | 28 ++++++
 10 files changed, 288 insertions(+), 288 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/111e3932/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/AbstractModelEnforcement.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/AbstractModelEnforcement.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/AbstractModelEnforcement.java
deleted file mode 100644
index 32b2a67..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/AbstractModelEnforcement.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Copyright (C) 2016 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.runners.inprocess;
-
-import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-import com.google.cloud.dataflow.sdk.util.WindowedValue;
-
-/**
- * An abstract {@link ModelEnforcement} that provides default empty implementations for each method.
- */
-abstract class AbstractModelEnforcement<T> implements ModelEnforcement<T> {
-  @Override
-  public void beforeElement(WindowedValue<T> element) {}
-
-  @Override
-  public void afterElement(WindowedValue<T> element) {}
-
-  @Override
-  public void afterFinish(
-      CommittedBundle<T> input,
-      InProcessTransformResult result,
-      Iterable<? extends CommittedBundle<?>> outputs) {}
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/111e3932/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/EncodabilityEnforcementFactory.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/EncodabilityEnforcementFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/EncodabilityEnforcementFactory.java
deleted file mode 100644
index 0e38b55..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/EncodabilityEnforcementFactory.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Copyright (C) 2016 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.runners.inprocess;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
-import com.google.cloud.dataflow.sdk.util.CoderUtils;
-import com.google.cloud.dataflow.sdk.util.SerializableUtils;
-import com.google.cloud.dataflow.sdk.util.UserCodeException;
-import com.google.cloud.dataflow.sdk.util.WindowedValue;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-
-/**
- * Enforces that all elements in a {@link PCollection} can be encoded using that
- * {@link PCollection PCollection's} {@link Coder}.
- */
-class EncodabilityEnforcementFactory implements ModelEnforcementFactory {
-  public static EncodabilityEnforcementFactory create() {
-    return new EncodabilityEnforcementFactory();
-  }
-
-  @Override
-  public <T> ModelEnforcement<T> forBundle(
-      CommittedBundle<T> input, AppliedPTransform<?, ?, ?> consumer) {
-    return new EncodabilityEnforcement<>(input);
-  }
-
-  private static class EncodabilityEnforcement<T> extends AbstractModelEnforcement<T> {
-    private Coder<T> coder;
-
-    public EncodabilityEnforcement(CommittedBundle<T> input) {
-      coder = SerializableUtils.clone(input.getPCollection().getCoder());
-    }
-
-    @Override
-    public void beforeElement(WindowedValue<T> element) {
-      try {
-        T clone = CoderUtils.clone(coder, element.getValue());
-        if (coder.consistentWithEquals()) {
-          checkArgument(
-              coder.structuralValue(element.getValue()).equals(coder.structuralValue(clone)),
-              "Coder %s of class %s does not maintain structural value equality"
-                  + " on input element %s",
-              coder,
-              coder.getClass().getSimpleName(),
-              element.getValue());
-        }
-      } catch (Exception e) {
-        throw UserCodeException.wrap(e);
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/111e3932/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactory.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactory.java
deleted file mode 100644
index dfc56a9..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactory.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Copyright (C) 2016 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.runners.inprocess;
-
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.CoderException;
-import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
-import com.google.cloud.dataflow.sdk.util.IllegalMutationException;
-import com.google.cloud.dataflow.sdk.util.MutationDetector;
-import com.google.cloud.dataflow.sdk.util.MutationDetectors;
-import com.google.cloud.dataflow.sdk.util.SerializableUtils;
-import com.google.cloud.dataflow.sdk.util.UserCodeException;
-import com.google.cloud.dataflow.sdk.util.WindowedValue;
-
-import java.util.IdentityHashMap;
-import java.util.Map;
-
-/**
- * {@link ModelEnforcement} that enforces elements are not modified over the course of processing
- * an element.
- *
- * <p>Implies {@link EncodabilityEnforcment}.
- */
-class ImmutabilityEnforcementFactory implements ModelEnforcementFactory {
-  public static ModelEnforcementFactory create() {
-    return new ImmutabilityEnforcementFactory();
-  }
-
-  @Override
-  public <T> ModelEnforcement<T> forBundle(
-      CommittedBundle<T> input, AppliedPTransform<?, ?, ?> consumer) {
-    return new ImmutabilityCheckingEnforcement<T>(input, consumer);
-  }
-
-  private static class ImmutabilityCheckingEnforcement<T> extends AbstractModelEnforcement<T> {
-    private final AppliedPTransform<?, ?, ?> transform;
-    private final Map<WindowedValue<T>, MutationDetector> mutationElements;
-    private final Coder<T> coder;
-
-    private ImmutabilityCheckingEnforcement(
-        CommittedBundle<T> input, AppliedPTransform<?, ?, ?> transform) {
-      this.transform = transform;
-      coder = SerializableUtils.clone(input.getPCollection().getCoder());
-      mutationElements = new IdentityHashMap<>();
-    }
-
-    @Override
-    public void beforeElement(WindowedValue<T> element) {
-      try {
-        mutationElements.put(
-            element, MutationDetectors.forValueWithCoder(element.getValue(), coder));
-      } catch (CoderException e) {
-        throw UserCodeException.wrap(e);
-      }
-    }
-
-    @Override
-    public void afterFinish(
-        CommittedBundle<T> input,
-        InProcessTransformResult result,
-        Iterable<? extends CommittedBundle<?>> outputs) {
-      for (MutationDetector detector : mutationElements.values()) {
-        try {
-          detector.verifyUnmodified();
-        } catch (IllegalMutationException e) {
-          throw UserCodeException.wrap(
-              new IllegalMutationException(
-                  String.format(
-                      "PTransform %s illegaly mutated value %s of class %s."
-                          + " Input values must not be mutated in any way.",
-                      transform.getFullName(),
-                      e.getSavedValue(),
-                      e.getSavedValue().getClass()),
-                  e.getSavedValue(),
-                  e.getNewValue()));
-        }
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/111e3932/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ModelEnforcement.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ModelEnforcement.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ModelEnforcement.java
deleted file mode 100644
index 66bea37..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ModelEnforcement.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Copyright (C) 2016 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.runners.inprocess;
-
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
-import com.google.cloud.dataflow.sdk.util.WindowedValue;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-
-/**
- * Enforcement tools that verify that executing code conforms to the model.
- *
- * <p>ModelEnforcement is performed on a per-element and per-bundle basis. The
- * {@link ModelEnforcement} is provided with the input bundle as part of
- * {@link ModelEnforcementFactory#forBundle(CommittedBundle, AppliedPTransform)}, each element
- * before and after that element is provided to an underlying {@link TransformEvaluator}, and the
- * output {@link InProcessTransformResult} and committed output bundles after the
- * {@link TransformEvaluator} has completed.
- *
- * <p>Typically, {@link ModelEnforcement} will obtain required metadata (such as the {@link Coder}
- * of the input {@link PCollection} on construction, and then enforce per-element behavior
- * (such as the immutability of input elements). When the element is output or the bundle is
- * completed, the required conditions can be enforced across all elements.
- */
-public interface ModelEnforcement<T> {
-  /**
-   * Called before a call to {@link TransformEvaluator#processElement(WindowedValue)} on the
-   * provided {@link WindowedValue}.
-   */
-  void beforeElement(WindowedValue<T> element);
-
-  /**
-   * Called after a call to {@link TransformEvaluator#processElement(WindowedValue)} on the
-   * provided {@link WindowedValue}.
-   */
-  void afterElement(WindowedValue<T> element);
-
-  /**
-   * Called after a bundle has been completed and {@link TransformEvaluator#finishBundle()} has been
-   * called, producing the provided {@link InProcessTransformResult} and
-   * {@link CommittedBundle output bundles}.
-   */
-  void afterFinish(
-      CommittedBundle<T> input,
-      InProcessTransformResult result,
-      Iterable<? extends CommittedBundle<?>> outputs);
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/111e3932/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ModelEnforcementFactory.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ModelEnforcementFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ModelEnforcementFactory.java
deleted file mode 100644
index 66c01b3..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ModelEnforcementFactory.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Copyright (C) 2016 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.runners.inprocess;
-
-import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
-
-/**
- * Creates {@link ModelEnforcement} instances for an {@link AppliedPTransform} on an input
- * {@link CommittedBundle bundle}. {@link ModelEnforcement} instances are created before the
- * {@link TransformEvaluator} is created.
- */
-public interface ModelEnforcementFactory {
-  <T> ModelEnforcement<T> forBundle(CommittedBundle<T> input, AppliedPTransform<?, ?, ?> consumer);
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/111e3932/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/AbstractModelEnforcement.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/AbstractModelEnforcement.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/AbstractModelEnforcement.java
new file mode 100644
index 0000000..32b2a67
--- /dev/null
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/AbstractModelEnforcement.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright (C) 2016 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package com.google.cloud.dataflow.sdk.runners.inprocess;
+
+import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
+import com.google.cloud.dataflow.sdk.util.WindowedValue;
+
+/**
+ * An abstract {@link ModelEnforcement} that provides default empty implementations for each method.
+ */
+abstract class AbstractModelEnforcement<T> implements ModelEnforcement<T> {
+  @Override
+  public void beforeElement(WindowedValue<T> element) {}
+
+  @Override
+  public void afterElement(WindowedValue<T> element) {}
+
+  @Override
+  public void afterFinish(
+      CommittedBundle<T> input,
+      InProcessTransformResult result,
+      Iterable<? extends CommittedBundle<?>> outputs) {}
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/111e3932/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/EncodabilityEnforcementFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/EncodabilityEnforcementFactory.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/EncodabilityEnforcementFactory.java
new file mode 100644
index 0000000..0e38b55
--- /dev/null
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/EncodabilityEnforcementFactory.java
@@ -0,0 +1,69 @@
+/*
+ * Copyright (C) 2016 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package com.google.cloud.dataflow.sdk.runners.inprocess;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.cloud.dataflow.sdk.coders.Coder;
+import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
+import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
+import com.google.cloud.dataflow.sdk.util.CoderUtils;
+import com.google.cloud.dataflow.sdk.util.SerializableUtils;
+import com.google.cloud.dataflow.sdk.util.UserCodeException;
+import com.google.cloud.dataflow.sdk.util.WindowedValue;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+
+/**
+ * Enforces that all elements in a {@link PCollection} can be encoded using that
+ * {@link PCollection PCollection's} {@link Coder}.
+ */
+class EncodabilityEnforcementFactory implements ModelEnforcementFactory {
+  public static EncodabilityEnforcementFactory create() {
+    return new EncodabilityEnforcementFactory();
+  }
+
+  @Override
+  public <T> ModelEnforcement<T> forBundle(
+      CommittedBundle<T> input, AppliedPTransform<?, ?, ?> consumer) {
+    return new EncodabilityEnforcement<>(input);
+  }
+
+  private static class EncodabilityEnforcement<T> extends AbstractModelEnforcement<T> {
+    private Coder<T> coder;
+
+    public EncodabilityEnforcement(CommittedBundle<T> input) {
+      coder = SerializableUtils.clone(input.getPCollection().getCoder());
+    }
+
+    @Override
+    public void beforeElement(WindowedValue<T> element) {
+      try {
+        T clone = CoderUtils.clone(coder, element.getValue());
+        if (coder.consistentWithEquals()) {
+          checkArgument(
+              coder.structuralValue(element.getValue()).equals(coder.structuralValue(clone)),
+              "Coder %s of class %s does not maintain structural value equality"
+                  + " on input element %s",
+              coder,
+              coder.getClass().getSimpleName(),
+              element.getValue());
+        }
+      } catch (Exception e) {
+        throw UserCodeException.wrap(e);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/111e3932/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactory.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactory.java
new file mode 100644
index 0000000..dfc56a9
--- /dev/null
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactory.java
@@ -0,0 +1,94 @@
+/*
+ * Copyright (C) 2016 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package com.google.cloud.dataflow.sdk.runners.inprocess;
+
+import com.google.cloud.dataflow.sdk.coders.Coder;
+import com.google.cloud.dataflow.sdk.coders.CoderException;
+import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
+import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
+import com.google.cloud.dataflow.sdk.util.IllegalMutationException;
+import com.google.cloud.dataflow.sdk.util.MutationDetector;
+import com.google.cloud.dataflow.sdk.util.MutationDetectors;
+import com.google.cloud.dataflow.sdk.util.SerializableUtils;
+import com.google.cloud.dataflow.sdk.util.UserCodeException;
+import com.google.cloud.dataflow.sdk.util.WindowedValue;
+
+import java.util.IdentityHashMap;
+import java.util.Map;
+
+/**
+ * {@link ModelEnforcement} that enforces elements are not modified over the course of processing
+ * an element.
+ *
+ * <p>Implies {@link EncodabilityEnforcment}.
+ */
+class ImmutabilityEnforcementFactory implements ModelEnforcementFactory {
+  public static ModelEnforcementFactory create() {
+    return new ImmutabilityEnforcementFactory();
+  }
+
+  @Override
+  public <T> ModelEnforcement<T> forBundle(
+      CommittedBundle<T> input, AppliedPTransform<?, ?, ?> consumer) {
+    return new ImmutabilityCheckingEnforcement<T>(input, consumer);
+  }
+
+  private static class ImmutabilityCheckingEnforcement<T> extends AbstractModelEnforcement<T> {
+    private final AppliedPTransform<?, ?, ?> transform;
+    private final Map<WindowedValue<T>, MutationDetector> mutationElements;
+    private final Coder<T> coder;
+
+    private ImmutabilityCheckingEnforcement(
+        CommittedBundle<T> input, AppliedPTransform<?, ?, ?> transform) {
+      this.transform = transform;
+      coder = SerializableUtils.clone(input.getPCollection().getCoder());
+      mutationElements = new IdentityHashMap<>();
+    }
+
+    @Override
+    public void beforeElement(WindowedValue<T> element) {
+      try {
+        mutationElements.put(
+            element, MutationDetectors.forValueWithCoder(element.getValue(), coder));
+      } catch (CoderException e) {
+        throw UserCodeException.wrap(e);
+      }
+    }
+
+    @Override
+    public void afterFinish(
+        CommittedBundle<T> input,
+        InProcessTransformResult result,
+        Iterable<? extends CommittedBundle<?>> outputs) {
+      for (MutationDetector detector : mutationElements.values()) {
+        try {
+          detector.verifyUnmodified();
+        } catch (IllegalMutationException e) {
+          throw UserCodeException.wrap(
+              new IllegalMutationException(
+                  String.format(
+                      "PTransform %s illegaly mutated value %s of class %s."
+                          + " Input values must not be mutated in any way.",
+                      transform.getFullName(),
+                      e.getSavedValue(),
+                      e.getSavedValue().getClass()),
+                  e.getSavedValue(),
+                  e.getNewValue()));
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/111e3932/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ModelEnforcement.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ModelEnforcement.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ModelEnforcement.java
new file mode 100644
index 0000000..66bea37
--- /dev/null
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ModelEnforcement.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright (C) 2016 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package com.google.cloud.dataflow.sdk.runners.inprocess;
+
+import com.google.cloud.dataflow.sdk.coders.Coder;
+import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
+import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
+import com.google.cloud.dataflow.sdk.util.WindowedValue;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+
+/**
+ * Enforcement tools that verify that executing code conforms to the model.
+ *
+ * <p>ModelEnforcement is performed on a per-element and per-bundle basis. The
+ * {@link ModelEnforcement} is provided with the input bundle as part of
+ * {@link ModelEnforcementFactory#forBundle(CommittedBundle, AppliedPTransform)}, each element
+ * before and after that element is provided to an underlying {@link TransformEvaluator}, and the
+ * output {@link InProcessTransformResult} and committed output bundles after the
+ * {@link TransformEvaluator} has completed.
+ *
+ * <p>Typically, {@link ModelEnforcement} will obtain required metadata (such as the {@link Coder}
+ * of the input {@link PCollection} on construction, and then enforce per-element behavior
+ * (such as the immutability of input elements). When the element is output or the bundle is
+ * completed, the required conditions can be enforced across all elements.
+ */
+public interface ModelEnforcement<T> {
+  /**
+   * Called before a call to {@link TransformEvaluator#processElement(WindowedValue)} on the
+   * provided {@link WindowedValue}.
+   */
+  void beforeElement(WindowedValue<T> element);
+
+  /**
+   * Called after a call to {@link TransformEvaluator#processElement(WindowedValue)} on the
+   * provided {@link WindowedValue}.
+   */
+  void afterElement(WindowedValue<T> element);
+
+  /**
+   * Called after a bundle has been completed and {@link TransformEvaluator#finishBundle()} has been
+   * called, producing the provided {@link InProcessTransformResult} and
+   * {@link CommittedBundle output bundles}.
+   */
+  void afterFinish(
+      CommittedBundle<T> input,
+      InProcessTransformResult result,
+      Iterable<? extends CommittedBundle<?>> outputs);
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/111e3932/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ModelEnforcementFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ModelEnforcementFactory.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ModelEnforcementFactory.java
new file mode 100644
index 0000000..66c01b3
--- /dev/null
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ModelEnforcementFactory.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright (C) 2016 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package com.google.cloud.dataflow.sdk.runners.inprocess;
+
+import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
+import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
+
+/**
+ * Creates {@link ModelEnforcement} instances for an {@link AppliedPTransform} on an input
+ * {@link CommittedBundle bundle}. {@link ModelEnforcement} instances are created before the
+ * {@link TransformEvaluator} is created.
+ */
+public interface ModelEnforcementFactory {
+  <T> ModelEnforcement<T> forBundle(CommittedBundle<T> input, AppliedPTransform<?, ?, ?> consumer);
+}


[2/2] incubator-beam git commit: This closes #84

Posted by ke...@apache.org.
This closes #84


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4f91c2ea
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4f91c2ea
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4f91c2ea

Branch: refs/heads/master
Commit: 4f91c2eaee97cec80e37c33cec04079bc8389653
Parents: f8f3745 111e393
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Mar 28 14:28:04 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Mar 28 14:28:04 2016 -0700

----------------------------------------------------------------------
 .../inprocess/AbstractModelEnforcement.java     | 36 --------
 .../EncodabilityEnforcementFactory.java         | 69 --------------
 .../ImmutabilityEnforcementFactory.java         | 94 --------------------
 .../sdk/runners/inprocess/ModelEnforcement.java | 61 -------------
 .../inprocess/ModelEnforcementFactory.java      | 28 ------
 .../inprocess/AbstractModelEnforcement.java     | 36 ++++++++
 .../EncodabilityEnforcementFactory.java         | 69 ++++++++++++++
 .../ImmutabilityEnforcementFactory.java         | 94 ++++++++++++++++++++
 .../sdk/runners/inprocess/ModelEnforcement.java | 61 +++++++++++++
 .../inprocess/ModelEnforcementFactory.java      | 28 ++++++
 10 files changed, 288 insertions(+), 288 deletions(-)
----------------------------------------------------------------------