You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by th...@apache.org on 2019/01/09 08:08:28 UTC

[beam] branch revert-7360-reshuffle-flink created (now cb33143)

This is an automated email from the ASF dual-hosted git repository.

thw pushed a change to branch revert-7360-reshuffle-flink
in repository https://gitbox.apache.org/repos/asf/beam.git.


      at cb33143  Revert "[BEAM-6294] Use Flink rebalance for shuffle."

This branch includes the following new commits:

     new cb33143  Revert "[BEAM-6294] Use Flink rebalance for shuffle."

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[beam] 01/01: Revert "[BEAM-6294] Use Flink rebalance for shuffle."

Posted by th...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch revert-7360-reshuffle-flink
in repository https://gitbox.apache.org/repos/asf/beam.git

commit cb33143baf0fe22502ea490518eae048904339dd
Author: Thomas Weise <tw...@users.noreply.github.com>
AuthorDate: Wed Jan 9 00:08:15 2019 -0800

    Revert "[BEAM-6294] Use Flink rebalance for shuffle."
---
 .../FlinkBatchPortablePipelineTranslator.java      | 34 +--------
 .../beam/runners/flink/FlinkJobInvocation.java     | 82 ++++++----------------
 .../flink/FlinkPortablePipelineTranslator.java     | 19 +----
 .../FlinkStreamingPortablePipelineTranslator.java  | 23 +-----
 sdks/python/apache_beam/transforms/util.py         |  8 ---
 5 files changed, 29 insertions(+), 137 deletions(-)

diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
index b182832..4a09d86 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
@@ -23,7 +23,6 @@ import static org.apache.beam.runners.flink.translation.utils.FlinkPipelineTrans
 import static org.apache.beam.runners.flink.translation.utils.FlinkPipelineTranslatorUtils.getWindowingStrategy;
 import static org.apache.beam.runners.flink.translation.utils.FlinkPipelineTranslatorUtils.instantiateCoder;
 
-import com.google.auto.service.AutoService;
 import com.google.common.collect.BiMap;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
@@ -44,7 +43,6 @@ import java.util.stream.Collectors;
 import javax.annotation.Nullable;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload.SideInputId;
-import org.apache.beam.runners.core.construction.NativeTransforms;
 import org.apache.beam.runners.core.construction.PTransformTranslation;
 import org.apache.beam.runners.core.construction.RehydratedComponents;
 import org.apache.beam.runners.core.construction.WindowingStrategyTranslation;
@@ -83,7 +81,6 @@ import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.WindowingStrategy;
 import org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.InvalidProtocolBufferException;
-import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
@@ -122,8 +119,7 @@ public class FlinkBatchPortablePipelineTranslator
    * Creates a batch translation context. The resulting Flink execution dag will live in a new
    * {@link ExecutionEnvironment}.
    */
