You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2014/09/22 16:46:41 UTC

git commit: Wait for slots instead of TaskManagers in test environment

Repository: incubator-flink
Updated Branches:
  refs/heads/master 582cd0304 -> 24a9b1e58


Wait for slots instead of TaskManagers in test environment

Instead of waiting for the number of connected task managers, the local
mini cluster used for tests now waits for the total number of available
slots. Waiting for the number of connected task managers instead of the
available slots might result in races in rare situations.

In addition, rename task tracker (sic) to task manager in test classes.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/24a9b1e5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/24a9b1e5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/24a9b1e5

Branch: refs/heads/master
Commit: 24a9b1e5801bb7c00997520329b2ef507f48764a
Parents: 582cd03
Author: Ufuk Celebi <uc...@apache.org>
Authored: Mon Sep 22 16:34:27 2014 +0200
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Mon Sep 22 16:34:27 2014 +0200

----------------------------------------------------------------------
 .../org/apache/flink/streaming/util/ClusterUtil.java  |  2 +-
 .../flink/client/minicluster/NepheleMiniCluster.java  | 14 +++++++-------
 .../org/apache/flink/test/util/AbstractTestBase.java  | 10 +++++-----
 .../test/exampleScalaPrograms/WordCountITCase.java    |  2 +-
 .../PackagedProgramEndToEndITCase.java                |  2 +-
 .../flink/test/runtime/NetworkStackThroughput.java    |  2 +-
 6 files changed, 16 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/24a9b1e5/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
index ec73aef..76d7aaf 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
@@ -48,7 +48,7 @@ public class ClusterUtil {
 
 		NepheleMiniCluster exec = new NepheleMiniCluster();
 		exec.setMemorySize(memorySize);
-		exec.setNumTaskTracker(numberOfTaskTrackers);
+		exec.setNumTaskManager(numberOfTaskTrackers);
 		if (LOG.isInfoEnabled()) {
 			LOG.info("Running on mini cluster");
 		}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/24a9b1e5/flink-clients/src/main/java/org/apache/flink/client/minicluster/NepheleMiniCluster.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/minicluster/NepheleMiniCluster.java b/flink-clients/src/main/java/org/apache/flink/client/minicluster/NepheleMiniCluster.java
index f5e463f..bb9f890 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/minicluster/NepheleMiniCluster.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/minicluster/NepheleMiniCluster.java
@@ -62,7 +62,7 @@ public class NepheleMiniCluster {
 	
 	private int taskManagerDataPort = DEFAULT_TM_DATA_PORT;
 
-	private int numTaskTracker = DEFAULT_NUM_TASK_MANAGER;
+	private int numTaskManager = DEFAULT_NUM_TASK_MANAGER;
 
 	private int taskManagerNumSlots = DEFAULT_TASK_MANAGER_NUM_SLOTS;
 	
@@ -157,9 +157,9 @@ public class NepheleMiniCluster {
 		this.defaultAlwaysCreateDirectory = defaultAlwaysCreateDirectory;
 	}
 
-	public void setNumTaskTracker(int numTaskTracker) { this.numTaskTracker = numTaskTracker; }
+	public void setNumTaskManager(int numTaskManager) { this.numTaskManager = numTaskManager; }
 
-	public int getNumTaskTracker() { return numTaskTracker; }
+	public int getNumTaskManager() { return numTaskManager; }
 
 	public void setTaskManagerNumSlots(int taskManagerNumSlots) { this.taskManagerNumSlots = taskManagerNumSlots; }
 
@@ -202,7 +202,7 @@ public class NepheleMiniCluster {
 			} else {
 				Configuration conf = getMiniclusterDefaultConfig(jobManagerRpcPort, taskManagerRpcPort,
 					taskManagerDataPort, memorySize, hdfsConfigFile, lazyMemoryAllocation, defaultOverwriteFiles,
-						defaultAlwaysCreateDirectory, taskManagerNumSlots, numTaskTracker);
+						defaultAlwaysCreateDirectory, taskManagerNumSlots, numTaskManager);
 				GlobalConfiguration.includeConfiguration(conf);
 			}
 
@@ -226,7 +226,7 @@ public class NepheleMiniCluster {
 			// start the job manager
 			jobManager = new JobManager(ExecutionMode.LOCAL);
 	
-			waitForJobManagerToBecomeReady(numTaskTracker);
+			waitForJobManagerToBecomeReady(taskManagerNumSlots * numTaskManager);
 		}
 	}
 
