You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/04/07 22:41:26 UTC
[2/3] incubator-beam git commit: Apply ModelEnforcement in the
InProcessPipelineRunner
Apply ModelEnforcement in the InProcessPipelineRunner
This ensures that user code does not violate the model.
Add a flag to control application of immutability enforcement. This flag
is enabled by default.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/740242c3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/740242c3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/740242c3
Branch: refs/heads/master
Commit: 740242c330ce99916ed76af7c6e68638d1e3c0e3
Parents: 363d4ec
Author: Thomas Groh <tg...@google.com>
Authored: Mon Apr 4 16:48:15 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Thu Apr 7 09:03:11 2016 -0700
----------------------------------------------------------------------
.../inprocess/InProcessPipelineOptions.java | 9 +++++++
.../inprocess/InProcessPipelineRunner.java | 25 +++++++++++++++++---
2 files changed, 31 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/740242c3/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineOptions.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineOptions.java
index ae5b49b..d44ea78 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineOptions.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineOptions.java
@@ -89,4 +89,13 @@ public interface InProcessPipelineOptions extends PipelineOptions, ApplicationNa
boolean isBlockOnRun();
void setBlockOnRun(boolean b);
+
+ @Default.Boolean(true)
+ @Description(
+ "Controls whether the runner should ensure that all of the elements of every "
+ + "PCollection are not mutated. PTransforms are not permitted to mutate input elements "
+ + "at any point, or output elements after they are output.")
+ boolean isTestImmutability();
+
+ void setTestImmutability(boolean test);
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/740242c3/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java
index 4fb01b7..f5b7f3c 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java
@@ -33,6 +33,7 @@ import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
import com.google.cloud.dataflow.sdk.transforms.Create;
import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
import com.google.cloud.dataflow.sdk.transforms.View.CreatePCollectionView;
import com.google.cloud.dataflow.sdk.util.InstanceBuilder;
import com.google.cloud.dataflow.sdk.util.MapAggregatorValues;
@@ -48,13 +49,13 @@ import com.google.cloud.dataflow.sdk.values.PInput;
import com.google.cloud.dataflow.sdk.values.POutput;
import com.google.cloud.dataflow.sdk.values.PValue;
import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.joda.time.Instant;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
@@ -269,11 +270,29 @@ public class InProcessPipelineRunner
private Map<Class<? extends PTransform>, Collection<ModelEnforcementFactory>>
defaultModelEnforcements(InProcessPipelineOptions options) {
- return Collections.emptyMap();
+ ImmutableMap.Builder<Class<? extends PTransform>, Collection<ModelEnforcementFactory>>
+ enforcements = ImmutableMap.builder();
+ Collection<ModelEnforcementFactory> parDoEnforcements = createParDoEnforcements(options);
+ enforcements.put(ParDo.Bound.class, parDoEnforcements);
+ enforcements.put(ParDo.BoundMulti.class, parDoEnforcements);
+ return enforcements.build();
+ }
+
+ private Collection<ModelEnforcementFactory> createParDoEnforcements(
+ InProcessPipelineOptions options) {
+ ImmutableList.Builder<ModelEnforcementFactory> enforcements = ImmutableList.builder();
+ if (options.isTestImmutability()) {
+ enforcements.add(ImmutabilityEnforcementFactory.create());
+ }
+ return enforcements.build();
}
private BundleFactory createBundleFactory(InProcessPipelineOptions pipelineOptions) {
- return InProcessBundleFactory.create();
+ BundleFactory bundleFactory = InProcessBundleFactory.create();
+ if (pipelineOptions.isTestImmutability()) {
+ bundleFactory = ImmutabilityCheckingBundleFactory.create(bundleFactory);
+ }
+ return bundleFactory;
}
/**