You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2019/03/29 15:02:45 UTC

[beam] branch master updated: [BEAM-6937] Apply FlinkTransformOverrides correctly with inferred streaming mode

This is an automated email from the ASF dual-hosted git repository.

iemejia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new c159d55  [BEAM-6937] Apply FlinkTransformOverrides correctly with inferred streaming mode
     new 9cf7afc  Merge pull request #8168: [BEAM-6937] Apply FlinkTransformOverrides correctly with inferred streaming mode
c159d55 is described below

commit c159d550856e92c70fb81471f45ebb91c2948b2e
Author: Maximilian Michels <mx...@apache.org>
AuthorDate: Fri Mar 29 13:37:10 2019 +0100

    [BEAM-6937] Apply FlinkTransformOverrides correctly with inferred streaming mode
    
    When streaming is set to false the streaming mode will be switched to true if
    the pipeline contains unbounded sources. There is a regression which prevents
    PipelineOverrides to be applied correctly in this case.
---
 .../core/construction/PTransformMatchers.java      | 18 +++++++
 .../runners/flink/CreateStreamingFlinkView.java    |  5 +-
 .../flink/FlinkPipelineExecutionEnvironment.java   | 14 +++--
 .../runners/flink/FlinkTransformOverrides.java     |  2 +-
 .../flink/PipelineTranslationModeOptimizer.java    | 24 ++++-----
 .../FlinkPipelineExecutionEnvironmentTest.java     | 59 ++++++++++++++++++++++
 .../PipelineTranslationModeOptimizerTest.java      | 36 +++++--------
 7 files changed, 112 insertions(+), 46 deletions(-)

diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
index 3b7014c..6af3f03 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
@@ -21,6 +21,7 @@ import static org.apache.beam.runners.core.construction.PTransformTranslation.WR
 
 import java.io.IOException;
 import java.util.HashSet;
+import java.util.Objects;
 import java.util.Set;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
