You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by go...@apache.org on 2019/05/28 23:00:51 UTC

[beam] 01/01: [BEAM-7282] portable Spark: don't fuse already optimized graph

This is an automated email from the ASF dual-hosted git repository.

goenka pushed a commit to branch dont-fuse
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 091c4781678ff1d2fbe3a7a685bb291125c97315
Author: Kyle Weaver <kc...@google.com>
AuthorDate: Mon May 13 13:46:19 2019 -0700

    [BEAM-7282] portable Spark: don't fuse already optimized graph
---
 .../org/apache/beam/runners/spark/SparkPipelineRunner.java     | 10 +++++++++-
 1 file changed, 9 insertions(+), 1 deletion(-)

diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java
index 0e8a649..b0a1063 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java
@@ -25,6 +25,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
 import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser;
 import org.apache.beam.runners.core.construction.graph.PipelineTrimmer;
 import org.apache.beam.runners.core.metrics.MetricsPusher;
@@ -58,7 +59,14 @@ public class SparkPipelineRunner implements PortablePipelineRunner {
 
     // Don't let the fuser fuse any subcomponents of native transforms.
     Pipeline trimmedPipeline = PipelineTrimmer.trim(pipeline, translator.knownUrns());
-    Pipeline fusedPipeline = GreedyPipelineFuser.fuse(trimmedPipeline).toPipeline();
+
+    // Fused pipeline proto.
+    // TODO: Consider supporting partially-fused graphs.
+    RunnerApi.Pipeline fusedPipeline =
+        trimmedPipeline.getComponents().getTransformsMap().values().stream()
+                .anyMatch(proto -> ExecutableStage.URN.equals(proto.getSpec().getUrn()))
+            ? trimmedPipeline
+            : GreedyPipelineFuser.fuse(trimmedPipeline).toPipeline();
 
     if (pipelineOptions.getFilesToStage() == null) {
       pipelineOptions.setFilesToStage(