You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/03/04 18:37:29 UTC
[3/3] flink git commit: [jobmanage] Move auto-parallelism of vertices
before master initialization
[jobmanage] Move auto-parallelism of vertices before master initialization
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a6f9f993
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a6f9f993
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a6f9f993
Branch: refs/heads/release-0.8
Commit: a6f9f9939ca03026baeefb3bd0876b90068b7682
Parents: 785f204
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Mar 4 18:08:22 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Mar 4 18:08:22 2015 +0100
----------------------------------------------------------------------
.../java/org/apache/flink/runtime/jobmanager/JobManager.java | 7 ++++---
1 file changed, 4 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/a6f9f993/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
index 223c6c6..ac39047 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
@@ -384,12 +384,13 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide
throw new JobException(String.format("The vertex %s (%s) has no invokable class.", vertex.getID(), vertex.getName()));
}
- // master side initialization
- vertex.initializeOnMaster(userCodeLoader);
-
+ // set the parallelism in case of auto parallelism
if (vertex.getParallelism() == ExecutionConfig.PARALLELISM_AUTO_MAX) {
vertex.setParallelism(numSlots);
}
+
+ // master side initialization
+ vertex.initializeOnMaster(userCodeLoader);
}
// first topologically sort the job vertices to form the basis of creating the execution graph