You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by th...@apache.org on 2019/01/09 08:08:29 UTC

[beam] 01/01: Revert "[BEAM-6294] Use Flink rebalance for shuffle."

This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch revert-7360-reshuffle-flink
in repository https://gitbox.apache.org/repos/asf/beam.git

commit cb33143baf0fe22502ea490518eae048904339dd
Author: Thomas Weise <tw...@users.noreply.github.com>
AuthorDate: Wed Jan 9 00:08:15 2019 -0800

    Revert "[BEAM-6294] Use Flink rebalance for shuffle."
---
 .../FlinkBatchPortablePipelineTranslator.java      | 34 +--------
 .../beam/runners/flink/FlinkJobInvocation.java     | 82 ++++++----------------
 .../flink/FlinkPortablePipelineTranslator.java     | 19 +----
 .../FlinkStreamingPortablePipelineTranslator.java  | 23 +-----
 sdks/python/apache_beam/transforms/util.py         |  8 ---
 5 files changed, 29 insertions(+), 137 deletions(-)

diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
index b182832..4a09d86 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
@@ -23,7 +23,6 @@ import static org.apache.beam.runners.flink.translation.utils.FlinkPipelineTrans
 import static org.apache.beam.runners.flink.translation.utils.FlinkPipelineTranslatorUtils.getWindowingStrategy;
 import static org.apache.beam.runners.flink.translation.utils.FlinkPipelineTranslatorUtils.instantiateCoder;
 
-import com.google.auto.service.AutoService;
 import com.google.common.collect.BiMap;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
@@ -44,7 +43,6 @@ import java.util.stream.Collectors;
 import javax.annotation.Nullable;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload.SideInputId;
-import org.apache.beam.runners.core.construction.NativeTransforms;
 import org.apache.beam.runners.core.construction.PTransformTranslation;
 import org.apache.beam.runners.core.construction.RehydratedComponents;
 import org.apache.beam.runners.core.construction.WindowingStrategyTranslation;
@@ -83,7 +81,6 @@ import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.WindowingStrategy;
 import org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.InvalidProtocolBufferException;
-import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
@@ -122,8 +119,7 @@ public class FlinkBatchPortablePipelineTranslator
    * Creates a batch translation context. The resulting Flink execution dag will live in a new
    * {@link ExecutionEnvironment}.
    */
