You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/03/23 01:13:25 UTC

[1/3] beam git commit: Port the DirectRunner to the Batch Surgery API

Repository: beam
Updated Branches:
  refs/heads/master d6f6351f1 -> 75b6567f6


Port the DirectRunner to the Batch Surgery API


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/fdc2eddb
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/fdc2eddb
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/fdc2eddb

Branch: refs/heads/master
Commit: fdc2eddb633ed0e0dde80948d4588757e7a552e6
Parents: 85af898
Author: Thomas Groh <tg...@google.com>
Authored: Fri Mar 17 16:39:58 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Wed Mar 22 18:11:54 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/direct/DirectRunner.java       | 60 +++++++++++---------
 1 file changed, 33 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/fdc2eddb/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index 94f0521..62df6c8 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -26,6 +26,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import javax.annotation.Nullable;
@@ -44,9 +45,7 @@ import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.metrics.MetricResults;
 import org.apache.beam.sdk.metrics.MetricsEnvironment;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.runners.PTransformMatcher;
 import org.apache.beam.sdk.runners.PTransformOverride;
-import org.apache.beam.sdk.runners.PTransformOverrideFactory;
 import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.testing.TestStream;
 import org.apache.beam.sdk.transforms.Aggregator;
@@ -260,10 +259,7 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult> {
 
   @Override
   public DirectPipelineResult run(Pipeline pipeline) {
-    for (Map.Entry<PTransformMatcher, PTransformOverrideFactory> override :
-        defaultTransformOverrides().entrySet()) {
-      pipeline.replace(PTransformOverride.of(override.getKey(), override.getValue()));
-    }
+    pipeline.replaceAll(defaultTransformOverrides());
     MetricsEnvironment.setMetricsSupported(true);
     DirectGraphVisitor graphVisitor = new DirectGraphVisitor();
     pipeline.traverseTopologically(graphVisitor);
@@ -321,27 +317,37 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult> {
    * iteration order based on the order at which elements are added to it.
    */
   @SuppressWarnings("rawtypes")
-  private Map<PTransformMatcher, PTransformOverrideFactory> defaultTransformOverrides() {
-    return ImmutableMap.<PTransformMatcher, PTransformOverrideFactory>builder()
-        .put(
-            PTransformMatchers.writeWithRunnerDeterminedSharding(),
-            new WriteWithShardingFactory()) /* Uses a view internally. */
-        .put(
-            PTransformMatchers.classEqualTo(CreatePCollectionView.class),
-            new ViewOverrideFactory()) /* Uses pardos and GBKs */
-        .put(
-            PTransformMatchers.classEqualTo(TestStream.class),
-            new DirectTestStreamFactory(this)) /* primitive */
-        // SplittableParDo is implemented in terms of GroupByKeys and Primitives
-        .put(PTransformMatchers.splittableParDoMulti(), new ParDoMultiOverrideFactory())
-        // state and timer ParDos are implemented in terms of GroupByKeys and Primitives
-        .put(PTransformMatchers.stateOrTimerParDoMulti(), new ParDoMultiOverrideFactory())
-        .put(
-            PTransformMatchers.classEqualTo(GBKIntoKeyedWorkItems.class),
-            new DirectGBKIntoKeyedWorkItemsOverrideFactory()) /* Returns a GBKO */
-        .put(
-            PTransformMatchers.classEqualTo(GroupByKey.class),
-            new DirectGroupByKeyOverrideFactory()) /* returns two chained primitives. */
+  private List<PTransformOverride> defaultTransformOverrides() {
+    return ImmutableList.<PTransformOverride>builder()
+        .add(
+            PTransformOverride.of(
+                PTransformMatchers.writeWithRunnerDeterminedSharding(),
+                new WriteWithShardingFactory())) /* Uses a view internally. */
+        .add(
+            PTransformOverride.of(
+                PTransformMatchers.classEqualTo(CreatePCollectionView.class),
+                new ViewOverrideFactory())) /* Uses pardos and GBKs */
+        .add(
+            PTransformOverride.of(
+                PTransformMatchers.classEqualTo(TestStream.class),
+                new DirectTestStreamFactory(this))) /* primitive */
+        // SplittableParMultiDo is implemented in terms of nonsplittable simple ParDos and extra
+        // primitives
+        .add(
+            PTransformOverride.of(
+                PTransformMatchers.splittableParDoMulti(), new ParDoMultiOverrideFactory()))
+        // state and timer pardos are implemented in terms of simple ParDos and extra primitives
+        .add(
+            PTransformOverride.of(
+                PTransformMatchers.stateOrTimerParDoMulti(), new ParDoMultiOverrideFactory()))
+        .add(
+            PTransformOverride.of(
+                PTransformMatchers.classEqualTo(GBKIntoKeyedWorkItems.class),
+                new DirectGBKIntoKeyedWorkItemsOverrideFactory())) /* Returns a GBKO */
+        .add(
+            PTransformOverride.of(
+                PTransformMatchers.classEqualTo(GroupByKey.class),
+                new DirectGroupByKeyOverrideFactory())) /* returns two chained primitives. */
         .build();
   }
 


[2/3] beam git commit: Provide methods for validating Surgery completion

Posted by jk...@apache.org.
Provide methods for validating Surgery completion

This takes a list, applies all overrides in the order of the list, and
validates that no more overrides are applicable.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/85af8981
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/85af8981
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/85af8981

Branch: refs/heads/master
Commit: 85af898132ec5c528a96d5c213c08cee91fa6538
Parents: d6f6351
Author: Thomas Groh <tg...@google.com>
Authored: Fri Mar 17 16:17:20 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Wed Mar 22 18:11:54 2017 -0700

----------------------------------------------------------------------
 .../apache/beam/runners/apex/ApexRunner.java    |   3 +-
 .../beam/runners/direct/DirectRunner.java       |   3 +-
 .../flink/FlinkStreamingPipelineTranslator.java |   3 +-
 .../beam/runners/dataflow/DataflowRunner.java   |   3 +-
 .../beam/runners/spark/TestSparkRunner.java     |   6 +-
 .../main/java/org/apache/beam/sdk/Pipeline.java |  66 ++++++++-
 .../beam/sdk/runners/PTransformOverride.java    |  44 ++++++
 .../java/org/apache/beam/sdk/PipelineTest.java  | 138 +++++++++++++++++++
 8 files changed, 255 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/85af8981/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
index 010ede3..79a2dd7 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
@@ -49,6 +49,7 @@ import org.apache.beam.sdk.coders.ListCoder;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsValidator;
 import org.apache.beam.sdk.runners.PTransformMatcher;
+import org.apache.beam.sdk.runners.PTransformOverride;
 import org.apache.beam.sdk.runners.PTransformOverrideFactory;
 import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.transforms.Combine;
@@ -114,7 +115,7 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
   public ApexRunnerResult run(final Pipeline pipeline) {
     for (Map.Entry<PTransformMatcher, PTransformOverrideFactory> override :
         getOverrides().entrySet()) {
-      pipeline.replace(override.getKey(), override.getValue());
+      pipeline.replace(PTransformOverride.of(override.getKey(), override.getValue()));
     }
 
     final ApexPipelineTranslator translator = new ApexPipelineTranslator(options);

http://git-wip-us.apache.org/repos/asf/beam/blob/85af8981/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index 4992c6a..94f0521 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -45,6 +45,7 @@ import org.apache.beam.sdk.metrics.MetricResults;
 import org.apache.beam.sdk.metrics.MetricsEnvironment;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.runners.PTransformMatcher;
+import org.apache.beam.sdk.runners.PTransformOverride;
 import org.apache.beam.sdk.runners.PTransformOverrideFactory;
 import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.testing.TestStream;
@@ -261,7 +262,7 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult> {
   public DirectPipelineResult run(Pipeline pipeline) {
     for (Map.Entry<PTransformMatcher, PTransformOverrideFactory> override :
         defaultTransformOverrides().entrySet()) {
-      pipeline.replace(override.getKey(), override.getValue());
+      pipeline.replace(PTransformOverride.of(override.getKey(), override.getValue()));
     }
     MetricsEnvironment.setMetricsSupported(true);
     DirectGraphVisitor graphVisitor = new DirectGraphVisitor();

http://git-wip-us.apache.org/repos/asf/beam/blob/85af8981/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
index 9ab1310..d50d6bf 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
@@ -24,6 +24,7 @@ import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactor
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.runners.PTransformMatcher;
+import org.apache.beam.sdk.runners.PTransformOverride;
 import org.apache.beam.sdk.runners.PTransformOverrideFactory;
 import org.apache.beam.sdk.runners.TransformHierarchy;
 import org.apache.beam.sdk.transforms.Combine;
@@ -95,7 +96,7 @@ class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator {
 
     for (Map.Entry<PTransformMatcher, PTransformOverrideFactory> override :
         transformOverrides.entrySet()) {
-      pipeline.replace(override.getKey(), override.getValue());
+      pipeline.replace(PTransformOverride.of(override.getKey(), override.getValue()));
     }
     super.translate(pipeline);
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/85af8981/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 9398d72..718a1e3 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -93,6 +93,7 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsValidator;
 import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
 import org.apache.beam.sdk.runners.PTransformMatcher;
+import org.apache.beam.sdk.runners.PTransformOverride;
 import org.apache.beam.sdk.runners.PTransformOverrideFactory;
 import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.runners.TransformHierarchy;
@@ -666,7 +667,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
   @VisibleForTesting
   void replaceTransforms(Pipeline pipeline) {
     for (Map.Entry<PTransformMatcher, PTransformOverrideFactory> override : overrides.entrySet()) {
-      pipeline.replace(override.getKey(), override.getValue());
+      pipeline.replace(PTransformOverride.of(override.getKey(), override.getValue()));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/85af8981/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
index e436422..e40534f 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
@@ -42,6 +42,7 @@ import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.io.BoundedReadFromUnboundedSource;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsValidator;
+import org.apache.beam.sdk.runners.PTransformOverride;
 import org.apache.beam.sdk.runners.PTransformOverrideFactory;
 import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.testing.PAssert;
@@ -207,8 +208,9 @@ public final class TestSparkRunner extends PipelineRunner<SparkPipelineResult> {
   @VisibleForTesting
   void adaptBoundedReads(Pipeline pipeline) {
     pipeline.replace(
-        PTransformMatchers.classEqualTo(BoundedReadFromUnboundedSource.class),
-        new AdaptedBoundedAsUnbounded.Factory());
+        PTransformOverride.of(
+            PTransformMatchers.classEqualTo(BoundedReadFromUnboundedSource.class),
+            new AdaptedBoundedAsUnbounded.Factory()));
   }
 
   private static class AdaptedBoundedAsUnbounded<T> extends PTransform<PBegin, PCollection<T>> {

http://git-wip-us.apache.org/repos/asf/beam/blob/85af8981/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
index 2f368b1..f980a0b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
@@ -19,9 +19,12 @@ package org.apache.beam.sdk;
 
 import static com.google.common.base.Preconditions.checkState;
 
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.SetMultimap;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import org.apache.beam.sdk.coders.CoderRegistry;
@@ -29,7 +32,7 @@ import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptions.CheckEnabled;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.runners.PTransformMatcher;
+import org.apache.beam.sdk.runners.PTransformOverride;
 import org.apache.beam.sdk.runners.PTransformOverrideFactory;
 import org.apache.beam.sdk.runners.PTransformOverrideFactory.ReplacementOutput;
 import org.apache.beam.sdk.runners.PipelineRunner;
@@ -172,14 +175,67 @@ public class Pipeline {
     return begin().apply(name, root);
   }
 
+  /**
+   * Replaces all nodes that match a {@link PTransformOverride} in this pipeline. Overrides are
+   * applied in the order they are present within the list.
+   *
+   * <p>After all nodes are replaced, ensures that no nodes in the updated graph match any of the
+   * overrides.
+   */
+  public void replaceAll(List<PTransformOverride> overrides) {
+    for (PTransformOverride override : overrides) {
+      replace(override);
+    }
+    checkNoMoreMatches(overrides);
+  }
+
+  private void checkNoMoreMatches(final List<PTransformOverride> overrides) {
+    traverseTopologically(
+        new PipelineVisitor.Defaults() {
+          SetMultimap<Node, PTransformOverride> matched = HashMultimap.create();
+
+          @Override
+          public CompositeBehavior enterCompositeTransform(Node node) {
+            if (!node.isRootNode()) {
+              for (PTransformOverride override : overrides) {
+                if (override.getMatcher().matches(node.toAppliedPTransform())) {
+                  matched.put(node, override);
+                }
+              }
+            }
+            if (!matched.containsKey(node)) {
+              return CompositeBehavior.ENTER_TRANSFORM;
+            }
+            return CompositeBehavior.DO_NOT_ENTER_TRANSFORM;
+          }
+
+          @Override
+          public void leaveCompositeTransform(Node node) {
+            if (node.isRootNode()) {
+              checkState(
+                  matched.isEmpty(), "Found nodes that matched overrides. Matches: %s", matched);
+            }
+          }
+
+          @Override
+          public void visitPrimitiveTransform(Node node) {
+            for (PTransformOverride override : overrides) {
+              if (override.getMatcher().matches(node.toAppliedPTransform())) {
+                matched.put(node, override);
+              }
+            }
+          }
+        });
+  }
+
   public void replace(
-      final PTransformMatcher matcher, PTransformOverrideFactory replacementFactory) {
+      final PTransformOverride override) {
     final Collection<Node> matches = new ArrayList<>();
     transforms.visit(
         new PipelineVisitor.Defaults() {
           @Override
           public CompositeBehavior enterCompositeTransform(Node node) {
-            if (!node.isRootNode() && matcher.matches(node.toAppliedPTransform())) {
+            if (!node.isRootNode() && override.getMatcher().matches(node.toAppliedPTransform())) {
               matches.add(node);
               // This node will be replaced. It should not be visited.
               return CompositeBehavior.DO_NOT_ENTER_TRANSFORM;
@@ -189,13 +245,13 @@ public class Pipeline {
 
           @Override
           public void visitPrimitiveTransform(Node node) {
-            if (matcher.matches(node.toAppliedPTransform())) {
+            if (override.getMatcher().matches(node.toAppliedPTransform())) {
               matches.add(node);
             }
           }
         });
     for (Node match : matches) {
-      applyReplacement(match, replacementFactory);
+      applyReplacement(match, override.getOverrideFactory());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/85af8981/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PTransformOverride.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PTransformOverride.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PTransformOverride.java
new file mode 100644
index 0000000..33b9114
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PTransformOverride.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.runners;
+
+import com.google.auto.value.AutoValue;
+import org.apache.beam.sdk.transforms.PTransform;
+
+/**
+ * A {@link PTransformMatcher} and associated {@link PTransformOverrideFactory} to replace all
+ * matching {@link PTransform PTransforms}.
+ */
+@AutoValue
+public abstract class PTransformOverride {
+  public static PTransformOverride of(
+      PTransformMatcher matcher, PTransformOverrideFactory factory) {
+    return new AutoValue_PTransformOverride(matcher, factory);
+  }
+
+  /**
+   * Gets the {@link PTransformMatcher} to identify {@link PTransform PTransforms} to replace.
+   */
+  public abstract PTransformMatcher getMatcher();
+
+  /**
+   * Gets the {@link PTransformOverrideFactory} of this override.
+   */
+  public abstract PTransformOverrideFactory getOverrideFactory();
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/85af8981/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java
index d8e4ef4..7e5cc35 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java
@@ -17,37 +17,56 @@
  */
 package org.apache.beam.sdk;
 
+import static org.hamcrest.Matchers.anyOf;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.isA;
 import static org.hamcrest.Matchers.not;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
 import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
+import org.apache.beam.sdk.Pipeline.PipelineVisitor;
+import org.apache.beam.sdk.io.CountingInput;
+import org.apache.beam.sdk.io.CountingInput.BoundedCountingInput;
+import org.apache.beam.sdk.io.CountingInput.UnboundedCountingInput;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptions.CheckEnabled;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.runners.PTransformMatcher;
+import org.apache.beam.sdk.runners.PTransformOverride;
+import org.apache.beam.sdk.runners.PTransformOverrideFactory;
 import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.runners.TransformHierarchy.Node;
 import org.apache.beam.sdk.testing.CrashingRunner;
 import org.apache.beam.sdk.testing.ExpectedLogs;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.SimpleFunction;
 import org.apache.beam.sdk.util.UserCodeException;
+import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionList;
 import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TaggedPValue;
 import org.apache.beam.sdk.values.TupleTag;
+import org.hamcrest.Matchers;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
@@ -290,4 +309,123 @@ public class PipelineTest {
   public void testEmptyPipeline() throws Exception {
     pipeline.run();
   }
+
+  @Test
+  public void testReplaceAll() {
+    pipeline.enableAbandonedNodeEnforcement(false);
+    pipeline.apply(CountingInput.unbounded());
+    pipeline.apply(CountingInput.upTo(100L));
+
+    pipeline.replaceAll(
+        ImmutableList.of(
+            PTransformOverride.of(
+                new PTransformMatcher() {
+                  @Override
+                  public boolean matches(AppliedPTransform<?, ?, ?> application) {
+                    return application.getTransform() instanceof UnboundedCountingInput;
+                  }
+                },
+                new UnboundedCountingInputOverride()),
+            PTransformOverride.of(
+                new PTransformMatcher() {
+                  @Override
+                  public boolean matches(AppliedPTransform<?, ?, ?> application) {
+                    return application.getTransform() instanceof BoundedCountingInput;
+                  }
+                },
+                new BoundedCountingInputOverride())));
+    pipeline.traverseTopologically(
+        new PipelineVisitor.Defaults() {
+          @Override
+          public CompositeBehavior enterCompositeTransform(Node node) {
+            if (!node.isRootNode()) {
+              assertThat(
+                  node.getTransform().getClass(),
+                  not(
+                      anyOf(
+                          Matchers.<Class<? extends PTransform>>equalTo(
+                              UnboundedCountingInput.class),
+                          Matchers.<Class<? extends PTransform>>equalTo(
+                              BoundedCountingInput.class))));
+            }
+            return CompositeBehavior.ENTER_TRANSFORM;
+          }
+        });
+  }
+
+  /**
+   * Tests that {@link Pipeline#replaceAll(List)} throws when one of the PTransformOverride still
+   * matches.
+   */
+  @Test
+  public void testReplaceAllIncomplete() {
+    pipeline.enableAbandonedNodeEnforcement(false);
+    pipeline.apply(CountingInput.unbounded());
+
+    // The order is such that the output of the second will match the first, which is not permitted
+    thrown.expect(IllegalStateException.class);
+    pipeline.replaceAll(
+        ImmutableList.of(
+            PTransformOverride.of(
+                new PTransformMatcher() {
+                  @Override
+                  public boolean matches(AppliedPTransform<?, ?, ?> application) {
+                    return application.getTransform() instanceof BoundedCountingInput;
+                  }
+                },
+                new BoundedCountingInputOverride()),
+            PTransformOverride.of(
+                new PTransformMatcher() {
+                  @Override
+                  public boolean matches(AppliedPTransform<?, ?, ?> application) {
+                    return application.getTransform() instanceof UnboundedCountingInput;
+                  }
+                },
+                new UnboundedCountingInputOverride())));
+  }
+
+  static class BoundedCountingInputOverride
+      implements PTransformOverrideFactory<PBegin, PCollection<Long>, BoundedCountingInput> {
+    @Override
+    public PTransform<PBegin, PCollection<Long>> getReplacementTransform(
+        BoundedCountingInput transform) {
+      return Create.of(0L);
+    }
+
+    @Override
+    public PBegin getInput(List<TaggedPValue> inputs, Pipeline p) {
+      return p.begin();
+    }
+
+    @Override
+    public Map<PValue, ReplacementOutput> mapOutputs(
+        List<TaggedPValue> outputs, PCollection<Long> newOutput) {
+      return Collections.<PValue, ReplacementOutput>singletonMap(
+          newOutput,
+          ReplacementOutput.of(
+              Iterables.getOnlyElement(outputs), Iterables.getOnlyElement(newOutput.expand())));
+    }
+  }
+  static class UnboundedCountingInputOverride
+      implements PTransformOverrideFactory<PBegin, PCollection<Long>, UnboundedCountingInput> {
+    @Override
+    public PTransform<PBegin, PCollection<Long>> getReplacementTransform(
+        UnboundedCountingInput transform) {
+      return CountingInput.upTo(100L);
+    }
+
+    @Override
+    public PBegin getInput(List<TaggedPValue> inputs, Pipeline p) {
+      return p.begin();
+    }
+
+    @Override
+    public Map<PValue, ReplacementOutput> mapOutputs(
+        List<TaggedPValue> outputs, PCollection<Long> newOutput) {
+      return Collections.<PValue, ReplacementOutput>singletonMap(
+          newOutput,
+          ReplacementOutput.of(
+              Iterables.getOnlyElement(outputs), Iterables.getOnlyElement(newOutput.expand())));
+    }
+  }
 }


[3/3] beam git commit: This closes #2274

Posted by jk...@apache.org.
This closes #2274


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/75b6567f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/75b6567f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/75b6567f

Branch: refs/heads/master
Commit: 75b6567f6f883af3707b0092dfbe3585b7d8b7bf
Parents: d6f6351 fdc2edd
Author: Eugene Kirpichov <ki...@google.com>
Authored: Wed Mar 22 18:12:37 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Wed Mar 22 18:12:37 2017 -0700

----------------------------------------------------------------------
 .../apache/beam/runners/apex/ApexRunner.java    |   3 +-
 .../beam/runners/direct/DirectRunner.java       |  61 ++++----
 .../flink/FlinkStreamingPipelineTranslator.java |   3 +-
 .../beam/runners/dataflow/DataflowRunner.java   |   3 +-
 .../beam/runners/spark/TestSparkRunner.java     |   6 +-
 .../main/java/org/apache/beam/sdk/Pipeline.java |  66 ++++++++-
 .../beam/sdk/runners/PTransformOverride.java    |  44 ++++++
 .../java/org/apache/beam/sdk/PipelineTest.java  | 138 +++++++++++++++++++
 8 files changed, 287 insertions(+), 37 deletions(-)
----------------------------------------------------------------------