You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2015/09/21 11:16:34 UTC

flink git commit: [test-stability] Hardens TaskManagerRegistrationTest by increasing the timeouts

Repository: flink
Updated Branches:
  refs/heads/master 031aa4d2a -> 127cd02d6


[test-stability] Hardens TaskManagerRegistrationTest by increasing the timeouts


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/127cd02d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/127cd02d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/127cd02d

Branch: refs/heads/master
Commit: 127cd02d6232b708509fe60048eb9593a6995845
Parents: 031aa4d
Author: Till Rohrmann <tr...@apache.org>
Authored: Fri Sep 18 14:14:08 2015 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Mon Sep 21 11:14:54 2015 +0200

----------------------------------------------------------------------
 .../TaskManagerRegistrationTest.java            | 43 ++++++++++----------
 1 file changed, 22 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/127cd02d/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
index 85123a1..2b5b709 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
@@ -75,13 +75,15 @@ public class TaskManagerRegistrationTest extends TestLogger {
 
 	private static Configuration config;
 
+	private static FiniteDuration timeout = new FiniteDuration(20, TimeUnit.SECONDS);
+
 	@BeforeClass
 	public static void startActorSystem() {
 		config = new Configuration();
-		config.getString(ConfigConstants.AKKA_ASK_TIMEOUT, "5 s");
-		config.getString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "200 ms");
-		config.getString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "2 s");
-		config.getDouble(ConfigConstants.AKKA_WATCH_THRESHOLD, 2.0);
+		config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "5 s");
+		config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "200 ms");
+		config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "2 s");
+		config.setDouble(ConfigConstants.AKKA_WATCH_THRESHOLD, 2.0);
 
 		actorSystem = AkkaUtils.createLocalActorSystem(config);
 	}
@@ -127,14 +129,14 @@ public class TaskManagerRegistrationTest extends TestLogger {
 				// check that the TaskManagers are registered
 				Future<Object> responseFuture1 = taskManager1.ask(
 						TaskManagerMessages.getNotifyWhenRegisteredAtJobManagerMessage(),
-						new FiniteDuration(5000, TimeUnit.MILLISECONDS));
+						timeout);
 
 				Future<Object> responseFuture2 = taskManager2.ask(
 						TaskManagerMessages.getNotifyWhenRegisteredAtJobManagerMessage(),
-						new FiniteDuration(5000, TimeUnit.MILLISECONDS));
+						timeout);
 
-				Object response1 = Await.result(responseFuture1, new FiniteDuration(5, TimeUnit.SECONDS));
-				Object response2 = Await.result(responseFuture2, new FiniteDuration(5, TimeUnit.SECONDS));
+				Object response1 = Await.result(responseFuture1, timeout);
+				Object response2 = Await.result(responseFuture2, timeout);
 
 				// this is a hack to work around the way Java can interact with scala case objects
 				Class<?> confirmClass = TaskManagerMessages.getRegisteredAtJobManagerMessage().getClass();
@@ -144,9 +146,9 @@ public class TaskManagerRegistrationTest extends TestLogger {
 				// check that the JobManager has 2 TaskManagers registered
 				Future<Object> numTaskManagersFuture = jobManager.ask(
 						JobManagerMessages.getRequestNumberRegisteredTaskManager(),
-						new FiniteDuration(1000, TimeUnit.MILLISECONDS));
+						timeout);
 
-				Integer count = (Integer) Await.result(numTaskManagersFuture, new FiniteDuration(1, TimeUnit.SECONDS));
+				Integer count = (Integer) Await.result(numTaskManagersFuture, timeout);
 				assertEquals(2, count.intValue());
 			}
 			catch (Exception e) {
@@ -170,6 +172,8 @@ public class TaskManagerRegistrationTest extends TestLogger {
 			ActorGateway jobManager = null;
 			ActorGateway taskManager = null;
 
+			FiniteDuration delayedTimeout = timeout.$times(3);
+
 			try {
 				// start a TaskManager that tries to register at the JobManager before the JobManager is
 				// available. we give it the regular JobManager akka URL
@@ -191,9 +195,9 @@ public class TaskManagerRegistrationTest extends TestLogger {
 				// check that the TaskManagers are registered
 				Future<Object> responseFuture = taskManager.ask(
 						TaskManagerMessages.getNotifyWhenRegisteredAtJobManagerMessage(),
-						new FiniteDuration(30000, TimeUnit.MILLISECONDS));
+						delayedTimeout);
 
-				Object response = Await.result(responseFuture, new FiniteDuration(30, TimeUnit.SECONDS));
+				Object response = Await.result(responseFuture, delayedTimeout);
 
 				// this is a hack to work around the way Java can interact with scala case objects
 				Class<?> confirmClass = TaskManagerMessages.getRegisteredAtJobManagerMessage().getClass();
@@ -245,7 +249,7 @@ public class TaskManagerRegistrationTest extends TestLogger {
 
 				final ActorGateway tm = taskManager;
 
-				new Within(new FiniteDuration(10, TimeUnit.SECONDS)) {
+				new Within(timeout) {
 
 					@Override
 					protected void run() {
@@ -287,7 +291,7 @@ public class TaskManagerRegistrationTest extends TestLogger {
 				final ActorGateway taskManagerGateway = taskManager;
 
 				// check and decline initial registration
-				new Within(new FiniteDuration(2, TimeUnit.SECONDS)) {
+				new Within(timeout) {
 
 					@Override
 					protected void run() {
@@ -353,7 +357,7 @@ public class TaskManagerRegistrationTest extends TestLogger {
 				final ActorGateway tm = taskManagerGateway;
 
 				// validate initial registration
-				new Within(new FiniteDuration(2, TimeUnit.SECONDS)) {
+				new Within(timeout) {
 
 					@Override
 					protected void run() {
@@ -375,9 +379,6 @@ public class TaskManagerRegistrationTest extends TestLogger {
 
 				final ActorGateway gateway = fakeJobManager1Gateway;
 
-				// wait for the killing to be completed
-				final FiniteDuration timeout = new FiniteDuration(2, TimeUnit.SECONDS);
-
 				new Within(timeout) {
 
 					@Override
@@ -419,7 +420,7 @@ public class TaskManagerRegistrationTest extends TestLogger {
 				final ActorGateway fakeJM2GatewayClosure = fakeJobManager2Gateway;
 
 				// expect the next registration
-				new Within(new FiniteDuration(10, TimeUnit.SECONDS)) {
+				new Within(timeout) {
 
 					@Override
 					protected void run() {
@@ -478,7 +479,7 @@ public class TaskManagerRegistrationTest extends TestLogger {
 
 					watch(taskManager);
 
-					expectTerminated(new FiniteDuration(20, TimeUnit.SECONDS), taskManager);
+					expectTerminated(timeout, taskManager);
 				}
 				catch (Exception e) {
 					e.printStackTrace();
@@ -527,7 +528,7 @@ public class TaskManagerRegistrationTest extends TestLogger {
 				final UUID falseLeaderSessionID = UUID.randomUUID();
 				final UUID trueLeaderSessionID = null;
 
-				new Within(new FiniteDuration(20, TimeUnit.SECONDS)) {
+				new Within(timeout) {
 
 					@Override
 					protected void run() {