@@ -243,8 +243,8 @@ public class NepheleMiniCluster {
 	// Network utility methods
 	// ------------------------------------------------------------------------
 	
-	private void waitForJobManagerToBecomeReady(int numTaskManagers) throws InterruptedException {
-		while (jobManager.getNumberOfTaskManagers() < numTaskManagers) {
+	private void waitForJobManagerToBecomeReady(int numSlots) throws InterruptedException {
+		while (jobManager.getNumberOfSlotsAvailableToScheduler() < numSlots) {
 			Thread.sleep(50);
 		}
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/24a9b1e5/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
index 3ef137e..9613052 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
@@ -55,7 +55,7 @@ public abstract class AbstractTestBase {
 
 	protected static final int DEFAULT_TASK_MANAGER_NUM_SLOTS = 1;
 
-	protected static final int DEFAULT_NUM_TASK_TRACKER = 1;
+	protected static final int DEFAULT_NUM_TASK_MANAGER = 1;
 
 	protected final Configuration config;
 	
@@ -65,7 +65,7 @@ public abstract class AbstractTestBase {
 
 	protected int taskManagerNumSlots = DEFAULT_TASK_MANAGER_NUM_SLOTS;
 
-	protected int numTaskTracker = DEFAULT_NUM_TASK_TRACKER;
+	protected int numTaskManager = DEFAULT_NUM_TASK_MANAGER;
 
 	public AbstractTestBase(Configuration config) {
 		verifyJvmOptions();
@@ -89,7 +89,7 @@ public abstract class AbstractTestBase {
 		this.executor.setLazyMemoryAllocation(true);
 		this.executor.setMemorySize(TASK_MANAGER_MEMORY_SIZE);
 		this.executor.setTaskManagerNumSlots(taskManagerNumSlots);
-		this.executor.setNumTaskTracker(this.numTaskTracker);
+		this.executor.setNumTaskManager(this.numTaskManager);
 		this.executor.start();
 	}
 
@@ -115,9 +115,9 @@ public abstract class AbstractTestBase {
 
 	public void setTaskManagerNumSlots(int taskManagerNumSlots) { this.taskManagerNumSlots = taskManagerNumSlots; }
 
-	public int getNumTaskTracker() { return numTaskTracker; }
+	public int getNumTaskManager() { return numTaskManager; }
 
-	public void setNumTaskTracker(int numTaskTracker) { this.numTaskTracker = numTaskTracker; }
+	public void setNumTaskManager(int numTaskManager) { this.numTaskManager = numTaskManager; }
 
 	
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/24a9b1e5/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/WordCountITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/WordCountITCase.java b/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/WordCountITCase.java
index c0e1db0..f9b15e3 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/WordCountITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/WordCountITCase.java
@@ -29,7 +29,7 @@ public class WordCountITCase extends JavaProgramTestBase {
 
 	public WordCountITCase(){
 		setDegreeOfParallelism(4);
-		setNumTaskTracker(2);
+		setNumTaskManager(2);
 		setTaskManagerNumSlots(2);
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/24a9b1e5/flink-tests/src/test/java/org/apache/flink/test/localDistributed/PackagedProgramEndToEndITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/localDistributed/PackagedProgramEndToEndITCase.java b/flink-tests/src/test/java/org/apache/flink/test/localDistributed/PackagedProgramEndToEndITCase.java
index d77c653..a27a9cf 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/localDistributed/PackagedProgramEndToEndITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/localDistributed/PackagedProgramEndToEndITCase.java
@@ -59,7 +59,7 @@ public class PackagedProgramEndToEndITCase {
 			String jarPath = "target/maven-test-jar.jar";
 
 			// run KMeans
-			cluster.setNumTaskTracker(2);
+			cluster.setNumTaskManager(2);
 			cluster.setTaskManagerNumSlots(2);
 			cluster.start();
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/24a9b1e5/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughput.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughput.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughput.java
index 7c79e35..af9bb9b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughput.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughput.java
@@ -82,7 +82,7 @@ public class NetworkStackThroughput {
 				throw new RuntimeException("The test case defines a parallelism that is not a multiple of the slots per task manager.");
 			}
 
-			setNumTaskTracker(parallelism / numSlots);
+			setNumTaskManager(parallelism / numSlots);
 			setTaskManagerNumSlots(numSlots);
 		}