You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mx...@apache.org on 2018/12/04 17:02:23 UTC
[beam] 01/01: Merge pull request #7128: [BEAM-6077] If available,
use max_parallelism for splitting unbounded source
This is an automated email from the ASF dual-hosted git repository.
mxm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
commit a3a8a32dd64290a44c26bedff74b07d5ad571c82
Merge: e5d9cf4 5565b0a
Author: Maximilian Michels <mx...@apache.org>
AuthorDate: Tue Dec 4 18:02:15 2018 +0100
Merge pull request #7128: [BEAM-6077] If available, use max_parallelism for splitting unbounded source
.../runners/flink/FlinkExecutionEnvironments.java | 3 +
.../beam/runners/flink/FlinkPipelineOptions.java | 8 +
.../flink/FlinkStreamingTransformTranslators.java | 26 ++-
.../flink/FlinkExecutionEnvironmentsTest.java | 14 ++
.../FlinkStreamingTransformTranslatorsTest.java | 238 +++++++++++++++++++++
.../beam/runners/flink/PipelineOptionsTest.java | 1 +
6 files changed, 281 insertions(+), 9 deletions(-)
diff --cc runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
index 25854cf,55c3865..8656860
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
@@@ -191,8 -150,11 +191,11 @@@ public class FlinkExecutionEnvironment
// Set the parallelism, required by UnboundedSourceWrapper to generate consistent splits.
final int parallelism =
determineParallelism(
- options.getParallelism(), flinkStreamEnv.getParallelism(), flinkConfigDir);
+ options.getParallelism(), flinkStreamEnv.getParallelism(), flinkConfig);
flinkStreamEnv.setParallelism(parallelism);
+ if (options.getMaxParallelism() > 0) {
+ flinkStreamEnv.setMaxParallelism(options.getMaxParallelism());
+ }
// set parallelism in the options (required by some execution code)
options.setParallelism(parallelism);