You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2015/11/23 18:33:32 UTC

flink git commit: [FLINK-3020][streaming] set number of task slots to maximum parallelism in local execution

Repository: flink
Updated Branches:
  refs/heads/master 601e8c607 -> 8cabe67e7


[FLINK-3020][streaming] set number of task slots to maximum parallelism in local execution

This closes #1360.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8cabe67e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8cabe67e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8cabe67e

Branch: refs/heads/master
Commit: 8cabe67e7fb0eb40e7b58cfd52b8e1b0e5bc5268
Parents: 601e8c6
Author: Maximilian Michels <mx...@apache.org>
Authored: Tue Nov 17 15:04:31 2015 +0100
Committer: Maximilian Michels <mx...@apache.org>
Committed: Mon Nov 23 18:32:30 2015 +0100

----------------------------------------------------------------------
 .../org/apache/flink/runtime/jobgraph/JobGraph.java     | 12 ++++++++++++
 .../api/environment/LocalStreamEnvironment.java         |  2 +-
 2 files changed, 13 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8cabe67e/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
index a64d63c..566e44f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
@@ -539,6 +539,18 @@ public class JobGraph implements Serializable {
 		}
 	}
 
+	/**
+	 * Gets the maximum parallelism of all operations in this job graph.
+	 * @return The maximum parallelism of this job graph
+	 */
+	public int getMaximumParallelism() {
+		int maxParallelism = -1;
+		for (JobVertex vertex : taskVertices.values()) {
+			maxParallelism = Math.max(vertex.getParallelism(), maxParallelism);
+		}
+		return maxParallelism;
+	}
+
 	@Override
 	public String toString() {
 		return "JobGraph(jobId: " + jobID + ")";

http://git-wip-us.apache.org/repos/asf/flink/blob/8cabe67e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
index d6ddcff..e76a2c0 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
@@ -91,7 +91,7 @@ public class LocalStreamEnvironment extends StreamExecutionEnvironment {
 		configuration.addAll(jobGraph.getJobConfiguration());
 
 		configuration.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1L);
-		configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, getParallelism());
+		configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, jobGraph.getMaximumParallelism());
 		
 		// add (and override) the settings with what the user defined
 		configuration.addAll(this.conf);