You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/03/28 23:28:16 UTC
[1/2] incubator-beam git commit: Move Model Enforcements into the
proper module
Repository: incubator-beam
Updated Branches:
refs/heads/master f8f3745a8 -> 4f91c2eae
Move Model Enforcements into the proper module
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/111e3932
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/111e3932
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/111e3932
Branch: refs/heads/master
Commit: 111e393219adfd7a41f8545f66a80c638dc38165
Parents: f8f3745
Author: Thomas Groh <tg...@google.com>
Authored: Mon Mar 28 12:02:58 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Mon Mar 28 13:46:04 2016 -0700
----------------------------------------------------------------------
.../inprocess/AbstractModelEnforcement.java | 36 --------
.../EncodabilityEnforcementFactory.java | 69 --------------
.../ImmutabilityEnforcementFactory.java | 94 --------------------
.../sdk/runners/inprocess/ModelEnforcement.java | 61 -------------
.../inprocess/ModelEnforcementFactory.java | 28 ------
.../inprocess/AbstractModelEnforcement.java | 36 ++++++++
.../EncodabilityEnforcementFactory.java | 69 ++++++++++++++
.../ImmutabilityEnforcementFactory.java | 94 ++++++++++++++++++++
.../sdk/runners/inprocess/ModelEnforcement.java | 61 +++++++++++++
.../inprocess/ModelEnforcementFactory.java | 28 ++++++
10 files changed, 288 insertions(+), 288 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/111e3932/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/AbstractModelEnforcement.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/AbstractModelEnforcement.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/AbstractModelEnforcement.java
deleted file mode 100644
index 32b2a67..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/AbstractModelEnforcement.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Copyright (C) 2016 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.runners.inprocess;
-
-import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-import com.google.cloud.dataflow.sdk.util.WindowedValue;
-
-/**
- * An abstract {@link ModelEnforcement} that provides default empty implementations for each method.
- */
-abstract class AbstractModelEnforcement<T> implements ModelEnforcement<T> {
- @Override
- public void beforeElement(WindowedValue<T> element) {}
-
- @Override
- public void afterElement(WindowedValue<T> element) {}
-
- @Override
- public void afterFinish(
- CommittedBundle<T> input,
- InProcessTransformResult result,
- Iterable<? extends CommittedBundle<?>> outputs) {}
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/111e3932/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/EncodabilityEnforcementFactory.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/EncodabilityEnforcementFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/EncodabilityEnforcementFactory.java
deleted file mode 100644
index 0e38b55..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/EncodabilityEnforcementFactory.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Copyright (C) 2016 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.runners.inprocess;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
-import com.google.cloud.dataflow.sdk.util.CoderUtils;
-import com.google.cloud.dataflow.sdk.util.SerializableUtils;
-import com.google.cloud.dataflow.sdk.util.UserCodeException;
-import com.google.cloud.dataflow.sdk.util.WindowedValue;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-
-/**
- * Enforces that all elements in a {@link PCollection} can be encoded using that
- * {@link PCollection PCollection's} {@link Coder}.
- */
-class EncodabilityEnforcementFactory implements ModelEnforcementFactory {
- public static EncodabilityEnforcementFactory create() {
- return new EncodabilityEnforcementFactory();
- }
-
- @Override
- public <T> ModelEnforcement<T> forBundle(
- CommittedBundle<T> input, AppliedPTransform<?, ?, ?> consumer) {
- return new EncodabilityEnforcement<>(input);
- }
-
- private static class EncodabilityEnforcement<T> extends AbstractModelEnforcement<T> {
- private Coder<T> coder;
-
- public EncodabilityEnforcement(CommittedBundle<T> input) {
- coder = SerializableUtils.clone(input.getPCollection().getCoder());
- }
-
- @Override
- public void beforeElement(WindowedValue<T> element) {
- try {
- T clone = CoderUtils.clone(coder, element.getValue());
- if (coder.consistentWithEquals()) {
- checkArgument(
- coder.structuralValue(element.getValue()).equals(coder.structuralValue(clone)),
- "Coder %s of class %s does not maintain structural value equality"
- + " on input element %s",
- coder,
- coder.getClass().getSimpleName(),
- element.getValue());
- }
- } catch (Exception e) {
- throw UserCodeException.wrap(e);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/111e3932/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactory.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactory.java
deleted file mode 100644
index dfc56a9..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactory.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Copyright (C) 2016 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.runners.inprocess;
-
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.CoderException;
-import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
-import com.google.cloud.dataflow.sdk.util.IllegalMutationException;
-import com.google.cloud.dataflow.sdk.util.MutationDetector;
-import com.google.cloud.dataflow.sdk.util.MutationDetectors;
-import com.google.cloud.dataflow.sdk.util.SerializableUtils;
-import com.google.cloud.dataflow.sdk.util.UserCodeException;
-import com.google.cloud.dataflow.sdk.util.WindowedValue;
-
-import java.util.IdentityHashMap;
-import java.util.Map;
-
-/**
- * {@link ModelEnforcement} that enforces elements are not modified over the course of processing
- * an element.
- *
- * <p>Implies {@link EncodabilityEnforcment}.
- */
-class ImmutabilityEnforcementFactory implements ModelEnforcementFactory {
- public static ModelEnforcementFactory create() {
- return new ImmutabilityEnforcementFactory();
- }
-
- @Override
- public <T> ModelEnforcement<T> forBundle(
- CommittedBundle<T> input, AppliedPTransform<?, ?, ?> consumer) {
- return new ImmutabilityCheckingEnforcement<T>(input, consumer);
- }
-
- private static class ImmutabilityCheckingEnforcement<T> extends AbstractModelEnforcement<T> {
- private final AppliedPTransform<?, ?, ?> transform;
- private final Map<WindowedValue<T>, MutationDetector> mutationElements;
- private final Coder<T> coder;
-
- private ImmutabilityCheckingEnforcement(
- CommittedBundle<T> input, AppliedPTransform<?, ?, ?> transform) {
- this.transform = transform;
- coder = SerializableUtils.clone(input.getPCollection().getCoder());
- mutationElements = new IdentityHashMap<>();
- }
-
- @Override
- public void beforeElement(WindowedValue<T> element) {
- try {
- mutationElements.put(
- element, MutationDetectors.forValueWithCoder(element.getValue(), coder));
- } catch (CoderException e) {
- throw UserCodeException.wrap(e);
- }
- }
-
- @Override
- public void afterFinish(
- CommittedBundle<T> input,
- InProcessTransformResult result,
- Iterable<? extends CommittedBundle<?>> outputs) {
- for (MutationDetector detector : mutationElements.values()) {
- try {
- detector.verifyUnmodified();
- } catch (IllegalMutationException e) {
- throw UserCodeException.wrap(
- new IllegalMutationException(
- String.format(
- "PTransform %s illegaly mutated value %s of class %s."
- + " Input values must not be mutated in any way.",
- transform.getFullName(),
- e.getSavedValue(),
- e.getSavedValue().getClass()),
- e.getSavedValue(),
- e.getNewValue()));
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/111e3932/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ModelEnforcement.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ModelEnforcement.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ModelEnforcement.java
deleted file mode 100644
index 66bea37..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ModelEnforcement.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Copyright (C) 2016 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.runners.inprocess;
-
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
-import com.google.cloud.dataflow.sdk.util.WindowedValue;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-
-/**
- * Enforcement tools that verify that executing code conforms to the model.
- *
- * <p>ModelEnforcement is performed on a per-element and per-bundle basis. The
- * {@link ModelEnforcement} is provided with the input bundle as part of
- * {@link ModelEnforcementFactory#forBundle(CommittedBundle, AppliedPTransform)}, each element
- * before and after that element is provided to an underlying {@link TransformEvaluator}, and the
- * output {@link InProcessTransformResult} and committed output bundles after the
- * {@link TransformEvaluator} has completed.
- *
- * <p>Typically, {@link ModelEnforcement} will obtain required metadata (such as the {@link Coder}
- * of the input {@link PCollection} on construction, and then enforce per-element behavior
- * (such as the immutability of input elements). When the element is output or the bundle is
- * completed, the required conditions can be enforced across all elements.
- */
-public interface ModelEnforcement<T> {
- /**
- * Called before a call to {@link TransformEvaluator#processElement(WindowedValue)} on the
- * provided {@link WindowedValue}.
- */
- void beforeElement(WindowedValue<T> element);
-
- /**
- * Called after a call to {@link TransformEvaluator#processElement(WindowedValue)} on the
- * provided {@link WindowedValue}.
- */
- void afterElement(WindowedValue<T> element);
-
- /**
- * Called after a bundle has been completed and {@link TransformEvaluator#finishBundle()} has been
- * called, producing the provided {@link InProcessTransformResult} and
- * {@link CommittedBundle output bundles}.
- */
- void afterFinish(
- CommittedBundle<T> input,
- InProcessTransformResult result,
- Iterable<? extends CommittedBundle<?>> outputs);
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/111e3932/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ModelEnforcementFactory.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ModelEnforcementFactory.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ModelEnforcementFactory.java
deleted file mode 100644
index 66c01b3..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ModelEnforcementFactory.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Copyright (C) 2016 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.runners.inprocess;
-
-import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
-
-/**
- * Creates {@link ModelEnforcement} instances for an {@link AppliedPTransform} on an input
- * {@link CommittedBundle bundle}. {@link ModelEnforcement} instances are created before the
- * {@link TransformEvaluator} is created.
- */
-public interface ModelEnforcementFactory {
- <T> ModelEnforcement<T> forBundle(CommittedBundle<T> input, AppliedPTransform<?, ?, ?> consumer);
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/111e3932/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/AbstractModelEnforcement.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/AbstractModelEnforcement.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/AbstractModelEnforcement.java
new file mode 100644
index 0000000..32b2a67
--- /dev/null
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/AbstractModelEnforcement.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright (C) 2016 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package com.google.cloud.dataflow.sdk.runners.inprocess;
+
+import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
+import com.google.cloud.dataflow.sdk.util.WindowedValue;
+
+/**
+ * An abstract {@link ModelEnforcement} that provides default empty implementations for each method.
+ */
+abstract class AbstractModelEnforcement<T> implements ModelEnforcement<T> {
+ @Override
+ public void beforeElement(WindowedValue<T> element) {}
+
+ @Override
+ public void afterElement(WindowedValue<T> element) {}
+
+ @Override
+ public void afterFinish(
+ CommittedBundle<T> input,
+ InProcessTransformResult result,
+ Iterable<? extends CommittedBundle<?>> outputs) {}
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/111e3932/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/EncodabilityEnforcementFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/EncodabilityEnforcementFactory.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/EncodabilityEnforcementFactory.java
new file mode 100644
index 0000000..0e38b55
--- /dev/null
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/EncodabilityEnforcementFactory.java
@@ -0,0 +1,69 @@
+/*
+ * Copyright (C) 2016 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package com.google.cloud.dataflow.sdk.runners.inprocess;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.cloud.dataflow.sdk.coders.Coder;
+import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
+import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
+import com.google.cloud.dataflow.sdk.util.CoderUtils;
+import com.google.cloud.dataflow.sdk.util.SerializableUtils;
+import com.google.cloud.dataflow.sdk.util.UserCodeException;
+import com.google.cloud.dataflow.sdk.util.WindowedValue;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+
+/**
+ * Enforces that all elements in a {@link PCollection} can be encoded using that
+ * {@link PCollection PCollection's} {@link Coder}.
+ */
+class EncodabilityEnforcementFactory implements ModelEnforcementFactory {
+ public static EncodabilityEnforcementFactory create() {
+ return new EncodabilityEnforcementFactory();
+ }
+
+ @Override
+ public <T> ModelEnforcement<T> forBundle(
+ CommittedBundle<T> input, AppliedPTransform<?, ?, ?> consumer) {
+ return new EncodabilityEnforcement<>(input);
+ }
+
+ private static class EncodabilityEnforcement<T> extends AbstractModelEnforcement<T> {
+ private Coder<T> coder;
+
+ public EncodabilityEnforcement(CommittedBundle<T> input) {
+ coder = SerializableUtils.clone(input.getPCollection().getCoder());
+ }
+
+ @Override
+ public void beforeElement(WindowedValue<T> element) {
+ try {
+ T clone = CoderUtils.clone(coder, element.getValue());
+ if (coder.consistentWithEquals()) {
+ checkArgument(
+ coder.structuralValue(element.getValue()).equals(coder.structuralValue(clone)),
+ "Coder %s of class %s does not maintain structural value equality"
+ + " on input element %s",
+ coder,
+ coder.getClass().getSimpleName(),
+ element.getValue());
+ }
+ } catch (Exception e) {
+ throw UserCodeException.wrap(e);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/111e3932/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactory.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactory.java
new file mode 100644
index 0000000..dfc56a9
--- /dev/null
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityEnforcementFactory.java
@@ -0,0 +1,94 @@
+/*
+ * Copyright (C) 2016 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package com.google.cloud.dataflow.sdk.runners.inprocess;
+
+import com.google.cloud.dataflow.sdk.coders.Coder;
+import com.google.cloud.dataflow.sdk.coders.CoderException;
+import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
+import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
+import com.google.cloud.dataflow.sdk.util.IllegalMutationException;
+import com.google.cloud.dataflow.sdk.util.MutationDetector;
+import com.google.cloud.dataflow.sdk.util.MutationDetectors;
+import com.google.cloud.dataflow.sdk.util.SerializableUtils;
+import com.google.cloud.dataflow.sdk.util.UserCodeException;
+import com.google.cloud.dataflow.sdk.util.WindowedValue;
+
+import java.util.IdentityHashMap;
+import java.util.Map;
+
+/**
+ * {@link ModelEnforcement} that enforces elements are not modified over the course of processing
+ * an element.
+ *
+ * <p>Implies {@link EncodabilityEnforcment}.
+ */
+class ImmutabilityEnforcementFactory implements ModelEnforcementFactory {
+ public static ModelEnforcementFactory create() {
+ return new ImmutabilityEnforcementFactory();
+ }
+
+ @Override
+ public <T> ModelEnforcement<T> forBundle(
+ CommittedBundle<T> input, AppliedPTransform<?, ?, ?> consumer) {
+ return new ImmutabilityCheckingEnforcement<T>(input, consumer);
+ }
+
+ private static class ImmutabilityCheckingEnforcement<T> extends AbstractModelEnforcement<T> {
+ private final AppliedPTransform<?, ?, ?> transform;
+ private final Map<WindowedValue<T>, MutationDetector> mutationElements;
+ private final Coder<T> coder;
+
+ private ImmutabilityCheckingEnforcement(
+ CommittedBundle<T> input, AppliedPTransform<?, ?, ?> transform) {
+ this.transform = transform;
+ coder = SerializableUtils.clone(input.getPCollection().getCoder());
+ mutationElements = new IdentityHashMap<>();
+ }
+
+ @Override
+ public void beforeElement(WindowedValue<T> element) {
+ try {
+ mutationElements.put(
+ element, MutationDetectors.forValueWithCoder(element.getValue(), coder));
+ } catch (CoderException e) {
+ throw UserCodeException.wrap(e);
+ }
+ }
+
+ @Override
+ public void afterFinish(
+ CommittedBundle<T> input,
+ InProcessTransformResult result,
+ Iterable<? extends CommittedBundle<?>> outputs) {
+ for (MutationDetector detector : mutationElements.values()) {
+ try {
+ detector.verifyUnmodified();
+ } catch (IllegalMutationException e) {
+ throw UserCodeException.wrap(
+ new IllegalMutationException(
+ String.format(
+ "PTransform %s illegaly mutated value %s of class %s."
+ + " Input values must not be mutated in any way.",
+ transform.getFullName(),
+ e.getSavedValue(),
+ e.getSavedValue().getClass()),
+ e.getSavedValue(),
+ e.getNewValue()));
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/111e3932/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ModelEnforcement.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ModelEnforcement.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ModelEnforcement.java
new file mode 100644
index 0000000..66bea37
--- /dev/null
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ModelEnforcement.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright (C) 2016 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package com.google.cloud.dataflow.sdk.runners.inprocess;
+
+import com.google.cloud.dataflow.sdk.coders.Coder;
+import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
+import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
+import com.google.cloud.dataflow.sdk.util.WindowedValue;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+
+/**
+ * Enforcement tools that verify that executing code conforms to the model.
+ *
+ * <p>ModelEnforcement is performed on a per-element and per-bundle basis. The
+ * {@link ModelEnforcement} is provided with the input bundle as part of
+ * {@link ModelEnforcementFactory#forBundle(CommittedBundle, AppliedPTransform)}, each element
+ * before and after that element is provided to an underlying {@link TransformEvaluator}, and the
+ * output {@link InProcessTransformResult} and committed output bundles after the
+ * {@link TransformEvaluator} has completed.
+ *
+ * <p>Typically, {@link ModelEnforcement} will obtain required metadata (such as the {@link Coder}
+ * of the input {@link PCollection} on construction, and then enforce per-element behavior
+ * (such as the immutability of input elements). When the element is output or the bundle is
+ * completed, the required conditions can be enforced across all elements.
+ */
+public interface ModelEnforcement<T> {
+ /**
+ * Called before a call to {@link TransformEvaluator#processElement(WindowedValue)} on the
+ * provided {@link WindowedValue}.
+ */
+ void beforeElement(WindowedValue<T> element);
+
+ /**
+ * Called after a call to {@link TransformEvaluator#processElement(WindowedValue)} on the
+ * provided {@link WindowedValue}.
+ */
+ void afterElement(WindowedValue<T> element);
+
+ /**
+ * Called after a bundle has been completed and {@link TransformEvaluator#finishBundle()} has been
+ * called, producing the provided {@link InProcessTransformResult} and
+ * {@link CommittedBundle output bundles}.
+ */
+ void afterFinish(
+ CommittedBundle<T> input,
+ InProcessTransformResult result,
+ Iterable<? extends CommittedBundle<?>> outputs);
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/111e3932/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ModelEnforcementFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ModelEnforcementFactory.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ModelEnforcementFactory.java
new file mode 100644
index 0000000..66c01b3
--- /dev/null
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ModelEnforcementFactory.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright (C) 2016 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package com.google.cloud.dataflow.sdk.runners.inprocess;
+
+import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
+import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
+
+/**
+ * Creates {@link ModelEnforcement} instances for an {@link AppliedPTransform} on an input
+ * {@link CommittedBundle bundle}. {@link ModelEnforcement} instances are created before the
+ * {@link TransformEvaluator} is created.
+ */
+public interface ModelEnforcementFactory {
+ <T> ModelEnforcement<T> forBundle(CommittedBundle<T> input, AppliedPTransform<?, ?, ?> consumer);
+}
[2/2] incubator-beam git commit: This closes #84
Posted by ke...@apache.org.
This closes #84
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4f91c2ea
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4f91c2ea
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4f91c2ea
Branch: refs/heads/master
Commit: 4f91c2eaee97cec80e37c33cec04079bc8389653
Parents: f8f3745 111e393
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Mar 28 14:28:04 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Mar 28 14:28:04 2016 -0700
----------------------------------------------------------------------
.../inprocess/AbstractModelEnforcement.java | 36 --------
.../EncodabilityEnforcementFactory.java | 69 --------------
.../ImmutabilityEnforcementFactory.java | 94 --------------------
.../sdk/runners/inprocess/ModelEnforcement.java | 61 -------------
.../inprocess/ModelEnforcementFactory.java | 28 ------
.../inprocess/AbstractModelEnforcement.java | 36 ++++++++
.../EncodabilityEnforcementFactory.java | 69 ++++++++++++++
.../ImmutabilityEnforcementFactory.java | 94 ++++++++++++++++++++
.../sdk/runners/inprocess/ModelEnforcement.java | 61 +++++++++++++
.../inprocess/ModelEnforcementFactory.java | 28 ++++++
10 files changed, 288 insertions(+), 288 deletions(-)
----------------------------------------------------------------------