You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/04/07 22:41:25 UTC
[1/3] incubator-beam git commit: Improve ImmutabilityEnforcement
Repository: incubator-beam
Updated Branches:
refs/heads/master 1451a0ec9 -> 2b3216bbe
Improve ImmutabilityEnforcement
Check per-element, to catch failures within a call to ProcessElement
more quickly.
Move wrapping of exceptions over the course of calls to ProcessElement
to ParDoInProcessEvaluator.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/150eac59
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/150eac59
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/150eac59
Branch: refs/heads/master
Commit: 150eac594a265a700771e07f07a54b802c5c4776
Parents: 740242c
Author: Thomas Groh <tg...@google.com>
Authored: Tue Apr 5 13:19:24 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Thu Apr 7 09:03:11 2016 -0700
----------------------------------------------------------------------
.../ImmutabilityEnforcementFactory.java | 36 ++++++++++++--------
.../inprocess/ParDoInProcessEvaluator.java | 13 +++++--
.../ImmutabilityEnforcementFactoryTest.java | 9 ++---
.../inprocess/TransformExecutorTest.java | 6 ++--
4 files changed, 38 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/150eac59/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactory.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactory.java
index 2e4c07b..8b7ccba 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactory.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactory.java
@@ -71,25 +71,33 @@ class ImmutabilityEnforcementFactory implements ModelEnforcementFactory {
}
@Override
+ public void afterElement(WindowedValue<T> element) {
+ verifyUnmodified(mutationElements.get(element));
+ }
+
+ @Override
public void afterFinish(
CommittedBundle<T> input,
InProcessTransformResult result,
Iterable<? extends CommittedBundle<?>> outputs) {
for (MutationDetector detector : mutationElements.values()) {
- try {
- detector.verifyUnmodified();
- } catch (IllegalMutationException e) {
- throw UserCodeException.wrap(
- new IllegalMutationException(
- String.format(
- "PTransform %s illegaly mutated value %s of class %s."
- + " Input values must not be mutated in any way.",
- transform.getFullName(),
- e.getSavedValue(),
- e.getSavedValue().getClass()),
- e.getSavedValue(),
- e.getNewValue()));
- }
+ verifyUnmodified(detector);
+ }
+ }
+
+ private void verifyUnmodified(MutationDetector detector) {
+ try {
+ detector.verifyUnmodified();
+ } catch (IllegalMutationException e) {
+ throw new IllegalMutationException(
+ String.format(
+ "PTransform %s illegaly mutated value %s of class %s."
+ + " Input values must not be mutated in any way.",
+ transform.getFullName(),
+ e.getSavedValue(),
+ e.getSavedValue().getClass()),
+ e.getSavedValue(),
+ e.getNewValue());
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/150eac59/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoInProcessEvaluator.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoInProcessEvaluator.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoInProcessEvaluator.java
index 3942bff..4b4d699 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoInProcessEvaluator.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoInProcessEvaluator.java
@@ -22,6 +22,7 @@ import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.U
import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
import com.google.cloud.dataflow.sdk.util.DoFnRunner;
import com.google.cloud.dataflow.sdk.util.DoFnRunners.OutputManager;
+import com.google.cloud.dataflow.sdk.util.UserCodeException;
import com.google.cloud.dataflow.sdk.util.WindowedValue;
import com.google.cloud.dataflow.sdk.util.common.CounterSet;
import com.google.cloud.dataflow.sdk.util.state.CopyOnAccessInMemoryStateInternals;
@@ -56,12 +57,20 @@ class ParDoInProcessEvaluator<T> implements TransformEvaluator<T> {
@Override
public void processElement(WindowedValue<T> element) {
- fnRunner.processElement(element);
+ try {
+ fnRunner.processElement(element);
+ } catch (Exception e) {
+ throw UserCodeException.wrap(e);
+ }
}
@Override
public InProcessTransformResult finishBundle() {
- fnRunner.finishBundle();
+ try {
+ fnRunner.finishBundle();
+ } catch (Exception e) {
+ throw UserCodeException.wrap(e);
+ }
StepTransformResult.Builder resultBuilder;
CopyOnAccessInMemoryStateInternals<?> state = stepContext.commitState();
if (state != null) {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/150eac59/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactoryTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactoryTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactoryTest.java
index e65b178..ec779c0 100644
--- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactoryTest.java
+++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactoryTest.java
@@ -17,8 +17,6 @@
*/
package com.google.cloud.dataflow.sdk.runners.inprocess;
-import static org.hamcrest.Matchers.isA;
-
import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
import com.google.cloud.dataflow.sdk.testing.TestPipeline;
import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
@@ -27,7 +25,6 @@ import com.google.cloud.dataflow.sdk.transforms.Create;
import com.google.cloud.dataflow.sdk.transforms.DoFn;
import com.google.cloud.dataflow.sdk.transforms.ParDo;
import com.google.cloud.dataflow.sdk.util.IllegalMutationException;
-import com.google.cloud.dataflow.sdk.util.UserCodeException;
import com.google.cloud.dataflow.sdk.util.WindowedValue;
import com.google.cloud.dataflow.sdk.values.PCollection;
@@ -94,8 +91,7 @@ public class ImmutabilityEnforcementFactoryTest implements Serializable {
ModelEnforcement<byte[]> enforcement = factory.forBundle(elements, consumer);
enforcement.beforeElement(element);
element.getValue()[0] = 'f';
- thrown.expect(UserCodeException.class);
- thrown.expectCause(isA(IllegalMutationException.class));
+ thrown.expect(IllegalMutationException.class);
thrown.expectMessage(consumer.getFullName());
thrown.expectMessage("illegaly mutated");
thrown.expectMessage("Input values must not be mutated");
@@ -118,8 +114,7 @@ public class ImmutabilityEnforcementFactoryTest implements Serializable {
enforcement.afterElement(element);
element.getValue()[0] = 'f';
- thrown.expect(UserCodeException.class);
- thrown.expectCause(isA(IllegalMutationException.class));
+ thrown.expect(IllegalMutationException.class);
thrown.expectMessage(consumer.getFullName());
thrown.expectMessage("illegaly mutated");
thrown.expectMessage("Input values must not be mutated");
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/150eac59/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutorTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutorTest.java
index b029dd3..7e87515 100644
--- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutorTest.java
+++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutorTest.java
@@ -34,7 +34,7 @@ import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
import com.google.cloud.dataflow.sdk.transforms.Create;
import com.google.cloud.dataflow.sdk.transforms.PTransform;
import com.google.cloud.dataflow.sdk.transforms.WithKeys;
-import com.google.cloud.dataflow.sdk.util.UserCodeException;
+import com.google.cloud.dataflow.sdk.util.IllegalMutationException;
import com.google.cloud.dataflow.sdk.util.WindowedValue;
import com.google.cloud.dataflow.sdk.values.KV;
import com.google.cloud.dataflow.sdk.values.PCollection;
@@ -413,7 +413,7 @@ public class TransformExecutorTest {
fooBytes.getValue()[0] = 'b';
evaluatorLatch.countDown();
- thrown.expectCause(isA(UserCodeException.class));
+ thrown.expectCause(isA(IllegalMutationException.class));
task.get();
}
@@ -472,7 +472,7 @@ public class TransformExecutorTest {
fooBytes.getValue()[0] = 'b';
evaluatorLatch.countDown();
- thrown.expectCause(isA(UserCodeException.class));
+ thrown.expectCause(isA(IllegalMutationException.class));
task.get();
}
[2/3] incubator-beam git commit: Apply ModelEnforcement in the
InProcessPipelineRunner
Posted by ke...@apache.org.
Apply ModelEnforcement in the InProcessPipelineRunner
This ensures that user code does not violate the model.
Add a flag to control application of immutability enforcement. This flag
is enabled by default.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/740242c3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/740242c3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/740242c3
Branch: refs/heads/master
Commit: 740242c330ce99916ed76af7c6e68638d1e3c0e3
Parents: 363d4ec
Author: Thomas Groh <tg...@google.com>
Authored: Mon Apr 4 16:48:15 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Thu Apr 7 09:03:11 2016 -0700
----------------------------------------------------------------------
.../inprocess/InProcessPipelineOptions.java | 9 +++++++
.../inprocess/InProcessPipelineRunner.java | 25 +++++++++++++++++---
2 files changed, 31 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/740242c3/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineOptions.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineOptions.java
index ae5b49b..d44ea78 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineOptions.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineOptions.java
@@ -89,4 +89,13 @@ public interface InProcessPipelineOptions extends PipelineOptions, ApplicationNa
boolean isBlockOnRun();
void setBlockOnRun(boolean b);
+
+ @Default.Boolean(true)
+ @Description(
+ "Controls whether the runner should ensure that all of the elements of every "
+ + "PCollection are not mutated. PTransforms are not permitted to mutate input elements "
+ + "at any point, or output elements after they are output.")
+ boolean isTestImmutability();
+
+ void setTestImmutability(boolean test);
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/740242c3/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java
index 4fb01b7..f5b7f3c 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java
@@ -33,6 +33,7 @@ import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
import com.google.cloud.dataflow.sdk.transforms.Create;
import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
import com.google.cloud.dataflow.sdk.transforms.View.CreatePCollectionView;
import com.google.cloud.dataflow.sdk.util.InstanceBuilder;
import com.google.cloud.dataflow.sdk.util.MapAggregatorValues;
@@ -48,13 +49,13 @@ import com.google.cloud.dataflow.sdk.values.PInput;
import com.google.cloud.dataflow.sdk.values.POutput;
import com.google.cloud.dataflow.sdk.values.PValue;
import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.joda.time.Instant;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
@@ -269,11 +270,29 @@ public class InProcessPipelineRunner
private Map<Class<? extends PTransform>, Collection<ModelEnforcementFactory>>
defaultModelEnforcements(InProcessPipelineOptions options) {
- return Collections.emptyMap();
+ ImmutableMap.Builder<Class<? extends PTransform>, Collection<ModelEnforcementFactory>>
+ enforcements = ImmutableMap.builder();
+ Collection<ModelEnforcementFactory> parDoEnforcements = createParDoEnforcements(options);
+ enforcements.put(ParDo.Bound.class, parDoEnforcements);
+ enforcements.put(ParDo.BoundMulti.class, parDoEnforcements);
+ return enforcements.build();
+ }
+
+ private Collection<ModelEnforcementFactory> createParDoEnforcements(
+ InProcessPipelineOptions options) {
+ ImmutableList.Builder<ModelEnforcementFactory> enforcements = ImmutableList.builder();
+ if (options.isTestImmutability()) {
+ enforcements.add(ImmutabilityEnforcementFactory.create());
+ }
+ return enforcements.build();
}
private BundleFactory createBundleFactory(InProcessPipelineOptions pipelineOptions) {
- return InProcessBundleFactory.create();
+ BundleFactory bundleFactory = InProcessBundleFactory.create();
+ if (pipelineOptions.isTestImmutability()) {
+ bundleFactory = ImmutabilityCheckingBundleFactory.create(bundleFactory);
+ }
+ return bundleFactory;
}
/**
[3/3] incubator-beam git commit: This closes #129
Posted by ke...@apache.org.
This closes #129
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2b3216bb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2b3216bb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2b3216bb
Branch: refs/heads/master
Commit: 2b3216bbe5c39ca33ee39bc8f8786efc2db140f9
Parents: 1451a0e 150eac5
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Apr 7 13:40:55 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Apr 7 13:40:55 2016 -0700
----------------------------------------------------------------------
.../ImmutabilityEnforcementFactory.java | 36 ++++++++++++--------
.../inprocess/InProcessPipelineOptions.java | 9 +++++
.../inprocess/InProcessPipelineRunner.java | 25 ++++++++++++--
.../inprocess/ParDoInProcessEvaluator.java | 13 +++++--
.../ImmutabilityEnforcementFactoryTest.java | 9 ++---
.../inprocess/TransformExecutorTest.java | 6 ++--
6 files changed, 69 insertions(+), 29 deletions(-)
----------------------------------------------------------------------