You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/04/05 22:27:17 UTC
[2/2] beam git commit: Remove single-matcher replacement API
Remove single-matcher replacement API
This makes it more difficult for runner authors to use the discouraged
API that doesn't validate ordering of replacements.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/753bc9cc
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/753bc9cc
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/753bc9cc
Branch: refs/heads/master
Commit: 753bc9cc2bcfd130e1523eabf97d3de791c4cfd2
Parents: 9fb4fc3
Author: Thomas Groh <tg...@google.com>
Authored: Tue Apr 4 10:20:49 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Wed Apr 5 15:26:56 2017 -0700
----------------------------------------------------------------------
.../org/apache/beam/runners/spark/TestSparkRunner.java | 10 ++++++----
.../core/src/main/java/org/apache/beam/sdk/Pipeline.java | 2 +-
2 files changed, 7 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/753bc9cc/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
index be9ff2e..988a82b 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
@@ -27,6 +27,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.Uninterruptibles;
import java.io.File;
import java.io.IOException;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@@ -217,10 +218,11 @@ public final class TestSparkRunner extends PipelineRunner<SparkPipelineResult> {
@VisibleForTesting
void adaptBoundedReads(Pipeline pipeline) {
- pipeline.replace(
- PTransformOverride.of(
- PTransformMatchers.classEqualTo(BoundedReadFromUnboundedSource.class),
- new AdaptedBoundedAsUnbounded.Factory()));
+ pipeline.replaceAll(
+ Collections.singletonList(
+ PTransformOverride.of(
+ PTransformMatchers.classEqualTo(BoundedReadFromUnboundedSource.class),
+ new AdaptedBoundedAsUnbounded.Factory())));
}
private static class AdaptedBoundedAsUnbounded<T> extends PTransform<PBegin, PCollection<T>> {
http://git-wip-us.apache.org/repos/asf/beam/blob/753bc9cc/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
index fa8277f..11d781d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
@@ -228,7 +228,7 @@ public class Pipeline {
});
}
- public void replace(final PTransformOverride override) {
+ private void replace(final PTransformOverride override) {
final Collection<Node> matches = new ArrayList<>();
transforms.visit(
new PipelineVisitor.Defaults() {