@@ -75,6 +76,23 @@ public class PTransformMatchers {
     public String toString() {
       return MoreObjects.toStringHelper(this).add("urn", urn).toString();
     }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      EqualUrnPTransformMatcher that = (EqualUrnPTransformMatcher) o;
+      return urn.equals(that.urn);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(urn);
+    }
   }
 
   /**
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/CreateStreamingFlinkView.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/CreateStreamingFlinkView.java
index df06707..f94c961 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/CreateStreamingFlinkView.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/CreateStreamingFlinkView.java
@@ -138,7 +138,10 @@ class CreateStreamingFlinkView<ElemT, ViewT>
           PCollection<ElemT>,
           PCollection<ElemT>,
           PTransform<PCollection<ElemT>, PCollection<ElemT>>> {
-    public Factory() {}
+
+    static final Factory INSTANCE = new Factory();
+
+    private Factory() {}
 
     @Override
     public PTransformReplacement<PCollection<ElemT>, PCollection<ElemT>> getReplacementTransform(
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
index af97278..8e15808 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
@@ -82,10 +82,15 @@ class FlinkPipelineExecutionEnvironment {
     this.flinkBatchEnv = null;
     this.flinkStreamEnv = null;
 
-    pipeline.replaceAll(FlinkTransformOverrides.getDefaultOverrides(options));
+    final boolean hasUnboundedOutput =
+        PipelineTranslationModeOptimizer.hasUnboundedOutput(pipeline);
+    if (hasUnboundedOutput) {
+      LOG.info("Found unbounded PCollection. Switching to streaming execution.");
+      options.setStreaming(true);
+    }
 
-    PipelineTranslationModeOptimizer optimizer = new PipelineTranslationModeOptimizer(options);
-    optimizer.translate(pipeline);
+    // Replace transforms only after determining the execution mode (batch/streaming)
+    pipeline.replaceAll(FlinkTransformOverrides.getDefaultOverrides(options));
 
     // Needs to be done before creating the Flink ExecutionEnvironments
     prepareFilesToStageForRemoteClusterExecution(options);
@@ -95,8 +100,7 @@ class FlinkPipelineExecutionEnvironment {
       this.flinkStreamEnv =
           FlinkExecutionEnvironments.createStreamExecutionEnvironment(
               options, options.getFilesToStage());
-      if (optimizer.hasUnboundedSources()
-          && !flinkStreamEnv.getCheckpointConfig().isCheckpointingEnabled()) {
+      if (hasUnboundedOutput && !flinkStreamEnv.getCheckpointConfig().isCheckpointingEnabled()) {
         LOG.warn(
             "UnboundedSources present which rely on checkpointing, but checkpointing is disabled.");
       }
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkTransformOverrides.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkTransformOverrides.java
index 29ab365..160918b 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkTransformOverrides.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkTransformOverrides.java
@@ -53,7 +53,7 @@ class FlinkTransformOverrides {
           .add(
               PTransformOverride.of(
                   PTransformMatchers.urnEqualTo(PTransformTranslation.CREATE_VIEW_TRANSFORM_URN),
-                  new CreateStreamingFlinkView.Factory()));
+                  CreateStreamingFlinkView.Factory.INSTANCE));
     }
     return builder.build();
   }
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/PipelineTranslationModeOptimizer.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/PipelineTranslationModeOptimizer.java
index f852ad7..4d90bc8 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/PipelineTranslationModeOptimizer.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/PipelineTranslationModeOptimizer.java
@@ -24,7 +24,6 @@ import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollection.IsBounded;
 import org.apache.beam.sdk.values.PValue;
-import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -36,18 +35,19 @@ class PipelineTranslationModeOptimizer extends FlinkPipelineTranslator {
 
   private static final Logger LOG = LoggerFactory.getLogger(PipelineTranslationModeOptimizer.class);
 
-  private final FlinkPipelineOptions options;
-  private boolean hasRun;
-  private boolean hasUnboundedSources;
+  private boolean hasUnboundedCollections;
 
-  public PipelineTranslationModeOptimizer(FlinkPipelineOptions options) {
-    this.options = options;
+  static boolean hasUnboundedOutput(Pipeline p) {
+    PipelineTranslationModeOptimizer optimizer = new PipelineTranslationModeOptimizer();
+    optimizer.translate(p);
+    return optimizer.hasUnboundedCollections;
   }
 
+  private PipelineTranslationModeOptimizer() {}
+
   @Override
   public void translate(Pipeline pipeline) {
     super.translate(pipeline);
-    hasRun = true;
   }
 
   @Override
@@ -63,9 +63,8 @@ class PipelineTranslationModeOptimizer extends FlinkPipelineTranslator {
     AppliedPTransform<?, ?, ?> appliedPTransform = node.toAppliedPTransform(getPipeline());
     if (hasUnboundedOutput(appliedPTransform)) {
       Class<? extends PTransform> transformClass = node.getTransform().getClass();
-      LOG.info("Found {}. Switching to streaming execution.", transformClass);
-      options.setStreaming(true);
-      hasUnboundedSources = true;
+      LOG.debug("Found unbounded PCollection for transform %s", transformClass);
+      hasUnboundedCollections = true;
     }
   }
 
@@ -78,9 +77,4 @@ class PipelineTranslationModeOptimizer extends FlinkPipelineTranslator {
 
   @Override
   public void visitValue(PValue value, TransformHierarchy.Node producer) {}
-
-  boolean hasUnboundedSources() {
-    Preconditions.checkState(hasRun, "%s has not run yet.", getClass().getSimpleName());
-    return hasUnboundedSources;
-  }
 }
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironmentTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironmentTest.java
index 8d0451e..838a880 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironmentTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironmentTest.java
@@ -38,6 +38,8 @@ import java.net.MalformedURLException;
 import java.net.URL;
 import java.util.List;
 import java.util.stream.Collectors;
+import org.apache.beam.runners.core.construction.PTransformMatchers;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.GenerateSequence;
 import org.apache.beam.sdk.io.TextIO;
@@ -53,6 +55,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.RemoteEnvironment;
 import org.apache.flink.streaming.api.environment.RemoteStreamEnvironment;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.hamcrest.Matchers;
 import org.joda.time.Duration;
 import org.junit.Rule;
 import org.junit.Test;
@@ -215,6 +218,62 @@ public class FlinkPipelineExecutionEnvironmentTest implements Serializable {
   }
 
   @Test
+  public void shouldUseStreamingTransformOverridesWithUnboundedSources() {
+    FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    // no explicit streaming mode set
+    options.setRunner(FlinkRunner.class);
+    FlinkPipelineExecutionEnvironment flinkEnv = new FlinkPipelineExecutionEnvironment(options);
+    Pipeline p = Mockito.spy(Pipeline.create(options));
+
+    // Add unbounded source which will set the streaming mode to true
+    p.apply(GenerateSequence.from(0));
+
+    flinkEnv.translate(p);
+
+    ArgumentCaptor<ImmutableList> captor = ArgumentCaptor.forClass(ImmutableList.class);
+    Mockito.verify(p).replaceAll(captor.capture());
+    ImmutableList<PTransformOverride> overridesList = captor.getValue();
+
+    assertThat(
+        overridesList,
+        hasItem(
+            PTransformOverride.of(
+                PTransformMatchers.urnEqualTo(PTransformTranslation.CREATE_VIEW_TRANSFORM_URN),
+                CreateStreamingFlinkView.Factory.INSTANCE)));
+  }
+
+  @Test
+  public void testTranslationModeOverrideWithUnboundedSources() {
+    FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    options.setRunner(FlinkRunner.class);
+    options.setStreaming(false);
+
+    FlinkPipelineExecutionEnvironment flinkEnv = new FlinkPipelineExecutionEnvironment(options);
+    Pipeline pipeline = Pipeline.create(options);
+    pipeline.apply(GenerateSequence.from(0));
+    flinkEnv.translate(pipeline);
+
+    assertThat(options.isStreaming(), Matchers.is(true));
+  }
+
+  @Test
+  public void testTranslationModeNoOverrideWithoutUnboundedSources() {
+    boolean[] testArgs = new boolean[] {true, false};
+    for (boolean streaming : testArgs) {
+      FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+      options.setRunner(FlinkRunner.class);
+      options.setStreaming(streaming);
+
+      FlinkPipelineExecutionEnvironment flinkEnv = new FlinkPipelineExecutionEnvironment(options);
+      Pipeline pipeline = Pipeline.create(options);
+      pipeline.apply(GenerateSequence.from(0).to(10));
+      flinkEnv.translate(pipeline);
+
+      assertThat(options.isStreaming(), Matchers.is(streaming));
+    }
+  }
+
+  @Test
   public void shouldLogWarningWhenCheckpointingIsDisabled() {
     Pipeline pipeline = Pipeline.create();
     pipeline.getOptions().setRunner(TestFlinkRunner.class);
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineTranslationModeOptimizerTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineTranslationModeOptimizerTest.java
index 2395c0d..f1b87b1 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineTranslationModeOptimizerTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineTranslationModeOptimizerTest.java
@@ -17,47 +17,35 @@
  */
 package org.apache.beam.runners.flink;
 
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertThat;
 
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.GenerateSequence;
+import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.junit.Test;
 
