You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/04/07 22:41:25 UTC

[1/3] incubator-beam git commit: Improve ImmutabilityEnforcement

Repository: incubator-beam
Updated Branches:
  refs/heads/master 1451a0ec9 -> 2b3216bbe


Improve ImmutabilityEnforcement

Check per-element, to catch failures within a call to ProcessElement
more quickly.

Move wrapping of exceptions over the course of calls to ProcessElement
to ParDoInProcessEvaluator.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/150eac59
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/150eac59
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/150eac59

Branch: refs/heads/master
Commit: 150eac594a265a700771e07f07a54b802c5c4776
Parents: 740242c
Author: Thomas Groh <tg...@google.com>
Authored: Tue Apr 5 13:19:24 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Thu Apr 7 09:03:11 2016 -0700

----------------------------------------------------------------------
 .../ImmutabilityEnforcementFactory.java         | 36 ++++++++++++--------
 .../inprocess/ParDoInProcessEvaluator.java      | 13 +++++--
 .../ImmutabilityEnforcementFactoryTest.java     |  9 ++---
 .../inprocess/TransformExecutorTest.java        |  6 ++--
 4 files changed, 38 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/150eac59/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactory.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactory.java
index 2e4c07b..8b7ccba 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactory.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactory.java
@@ -71,25 +71,33 @@ class ImmutabilityEnforcementFactory implements ModelEnforcementFactory {
     }
 
     @Override
