You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2016/04/19 13:52:37 UTC
flink git commit: [FLINK-3764] [tests] Improve
LeaderChangeStateCleanupTest stability
Repository: flink
Updated Branches:
refs/heads/master 4be297ec2 -> 716d83209
[FLINK-3764] [tests] Improve LeaderChangeStateCleanupTest stability
The LeaderElectionRetrievalTestingCluster now also waits for the Flink
resource manager to be properly started before trying to send leader
change notification messages to the LeaderRetrievalServices.
This closes #1894
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/716d8320
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/716d8320
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/716d8320
Branch: refs/heads/master
Commit: 716d832098b47ec19c6f5c675eb461d3ba96f60a
Parents: 4be297e
Author: Till Rohrmann <tr...@apache.org>
Authored: Fri Apr 15 12:33:58 2016 +0200
Committer: zentol <ch...@apache.org>
Committed: Tue Apr 19 13:51:57 2016 +0200
----------------------------------------------------------------------
.../leaderelection/TestingLeaderRetrievalService.java | 9 +++++++--
.../flink/runtime/testutils/TestingResourceManager.java | 2 ++
.../flink/runtime/testingUtils/TestingCluster.scala | 11 +++++++++--
3 files changed, 18 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/716d8320/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderRetrievalService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderRetrievalService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderRetrievalService.java
index c44fc2a..e2fc540 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderRetrievalService.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderRetrievalService.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.leaderelection;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.util.Preconditions;
import java.util.UUID;
@@ -45,7 +46,7 @@ public class TestingLeaderRetrievalService implements LeaderRetrievalService {
@Override
public void start(LeaderRetrievalListener listener) throws Exception {
- this.listener = listener;
+ this.listener = Preconditions.checkNotNull(listener);
if (leaderAddress != null) {
listener.notifyLeaderAddress(leaderAddress, leaderSessionID);
@@ -58,6 +59,10 @@ public class TestingLeaderRetrievalService implements LeaderRetrievalService {
}
public void notifyListener(String address, UUID leaderSessionID) {
- listener.notifyLeaderAddress(address, leaderSessionID);
+ if (listener != null) {
+ listener.notifyLeaderAddress(address, leaderSessionID);
+ } else {
+ throw new IllegalStateException("The retrieval service has not been started properly.");
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/716d8320/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingResourceManager.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingResourceManager.java
index 401299c..2422925 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingResourceManager.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingResourceManager.java
@@ -85,6 +85,8 @@ public class TestingResourceManager extends StandaloneResourceManager {
} else if (message instanceof TestingMessages.NotifyOfComponentShutdown$) {
waitForShutdown.add(sender());
+ } else if (message instanceof TestingMessages.Alive$) {
+ sender().tell(Messages.getAcknowledge(), self());
} else {
super.handleMessage(message);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/716d8320/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
index 7c7e1db..5b08a45 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
@@ -211,11 +211,18 @@ class TestingCluster(
val jmsAliveFutures = jobManagerActors map {
_ map {
- tm => (tm ? Alive)(timeout)
+ jm => (jm ? Alive)(timeout)
+ }
+ } getOrElse(Seq())
+
+ val resourceManagersAliveFutures = resourceManagerActors map {
+ _ map {
+ rm => (rm ? Alive)(timeout)
}
} getOrElse(Seq())
- val combinedFuture = Future.sequence(tmsAliveFutures ++ jmsAliveFutures)
+ val combinedFuture = Future.sequence(tmsAliveFutures ++ jmsAliveFutures ++
+ resourceManagersAliveFutures)
Await.ready(combinedFuture, timeout)
}