You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/11/07 11:12:42 UTC
[2/3] incubator-flink git commit: [runtime] In local mode,
make sure taskmanagers have completed registration before starting a
job.
[runtime] In local mode, make sure taskmanagers have completed registration before starting a job.
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/ef9a3739
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/ef9a3739
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/ef9a3739
Branch: refs/heads/master
Commit: ef9a37390dfbd325b3bf2422334f99d22fca2a1a
Parents: ef40691
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Nov 6 19:14:13 2014 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Nov 6 19:14:13 2014 +0100
----------------------------------------------------------------------
.../apache/flink/client/minicluster/NepheleMiniCluster.java | 9 +++++++++
1 file changed, 9 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ef9a3739/flink-clients/src/main/java/org/apache/flink/client/minicluster/NepheleMiniCluster.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/minicluster/NepheleMiniCluster.java b/flink-clients/src/main/java/org/apache/flink/client/minicluster/NepheleMiniCluster.java
index e04006c..aac0786 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/minicluster/NepheleMiniCluster.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/minicluster/NepheleMiniCluster.java
@@ -267,6 +267,15 @@ public class NepheleMiniCluster {
while (jobManager.getNumberOfSlotsAvailableToScheduler() < numSlots) {
Thread.sleep(50);
}
+
+ // make sure that not just the jobmanager has the slots, but also the taskmanager
+ // has figured out its registration. under rare races, calls can be scheduled before that otherwise
+ TaskManager[] tms = getTaskManagers();
+ for (TaskManager tm : tms) {
+ while (tm.getRegisteredId() == null) {
+ Thread.sleep(10);
+ }
+ }
}
private static void initializeIOFormatClasses() {