You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2019/03/29 15:02:45 UTC
[beam] branch master updated: [BEAM-6937] Apply
FlinkTransformOverrides correctly with inferred streaming mode
This is an automated email from the ASF dual-hosted git repository.
iemejia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new c159d55 [BEAM-6937] Apply FlinkTransformOverrides correctly with inferred streaming mode
new 9cf7afc Merge pull request #8168: [BEAM-6937] Apply FlinkTransformOverrides correctly with inferred streaming mode
c159d55 is described below
commit c159d550856e92c70fb81471f45ebb91c2948b2e
Author: Maximilian Michels <mx...@apache.org>
AuthorDate: Fri Mar 29 13:37:10 2019 +0100
[BEAM-6937] Apply FlinkTransformOverrides correctly with inferred streaming mode
When streaming is set to false the streaming mode will be switched to true if
the pipeline contains unbounded sources. There is a regression which prevents
PipelineOverrides to be applied correctly in this case.
---
.../core/construction/PTransformMatchers.java | 18 +++++++
.../runners/flink/CreateStreamingFlinkView.java | 5 +-
.../flink/FlinkPipelineExecutionEnvironment.java | 14 +++--
.../runners/flink/FlinkTransformOverrides.java | 2 +-
.../flink/PipelineTranslationModeOptimizer.java | 24 ++++-----
.../FlinkPipelineExecutionEnvironmentTest.java | 59 ++++++++++++++++++++++
.../PipelineTranslationModeOptimizerTest.java | 36 +++++--------
7 files changed, 112 insertions(+), 46 deletions(-)
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
index 3b7014c..6af3f03 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
@@ -21,6 +21,7 @@ import static org.apache.beam.runners.core.construction.PTransformTranslation.WR
import java.io.IOException;
import java.util.HashSet;
+import java.util.Objects;
import java.util.Set;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
@@ -75,6 +76,23 @@ public class PTransformMatchers {
public String toString() {
return MoreObjects.toStringHelper(this).add("urn", urn).toString();
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ EqualUrnPTransformMatcher that = (EqualUrnPTransformMatcher) o;
+ return urn.equals(that.urn);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(urn);
+ }
}
/**
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/CreateStreamingFlinkView.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/CreateStreamingFlinkView.java
index df06707..f94c961 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/CreateStreamingFlinkView.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/CreateStreamingFlinkView.java
@@ -138,7 +138,10 @@ class CreateStreamingFlinkView<ElemT, ViewT>
PCollection<ElemT>,
PCollection<ElemT>,
PTransform<PCollection<ElemT>, PCollection<ElemT>>> {
- public Factory() {}
+
+ static final Factory INSTANCE = new Factory();
+
+ private Factory() {}
@Override
public PTransformReplacement<PCollection<ElemT>, PCollection<ElemT>> getReplacementTransform(
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
index af97278..8e15808 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
@@ -82,10 +82,15 @@ class FlinkPipelineExecutionEnvironment {
this.flinkBatchEnv = null;
this.flinkStreamEnv = null;
- pipeline.replaceAll(FlinkTransformOverrides.getDefaultOverrides(options));
+ final boolean hasUnboundedOutput =
+ PipelineTranslationModeOptimizer.hasUnboundedOutput(pipeline);
+ if (hasUnboundedOutput) {
+ LOG.info("Found unbounded PCollection. Switching to streaming execution.");
+ options.setStreaming(true);
+ }
- PipelineTranslationModeOptimizer optimizer = new PipelineTranslationModeOptimizer(options);
- optimizer.translate(pipeline);
+ // Replace transforms only after determining the execution mode (batch/streaming)
+ pipeline.replaceAll(FlinkTransformOverrides.getDefaultOverrides(options));
// Needs to be done before creating the Flink ExecutionEnvironments
prepareFilesToStageForRemoteClusterExecution(options);
@@ -95,8 +100,7 @@ class FlinkPipelineExecutionEnvironment {
this.flinkStreamEnv =
FlinkExecutionEnvironments.createStreamExecutionEnvironment(
options, options.getFilesToStage());
- if (optimizer.hasUnboundedSources()
- && !flinkStreamEnv.getCheckpointConfig().isCheckpointingEnabled()) {
+ if (hasUnboundedOutput && !flinkStreamEnv.getCheckpointConfig().isCheckpointingEnabled()) {
LOG.warn(
"UnboundedSources present which rely on checkpointing, but checkpointing is disabled.");
}
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkTransformOverrides.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkTransformOverrides.java
index 29ab365..160918b 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkTransformOverrides.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkTransformOverrides.java
@@ -53,7 +53,7 @@ class FlinkTransformOverrides {
.add(
PTransformOverride.of(
PTransformMatchers.urnEqualTo(PTransformTranslation.CREATE_VIEW_TRANSFORM_URN),
- new CreateStreamingFlinkView.Factory()));
+ CreateStreamingFlinkView.Factory.INSTANCE));
}
return builder.build();
}
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/PipelineTranslationModeOptimizer.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/PipelineTranslationModeOptimizer.java
index f852ad7..4d90bc8 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/PipelineTranslationModeOptimizer.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/PipelineTranslationModeOptimizer.java
@@ -24,7 +24,6 @@ import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollection.IsBounded;
import org.apache.beam.sdk.values.PValue;
-import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,18 +35,19 @@ class PipelineTranslationModeOptimizer extends FlinkPipelineTranslator {
private static final Logger LOG = LoggerFactory.getLogger(PipelineTranslationModeOptimizer.class);
- private final FlinkPipelineOptions options;
- private boolean hasRun;
- private boolean hasUnboundedSources;
+ private boolean hasUnboundedCollections;
- public PipelineTranslationModeOptimizer(FlinkPipelineOptions options) {
- this.options = options;
+ static boolean hasUnboundedOutput(Pipeline p) {
+ PipelineTranslationModeOptimizer optimizer = new PipelineTranslationModeOptimizer();
+ optimizer.translate(p);
+ return optimizer.hasUnboundedCollections;
}
+ private PipelineTranslationModeOptimizer() {}
+
@Override
public void translate(Pipeline pipeline) {
super.translate(pipeline);
- hasRun = true;
}
@Override
@@ -63,9 +63,8 @@ class PipelineTranslationModeOptimizer extends FlinkPipelineTranslator {
AppliedPTransform<?, ?, ?> appliedPTransform = node.toAppliedPTransform(getPipeline());
if (hasUnboundedOutput(appliedPTransform)) {
Class<? extends PTransform> transformClass = node.getTransform().getClass();
- LOG.info("Found {}. Switching to streaming execution.", transformClass);
- options.setStreaming(true);
- hasUnboundedSources = true;
+ LOG.debug("Found unbounded PCollection for transform %s", transformClass);
+ hasUnboundedCollections = true;
}
}
@@ -78,9 +77,4 @@ class PipelineTranslationModeOptimizer extends FlinkPipelineTranslator {
@Override
public void visitValue(PValue value, TransformHierarchy.Node producer) {}
-
- boolean hasUnboundedSources() {
- Preconditions.checkState(hasRun, "%s has not run yet.", getClass().getSimpleName());
- return hasUnboundedSources;
- }
}
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironmentTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironmentTest.java
index 8d0451e..838a880 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironmentTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironmentTest.java
@@ -38,6 +38,8 @@ import java.net.MalformedURLException;
import java.net.URL;
import java.util.List;
import java.util.stream.Collectors;
+import org.apache.beam.runners.core.construction.PTransformMatchers;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.TextIO;
@@ -53,6 +55,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.RemoteEnvironment;
import org.apache.flink.streaming.api.environment.RemoteStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.junit.Rule;
import org.junit.Test;
@@ -215,6 +218,62 @@ public class FlinkPipelineExecutionEnvironmentTest implements Serializable {
}
@Test
+ public void shouldUseStreamingTransformOverridesWithUnboundedSources() {
+ FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+ // no explicit streaming mode set
+ options.setRunner(FlinkRunner.class);
+ FlinkPipelineExecutionEnvironment flinkEnv = new FlinkPipelineExecutionEnvironment(options);
+ Pipeline p = Mockito.spy(Pipeline.create(options));
+
+ // Add unbounded source which will set the streaming mode to true
+ p.apply(GenerateSequence.from(0));
+
+ flinkEnv.translate(p);
+
+ ArgumentCaptor<ImmutableList> captor = ArgumentCaptor.forClass(ImmutableList.class);
+ Mockito.verify(p).replaceAll(captor.capture());
+ ImmutableList<PTransformOverride> overridesList = captor.getValue();
+
+ assertThat(
+ overridesList,
+ hasItem(
+ PTransformOverride.of(
+ PTransformMatchers.urnEqualTo(PTransformTranslation.CREATE_VIEW_TRANSFORM_URN),
+ CreateStreamingFlinkView.Factory.INSTANCE)));
+ }
+
+ @Test
+ public void testTranslationModeOverrideWithUnboundedSources() {
+ FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+ options.setRunner(FlinkRunner.class);
+ options.setStreaming(false);
+
+ FlinkPipelineExecutionEnvironment flinkEnv = new FlinkPipelineExecutionEnvironment(options);
+ Pipeline pipeline = Pipeline.create(options);
+ pipeline.apply(GenerateSequence.from(0));
+ flinkEnv.translate(pipeline);
+
+ assertThat(options.isStreaming(), Matchers.is(true));
+ }
+
+ @Test
+ public void testTranslationModeNoOverrideWithoutUnboundedSources() {
+ boolean[] testArgs = new boolean[] {true, false};
+ for (boolean streaming : testArgs) {
+ FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+ options.setRunner(FlinkRunner.class);
+ options.setStreaming(streaming);
+
+ FlinkPipelineExecutionEnvironment flinkEnv = new FlinkPipelineExecutionEnvironment(options);
+ Pipeline pipeline = Pipeline.create(options);
+ pipeline.apply(GenerateSequence.from(0).to(10));
+ flinkEnv.translate(pipeline);
+
+ assertThat(options.isStreaming(), Matchers.is(streaming));
+ }
+ }
+
+ @Test
public void shouldLogWarningWhenCheckpointingIsDisabled() {
Pipeline pipeline = Pipeline.create();
pipeline.getOptions().setRunner(TestFlinkRunner.class);
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineTranslationModeOptimizerTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineTranslationModeOptimizerTest.java
index 2395c0d..f1b87b1 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineTranslationModeOptimizerTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineTranslationModeOptimizerTest.java
@@ -17,47 +17,35 @@
*/
package org.apache.beam.runners.flink;
+import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertThat;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.GenerateSequence;
+import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.junit.Test;
-/**
- * Traverses the Pipeline to determine the translation mode (i.e. streaming or batch) for this
- * pipeline.
- */
+/** Tests for {@link PipelineTranslationModeOptimizer}. */
public class PipelineTranslationModeOptimizerTest {
@Test
- public void testTranslationModeOverrideWithUnboundedSources() {
- FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+ public void testUnboundedCollectionProducingTransform() {
+ PipelineOptions options = PipelineOptionsFactory.create();
options.setRunner(FlinkRunner.class);
- options.setStreaming(false);
-
- FlinkPipelineExecutionEnvironment flinkEnv = new FlinkPipelineExecutionEnvironment(options);
Pipeline pipeline = Pipeline.create(options);
pipeline.apply(GenerateSequence.from(0));
- flinkEnv.translate(pipeline);
- assertThat(options.isStreaming(), is(true));
+ assertThat(PipelineTranslationModeOptimizer.hasUnboundedOutput(pipeline), is(true));
}
@Test
- public void testTranslationModeNoOverrideWithoutUnboundedSources() {
- boolean[] testArgs = new boolean[] {true, false};
- for (boolean streaming : testArgs) {
- FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
- options.setRunner(FlinkRunner.class);
- options.setStreaming(streaming);
-
- FlinkPipelineExecutionEnvironment flinkEnv = new FlinkPipelineExecutionEnvironment(options);
- Pipeline pipeline = Pipeline.create(options);
- flinkEnv.translate(pipeline);
+ public void testBoundedCollectionProducingTransform() {
+ PipelineOptions options = PipelineOptionsFactory.create();
+ options.setRunner(FlinkRunner.class);
+ Pipeline pipeline = Pipeline.create(options);
+ pipeline.apply(GenerateSequence.from(0).to(10));
- assertThat(options.isStreaming(), is(streaming));
- }
+ assertThat(PipelineTranslationModeOptimizer.hasUnboundedOutput(pipeline), is(false));
}
}