+    public void afterElement(WindowedValue<T> element) {
+      verifyUnmodified(mutationElements.get(element));
+    }
+
+    @Override
     public void afterFinish(
         CommittedBundle<T> input,
         InProcessTransformResult result,
         Iterable<? extends CommittedBundle<?>> outputs) {
       for (MutationDetector detector : mutationElements.values()) {
-        try {
-          detector.verifyUnmodified();
-        } catch (IllegalMutationException e) {
-          throw UserCodeException.wrap(
-              new IllegalMutationException(
-                  String.format(
-                      "PTransform %s illegaly mutated value %s of class %s."
-                          + " Input values must not be mutated in any way.",
-                      transform.getFullName(),
-                      e.getSavedValue(),
-                      e.getSavedValue().getClass()),
-                  e.getSavedValue(),
-                  e.getNewValue()));
-        }
+        verifyUnmodified(detector);
+      }
+    }
+
+    private void verifyUnmodified(MutationDetector detector) {
+      try {
+        detector.verifyUnmodified();
+      } catch (IllegalMutationException e) {
+        throw new IllegalMutationException(
+            String.format(
+                "PTransform %s illegaly mutated value %s of class %s."
+                    + " Input values must not be mutated in any way.",
+                transform.getFullName(),
+                e.getSavedValue(),
+                e.getSavedValue().getClass()),
+            e.getSavedValue(),
+            e.getNewValue());
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/150eac59/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoInProcessEvaluator.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoInProcessEvaluator.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoInProcessEvaluator.java
index 3942bff..4b4d699 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoInProcessEvaluator.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoInProcessEvaluator.java
@@ -22,6 +22,7 @@ import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.U
 import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
 import com.google.cloud.dataflow.sdk.util.DoFnRunner;
 import com.google.cloud.dataflow.sdk.util.DoFnRunners.OutputManager;
+import com.google.cloud.dataflow.sdk.util.UserCodeException;
 import com.google.cloud.dataflow.sdk.util.WindowedValue;
 import com.google.cloud.dataflow.sdk.util.common.CounterSet;
 import com.google.cloud.dataflow.sdk.util.state.CopyOnAccessInMemoryStateInternals;
@@ -56,12 +57,20 @@ class ParDoInProcessEvaluator<T> implements TransformEvaluator<T> {
 
   @Override
   public void processElement(WindowedValue<T> element) {
-    fnRunner.processElement(element);
+    try {
+      fnRunner.processElement(element);
+    } catch (Exception e) {
+      throw UserCodeException.wrap(e);
+    }
   }
 
   @Override
   public InProcessTransformResult finishBundle() {
-    fnRunner.finishBundle();
+    try {
+      fnRunner.finishBundle();
+    } catch (Exception e) {
+      throw UserCodeException.wrap(e);
+    }
     StepTransformResult.Builder resultBuilder;
     CopyOnAccessInMemoryStateInternals<?> state = stepContext.commitState();
     if (state != null) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/150eac59/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactoryTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactoryTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactoryTest.java
index e65b178..ec779c0 100644
--- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactoryTest.java
+++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactoryTest.java
@@ -17,8 +17,6 @@
  */
 package com.google.cloud.dataflow.sdk.runners.inprocess;
 
-import static org.hamcrest.Matchers.isA;
-
 import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
 import com.google.cloud.dataflow.sdk.testing.TestPipeline;
 import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
@@ -27,7 +25,6 @@ import com.google.cloud.dataflow.sdk.transforms.Create;
 import com.google.cloud.dataflow.sdk.transforms.DoFn;
 import com.google.cloud.dataflow.sdk.transforms.ParDo;
 import com.google.cloud.dataflow.sdk.util.IllegalMutationException;
-import com.google.cloud.dataflow.sdk.util.UserCodeException;
 import com.google.cloud.dataflow.sdk.util.WindowedValue;
 import com.google.cloud.dataflow.sdk.values.PCollection;
 
@@ -94,8 +91,7 @@ public class ImmutabilityEnforcementFactoryTest implements Serializable {
     ModelEnforcement<byte[]> enforcement = factory.forBundle(elements, consumer);
     enforcement.beforeElement(element);
     element.getValue()[0] = 'f';
-    thrown.expect(UserCodeException.class);
-    thrown.expectCause(isA(IllegalMutationException.class));
+    thrown.expect(IllegalMutationException.class);
     thrown.expectMessage(consumer.getFullName());
     thrown.expectMessage("illegaly mutated");
     thrown.expectMessage("Input values must not be mutated");
@@ -118,8 +114,7 @@ public class ImmutabilityEnforcementFactoryTest implements Serializable {
     enforcement.afterElement(element);
 
     element.getValue()[0] = 'f';
-    thrown.expect(UserCodeException.class);
-    thrown.expectCause(isA(IllegalMutationException.class));
+    thrown.expect(IllegalMutationException.class);
     thrown.expectMessage(consumer.getFullName());
     thrown.expectMessage("illegaly mutated");
     thrown.expectMessage("Input values must not be mutated");

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/150eac59/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutorTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutorTest.java
index b029dd3..7e87515 100644
--- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutorTest.java
+++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutorTest.java
@@ -34,7 +34,7 @@ import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
 import com.google.cloud.dataflow.sdk.transforms.Create;
 import com.google.cloud.dataflow.sdk.transforms.PTransform;
 import com.google.cloud.dataflow.sdk.transforms.WithKeys;
-import com.google.cloud.dataflow.sdk.util.UserCodeException;
+import com.google.cloud.dataflow.sdk.util.IllegalMutationException;
 import com.google.cloud.dataflow.sdk.util.WindowedValue;
 import com.google.cloud.dataflow.sdk.values.KV;
 import com.google.cloud.dataflow.sdk.values.PCollection;
@@ -413,7 +413,7 @@ public class TransformExecutorTest {
     fooBytes.getValue()[0] = 'b';
     evaluatorLatch.countDown();
 
-    thrown.expectCause(isA(UserCodeException.class));
+    thrown.expectCause(isA(IllegalMutationException.class));
     task.get();
   }
 
@@ -472,7 +472,7 @@ public class TransformExecutorTest {
     fooBytes.getValue()[0] = 'b';
     evaluatorLatch.countDown();
 
-    thrown.expectCause(isA(UserCodeException.class));
+    thrown.expectCause(isA(IllegalMutationException.class));
     task.get();
   }
 


[2/3] incubator-beam git commit: Apply ModelEnforcement in the InProcessPipelineRunner

Posted by ke...@apache.org.
Apply ModelEnforcement in the InProcessPipelineRunner

This ensures that user code does not violate the model.

Add a flag to control application of immutability enforcement. This flag
is enabled by default.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/740242c3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/740242c3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/740242c3

Branch: refs/heads/master
Commit: 740242c330ce99916ed76af7c6e68638d1e3c0e3
Parents: 363d4ec
Author: Thomas Groh <tg...@google.com>
Authored: Mon Apr 4 16:48:15 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Thu Apr 7 09:03:11 2016 -0700

----------------------------------------------------------------------
 .../inprocess/InProcessPipelineOptions.java     |  9 +++++++
 .../inprocess/InProcessPipelineRunner.java      | 25 +++++++++++++++++---
 2 files changed, 31 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/740242c3/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineOptions.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineOptions.java
index ae5b49b..d44ea78 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineOptions.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineOptions.java
@@ -89,4 +89,13 @@ public interface InProcessPipelineOptions extends PipelineOptions, ApplicationNa
   boolean isBlockOnRun();
 
   void setBlockOnRun(boolean b);
+
+  @Default.Boolean(true)
+  @Description(
+      "Controls whether the runner should ensure that all of the elements of every "
+          + "PCollection are not mutated. PTransforms are not permitted to mutate input elements "
+          + "at any point, or output elements after they are output.")
+  boolean isTestImmutability();
+
+  void setTestImmutability(boolean test);
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/740242c3/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java
index 4fb01b7..f5b7f3c 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java
@@ -33,6 +33,7 @@ import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
 import com.google.cloud.dataflow.sdk.transforms.Create;
 import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
 import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
 import com.google.cloud.dataflow.sdk.transforms.View.CreatePCollectionView;
 import com.google.cloud.dataflow.sdk.util.InstanceBuilder;
 import com.google.cloud.dataflow.sdk.util.MapAggregatorValues;
@@ -48,13 +49,13 @@ import com.google.cloud.dataflow.sdk.values.PInput;
 import com.google.cloud.dataflow.sdk.values.POutput;
 import com.google.cloud.dataflow.sdk.values.PValue;
 import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 
 import org.joda.time.Instant;
 
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
@@ -269,11 +270,29 @@ public class InProcessPipelineRunner
 
   private Map<Class<? extends PTransform>, Collection<ModelEnforcementFactory>>
       defaultModelEnforcements(InProcessPipelineOptions options) {
-    return Collections.emptyMap();
+    ImmutableMap.Builder<Class<? extends PTransform>, Collection<ModelEnforcementFactory>>
+        enforcements = ImmutableMap.builder();
+    Collection<ModelEnforcementFactory> parDoEnforcements = createParDoEnforcements(options);
+    enforcements.put(ParDo.Bound.class, parDoEnforcements);
+    enforcements.put(ParDo.BoundMulti.class, parDoEnforcements);
+    return enforcements.build();
+  }
+
+  private Collection<ModelEnforcementFactory> createParDoEnforcements(
+      InProcessPipelineOptions options) {
+    ImmutableList.Builder<ModelEnforcementFactory> enforcements = ImmutableList.builder();
+    if (options.isTestImmutability()) {
+      enforcements.add(ImmutabilityEnforcementFactory.create());
+    }
+    return enforcements.build();
   }
 
   private BundleFactory createBundleFactory(InProcessPipelineOptions pipelineOptions) {
-    return InProcessBundleFactory.create();
+    BundleFactory bundleFactory = InProcessBundleFactory.create();
+    if (pipelineOptions.isTestImmutability()) {
+      bundleFactory = ImmutabilityCheckingBundleFactory.create(bundleFactory);
+    }
+    return bundleFactory;
   }
 
   /**


[3/3] incubator-beam git commit: This closes #129

Posted by ke...@apache.org.
This closes #129


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2b3216bb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2b3216bb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2b3216bb

Branch: refs/heads/master
Commit: 2b3216bbe5c39ca33ee39bc8f8786efc2db140f9
Parents: 1451a0e 150eac5
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Apr 7 13:40:55 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Apr 7 13:40:55 2016 -0700

----------------------------------------------------------------------
 .../ImmutabilityEnforcementFactory.java         | 36 ++++++++++++--------
 .../inprocess/InProcessPipelineOptions.java     |  9 +++++
 .../inprocess/InProcessPipelineRunner.java      | 25 ++++++++++++--
 .../inprocess/ParDoInProcessEvaluator.java      | 13 +++++--
 .../ImmutabilityEnforcementFactoryTest.java     |  9 ++---
 .../inprocess/TransformExecutorTest.java        |  6 ++--
 6 files changed, 69 insertions(+), 29 deletions(-)
----------------------------------------------------------------------