You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mx...@apache.org on 2018/12/04 17:02:23 UTC

[beam] 01/01: Merge pull request #7128: [BEAM-6077] If available, use max_parallelism for splitting unbounded source

This is an automated email from the ASF dual-hosted git repository.

mxm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit a3a8a32dd64290a44c26bedff74b07d5ad571c82
Merge: e5d9cf4 5565b0a
Author: Maximilian Michels <mx...@apache.org>
AuthorDate: Tue Dec 4 18:02:15 2018 +0100

    Merge pull request #7128: [BEAM-6077] If available, use max_parallelism for splitting unbounded source

 .../runners/flink/FlinkExecutionEnvironments.java  |   3 +
 .../beam/runners/flink/FlinkPipelineOptions.java   |   8 +
 .../flink/FlinkStreamingTransformTranslators.java  |  26 ++-
 .../flink/FlinkExecutionEnvironmentsTest.java      |  14 ++
 .../FlinkStreamingTransformTranslatorsTest.java    | 238 +++++++++++++++++++++
 .../beam/runners/flink/PipelineOptionsTest.java    |   1 +
 6 files changed, 281 insertions(+), 9 deletions(-)

diff --cc runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
index 25854cf,55c3865..8656860
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java
@@@ -191,8 -150,11 +191,11 @@@ public class FlinkExecutionEnvironment
      // Set the parallelism, required by UnboundedSourceWrapper to generate consistent splits.
      final int parallelism =
          determineParallelism(
 -            options.getParallelism(), flinkStreamEnv.getParallelism(), flinkConfigDir);
 +            options.getParallelism(), flinkStreamEnv.getParallelism(), flinkConfig);
      flinkStreamEnv.setParallelism(parallelism);
+     if (options.getMaxParallelism() > 0) {
+       flinkStreamEnv.setMaxParallelism(options.getMaxParallelism());
+     }
      // set parallelism in the options (required by some execution code)
      options.setParallelism(parallelism);