You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by go...@apache.org on 2019/05/28 23:00:50 UTC

[beam] branch dont-fuse created (now 091c478)

This is an automated email from the ASF dual-hosted git repository.

goenka pushed a change to branch dont-fuse
in repository https://gitbox.apache.org/repos/asf/beam.git.


      at 091c478  [BEAM-7282] portable Spark: don't fuse already optimized graph

This branch includes the following new commits:

     new 091c478  [BEAM-7282] portable Spark: don't fuse already optimized graph

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[beam] 01/01: [BEAM-7282] portable Spark: don't fuse already optimized graph

Posted by go...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

goenka pushed a commit to branch dont-fuse
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 091c4781678ff1d2fbe3a7a685bb291125c97315
Author: Kyle Weaver <kc...@google.com>
AuthorDate: Mon May 13 13:46:19 2019 -0700

    [BEAM-7282] portable Spark: don't fuse already optimized graph
---
 .../org/apache/beam/runners/spark/SparkPipelineRunner.java     | 10 +++++++++-
 1 file changed, 9 insertions(+), 1 deletion(-)

diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java
index 0e8a649..b0a1063 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java
@@ -25,6 +25,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
 import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser;
 import org.apache.beam.runners.core.construction.graph.PipelineTrimmer;
 import org.apache.beam.runners.core.metrics.MetricsPusher;
@@ -58,7 +59,14 @@ public class SparkPipelineRunner implements PortablePipelineRunner {
 
     // Don't let the fuser fuse any subcomponents of native transforms.
     Pipeline trimmedPipeline = PipelineTrimmer.trim(pipeline, translator.knownUrns());
-    Pipeline fusedPipeline = GreedyPipelineFuser.fuse(trimmedPipeline).toPipeline();
+
+    // Fused pipeline proto.
+    // TODO: Consider supporting partially-fused graphs.
+    RunnerApi.Pipeline fusedPipeline =
+        trimmedPipeline.getComponents().getTransformsMap().values().stream()
+                .anyMatch(proto -> ExecutableStage.URN.equals(proto.getSpec().getUrn()))
+            ? trimmedPipeline
+            : GreedyPipelineFuser.fuse(trimmedPipeline).toPipeline();
 
     if (pipelineOptions.getFilesToStage() == null) {
       pipelineOptions.setFilesToStage(