You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2015/09/21 11:16:34 UTC
flink git commit: [test-stability] Hardens
TaskManagerRegistrationTest by increasing the timeouts
Repository: flink
Updated Branches:
refs/heads/master 031aa4d2a -> 127cd02d6
[test-stability] Hardens TaskManagerRegistrationTest by increasing the timeouts
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/127cd02d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/127cd02d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/127cd02d
Branch: refs/heads/master
Commit: 127cd02d6232b708509fe60048eb9593a6995845
Parents: 031aa4d
Author: Till Rohrmann <tr...@apache.org>
Authored: Fri Sep 18 14:14:08 2015 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Mon Sep 21 11:14:54 2015 +0200
----------------------------------------------------------------------
.../TaskManagerRegistrationTest.java | 43 ++++++++++----------
1 file changed, 22 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/127cd02d/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
index 85123a1..2b5b709 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
@@ -75,13 +75,15 @@ public class TaskManagerRegistrationTest extends TestLogger {
private static Configuration config;
+ private static FiniteDuration timeout = new FiniteDuration(20, TimeUnit.SECONDS);
+
@BeforeClass
public static void startActorSystem() {
config = new Configuration();
- config.getString(ConfigConstants.AKKA_ASK_TIMEOUT, "5 s");
- config.getString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "200 ms");
- config.getString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "2 s");
- config.getDouble(ConfigConstants.AKKA_WATCH_THRESHOLD, 2.0);
+ config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "5 s");
+ config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "200 ms");
+ config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "2 s");
+ config.setDouble(ConfigConstants.AKKA_WATCH_THRESHOLD, 2.0);
actorSystem = AkkaUtils.createLocalActorSystem(config);
}
@@ -127,14 +129,14 @@ public class TaskManagerRegistrationTest extends TestLogger {
// check that the TaskManagers are registered
Future<Object> responseFuture1 = taskManager1.ask(
TaskManagerMessages.getNotifyWhenRegisteredAtJobManagerMessage(),
- new FiniteDuration(5000, TimeUnit.MILLISECONDS));
+ timeout);
Future<Object> responseFuture2 = taskManager2.ask(
TaskManagerMessages.getNotifyWhenRegisteredAtJobManagerMessage(),
- new FiniteDuration(5000, TimeUnit.MILLISECONDS));
+ timeout);
- Object response1 = Await.result(responseFuture1, new FiniteDuration(5, TimeUnit.SECONDS));
- Object response2 = Await.result(responseFuture2, new FiniteDuration(5, TimeUnit.SECONDS));
+ Object response1 = Await.result(responseFuture1, timeout);
+ Object response2 = Await.result(responseFuture2, timeout);
// this is a hack to work around the way Java can interact with scala case objects
Class<?> confirmClass = TaskManagerMessages.getRegisteredAtJobManagerMessage().getClass();
@@ -144,9 +146,9 @@ public class TaskManagerRegistrationTest extends TestLogger {
// check that the JobManager has 2 TaskManagers registered
Future<Object> numTaskManagersFuture = jobManager.ask(
JobManagerMessages.getRequestNumberRegisteredTaskManager(),
- new FiniteDuration(1000, TimeUnit.MILLISECONDS));
+ timeout);
- Integer count = (Integer) Await.result(numTaskManagersFuture, new FiniteDuration(1, TimeUnit.SECONDS));
+ Integer count = (Integer) Await.result(numTaskManagersFuture, timeout);
assertEquals(2, count.intValue());
}
catch (Exception e) {
@@ -170,6 +172,8 @@ public class TaskManagerRegistrationTest extends TestLogger {
ActorGateway jobManager = null;
ActorGateway taskManager = null;
+ FiniteDuration delayedTimeout = timeout.$times(3);
+
try {
// start a TaskManager that tries to register at the JobManager before the JobManager is
// available. we give it the regular JobManager akka URL
@@ -191,9 +195,9 @@ public class TaskManagerRegistrationTest extends TestLogger {
// check that the TaskManagers are registered
Future<Object> responseFuture = taskManager.ask(
TaskManagerMessages.getNotifyWhenRegisteredAtJobManagerMessage(),
- new FiniteDuration(30000, TimeUnit.MILLISECONDS));
+ delayedTimeout);
- Object response = Await.result(responseFuture, new FiniteDuration(30, TimeUnit.SECONDS));
+ Object response = Await.result(responseFuture, delayedTimeout);
// this is a hack to work around the way Java can interact with scala case objects
Class<?> confirmClass = TaskManagerMessages.getRegisteredAtJobManagerMessage().getClass();
@@ -245,7 +249,7 @@ public class TaskManagerRegistrationTest extends TestLogger {
final ActorGateway tm = taskManager;
- new Within(new FiniteDuration(10, TimeUnit.SECONDS)) {
+ new Within(timeout) {
@Override
protected void run() {
@@ -287,7 +291,7 @@ public class TaskManagerRegistrationTest extends TestLogger {
final ActorGateway taskManagerGateway = taskManager;
// check and decline initial registration
- new Within(new FiniteDuration(2, TimeUnit.SECONDS)) {
+ new Within(timeout) {
@Override
protected void run() {
@@ -353,7 +357,7 @@ public class TaskManagerRegistrationTest extends TestLogger {
final ActorGateway tm = taskManagerGateway;
// validate initial registration
- new Within(new FiniteDuration(2, TimeUnit.SECONDS)) {
+ new Within(timeout) {
@Override
protected void run() {
@@ -375,9 +379,6 @@ public class TaskManagerRegistrationTest extends TestLogger {
final ActorGateway gateway = fakeJobManager1Gateway;
- // wait for the killing to be completed
- final FiniteDuration timeout = new FiniteDuration(2, TimeUnit.SECONDS);
-
new Within(timeout) {
@Override
@@ -419,7 +420,7 @@ public class TaskManagerRegistrationTest extends TestLogger {
final ActorGateway fakeJM2GatewayClosure = fakeJobManager2Gateway;
// expect the next registration
- new Within(new FiniteDuration(10, TimeUnit.SECONDS)) {
+ new Within(timeout) {
@Override
protected void run() {
@@ -478,7 +479,7 @@ public class TaskManagerRegistrationTest extends TestLogger {
watch(taskManager);
- expectTerminated(new FiniteDuration(20, TimeUnit.SECONDS), taskManager);
+ expectTerminated(timeout, taskManager);
}
catch (Exception e) {
e.printStackTrace();
@@ -527,7 +528,7 @@ public class TaskManagerRegistrationTest extends TestLogger {
final UUID falseLeaderSessionID = UUID.randomUUID();
final UUID trueLeaderSessionID = null;
- new Within(new FiniteDuration(20, TimeUnit.SECONDS)) {
+ new Within(timeout) {
@Override
protected void run() {