You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2019/03/14 13:30:51 UTC
[beam] branch master updated: [BEAM-6327] move pipeline trimming
logic from Flink runner to core construction
This is an automated email from the ASF dual-hosted git repository.
robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 43bee0c [BEAM-6327] move pipeline trimming logic from Flink runner to core construction
new 273fd39 Merge pull request #8011 [BEAM-6327] Pipeline trimming logic in core construction.
43bee0c is described below
commit 43bee0c2832f26852644440a23182de4a8bfc55c
Author: Kyle Weaver <kc...@google.com>
AuthorDate: Thu Mar 7 10:00:37 2019 -0800
[BEAM-6327] move pipeline trimming logic from Flink runner to core construction
---
.../core/construction/graph/PipelineTrimmer.java | 78 ++++++++++++++++++++++
.../beam/runners/flink/FlinkPipelineRunner.java | 40 +----------
2 files changed, 80 insertions(+), 38 deletions(-)
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/PipelineTrimmer.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/PipelineTrimmer.java
new file mode 100644
index 0000000..f3edaad
--- /dev/null
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/PipelineTrimmer.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.construction.graph;
+
+import java.util.Set;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableSet;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+// TODO(BEAM-6327): Remove the need for this.
+
+/** PipelineTrimmer removes subcomponents of native transforms that shouldn't be fused. */
+public class PipelineTrimmer {
+ private static final Logger LOG = LoggerFactory.getLogger(PipelineTrimmer.class);
+
+ /**
+ * Remove subcomponents of native transforms that shouldn't be fused.
+ *
+ * @param pipeline the pipeline to be trimmed
+ * @param knownUrns set of URNs for the runner's native transforms
+ * @return the trimmed pipeline
+ */
+ public static Pipeline trim(Pipeline pipeline, Set<String> knownUrns) {
+ RunnerApi.Pipeline trimmedPipeline =
+ makeKnownUrnsPrimitives(
+ pipeline,
+ // The fuser should fuse AssignWindows into the graph, so we don't handle it here.
+ Sets.difference(
+ knownUrns, ImmutableSet.of(PTransformTranslation.ASSIGN_WINDOWS_TRANSFORM_URN)));
+ return trimmedPipeline;
+ }
+
+ private static RunnerApi.Pipeline makeKnownUrnsPrimitives(
+ RunnerApi.Pipeline pipeline, Set<String> knownUrns) {
+ RunnerApi.Pipeline.Builder trimmedPipeline = pipeline.toBuilder();
+ for (String ptransformId : pipeline.getComponents().getTransformsMap().keySet()) {
+ if (knownUrns.contains(
+ pipeline.getComponents().getTransformsOrThrow(ptransformId).getSpec().getUrn())) {
+ LOG.debug("Removing descendants of known PTransform {}" + ptransformId);
+ removeDescendants(trimmedPipeline, ptransformId);
+ }
+ }
+ return trimmedPipeline.build();
+ }
+
+ private static void removeDescendants(RunnerApi.Pipeline.Builder pipeline, String parentId) {
+ RunnerApi.PTransform parentProto =
+ pipeline.getComponents().getTransformsOrDefault(parentId, null);
+ if (parentProto != null) {
+ for (String childId : parentProto.getSubtransformsList()) {
+ removeDescendants(pipeline, childId);
+ pipeline.getComponentsBuilder().removeTransforms(childId);
+ }
+ pipeline
+ .getComponentsBuilder()
+ .putTransforms(parentId, parentProto.toBuilder().clearSubtransforms().build());
+ }
+ }
+}
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java
index 9ccd973..c340e23 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java
@@ -21,20 +21,17 @@ import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Precondi
import java.util.Collection;
import java.util.List;
-import java.util.Set;
import javax.annotation.Nullable;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline;
-import org.apache.beam.runners.core.construction.PTransformTranslation;
import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
import org.apache.beam.runners.core.construction.graph.ExecutableStage;
import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser;
+import org.apache.beam.runners.core.construction.graph.PipelineTrimmer;
import org.apache.beam.runners.fnexecution.jobsubmission.PortablePipelineRunner;
import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
-import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableSet;
-import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Sets;
import org.apache.flink.api.common.JobExecutionResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -82,13 +79,7 @@ public class FlinkPipelineRunner implements PortablePipelineRunner {
LOG.info("Translating pipeline to Flink program.");
// Don't let the fuser fuse any subcomponents of native transforms.
- // TODO(BEAM-6327): Remove the need for this.
- RunnerApi.Pipeline trimmedPipeline =
- makeKnownUrnsPrimitives(
- pipeline,
- Sets.difference(
- translator.knownUrns(),
- ImmutableSet.of(PTransformTranslation.ASSIGN_WINDOWS_TRANSFORM_URN)));
+ Pipeline trimmedPipeline = PipelineTrimmer.trim(pipeline, translator.knownUrns());
// Fused pipeline proto.
// TODO: Consider supporting partially-fused graphs.
@@ -113,33 +104,6 @@ public class FlinkPipelineRunner implements PortablePipelineRunner {
return FlinkRunner.createPipelineResult(result, pipelineOptions);
}
- private RunnerApi.Pipeline makeKnownUrnsPrimitives(
- RunnerApi.Pipeline pipeline, Set<String> knownUrns) {
- RunnerApi.Pipeline.Builder trimmedPipeline = pipeline.toBuilder();
- for (String ptransformId : pipeline.getComponents().getTransformsMap().keySet()) {
- if (knownUrns.contains(
- pipeline.getComponents().getTransformsOrThrow(ptransformId).getSpec().getUrn())) {
- LOG.debug("Removing descendants of known PTransform {}" + ptransformId);
- removeDescendants(trimmedPipeline, ptransformId);
- }
- }
- return trimmedPipeline.build();
- }
-
- private void removeDescendants(RunnerApi.Pipeline.Builder pipeline, String parentId) {
- RunnerApi.PTransform parentProto =
- pipeline.getComponents().getTransformsOrDefault(parentId, null);
- if (parentProto != null) {
- for (String childId : parentProto.getSubtransformsList()) {
- removeDescendants(pipeline, childId);
- pipeline.getComponentsBuilder().removeTransforms(childId);
- }
- pipeline
- .getComponentsBuilder()
- .putTransforms(parentId, parentProto.toBuilder().clearSubtransforms().build());
- }
- }
-
/** Indicates whether the given pipeline has any unbounded PCollections. */
private static boolean hasUnboundedPCollections(RunnerApi.Pipeline pipeline) {
checkNotNull(pipeline);