-/**
- * Traverses the Pipeline to determine the translation mode (i.e. streaming or batch) for this
- * pipeline.
- */
+/** Tests for {@link PipelineTranslationModeOptimizer}. */
 public class PipelineTranslationModeOptimizerTest {
 
   @Test
-  public void testTranslationModeOverrideWithUnboundedSources() {
-    FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+  public void testUnboundedCollectionProducingTransform() {
+    PipelineOptions options = PipelineOptionsFactory.create();
     options.setRunner(FlinkRunner.class);
-    options.setStreaming(false);
-
-    FlinkPipelineExecutionEnvironment flinkEnv = new FlinkPipelineExecutionEnvironment(options);
     Pipeline pipeline = Pipeline.create(options);
     pipeline.apply(GenerateSequence.from(0));
-    flinkEnv.translate(pipeline);
 
-    assertThat(options.isStreaming(), is(true));
+    assertThat(PipelineTranslationModeOptimizer.hasUnboundedOutput(pipeline), is(true));
   }
 
   @Test
-  public void testTranslationModeNoOverrideWithoutUnboundedSources() {
-    boolean[] testArgs = new boolean[] {true, false};
-    for (boolean streaming : testArgs) {
-      FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
-      options.setRunner(FlinkRunner.class);
-      options.setStreaming(streaming);
-
-      FlinkPipelineExecutionEnvironment flinkEnv = new FlinkPipelineExecutionEnvironment(options);
-      Pipeline pipeline = Pipeline.create(options);
-      flinkEnv.translate(pipeline);
+  public void testBoundedCollectionProducingTransform() {
+    PipelineOptions options = PipelineOptionsFactory.create();
+    options.setRunner(FlinkRunner.class);
+    Pipeline pipeline = Pipeline.create(options);
+    pipeline.apply(GenerateSequence.from(0).to(10));
 
-      assertThat(options.isStreaming(), is(streaming));
-    }
+    assertThat(PipelineTranslationModeOptimizer.hasUnboundedOutput(pipeline), is(false));
   }
 }