You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mx...@apache.org on 2018/11/01 09:54:28 UTC
[beam] 01/01: Merge pull request #6897: [BEAM-5464] Allow to
configure ExecutionMode for batch pipelines
This is an automated email from the ASF dual-hosted git repository.
mxm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
commit 03c08377d3b13fcdcb489cdd46ab24c70e58705d
Merge: d046063 329e51f
Author: Maximilian Michels <mx...@apache.org>
AuthorDate: Thu Nov 1 10:22:56 2018 +0100
Merge pull request #6897: [BEAM-5464] Allow to configure ExecutionMode for batch pipelines
.../beam/runners/flink/FlinkExecutionEnvironments.java | 3 +++
.../org/apache/beam/runners/flink/FlinkPipelineOptions.java | 12 ++++++++++++
.../org/apache/beam/runners/flink/PipelineOptionsTest.java | 2 ++
3 files changed, 17 insertions(+)
diff --cc runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
index b695c37,a84b964..234b457
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
@@@ -187,4 -189,12 +189,14 @@@ public interface FlinkPipelineOption
Long getLatencyTrackingInterval();
void setLatencyTrackingInterval(Long interval);
+
+ @Description(
- "Flink mode for data exchange for batch pipeline. "
- + "Reference {@link org.apache.flink.api.common.ExecutionMode}")
++ "Flink mode for data exchange of batch pipelines. "
++ + "Reference {@link org.apache.flink.api.common.ExecutionMode}. "
++ + "Set this to BATCH_FORCED if pipelines get blocked, see "
++ + "https://issues.apache.org/jira/browse/FLINK-10672")
+ @Default.Enum(PIPELINED)
+ ExecutionMode getExecutionModeForBatch();
+
+ void setExecutionModeForBatch(ExecutionMode executionMode);
}
diff --cc runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
index 49cdbe8,49cdbe8..257501f
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
@@@ -38,6 -38,6 +38,7 @@@ import org.apache.beam.sdk.values.Tuple
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.flink.api.common.ExecutionConfig;
++import org.apache.flink.api.common.ExecutionMode;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.CheckpointingMode;
@@@ -84,6 -84,6 +85,7 @@@ public class PipelineOptionsTest
assertThat(options.getStateBackend(), is(nullValue()));
assertThat(options.getMaxBundleSize(), is(1000L));
assertThat(options.getMaxBundleTimeMills(), is(1000L));
++ assertThat(options.getExecutionModeForBatch(), is(ExecutionMode.PIPELINED));
}
@Test(expected = Exception.class)