You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by th...@apache.org on 2019/01/09 08:08:29 UTC
[beam] 01/01: Revert "[BEAM-6294] Use Flink rebalance for shuffle."
This is an automated email from the ASF dual-hosted git repository.
thw pushed a commit to branch revert-7360-reshuffle-flink
in repository https://gitbox.apache.org/repos/asf/beam.git
commit cb33143baf0fe22502ea490518eae048904339dd
Author: Thomas Weise <tw...@users.noreply.github.com>
AuthorDate: Wed Jan 9 00:08:15 2019 -0800
Revert "[BEAM-6294] Use Flink rebalance for shuffle."
---
.../FlinkBatchPortablePipelineTranslator.java | 34 +--------
.../beam/runners/flink/FlinkJobInvocation.java | 82 ++++++----------------
.../flink/FlinkPortablePipelineTranslator.java | 19 +----
.../FlinkStreamingPortablePipelineTranslator.java | 23 +-----
sdks/python/apache_beam/transforms/util.py | 8 ---
5 files changed, 29 insertions(+), 137 deletions(-)
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
index b182832..4a09d86 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
@@ -23,7 +23,6 @@ import static org.apache.beam.runners.flink.translation.utils.FlinkPipelineTrans
import static org.apache.beam.runners.flink.translation.utils.FlinkPipelineTranslatorUtils.getWindowingStrategy;
import static org.apache.beam.runners.flink.translation.utils.FlinkPipelineTranslatorUtils.instantiateCoder;
-import com.google.auto.service.AutoService;
import com.google.common.collect.BiMap;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
@@ -44,7 +43,6 @@ import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload.SideInputId;
-import org.apache.beam.runners.core.construction.NativeTransforms;
import org.apache.beam.runners.core.construction.PTransformTranslation;
import org.apache.beam.runners.core.construction.RehydratedComponents;
import org.apache.beam.runners.core.construction.WindowingStrategyTranslation;
@@ -83,7 +81,6 @@ import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.InvalidProtocolBufferException;
-import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
@@ -122,8 +119,7 @@ public class FlinkBatchPortablePipelineTranslator
* Creates a batch translation context. The resulting Flink execution dag will live in a new
* {@link ExecutionEnvironment}.
*/
- @Override
- public BatchTranslationContext createTranslationContext(
+ public static BatchTranslationContext createTranslationContext(
JobInfo jobInfo,
FlinkPipelineOptions pipelineOptions,
@Nullable String confDir,
@@ -162,8 +158,7 @@ public class FlinkBatchPortablePipelineTranslator
* flink {@link ExecutionEnvironment} that the execution plan will be applied to.
*/
public static class BatchTranslationContext
- implements FlinkPortablePipelineTranslator.TranslationContext,
- FlinkPortablePipelineTranslator.Executor {
+ implements FlinkPortablePipelineTranslator.TranslationContext {
private final JobInfo jobInfo;
private final FlinkPipelineOptions options;
@@ -190,11 +185,6 @@ public class FlinkBatchPortablePipelineTranslator
return options;
}
- @Override
- public JobExecutionResult execute(String jobName) throws Exception {
- return getExecutionEnvironment().execute(jobName);
- }
-
public ExecutionEnvironment getExecutionEnvironment() {
return executionEnvironment;
}
@@ -239,23 +229,7 @@ public class FlinkBatchPortablePipelineTranslator
}
@Override
- public Set<String> knownUrns() {
- return urnToTransformTranslator.keySet();
- }
-
- /** Predicate to determine whether a URN is a Flink native transform. */
- @AutoService(NativeTransforms.IsNativeTransform.class)
- public static class IsFlinkNativeTransform implements NativeTransforms.IsNativeTransform {
- @Override
- public boolean test(RunnerApi.PTransform pTransform) {
- return PTransformTranslation.RESHUFFLE_URN.equals(
- PTransformTranslation.urnForTransformOrNull(pTransform));
- }
- }
-
- @Override
- public FlinkPortablePipelineTranslator.Executor translate(
- BatchTranslationContext context, RunnerApi.Pipeline pipeline) {
+ public void translate(BatchTranslationContext context, RunnerApi.Pipeline pipeline) {
// Use a QueryablePipeline to traverse transforms topologically.
QueryablePipeline p =
QueryablePipeline.forTransforms(
@@ -272,8 +246,6 @@ public class FlinkBatchPortablePipelineTranslator
for (DataSet<?> dataSet : context.getDanglingDataSets()) {
dataSet.output(new DiscardingOutputFormat<>()).name("DiscardingOutput");
}
-
- return context;
}
private static <K, V> void translateReshuffle(
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java
index f69bc83..b849770 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java
@@ -21,8 +21,6 @@ import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Throwables.getStackTraceAsString;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Sets;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
@@ -31,14 +29,12 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
-import java.util.Set;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage;
import org.apache.beam.model.jobmanagement.v1.JobApi.JobState;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline;
-import org.apache.beam.runners.core.construction.PTransformTranslation;
import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser;
import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation;
@@ -102,73 +98,39 @@ public class FlinkJobInvocation implements JobInvocation {
private PipelineResult runPipeline() throws Exception {
MetricsEnvironment.setMetricsSupported(false);
- FlinkPortablePipelineTranslator<?> translator;
- if (!pipelineOptions.isStreaming() && !hasUnboundedPCollections(pipeline)) {
- // TODO: Do we need to inspect for unbounded sources before fusing?
- translator = FlinkBatchPortablePipelineTranslator.createTranslator();
- } else {
- translator = new FlinkStreamingPortablePipelineTranslator();
- }
- return runPipelineWithTranslator(translator);
- }
-
- private <T extends FlinkPortablePipelineTranslator.TranslationContext>
- PipelineResult runPipelineWithTranslator(FlinkPortablePipelineTranslator<T> translator)
- throws Exception {
LOG.info("Translating pipeline to Flink program.");
-
- // Don't let the fuser fuse any subcomponents of native transforms.
- // TODO(BEAM-6327): Remove the need for this.
- RunnerApi.Pipeline trimmedPipeline =
- makeKnownUrnsPrimitives(
- pipeline,
- Sets.difference(
- translator.knownUrns(),
- ImmutableSet.of(PTransformTranslation.ASSIGN_WINDOWS_TRANSFORM_URN)));
-
// Fused pipeline proto.
- RunnerApi.Pipeline fusedPipeline = GreedyPipelineFuser.fuse(trimmedPipeline).toPipeline();
+ RunnerApi.Pipeline fusedPipeline = GreedyPipelineFuser.fuse(pipeline).toPipeline();
JobInfo jobInfo =
JobInfo.create(
id,
pipelineOptions.getJobName(),
retrievalToken,
PipelineOptionsTranslation.toProto(pipelineOptions));
+ final JobExecutionResult result;
- FlinkPortablePipelineTranslator.Executor executor =
- translator.translate(
- translator.createTranslationContext(jobInfo, pipelineOptions, confDir, filesToStage),
- fusedPipeline);
- final JobExecutionResult result = executor.execute(pipelineOptions.getJobName());
-
- return FlinkRunner.createPipelineResult(result, pipelineOptions);
- }
-
- private RunnerApi.Pipeline makeKnownUrnsPrimitives(
- RunnerApi.Pipeline pipeline, Set<String> knownUrns) {
- RunnerApi.Pipeline.Builder trimmedPipeline = pipeline.toBuilder();
- for (String ptransformId : pipeline.getComponents().getTransformsMap().keySet()) {
- if (knownUrns.contains(
- pipeline.getComponents().getTransformsOrThrow(ptransformId).getSpec().getUrn())) {
- LOG.debug("Removing descendants of known PTransform {}" + ptransformId);
- removeDescendants(trimmedPipeline, ptransformId);
- }
+ if (!pipelineOptions.isStreaming() && !hasUnboundedPCollections(fusedPipeline)) {
+ // TODO: Do we need to inspect for unbounded sources before fusing?
+ // batch translation
+ FlinkBatchPortablePipelineTranslator translator =
+ FlinkBatchPortablePipelineTranslator.createTranslator();
+ FlinkBatchPortablePipelineTranslator.BatchTranslationContext context =
+ FlinkBatchPortablePipelineTranslator.createTranslationContext(
+ jobInfo, pipelineOptions, confDir, filesToStage);
+ translator.translate(context, fusedPipeline);
+ result = context.getExecutionEnvironment().execute(pipelineOptions.getJobName());
+ } else {
+ // streaming translation
+ FlinkStreamingPortablePipelineTranslator translator =
+ new FlinkStreamingPortablePipelineTranslator();
+ FlinkStreamingPortablePipelineTranslator.StreamingTranslationContext context =
+ FlinkStreamingPortablePipelineTranslator.createTranslationContext(
+ jobInfo, pipelineOptions, confDir, filesToStage);
+ translator.translate(context, fusedPipeline);
+ result = context.getExecutionEnvironment().execute(pipelineOptions.getJobName());
}
- return trimmedPipeline.build();
- }
- private void removeDescendants(RunnerApi.Pipeline.Builder pipeline, String parentId) {
- RunnerApi.PTransform parentProto =
- pipeline.getComponents().getTransformsOrDefault(parentId, null);
- if (parentProto != null) {
- for (String childId : parentProto.getSubtransformsList()) {
- removeDescendants(pipeline, childId);
- pipeline.getComponentsBuilder().removeTransforms(childId);
- }
- pipeline
- .getComponentsBuilder()
- .putTransforms(parentId, parentProto.toBuilder().clearSubtransforms().build());
- }
+ return FlinkRunner.createPipelineResult(result, pipelineOptions);
}
@Override
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPortablePipelineTranslator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPortablePipelineTranslator.java
index eaf0cea..edbc6d3 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPortablePipelineTranslator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPortablePipelineTranslator.java
@@ -17,12 +17,8 @@
*/
package org.apache.beam.runners.flink;
-import java.util.List;
-import java.util.Set;
-import javax.annotation.Nullable;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
-import org.apache.flink.api.common.JobExecutionResult;
/**
* Interface for portable Flink translators. This allows for a uniform invocation pattern for
@@ -42,19 +38,6 @@ public interface FlinkPortablePipelineTranslator<
FlinkPipelineOptions getPipelineOptions();
}
- /** A handle used to execute a translated pipeline. */
- interface Executor {
- JobExecutionResult execute(String jobName) throws Exception;
- }
-
- T createTranslationContext(
- JobInfo jobInfo,
- FlinkPipelineOptions pipelineOptions,
- @Nullable String confDir,
- List<String> filesToStage);
-
- Set<String> knownUrns();
-
/** Translates the given pipeline. */
- Executor translate(T context, RunnerApi.Pipeline pipeline);
+ void translate(T context, RunnerApi.Pipeline pipeline);
}
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
index 838baa8..eed5e28 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
@@ -39,7 +39,6 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
-import java.util.Set;
import java.util.TreeMap;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.SystemReduceFn;
@@ -84,7 +83,6 @@ import org.apache.beam.sdk.values.PCollectionViews;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.InvalidProtocolBufferException;
-import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -107,8 +105,7 @@ public class FlinkStreamingPortablePipelineTranslator
* Creates a streaming translation context. The resulting Flink execution dag will live in a new
* {@link StreamExecutionEnvironment}.
*/
- @Override
- public StreamingTranslationContext createTranslationContext(
+ public static StreamingTranslationContext createTranslationContext(
JobInfo jobInfo,
FlinkPipelineOptions pipelineOptions,
String confDir,
@@ -124,8 +121,7 @@ public class FlinkStreamingPortablePipelineTranslator
* the Flink {@link StreamExecutionEnvironment} that the execution plan will be applied to.
*/
public static class StreamingTranslationContext
- implements FlinkPortablePipelineTranslator.TranslationContext,
- FlinkPortablePipelineTranslator.Executor {
+ implements FlinkPortablePipelineTranslator.TranslationContext {
private final JobInfo jobInfo;
private final FlinkPipelineOptions options;
@@ -152,11 +148,6 @@ public class FlinkStreamingPortablePipelineTranslator
return options;
}
- @Override
- public JobExecutionResult execute(String jobName) throws Exception {
- return getExecutionEnvironment().execute(jobName);
- }
-
public StreamExecutionEnvironment getExecutionEnvironment() {
return executionEnvironment;
}
@@ -201,13 +192,7 @@ public class FlinkStreamingPortablePipelineTranslator
}
@Override
- public Set<String> knownUrns() {
- return urnToTransformTranslator.keySet();
- }
-
- @Override
- public FlinkPortablePipelineTranslator.Executor translate(
- StreamingTranslationContext context, RunnerApi.Pipeline pipeline) {
+ public void translate(StreamingTranslationContext context, RunnerApi.Pipeline pipeline) {
QueryablePipeline p =
QueryablePipeline.forTransforms(
pipeline.getRootTransformIdsList(), pipeline.getComponents());
@@ -216,8 +201,6 @@ public class FlinkStreamingPortablePipelineTranslator
.getOrDefault(transform.getTransform().getSpec().getUrn(), this::urnNotFound)
.translate(transform.getId(), pipeline, context);
}
-
- return context;
}
private void urnNotFound(
diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py
index d34a734..c47cd77 100644
--- a/sdks/python/apache_beam/transforms/util.py
+++ b/sdks/python/apache_beam/transforms/util.py
@@ -33,7 +33,6 @@ from future.utils import itervalues
from apache_beam import typehints
from apache_beam.metrics import Metrics
-from apache_beam.portability import common_urns
from apache_beam.transforms import window
from apache_beam.transforms.core import CombinePerKey
from apache_beam.transforms.core import DoFn
@@ -637,10 +636,3 @@ class Reshuffle(PTransform):
| 'AddRandomKeys' >> Map(lambda t: (random.getrandbits(32), t))
| ReshufflePerKey()
| 'RemoveRandomKeys' >> Map(lambda t: t[1]))
-
- def to_runner_api_parameter(self, unused_context):
- return common_urns.composites.RESHUFFLE.urn, None
-
- @PTransform.register_urn(common_urns.composites.RESHUFFLE.urn, None)
- def from_runner_api_parameter(unused_parameter, unused_context):
- return Reshuffle()