You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mx...@apache.org on 2018/11/01 09:54:28 UTC

[beam] 01/01: Merge pull request #6897: [BEAM-5464] Allow to configure ExecutionMode for batch pipelines

This is an automated email from the ASF dual-hosted git repository.

mxm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 03c08377d3b13fcdcb489cdd46ab24c70e58705d
Merge: d046063 329e51f
Author: Maximilian Michels <mx...@apache.org>
AuthorDate: Thu Nov 1 10:22:56 2018 +0100

    Merge pull request #6897: [BEAM-5464] Allow to configure ExecutionMode for batch pipelines

 .../beam/runners/flink/FlinkExecutionEnvironments.java       |  3 +++
 .../org/apache/beam/runners/flink/FlinkPipelineOptions.java  | 12 ++++++++++++
 .../org/apache/beam/runners/flink/PipelineOptionsTest.java   |  2 ++
 3 files changed, 17 insertions(+)

diff --cc runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
index b695c37,a84b964..234b457
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
@@@ -187,4 -189,12 +189,14 @@@ public interface FlinkPipelineOption
    Long getLatencyTrackingInterval();
  
    void setLatencyTrackingInterval(Long interval);
+ 
+   @Description(
 -      "Flink mode for data exchange for batch pipeline. "
 -          + "Reference {@link org.apache.flink.api.common.ExecutionMode}")
++      "Flink mode for data exchange of batch pipelines. "
++          + "Reference {@link org.apache.flink.api.common.ExecutionMode}. "
++          + "Set this to BATCH_FORCED if pipelines get blocked, see "
++          + "https://issues.apache.org/jira/browse/FLINK-10672")
+   @Default.Enum(PIPELINED)
+   ExecutionMode getExecutionModeForBatch();
+ 
+   void setExecutionModeForBatch(ExecutionMode executionMode);
  }
diff --cc runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
index 49cdbe8,49cdbe8..257501f
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
@@@ -38,6 -38,6 +38,7 @@@ import org.apache.beam.sdk.values.Tuple
  import org.apache.beam.sdk.values.WindowingStrategy;
  import org.apache.commons.lang3.SerializationUtils;
  import org.apache.flink.api.common.ExecutionConfig;
++import org.apache.flink.api.common.ExecutionMode;
  import org.apache.flink.api.common.typeinfo.TypeHint;
  import org.apache.flink.api.common.typeinfo.TypeInformation;
  import org.apache.flink.streaming.api.CheckpointingMode;
@@@ -84,6 -84,6 +85,7 @@@ public class PipelineOptionsTest 
      assertThat(options.getStateBackend(), is(nullValue()));
      assertThat(options.getMaxBundleSize(), is(1000L));
      assertThat(options.getMaxBundleTimeMills(), is(1000L));
++    assertThat(options.getExecutionModeForBatch(), is(ExecutionMode.PIPELINED));
    }
  
    @Test(expected = Exception.class)