You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/04/05 22:27:17 UTC

[2/2] beam git commit: Remove single-matcher replacement API

Remove single-matcher replacement API

This makes it more difficult for runner authors to use the discouraged
API that doesn't validate ordering of replacements.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/753bc9cc
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/753bc9cc
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/753bc9cc

Branch: refs/heads/master
Commit: 753bc9cc2bcfd130e1523eabf97d3de791c4cfd2
Parents: 9fb4fc3
Author: Thomas Groh <tg...@google.com>
Authored: Tue Apr 4 10:20:49 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Wed Apr 5 15:26:56 2017 -0700

----------------------------------------------------------------------
 .../org/apache/beam/runners/spark/TestSparkRunner.java    | 10 ++++++----
 .../core/src/main/java/org/apache/beam/sdk/Pipeline.java  |  2 +-
 2 files changed, 7 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/753bc9cc/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
index be9ff2e..988a82b 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
@@ -27,6 +27,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.Uninterruptibles;
 import java.io.File;
 import java.io.IOException;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
@@ -217,10 +218,11 @@ public final class TestSparkRunner extends PipelineRunner<SparkPipelineResult> {
 
   @VisibleForTesting
   void adaptBoundedReads(Pipeline pipeline) {
-    pipeline.replace(
-        PTransformOverride.of(
-            PTransformMatchers.classEqualTo(BoundedReadFromUnboundedSource.class),
-            new AdaptedBoundedAsUnbounded.Factory()));
+    pipeline.replaceAll(
+        Collections.singletonList(
+            PTransformOverride.of(
+                PTransformMatchers.classEqualTo(BoundedReadFromUnboundedSource.class),
+                new AdaptedBoundedAsUnbounded.Factory())));
   }
 
   private static class AdaptedBoundedAsUnbounded<T> extends PTransform<PBegin, PCollection<T>> {

http://git-wip-us.apache.org/repos/asf/beam/blob/753bc9cc/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
index fa8277f..11d781d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
@@ -228,7 +228,7 @@ public class Pipeline {
         });
   }
 
-  public void replace(final PTransformOverride override) {
+  private void replace(final PTransformOverride override) {
     final Collection<Node> matches = new ArrayList<>();
     transforms.visit(
         new PipelineVisitor.Defaults() {