You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/11/07 11:12:42 UTC

[2/3] incubator-flink git commit: [runtime] In local mode, make sure taskmanagers have completed registration before starting a job.

[runtime] In local mode, make sure taskmanagers have completed registration before starting a job.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/ef9a3739
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/ef9a3739
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/ef9a3739

Branch: refs/heads/master
Commit: ef9a37390dfbd325b3bf2422334f99d22fca2a1a
Parents: ef40691
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Nov 6 19:14:13 2014 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Nov 6 19:14:13 2014 +0100

----------------------------------------------------------------------
 .../apache/flink/client/minicluster/NepheleMiniCluster.java | 9 +++++++++
 1 file changed, 9 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ef9a3739/flink-clients/src/main/java/org/apache/flink/client/minicluster/NepheleMiniCluster.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/minicluster/NepheleMiniCluster.java b/flink-clients/src/main/java/org/apache/flink/client/minicluster/NepheleMiniCluster.java
index e04006c..aac0786 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/minicluster/NepheleMiniCluster.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/minicluster/NepheleMiniCluster.java
@@ -267,6 +267,15 @@ public class NepheleMiniCluster {
 		while (jobManager.getNumberOfSlotsAvailableToScheduler() < numSlots) {
 			Thread.sleep(50);
 		}
+		
+		// make sure that not just the jobmanager has the slots, but also the taskmanager
+		// has figured out its registration. under rare races, calls can be scheduled before that otherwise
+		TaskManager[] tms = getTaskManagers();
+		for (TaskManager tm : tms) {
+			while (tm.getRegisteredId() == null) {
+				Thread.sleep(10);
+			}
+		}
 	}
 	
 	private static void initializeIOFormatClasses() {