-  @Override
-  public BatchTranslationContext createTranslationContext(
+  public static BatchTranslationContext createTranslationContext(
       JobInfo jobInfo,
       FlinkPipelineOptions pipelineOptions,
       @Nullable String confDir,
@@ -162,8 +158,7 @@ public class FlinkBatchPortablePipelineTranslator
    * flink {@link ExecutionEnvironment} that the execution plan will be applied to.
    */
   public static class BatchTranslationContext
-      implements FlinkPortablePipelineTranslator.TranslationContext,
-          FlinkPortablePipelineTranslator.Executor {
+      implements FlinkPortablePipelineTranslator.TranslationContext {
 
     private final JobInfo jobInfo;
     private final FlinkPipelineOptions options;
@@ -190,11 +185,6 @@ public class FlinkBatchPortablePipelineTranslator
       return options;
     }
 
-    @Override
-    public JobExecutionResult execute(String jobName) throws Exception {
-      return getExecutionEnvironment().execute(jobName);
-    }
-
     public ExecutionEnvironment getExecutionEnvironment() {
       return executionEnvironment;
     }
@@ -239,23 +229,7 @@ public class FlinkBatchPortablePipelineTranslator
   }
 
   @Override
-  public Set<String> knownUrns() {
-    return urnToTransformTranslator.keySet();
-  }
-
-  /** Predicate to determine whether a URN is a Flink native transform. */
-  @AutoService(NativeTransforms.IsNativeTransform.class)
-  public static class IsFlinkNativeTransform implements NativeTransforms.IsNativeTransform {
-    @Override
-    public boolean test(RunnerApi.PTransform pTransform) {
-      return PTransformTranslation.RESHUFFLE_URN.equals(
-          PTransformTranslation.urnForTransformOrNull(pTransform));
-    }
-  }
-
-  @Override
-  public FlinkPortablePipelineTranslator.Executor translate(
-      BatchTranslationContext context, RunnerApi.Pipeline pipeline) {
+  public void translate(BatchTranslationContext context, RunnerApi.Pipeline pipeline) {
     // Use a QueryablePipeline to traverse transforms topologically.
     QueryablePipeline p =
         QueryablePipeline.forTransforms(
@@ -272,8 +246,6 @@ public class FlinkBatchPortablePipelineTranslator
     for (DataSet<?> dataSet : context.getDanglingDataSets()) {
       dataSet.output(new DiscardingOutputFormat<>()).name("DiscardingOutput");
     }
-
-    return context;
   }
 
   private static <K, V> void translateReshuffle(
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java
index f69bc83..b849770 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java
@@ -21,8 +21,6 @@ import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Throwables.getStackTraceAsString;
 
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
@@ -31,14 +29,12 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
-import java.util.Set;
 import java.util.function.Consumer;
 import javax.annotation.Nullable;
 import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage;
 import org.apache.beam.model.jobmanagement.v1.JobApi.JobState;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline;
-import org.apache.beam.runners.core.construction.PTransformTranslation;
 import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
 import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser;
 import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation;
@@ -102,73 +98,39 @@ public class FlinkJobInvocation implements JobInvocation {
   private PipelineResult runPipeline() throws Exception {
     MetricsEnvironment.setMetricsSupported(false);
 
-    FlinkPortablePipelineTranslator<?> translator;
-    if (!pipelineOptions.isStreaming() && !hasUnboundedPCollections(pipeline)) {
-      // TODO: Do we need to inspect for unbounded sources before fusing?
-      translator = FlinkBatchPortablePipelineTranslator.createTranslator();
-    } else {
-      translator = new FlinkStreamingPortablePipelineTranslator();
-    }
-    return runPipelineWithTranslator(translator);
-  }
-
-  private <T extends FlinkPortablePipelineTranslator.TranslationContext>
-      PipelineResult runPipelineWithTranslator(FlinkPortablePipelineTranslator<T> translator)
-          throws Exception {
     LOG.info("Translating pipeline to Flink program.");
-
-    // Don't let the fuser fuse any subcomponents of native transforms.
-    // TODO(BEAM-6327): Remove the need for this.
-    RunnerApi.Pipeline trimmedPipeline =
-        makeKnownUrnsPrimitives(
-            pipeline,
-            Sets.difference(
-                translator.knownUrns(),
-                ImmutableSet.of(PTransformTranslation.ASSIGN_WINDOWS_TRANSFORM_URN)));
-
     // Fused pipeline proto.
-    RunnerApi.Pipeline fusedPipeline = GreedyPipelineFuser.fuse(trimmedPipeline).toPipeline();
+    RunnerApi.Pipeline fusedPipeline = GreedyPipelineFuser.fuse(pipeline).toPipeline();
     JobInfo jobInfo =
         JobInfo.create(
             id,
             pipelineOptions.getJobName(),
             retrievalToken,
             PipelineOptionsTranslation.toProto(pipelineOptions));
+    final JobExecutionResult result;
 
-    FlinkPortablePipelineTranslator.Executor executor =
-        translator.translate(
-            translator.createTranslationContext(jobInfo, pipelineOptions, confDir, filesToStage),
-            fusedPipeline);
-    final JobExecutionResult result = executor.execute(pipelineOptions.getJobName());
-
-    return FlinkRunner.createPipelineResult(result, pipelineOptions);
-  }
-
-  private RunnerApi.Pipeline makeKnownUrnsPrimitives(
-      RunnerApi.Pipeline pipeline, Set<String> knownUrns) {
-    RunnerApi.Pipeline.Builder trimmedPipeline = pipeline.toBuilder();
-    for (String ptransformId : pipeline.getComponents().getTransformsMap().keySet()) {
-      if (knownUrns.contains(
-          pipeline.getComponents().getTransformsOrThrow(ptransformId).getSpec().getUrn())) {
-        LOG.debug("Removing descendants of known PTransform {}" + ptransformId);
-        removeDescendants(trimmedPipeline, ptransformId);
-      }
+    if (!pipelineOptions.isStreaming() && !hasUnboundedPCollections(fusedPipeline)) {
+      // TODO: Do we need to inspect for unbounded sources before fusing?
+      // batch translation
+      FlinkBatchPortablePipelineTranslator translator =
+          FlinkBatchPortablePipelineTranslator.createTranslator();
+      FlinkBatchPortablePipelineTranslator.BatchTranslationContext context =
+          FlinkBatchPortablePipelineTranslator.createTranslationContext(
+              jobInfo, pipelineOptions, confDir, filesToStage);
+      translator.translate(context, fusedPipeline);
+      result = context.getExecutionEnvironment().execute(pipelineOptions.getJobName());
+    } else {
+      // streaming translation
+      FlinkStreamingPortablePipelineTranslator translator =
+          new FlinkStreamingPortablePipelineTranslator();
+      FlinkStreamingPortablePipelineTranslator.StreamingTranslationContext context =
+          FlinkStreamingPortablePipelineTranslator.createTranslationContext(
+              jobInfo, pipelineOptions, confDir, filesToStage);
+      translator.translate(context, fusedPipeline);
+      result = context.getExecutionEnvironment().execute(pipelineOptions.getJobName());
     }
-    return trimmedPipeline.build();
-  }
 
-  private void removeDescendants(RunnerApi.Pipeline.Builder pipeline, String parentId) {
-    RunnerApi.PTransform parentProto =
-        pipeline.getComponents().getTransformsOrDefault(parentId, null);
-    if (parentProto != null) {
-      for (String childId : parentProto.getSubtransformsList()) {
-        removeDescendants(pipeline, childId);
-        pipeline.getComponentsBuilder().removeTransforms(childId);
-      }
-      pipeline
-          .getComponentsBuilder()
-          .putTransforms(parentId, parentProto.toBuilder().clearSubtransforms().build());
-    }
+    return FlinkRunner.createPipelineResult(result, pipelineOptions);
   }
 
   @Override
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPortablePipelineTranslator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPortablePipelineTranslator.java
index eaf0cea..edbc6d3 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPortablePipelineTranslator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPortablePipelineTranslator.java
@@ -17,12 +17,8 @@
  */
 package org.apache.beam.runners.flink;
 
-import java.util.List;
-import java.util.Set;
-import javax.annotation.Nullable;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
-import org.apache.flink.api.common.JobExecutionResult;
 
 /**
  * Interface for portable Flink translators. This allows for a uniform invocation pattern for
@@ -42,19 +38,6 @@ public interface FlinkPortablePipelineTranslator<
     FlinkPipelineOptions getPipelineOptions();
   }
 
-  /** A handle used to execute a translated pipeline. */
-  interface Executor {
-    JobExecutionResult execute(String jobName) throws Exception;
-  }
-
-  T createTranslationContext(
-      JobInfo jobInfo,
-      FlinkPipelineOptions pipelineOptions,
-      @Nullable String confDir,
-      List<String> filesToStage);
-
-  Set<String> knownUrns();
-
   /** Translates the given pipeline. */
-  Executor translate(T context, RunnerApi.Pipeline pipeline);
+  void translate(T context, RunnerApi.Pipeline pipeline);
 }
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
index 838baa8..eed5e28 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
@@ -39,7 +39,6 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
-import java.util.Set;
 import java.util.TreeMap;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.runners.core.SystemReduceFn;
@@ -84,7 +83,6 @@ import org.apache.beam.sdk.values.PCollectionViews;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.WindowingStrategy;
 import org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.InvalidProtocolBufferException;
-import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -107,8 +105,7 @@ public class FlinkStreamingPortablePipelineTranslator
    * Creates a streaming translation context. The resulting Flink execution dag will live in a new
    * {@link StreamExecutionEnvironment}.
    */
-  @Override
-  public StreamingTranslationContext createTranslationContext(
+  public static StreamingTranslationContext createTranslationContext(
       JobInfo jobInfo,
       FlinkPipelineOptions pipelineOptions,
       String confDir,
@@ -124,8 +121,7 @@ public class FlinkStreamingPortablePipelineTranslator
    * the Flink {@link StreamExecutionEnvironment} that the execution plan will be applied to.
    */
   public static class StreamingTranslationContext
-      implements FlinkPortablePipelineTranslator.TranslationContext,
-          FlinkPortablePipelineTranslator.Executor {
+      implements FlinkPortablePipelineTranslator.TranslationContext {
 
     private final JobInfo jobInfo;
     private final FlinkPipelineOptions options;
@@ -152,11 +148,6 @@ public class FlinkStreamingPortablePipelineTranslator
       return options;
     }
 
-    @Override
-    public JobExecutionResult execute(String jobName) throws Exception {
-      return getExecutionEnvironment().execute(jobName);
-    }
-
     public StreamExecutionEnvironment getExecutionEnvironment() {
       return executionEnvironment;
     }
@@ -201,13 +192,7 @@ public class FlinkStreamingPortablePipelineTranslator
   }
 
   @Override
-  public Set<String> knownUrns() {
-    return urnToTransformTranslator.keySet();
-  }
-
-  @Override
-  public FlinkPortablePipelineTranslator.Executor translate(
-      StreamingTranslationContext context, RunnerApi.Pipeline pipeline) {
+  public void translate(StreamingTranslationContext context, RunnerApi.Pipeline pipeline) {
     QueryablePipeline p =
         QueryablePipeline.forTransforms(
             pipeline.getRootTransformIdsList(), pipeline.getComponents());
@@ -216,8 +201,6 @@ public class FlinkStreamingPortablePipelineTranslator
           .getOrDefault(transform.getTransform().getSpec().getUrn(), this::urnNotFound)
           .translate(transform.getId(), pipeline, context);
     }
-
-    return context;
   }
 
   private void urnNotFound(
diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py
index d34a734..c47cd77 100644
--- a/sdks/python/apache_beam/transforms/util.py
+++ b/sdks/python/apache_beam/transforms/util.py
@@ -33,7 +33,6 @@ from future.utils import itervalues
 
 from apache_beam import typehints
 from apache_beam.metrics import Metrics
-from apache_beam.portability import common_urns
 from apache_beam.transforms import window
 from apache_beam.transforms.core import CombinePerKey
 from apache_beam.transforms.core import DoFn
@@ -637,10 +636,3 @@ class Reshuffle(PTransform):
             | 'AddRandomKeys' >> Map(lambda t: (random.getrandbits(32), t))
             | ReshufflePerKey()
             | 'RemoveRandomKeys' >> Map(lambda t: t[1]))
-
-  def to_runner_api_parameter(self, unused_context):
-    return common_urns.composites.RESHUFFLE.urn, None
-
-  @PTransform.register_urn(common_urns.composites.RESHUFFLE.urn, None)
-  def from_runner_api_parameter(unused_parameter, unused_context):
-    return Reshuffle()