You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2019/03/14 13:30:51 UTC

[beam] branch master updated: [BEAM-6327] move pipeline trimming logic from Flink runner to core construction

This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 43bee0c  [BEAM-6327] move pipeline trimming logic from Flink runner to core construction
     new 273fd39  Merge pull request #8011 [BEAM-6327] Pipeline trimming logic in core construction.
43bee0c is described below

commit 43bee0c2832f26852644440a23182de4a8bfc55c
Author: Kyle Weaver <kc...@google.com>
AuthorDate: Thu Mar 7 10:00:37 2019 -0800

    [BEAM-6327] move pipeline trimming logic from Flink runner to core construction
---
 .../core/construction/graph/PipelineTrimmer.java   | 78 ++++++++++++++++++++++
 .../beam/runners/flink/FlinkPipelineRunner.java    | 40 +----------
 2 files changed, 80 insertions(+), 38 deletions(-)

diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/PipelineTrimmer.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/PipelineTrimmer.java
new file mode 100644
index 0000000..f3edaad
--- /dev/null
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/PipelineTrimmer.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.construction.graph;
+
+import java.util.Set;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableSet;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+// TODO(BEAM-6327): Remove the need for this.
+
+/** PipelineTrimmer removes subcomponents of native transforms that shouldn't be fused. */
+public class PipelineTrimmer {
+  private static final Logger LOG = LoggerFactory.getLogger(PipelineTrimmer.class);
+
+  /**
+   * Remove subcomponents of native transforms that shouldn't be fused.
+   *
+   * @param pipeline the pipeline to be trimmed
+   * @param knownUrns set of URNs for the runner's native transforms
+   * @return the trimmed pipeline
+   */
+  public static Pipeline trim(Pipeline pipeline, Set<String> knownUrns) {
+    RunnerApi.Pipeline trimmedPipeline =
+        makeKnownUrnsPrimitives(
+            pipeline,
+            // The fuser should fuse AssignWindows into the graph, so we don't handle it here.
+            Sets.difference(
+                knownUrns, ImmutableSet.of(PTransformTranslation.ASSIGN_WINDOWS_TRANSFORM_URN)));
+    return trimmedPipeline;
+  }
+
+  private static RunnerApi.Pipeline makeKnownUrnsPrimitives(
+      RunnerApi.Pipeline pipeline, Set<String> knownUrns) {
+    RunnerApi.Pipeline.Builder trimmedPipeline = pipeline.toBuilder();
+    for (String ptransformId : pipeline.getComponents().getTransformsMap().keySet()) {
+      if (knownUrns.contains(
+          pipeline.getComponents().getTransformsOrThrow(ptransformId).getSpec().getUrn())) {
+        LOG.debug("Removing descendants of known PTransform {}" + ptransformId);
+        removeDescendants(trimmedPipeline, ptransformId);
+      }
+    }
+    return trimmedPipeline.build();
+  }
+
+  private static void removeDescendants(RunnerApi.Pipeline.Builder pipeline, String parentId) {
+    RunnerApi.PTransform parentProto =
+        pipeline.getComponents().getTransformsOrDefault(parentId, null);
+    if (parentProto != null) {
+      for (String childId : parentProto.getSubtransformsList()) {
+        removeDescendants(pipeline, childId);
+        pipeline.getComponentsBuilder().removeTransforms(childId);
+      }
+      pipeline
+          .getComponentsBuilder()
+          .putTransforms(parentId, parentProto.toBuilder().clearSubtransforms().build());
+    }
+  }
+}
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java
index 9ccd973..c340e23 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java
@@ -21,20 +21,17 @@ import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Precondi
 
 import java.util.Collection;
 import java.util.List;
-import java.util.Set;
 import javax.annotation.Nullable;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline;
-import org.apache.beam.runners.core.construction.PTransformTranslation;
 import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
 import org.apache.beam.runners.core.construction.graph.ExecutableStage;
 import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser;
+import org.apache.beam.runners.core.construction.graph.PipelineTrimmer;
 import org.apache.beam.runners.fnexecution.jobsubmission.PortablePipelineRunner;
 import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.metrics.MetricsEnvironment;
-import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableSet;
-import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Sets;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -82,13 +79,7 @@ public class FlinkPipelineRunner implements PortablePipelineRunner {
     LOG.info("Translating pipeline to Flink program.");
 
     // Don't let the fuser fuse any subcomponents of native transforms.
-    // TODO(BEAM-6327): Remove the need for this.
-    RunnerApi.Pipeline trimmedPipeline =
-        makeKnownUrnsPrimitives(
-            pipeline,
-            Sets.difference(
-                translator.knownUrns(),
-                ImmutableSet.of(PTransformTranslation.ASSIGN_WINDOWS_TRANSFORM_URN)));
+    Pipeline trimmedPipeline = PipelineTrimmer.trim(pipeline, translator.knownUrns());
 
     // Fused pipeline proto.
     // TODO: Consider supporting partially-fused graphs.
@@ -113,33 +104,6 @@ public class FlinkPipelineRunner implements PortablePipelineRunner {
     return FlinkRunner.createPipelineResult(result, pipelineOptions);
   }
 
-  private RunnerApi.Pipeline makeKnownUrnsPrimitives(
-      RunnerApi.Pipeline pipeline, Set<String> knownUrns) {
-    RunnerApi.Pipeline.Builder trimmedPipeline = pipeline.toBuilder();
-    for (String ptransformId : pipeline.getComponents().getTransformsMap().keySet()) {
-      if (knownUrns.contains(
-          pipeline.getComponents().getTransformsOrThrow(ptransformId).getSpec().getUrn())) {
-        LOG.debug("Removing descendants of known PTransform {}" + ptransformId);
-        removeDescendants(trimmedPipeline, ptransformId);
-      }
-    }
-    return trimmedPipeline.build();
-  }
-
-  private void removeDescendants(RunnerApi.Pipeline.Builder pipeline, String parentId) {
-    RunnerApi.PTransform parentProto =
-        pipeline.getComponents().getTransformsOrDefault(parentId, null);
-    if (parentProto != null) {
-      for (String childId : parentProto.getSubtransformsList()) {
-        removeDescendants(pipeline, childId);
-        pipeline.getComponentsBuilder().removeTransforms(childId);
-      }
-      pipeline
-          .getComponentsBuilder()
-          .putTransforms(parentId, parentProto.toBuilder().clearSubtransforms().build());
-    }
-  }
-
   /** Indicates whether the given pipeline has any unbounded PCollections. */
   private static boolean hasUnboundedPCollections(RunnerApi.Pipeline pipeline) {
     checkNotNull(pipeline);