-  @Override
-  public BatchTranslationContext createTranslationContext(
+  public static BatchTranslationContext createTranslationContext(
       JobInfo jobInfo,
       FlinkPipelineOptions pipelineOptions,
       @Nullable String confDir,
@@ -162,8 +158,7 @@ public class FlinkBatchPortablePipelineTranslator
    * flink {@link ExecutionEnvironment} that the execution plan will be applied to.
    */
   public static class BatchTranslationContext
-      implements FlinkPortablePipelineTranslator.TranslationContext,
-          FlinkPortablePipelineTranslator.Executor {
+      implements FlinkPortablePipelineTranslator.TranslationContext {
 
     private final JobInfo jobInfo;
     private final FlinkPipelineOptions options;
@@ -190,11 +185,6 @@ public class FlinkBatchPortablePipelineTranslator
       return options;
     }
 
-    @Override
-    public JobExecutionResult execute(String jobName) throws Exception {
-      return getExecutionEnvironment().execute(jobName);
-    }
-
     public ExecutionEnvironment getExecutionEnvironment() {
       return executionEnvironment;
     }
@@ -239,23 +229,7 @@ public class FlinkBatchPortablePipelineTranslator
   }
 
   @Override
-  public Set<String> knownUrns() {
-    return urnToTransformTranslator.keySet();
-  }
-
-  /** Predicate to determine whether a URN is a Flink native transform. */
-  @AutoService(NativeTransforms.IsNativeTransform.class)
-  public static class IsFlinkNativeTransform implements NativeTransforms.IsNativeTransform {
-    @Override
-    public boolean test(RunnerApi.PTransform pTransform) {
-      return PTransformTranslation.RESHUFFLE_URN.equals(
-          PTransformTranslation.urnForTransformOrNull(pTransform));
-    }
-  }
-
-  @Override
-  public FlinkPortablePipelineTranslator.Executor translate(
-      BatchTranslationContext context, RunnerApi.Pipeline pipeline) {
+  public void translate(BatchTranslationContext context, RunnerApi.Pipeline pipeline) {
     // Use a QueryablePipeline to traverse transforms topologically.
     QueryablePipeline p =
         QueryablePipeline.forTransforms(
@@ -272,8 +246,6 @@ public class FlinkBatchPortablePipelineTranslator
     for (DataSet<?> dataSet : context.getDanglingDataSets()) {
       dataSet.output(new DiscardingOutputFormat<>()).name("DiscardingOutput");
     }
-
-    return context;
   }
 
   private static <K, V> void translateReshuffle(
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java
index f69bc83..b849770 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java
@@ -21,8 +21,6 @@ import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Throwables.getStackTraceAsString;
 
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
@@ -31,14 +29,12 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
-import java.util.Set;
 import java.util.function.Consumer;
 import javax.annotation.Nullable;
 import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage;
 import org.apache.beam.model.jobmanagement.v1.JobApi.JobState;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline;
-import org.apache.beam.runners.core.construction.PTransformTranslation;
 import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
 import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser;
 import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation;
@@ -102,73 +98,39 @@ public class FlinkJobInvocation implements JobInvocation {
   private PipelineResult runPipeline() throws Exception {
     MetricsEnvironment.setMetricsSupported(false);
 
-    FlinkPortablePipelineTranslator<?> translator;
-    if (!pipelineOptions.isStreaming() && !hasUnboundedPCollections(pipeline)) {
-      // TODO: Do we need to inspect for unbounded sources before fusing?
-      translator = FlinkBatchPortablePipelineTranslator.createTranslator();
-    } else {
-      translator = new FlinkStreamingPortablePipelineTranslator();
-    }
-    return runPipelineWithTranslator(translator);
-  }
-
-  private <T extends FlinkPortablePipelineTranslator.TranslationContext>
-      PipelineResult runPipelineWithTranslator(FlinkPortablePipelineTranslator<T> translator)
-          throws Exception {
     LOG.info("Translating pipeline to Flink program.");
-
-    // Don't let the fuser fuse any subcomponents of native transforms.
-    // TODO(BEAM-6327): Remove the need for this.
-    RunnerApi.Pipeline trimmedPipeline =
-        makeKnownUrnsPrimitives(
-            pipeline,
-            Sets.difference(
-                translator.knownUrns(),
-                ImmutableSet.of(PTransformTranslation.ASSIGN_WINDOWS_TRANSFORM_URN)));
-
     // Fused pipeline proto.
-    RunnerApi.Pipeline fusedPipeline = GreedyPipelineFuser.fuse(trimmedPipeline).toPipeline();
+    RunnerApi.Pipeline fusedPipeline = GreedyPipelineFuser.fuse(pipeline).toPipeline();
     JobInfo jobInfo =
         JobInfo.create(
             id,
             pipelineOptions.getJobName(),
             retrievalToken,
             PipelineOptionsTranslation.toProto(pipelineOptions));
+    final JobExecutionResult result;
 
-    FlinkPortablePipelineTranslator.Executor executor =
-        translator.translate(
-            translator.createTranslationContext(jobInfo, pipelineOptions, confDir, filesToStage),
-            fusedPipeline);
-    final JobExecutionResult result = executor.execute(pipelineOptions.getJobName());
-
-    return FlinkRunner.createPipelineResult(result, pipelineOptions);
-  }
-
-  private RunnerApi.Pipeline makeKnownUrnsPrimitives(
-      RunnerApi.Pipeline pipeline, Set<String> knownUrns) {
-    RunnerApi.Pipeline.Builder trimmedPipeline = pipeline.toBuilder();
-    for (String ptransformId : pipeline.getComponents().getTransformsMap().keySet()) {
-      if (knownUrns.contains(
-          pipeline.getComponents().getTransformsOrThrow(ptransformId).getSpec().getUrn())) {
-        LOG.debug("Removing descendants of known PTransform {}" + ptransformId);
-        removeDescendants(trimmedPipeline, ptransformId);
-      }
+    if (!pipelineOptions.isStreaming() && !hasUnboundedPCollections(fusedPipeline)) {
+      // TODO: Do we need to inspect for unbounded sources before fusing?
+      // batch translation
+      FlinkBatchPortablePipelineTranslator translator =
+          FlinkBatchPortablePipelineTranslator.createTranslator();
+      FlinkBatchPortablePipelineTranslator.BatchTranslationContext context =
+          FlinkBatchPortablePipelineTranslator.createTranslationContext(
+              jobInfo, pipelineOptions, confDir, filesToStage);
+      translator.translate(context, fusedPipeline);
+      result = context.getExecutionEnvironment().execute(pipelineOptions.getJobName());
+    } else {
+      // streaming translation
+      FlinkStreamingPortablePipelineTranslator translator =
+          new FlinkStreamingPortablePipelineTranslator();
+      FlinkStreamingPortablePipelineTranslator.StreamingTranslationContext context =
+          FlinkStreamingPortablePipelineTranslator.createTranslationContext(
+              jobInfo, pipelineOptions, confDir, filesToStage);
+      translator.translate(context, fusedPipeline);
+      result = context.getExecutionEnvironment().execute(pipelineOptions.getJobName());
     }
-    return trimmedPipeline.build();
-  }
 
-  private void removeDescendants(RunnerApi.Pipeline.Builder pipeline, String parentId) {
-    RunnerApi.PTransform parentProto =
-        pipeline.getComponents().getTransformsOrDefault(parentId, null);
-    if (parentProto != null) {
-      for (String childId : parentProto.getSubtransformsList()) {
-        removeDescendants(pipeline, childId);
-        pipeline.getComponentsBuilder().removeTransforms(childId);
-      }
-      pipeline
-          .getComponentsBuilder()
-          .putTransforms(parentId, parentProto.toBuilder().clearSubtransforms().build());
-    }
+    return FlinkRunner.createPipelineResult(result, pipelineOptions);
   }
 
   @Override
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPortablePipelineTranslator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPortablePipelineTranslator.java
index eaf0cea..edbc6d3 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPortablePipelineTranslator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPortablePipelineTranslator.java
@@ -17,12 +17,8 @@
  */
 package org.apache.beam.runners.flink;
 
-import java.util.List;
-import java.util.Set;
-import javax.annotation.Nullable;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
-import org.apache.flink.api.common.JobExecutionResult;
 
 /**
  * Interface for portable Flink translators. This allows for a uniform invocation pattern for
@@ -42,19 +38,6 @@ public interface FlinkPortablePipelineTranslator<
     FlinkPipelineOptions getPipelineOptions();
   }
 
-  /** A handle used to execute a translated pipeline. */
-  interface Executor {
-    JobExecutionResult execute(String jobName) throws Exception;
-  }
-
-  T createTranslationContext(
-      JobInfo jobInfo,
-      FlinkPipelineOptions pipelineOptions,
-      @Nullable String confDir,
-      List<String> filesToStage);
-
-  Set<String> knownUrns();
-
   /** Translates the given pipeline. */
-  Executor translate(T context, RunnerApi.Pipeline pipeline);
+  void translate(T context, RunnerApi.Pipeline pipeline);
 }
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
index 838baa8..eed5e28 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
@@ -39,7 +39,6 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
-import java.util.Set;
 import java.util.TreeMap;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.runners.core.SystemReduceFn;
@@ -84,7 +83,6 @@ import org.apache.beam.sdk.values.PCollectionViews;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.WindowingStrategy;
 import org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.InvalidProtocolBufferException;
-import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -107,8 +105,7 @@ public class FlinkStreamingPortablePipelineTranslator
    * Creates a streaming translation context. The resulting Flink execution dag will live in a new
    * {@link StreamExecutionEnvironment}.
    */
-  @Override
-  public StreamingTranslationContext createTranslationContext(
+  public static StreamingTranslationContext createTranslationContext(
       JobInfo jobInfo,
       FlinkPipelineOptions pipelineOptions,
       String confDir,
@@ -124,8 +121,7 @@ public class FlinkStreamingPortablePipelineTranslator
    * the Flink {@link StreamExecutionEnvironment} that the execution plan will be applied to.
    */
   public static class StreamingTranslationContext
-      implements FlinkPortablePipelineTranslator.TranslationContext,
-          FlinkPortablePipelineTranslator.Executor {
+      implements FlinkPortablePipelineTranslator.TranslationContext {
 
     private final JobInfo jobInfo;
     private final FlinkPipelineOptions options;
@@ -152,11 +148,6 @@ public class FlinkStreamingPortablePipelineTranslator
       return options;
     }
 
-    @Override
-    public JobExecutionResult execute(String jobName) throws Exception {
-      return getExecutionEnvironment().execute(jobName);
-    }
-
     public StreamExecutionEnvironment getExecutionEnvironment() {
       return executionEnvironment;
     }
@@ -201,13 +192,7 @@ public class FlinkStreamingPortablePipelineTranslator
   }
 
   @Override
-  public Set<String> knownUrns() {
-    return urnToTransformTranslator.keySet();
-  }
-
-  @Override
-  public FlinkPortablePipelineTranslator.Executor translate(
-      StreamingTranslationContext context, RunnerApi.Pipeline pipeline) {
+  public void translate(StreamingTranslationContext context, RunnerApi.Pipeline pipeline) {
     QueryablePipeline p =
         QueryablePipeline.forTransforms(
             pipeline.getRootTransformIdsList(), pipeline.getComponents());
@@ -216,8 +201,6 @@ public class FlinkStreamingPortablePipelineTranslator
           .getOrDefault(transform.getTransform().getSpec().getUrn(), this::urnNotFound)
           .translate(transform.getId(), pipeline, context);
     }
-
-    return context;
   }
 
   private void urnNotFound(
diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py
index d34a734..c47cd77 100644
--- a/sdks/python/apache_beam/transforms/util.py
+++ b/sdks/python/apache_beam/transforms/util.py
@@ -33,7 +33,6 @@ from future.utils import itervalues
 
 from apache_beam import typehints
 from apache_beam.metrics import Metrics
-from apache_beam.portability import common_urns
 from apache_beam.transforms import window
 from apache_beam.transforms.core import CombinePerKey
 from apache_beam.transforms.core import DoFn
@@ -637,10 +636,3 @@ class Reshuffle(PTransform):
             | 'AddRandomKeys' >> Map(lambda t: (random.getrandbits(32), t))
             | ReshufflePerKey()
             | 'RemoveRandomKeys' >> Map(lambda t: t[1]))
-
-  def to_runner_api_parameter(self, unused_context):
-    return common_urns.composites.RESHUFFLE.urn, None
-
-  @PTransform.register_urn(common_urns.composites.RESHUFFLE.urn, None)
-  def from_runner_api_parameter(unused_parameter, unused_context):
-    return Reshuffle()