You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/04/07 22:41:26 UTC

[2/3] incubator-beam git commit: Apply ModelEnforcement in the InProcessPipelineRunner

Apply ModelEnforcement in the InProcessPipelineRunner

This ensures that user code does not violate the model.

Add a flag to control application of immutability enforcement. This flag
is enabled by default.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/740242c3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/740242c3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/740242c3

Branch: refs/heads/master
Commit: 740242c330ce99916ed76af7c6e68638d1e3c0e3
Parents: 363d4ec
Author: Thomas Groh <tg...@google.com>
Authored: Mon Apr 4 16:48:15 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Thu Apr 7 09:03:11 2016 -0700

----------------------------------------------------------------------
 .../inprocess/InProcessPipelineOptions.java     |  9 +++++++
 .../inprocess/InProcessPipelineRunner.java      | 25 +++++++++++++++++---
 2 files changed, 31 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/740242c3/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineOptions.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineOptions.java
index ae5b49b..d44ea78 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineOptions.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineOptions.java
@@ -89,4 +89,13 @@ public interface InProcessPipelineOptions extends PipelineOptions, ApplicationNa
   boolean isBlockOnRun();
 
   void setBlockOnRun(boolean b);
+
+  @Default.Boolean(true)
+  @Description(
+      "Controls whether the runner should ensure that all of the elements of every "
+          + "PCollection are not mutated. PTransforms are not permitted to mutate input elements "
+          + "at any point, or output elements after they are output.")
+  boolean isTestImmutability();
+
+  void setTestImmutability(boolean test);
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/740242c3/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java
index 4fb01b7..f5b7f3c 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java
@@ -33,6 +33,7 @@ import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
 import com.google.cloud.dataflow.sdk.transforms.Create;
 import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
 import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
 import com.google.cloud.dataflow.sdk.transforms.View.CreatePCollectionView;
 import com.google.cloud.dataflow.sdk.util.InstanceBuilder;
 import com.google.cloud.dataflow.sdk.util.MapAggregatorValues;
@@ -48,13 +49,13 @@ import com.google.cloud.dataflow.sdk.values.PInput;
 import com.google.cloud.dataflow.sdk.values.POutput;
 import com.google.cloud.dataflow.sdk.values.PValue;
 import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 
 import org.joda.time.Instant;
 
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
@@ -269,11 +270,29 @@ public class InProcessPipelineRunner
 
   private Map<Class<? extends PTransform>, Collection<ModelEnforcementFactory>>
       defaultModelEnforcements(InProcessPipelineOptions options) {
-    return Collections.emptyMap();
+    ImmutableMap.Builder<Class<? extends PTransform>, Collection<ModelEnforcementFactory>>
+        enforcements = ImmutableMap.builder();
+    Collection<ModelEnforcementFactory> parDoEnforcements = createParDoEnforcements(options);
+    enforcements.put(ParDo.Bound.class, parDoEnforcements);
+    enforcements.put(ParDo.BoundMulti.class, parDoEnforcements);
+    return enforcements.build();
+  }
+
+  private Collection<ModelEnforcementFactory> createParDoEnforcements(
+      InProcessPipelineOptions options) {
+    ImmutableList.Builder<ModelEnforcementFactory> enforcements = ImmutableList.builder();
+    if (options.isTestImmutability()) {
+      enforcements.add(ImmutabilityEnforcementFactory.create());
+    }
+    return enforcements.build();
   }
 
   private BundleFactory createBundleFactory(InProcessPipelineOptions pipelineOptions) {
-    return InProcessBundleFactory.create();
+    BundleFactory bundleFactory = InProcessBundleFactory.create();
+    if (pipelineOptions.isTestImmutability()) {
+      bundleFactory = ImmutabilityCheckingBundleFactory.create(bundleFactory);
+    }
+    return bundleFactory;
   }
 
   /**