You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/03/23 01:13:25 UTC
[1/3] beam git commit: Port the DirectRunner to the Batch Surgery API
Repository: beam
Updated Branches:
refs/heads/master d6f6351f1 -> 75b6567f6
Port the DirectRunner to the Batch Surgery API
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/fdc2eddb
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/fdc2eddb
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/fdc2eddb
Branch: refs/heads/master
Commit: fdc2eddb633ed0e0dde80948d4588757e7a552e6
Parents: 85af898
Author: Thomas Groh <tg...@google.com>
Authored: Fri Mar 17 16:39:58 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Wed Mar 22 18:11:54 2017 -0700
----------------------------------------------------------------------
.../beam/runners/direct/DirectRunner.java | 60 +++++++++++---------
1 file changed, 33 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/fdc2eddb/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index 94f0521..62df6c8 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -26,6 +26,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
@@ -44,9 +45,7 @@ import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.metrics.MetricResults;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.runners.PTransformMatcher;
import org.apache.beam.sdk.runners.PTransformOverride;
-import org.apache.beam.sdk.runners.PTransformOverrideFactory;
import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.transforms.Aggregator;
@@ -260,10 +259,7 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult> {
@Override
public DirectPipelineResult run(Pipeline pipeline) {
- for (Map.Entry<PTransformMatcher, PTransformOverrideFactory> override :
- defaultTransformOverrides().entrySet()) {
- pipeline.replace(PTransformOverride.of(override.getKey(), override.getValue()));
- }
+ pipeline.replaceAll(defaultTransformOverrides());
MetricsEnvironment.setMetricsSupported(true);
DirectGraphVisitor graphVisitor = new DirectGraphVisitor();
pipeline.traverseTopologically(graphVisitor);
@@ -321,27 +317,37 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult> {
* iteration order based on the order at which elements are added to it.
*/
@SuppressWarnings("rawtypes")
- private Map<PTransformMatcher, PTransformOverrideFactory> defaultTransformOverrides() {
- return ImmutableMap.<PTransformMatcher, PTransformOverrideFactory>builder()
- .put(
- PTransformMatchers.writeWithRunnerDeterminedSharding(),
- new WriteWithShardingFactory()) /* Uses a view internally. */
- .put(
- PTransformMatchers.classEqualTo(CreatePCollectionView.class),
- new ViewOverrideFactory()) /* Uses pardos and GBKs */
- .put(
- PTransformMatchers.classEqualTo(TestStream.class),
- new DirectTestStreamFactory(this)) /* primitive */
- // SplittableParDo is implemented in terms of GroupByKeys and Primitives
- .put(PTransformMatchers.splittableParDoMulti(), new ParDoMultiOverrideFactory())
- // state and timer ParDos are implemented in terms of GroupByKeys and Primitives
- .put(PTransformMatchers.stateOrTimerParDoMulti(), new ParDoMultiOverrideFactory())
- .put(
- PTransformMatchers.classEqualTo(GBKIntoKeyedWorkItems.class),
- new DirectGBKIntoKeyedWorkItemsOverrideFactory()) /* Returns a GBKO */
- .put(
- PTransformMatchers.classEqualTo(GroupByKey.class),
- new DirectGroupByKeyOverrideFactory()) /* returns two chained primitives. */
+ private List<PTransformOverride> defaultTransformOverrides() {
+ return ImmutableList.<PTransformOverride>builder()
+ .add(
+ PTransformOverride.of(
+ PTransformMatchers.writeWithRunnerDeterminedSharding(),
+ new WriteWithShardingFactory())) /* Uses a view internally. */
+ .add(
+ PTransformOverride.of(
+ PTransformMatchers.classEqualTo(CreatePCollectionView.class),
+ new ViewOverrideFactory())) /* Uses pardos and GBKs */
+ .add(
+ PTransformOverride.of(
+ PTransformMatchers.classEqualTo(TestStream.class),
+ new DirectTestStreamFactory(this))) /* primitive */
+ // SplittableParMultiDo is implemented in terms of nonsplittable simple ParDos and extra
+ // primitives
+ .add(
+ PTransformOverride.of(
+ PTransformMatchers.splittableParDoMulti(), new ParDoMultiOverrideFactory()))
+ // state and timer pardos are implemented in terms of simple ParDos and extra primitives
+ .add(
+ PTransformOverride.of(
+ PTransformMatchers.stateOrTimerParDoMulti(), new ParDoMultiOverrideFactory()))
+ .add(
+ PTransformOverride.of(
+ PTransformMatchers.classEqualTo(GBKIntoKeyedWorkItems.class),
+ new DirectGBKIntoKeyedWorkItemsOverrideFactory())) /* Returns a GBKO */
+ .add(
+ PTransformOverride.of(
+ PTransformMatchers.classEqualTo(GroupByKey.class),
+ new DirectGroupByKeyOverrideFactory())) /* returns two chained primitives. */
.build();
}
[2/3] beam git commit: Provide methods for validating Surgery
completion
Posted by jk...@apache.org.
Provide methods for validating Surgery completion
This takes a list, applies all overrides in the order of the list, and
validates that no more overrides are applicable.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/85af8981
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/85af8981
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/85af8981
Branch: refs/heads/master
Commit: 85af898132ec5c528a96d5c213c08cee91fa6538
Parents: d6f6351
Author: Thomas Groh <tg...@google.com>
Authored: Fri Mar 17 16:17:20 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Wed Mar 22 18:11:54 2017 -0700
----------------------------------------------------------------------
.../apache/beam/runners/apex/ApexRunner.java | 3 +-
.../beam/runners/direct/DirectRunner.java | 3 +-
.../flink/FlinkStreamingPipelineTranslator.java | 3 +-
.../beam/runners/dataflow/DataflowRunner.java | 3 +-
.../beam/runners/spark/TestSparkRunner.java | 6 +-
.../main/java/org/apache/beam/sdk/Pipeline.java | 66 ++++++++-
.../beam/sdk/runners/PTransformOverride.java | 44 ++++++
.../java/org/apache/beam/sdk/PipelineTest.java | 138 +++++++++++++++++++
8 files changed, 255 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/85af8981/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
index 010ede3..79a2dd7 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
@@ -49,6 +49,7 @@ import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsValidator;
import org.apache.beam.sdk.runners.PTransformMatcher;
+import org.apache.beam.sdk.runners.PTransformOverride;
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.transforms.Combine;
@@ -114,7 +115,7 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
public ApexRunnerResult run(final Pipeline pipeline) {
for (Map.Entry<PTransformMatcher, PTransformOverrideFactory> override :
getOverrides().entrySet()) {
- pipeline.replace(override.getKey(), override.getValue());
+ pipeline.replace(PTransformOverride.of(override.getKey(), override.getValue()));
}
final ApexPipelineTranslator translator = new ApexPipelineTranslator(options);
http://git-wip-us.apache.org/repos/asf/beam/blob/85af8981/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index 4992c6a..94f0521 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -45,6 +45,7 @@ import org.apache.beam.sdk.metrics.MetricResults;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.runners.PTransformMatcher;
+import org.apache.beam.sdk.runners.PTransformOverride;
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.testing.TestStream;
@@ -261,7 +262,7 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult> {
public DirectPipelineResult run(Pipeline pipeline) {
for (Map.Entry<PTransformMatcher, PTransformOverrideFactory> override :
defaultTransformOverrides().entrySet()) {
- pipeline.replace(override.getKey(), override.getValue());
+ pipeline.replace(PTransformOverride.of(override.getKey(), override.getValue()));
}
MetricsEnvironment.setMetricsSupported(true);
DirectGraphVisitor graphVisitor = new DirectGraphVisitor();
http://git-wip-us.apache.org/repos/asf/beam/blob/85af8981/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
index 9ab1310..d50d6bf 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
@@ -24,6 +24,7 @@ import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactor
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.runners.PTransformMatcher;
+import org.apache.beam.sdk.runners.PTransformOverride;
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
import org.apache.beam.sdk.runners.TransformHierarchy;
import org.apache.beam.sdk.transforms.Combine;
@@ -95,7 +96,7 @@ class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator {
for (Map.Entry<PTransformMatcher, PTransformOverrideFactory> override :
transformOverrides.entrySet()) {
- pipeline.replace(override.getKey(), override.getValue());
+ pipeline.replace(PTransformOverride.of(override.getKey(), override.getValue()));
}
super.translate(pipeline);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/85af8981/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 9398d72..718a1e3 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -93,6 +93,7 @@ import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsValidator;
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
import org.apache.beam.sdk.runners.PTransformMatcher;
+import org.apache.beam.sdk.runners.PTransformOverride;
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.runners.TransformHierarchy;
@@ -666,7 +667,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
@VisibleForTesting
void replaceTransforms(Pipeline pipeline) {
for (Map.Entry<PTransformMatcher, PTransformOverrideFactory> override : overrides.entrySet()) {
- pipeline.replace(override.getKey(), override.getValue());
+ pipeline.replace(PTransformOverride.of(override.getKey(), override.getValue()));
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/85af8981/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
index e436422..e40534f 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
@@ -42,6 +42,7 @@ import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.BoundedReadFromUnboundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsValidator;
+import org.apache.beam.sdk.runners.PTransformOverride;
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.testing.PAssert;
@@ -207,8 +208,9 @@ public final class TestSparkRunner extends PipelineRunner<SparkPipelineResult> {
@VisibleForTesting
void adaptBoundedReads(Pipeline pipeline) {
pipeline.replace(
- PTransformMatchers.classEqualTo(BoundedReadFromUnboundedSource.class),
- new AdaptedBoundedAsUnbounded.Factory());
+ PTransformOverride.of(
+ PTransformMatchers.classEqualTo(BoundedReadFromUnboundedSource.class),
+ new AdaptedBoundedAsUnbounded.Factory()));
}
private static class AdaptedBoundedAsUnbounded<T> extends PTransform<PBegin, PCollection<T>> {
http://git-wip-us.apache.org/repos/asf/beam/blob/85af8981/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
index 2f368b1..f980a0b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
@@ -19,9 +19,12 @@ package org.apache.beam.sdk;
import static com.google.common.base.Preconditions.checkState;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.SetMultimap;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.beam.sdk.coders.CoderRegistry;
@@ -29,7 +32,7 @@ import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptions.CheckEnabled;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.runners.PTransformMatcher;
+import org.apache.beam.sdk.runners.PTransformOverride;
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
import org.apache.beam.sdk.runners.PTransformOverrideFactory.ReplacementOutput;
import org.apache.beam.sdk.runners.PipelineRunner;
@@ -172,14 +175,67 @@ public class Pipeline {
return begin().apply(name, root);
}
+ /**
+ * Replaces all nodes that match a {@link PTransformOverride} in this pipeline. Overrides are
+ * applied in the order they are present within the list.
+ *
+ * <p>After all nodes are replaced, ensures that no nodes in the updated graph match any of the
+ * overrides.
+ */
+ public void replaceAll(List<PTransformOverride> overrides) {
+ for (PTransformOverride override : overrides) {
+ replace(override);
+ }
+ checkNoMoreMatches(overrides);
+ }
+
+ private void checkNoMoreMatches(final List<PTransformOverride> overrides) {
+ traverseTopologically(
+ new PipelineVisitor.Defaults() {
+ SetMultimap<Node, PTransformOverride> matched = HashMultimap.create();
+
+ @Override
+ public CompositeBehavior enterCompositeTransform(Node node) {
+ if (!node.isRootNode()) {
+ for (PTransformOverride override : overrides) {
+ if (override.getMatcher().matches(node.toAppliedPTransform())) {
+ matched.put(node, override);
+ }
+ }
+ }
+ if (!matched.containsKey(node)) {
+ return CompositeBehavior.ENTER_TRANSFORM;
+ }
+ return CompositeBehavior.DO_NOT_ENTER_TRANSFORM;
+ }
+
+ @Override
+ public void leaveCompositeTransform(Node node) {
+ if (node.isRootNode()) {
+ checkState(
+ matched.isEmpty(), "Found nodes that matched overrides. Matches: %s", matched);
+ }
+ }
+
+ @Override
+ public void visitPrimitiveTransform(Node node) {
+ for (PTransformOverride override : overrides) {
+ if (override.getMatcher().matches(node.toAppliedPTransform())) {
+ matched.put(node, override);
+ }
+ }
+ }
+ });
+ }
+
public void replace(
- final PTransformMatcher matcher, PTransformOverrideFactory replacementFactory) {
+ final PTransformOverride override) {
final Collection<Node> matches = new ArrayList<>();
transforms.visit(
new PipelineVisitor.Defaults() {
@Override
public CompositeBehavior enterCompositeTransform(Node node) {
- if (!node.isRootNode() && matcher.matches(node.toAppliedPTransform())) {
+ if (!node.isRootNode() && override.getMatcher().matches(node.toAppliedPTransform())) {
matches.add(node);
// This node will be replaced. It should not be visited.
return CompositeBehavior.DO_NOT_ENTER_TRANSFORM;
@@ -189,13 +245,13 @@ public class Pipeline {
@Override
public void visitPrimitiveTransform(Node node) {
- if (matcher.matches(node.toAppliedPTransform())) {
+ if (override.getMatcher().matches(node.toAppliedPTransform())) {
matches.add(node);
}
}
});
for (Node match : matches) {
- applyReplacement(match, replacementFactory);
+ applyReplacement(match, override.getOverrideFactory());
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/85af8981/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PTransformOverride.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PTransformOverride.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PTransformOverride.java
new file mode 100644
index 0000000..33b9114
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PTransformOverride.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.runners;
+
+import com.google.auto.value.AutoValue;
+import org.apache.beam.sdk.transforms.PTransform;
+
+/**
+ * A {@link PTransformMatcher} and associated {@link PTransformOverrideFactory} to replace all
+ * matching {@link PTransform PTransforms}.
+ */
+@AutoValue
+public abstract class PTransformOverride {
+ public static PTransformOverride of(
+ PTransformMatcher matcher, PTransformOverrideFactory factory) {
+ return new AutoValue_PTransformOverride(matcher, factory);
+ }
+
+ /**
+ * Gets the {@link PTransformMatcher} to identify {@link PTransform PTransforms} to replace.
+ */
+ public abstract PTransformMatcher getMatcher();
+
+ /**
+ * Gets the {@link PTransformOverrideFactory} of this override.
+ */
+ public abstract PTransformOverrideFactory getOverrideFactory();
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/85af8981/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java
index d8e4ef4..7e5cc35 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java
@@ -17,37 +17,56 @@
*/
package org.apache.beam.sdk;
+import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.isA;
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
+import org.apache.beam.sdk.Pipeline.PipelineVisitor;
+import org.apache.beam.sdk.io.CountingInput;
+import org.apache.beam.sdk.io.CountingInput.BoundedCountingInput;
+import org.apache.beam.sdk.io.CountingInput.UnboundedCountingInput;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptions.CheckEnabled;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.runners.PTransformMatcher;
+import org.apache.beam.sdk.runners.PTransformOverride;
+import org.apache.beam.sdk.runners.PTransformOverrideFactory;
import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.runners.TransformHierarchy.Node;
import org.apache.beam.sdk.testing.CrashingRunner;
import org.apache.beam.sdk.testing.ExpectedLogs;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.RunnableOnService;
import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.util.UserCodeException;
+import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TaggedPValue;
import org.apache.beam.sdk.values.TupleTag;
+import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
@@ -290,4 +309,123 @@ public class PipelineTest {
public void testEmptyPipeline() throws Exception {
pipeline.run();
}
+
+ @Test
+ public void testReplaceAll() {
+ pipeline.enableAbandonedNodeEnforcement(false);
+ pipeline.apply(CountingInput.unbounded());
+ pipeline.apply(CountingInput.upTo(100L));
+
+ pipeline.replaceAll(
+ ImmutableList.of(
+ PTransformOverride.of(
+ new PTransformMatcher() {
+ @Override
+ public boolean matches(AppliedPTransform<?, ?, ?> application) {
+ return application.getTransform() instanceof UnboundedCountingInput;
+ }
+ },
+ new UnboundedCountingInputOverride()),
+ PTransformOverride.of(
+ new PTransformMatcher() {
+ @Override
+ public boolean matches(AppliedPTransform<?, ?, ?> application) {
+ return application.getTransform() instanceof BoundedCountingInput;
+ }
+ },
+ new BoundedCountingInputOverride())));
+ pipeline.traverseTopologically(
+ new PipelineVisitor.Defaults() {
+ @Override
+ public CompositeBehavior enterCompositeTransform(Node node) {
+ if (!node.isRootNode()) {
+ assertThat(
+ node.getTransform().getClass(),
+ not(
+ anyOf(
+ Matchers.<Class<? extends PTransform>>equalTo(
+ UnboundedCountingInput.class),
+ Matchers.<Class<? extends PTransform>>equalTo(
+ BoundedCountingInput.class))));
+ }
+ return CompositeBehavior.ENTER_TRANSFORM;
+ }
+ });
+ }
+
+ /**
+ * Tests that {@link Pipeline#replaceAll(List)} throws when one of the PTransformOverride still
+ * matches.
+ */
+ @Test
+ public void testReplaceAllIncomplete() {
+ pipeline.enableAbandonedNodeEnforcement(false);
+ pipeline.apply(CountingInput.unbounded());
+
+ // The order is such that the output of the second will match the first, which is not permitted
+ thrown.expect(IllegalStateException.class);
+ pipeline.replaceAll(
+ ImmutableList.of(
+ PTransformOverride.of(
+ new PTransformMatcher() {
+ @Override
+ public boolean matches(AppliedPTransform<?, ?, ?> application) {
+ return application.getTransform() instanceof BoundedCountingInput;
+ }
+ },
+ new BoundedCountingInputOverride()),
+ PTransformOverride.of(
+ new PTransformMatcher() {
+ @Override
+ public boolean matches(AppliedPTransform<?, ?, ?> application) {
+ return application.getTransform() instanceof UnboundedCountingInput;
+ }
+ },
+ new UnboundedCountingInputOverride())));
+ }
+
+ static class BoundedCountingInputOverride
+ implements PTransformOverrideFactory<PBegin, PCollection<Long>, BoundedCountingInput> {
+ @Override
+ public PTransform<PBegin, PCollection<Long>> getReplacementTransform(
+ BoundedCountingInput transform) {
+ return Create.of(0L);
+ }
+
+ @Override
+ public PBegin getInput(List<TaggedPValue> inputs, Pipeline p) {
+ return p.begin();
+ }
+
+ @Override
+ public Map<PValue, ReplacementOutput> mapOutputs(
+ List<TaggedPValue> outputs, PCollection<Long> newOutput) {
+ return Collections.<PValue, ReplacementOutput>singletonMap(
+ newOutput,
+ ReplacementOutput.of(
+ Iterables.getOnlyElement(outputs), Iterables.getOnlyElement(newOutput.expand())));
+ }
+ }
+ static class UnboundedCountingInputOverride
+ implements PTransformOverrideFactory<PBegin, PCollection<Long>, UnboundedCountingInput> {
+ @Override
+ public PTransform<PBegin, PCollection<Long>> getReplacementTransform(
+ UnboundedCountingInput transform) {
+ return CountingInput.upTo(100L);
+ }
+
+ @Override
+ public PBegin getInput(List<TaggedPValue> inputs, Pipeline p) {
+ return p.begin();
+ }
+
+ @Override
+ public Map<PValue, ReplacementOutput> mapOutputs(
+ List<TaggedPValue> outputs, PCollection<Long> newOutput) {
+ return Collections.<PValue, ReplacementOutput>singletonMap(
+ newOutput,
+ ReplacementOutput.of(
+ Iterables.getOnlyElement(outputs), Iterables.getOnlyElement(newOutput.expand())));
+ }
+ }
}
[3/3] beam git commit: This closes #2274
Posted by jk...@apache.org.
This closes #2274
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/75b6567f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/75b6567f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/75b6567f
Branch: refs/heads/master
Commit: 75b6567f6f883af3707b0092dfbe3585b7d8b7bf
Parents: d6f6351 fdc2edd
Author: Eugene Kirpichov <ki...@google.com>
Authored: Wed Mar 22 18:12:37 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Wed Mar 22 18:12:37 2017 -0700
----------------------------------------------------------------------
.../apache/beam/runners/apex/ApexRunner.java | 3 +-
.../beam/runners/direct/DirectRunner.java | 61 ++++----
.../flink/FlinkStreamingPipelineTranslator.java | 3 +-
.../beam/runners/dataflow/DataflowRunner.java | 3 +-
.../beam/runners/spark/TestSparkRunner.java | 6 +-
.../main/java/org/apache/beam/sdk/Pipeline.java | 66 ++++++++-
.../beam/sdk/runners/PTransformOverride.java | 44 ++++++
.../java/org/apache/beam/sdk/PipelineTest.java | 138 +++++++++++++++++++
8 files changed, 287 insertions(+), 37 deletions(-)
----------------------------------------------------------------------