You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2016/04/19 13:52:37 UTC

flink git commit: [FLINK-3764] [tests] Improve LeaderChangeStateCleanupTest stability

Repository: flink
Updated Branches:
  refs/heads/master 4be297ec2 -> 716d83209


[FLINK-3764] [tests] Improve LeaderChangeStateCleanupTest stability

The LeaderElectionRetrievalTestingCluster now also waits for the Flink
resource manager to be properly started before trying to send leader
change notification messages to the LeaderRetrievalServices.

This closes #1894


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/716d8320
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/716d8320
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/716d8320

Branch: refs/heads/master
Commit: 716d832098b47ec19c6f5c675eb461d3ba96f60a
Parents: 4be297e
Author: Till Rohrmann <tr...@apache.org>
Authored: Fri Apr 15 12:33:58 2016 +0200
Committer: zentol <ch...@apache.org>
Committed: Tue Apr 19 13:51:57 2016 +0200

----------------------------------------------------------------------
 .../leaderelection/TestingLeaderRetrievalService.java    |  9 +++++++--
 .../flink/runtime/testutils/TestingResourceManager.java  |  2 ++
 .../flink/runtime/testingUtils/TestingCluster.scala      | 11 +++++++++--
 3 files changed, 18 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/716d8320/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderRetrievalService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderRetrievalService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderRetrievalService.java
index c44fc2a..e2fc540 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderRetrievalService.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderRetrievalService.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.leaderelection;
 
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.util.Preconditions;
 
 import java.util.UUID;
 
@@ -45,7 +46,7 @@ public class TestingLeaderRetrievalService implements LeaderRetrievalService {
 
 	@Override
 	public void start(LeaderRetrievalListener listener) throws Exception {
-		this.listener = listener;
+		this.listener = Preconditions.checkNotNull(listener);
 
 		if (leaderAddress != null) {
 			listener.notifyLeaderAddress(leaderAddress, leaderSessionID);
@@ -58,6 +59,10 @@ public class TestingLeaderRetrievalService implements LeaderRetrievalService {
 	}
 
 	public void notifyListener(String address, UUID leaderSessionID) {
-		listener.notifyLeaderAddress(address, leaderSessionID);
+		if (listener != null) {
+			listener.notifyLeaderAddress(address, leaderSessionID);
+		} else {
+			throw new IllegalStateException("The retrieval service has not been started properly.");
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/716d8320/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingResourceManager.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingResourceManager.java
index 401299c..2422925 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingResourceManager.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestingResourceManager.java
@@ -85,6 +85,8 @@ public class TestingResourceManager extends StandaloneResourceManager {
 
 		} else if (message instanceof TestingMessages.NotifyOfComponentShutdown$) {
 			waitForShutdown.add(sender());
+		} else if (message instanceof TestingMessages.Alive$) {
+			sender().tell(Messages.getAcknowledge(), self());
 		} else {
 			super.handleMessage(message);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/716d8320/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
index 7c7e1db..5b08a45 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
@@ -211,11 +211,18 @@ class TestingCluster(
 
     val jmsAliveFutures = jobManagerActors map {
       _ map {
-        tm => (tm ? Alive)(timeout)
+        jm => (jm ? Alive)(timeout)
+      }
+    } getOrElse(Seq())
+
+    val resourceManagersAliveFutures = resourceManagerActors map {
+      _ map {
+        rm => (rm ? Alive)(timeout)
       }
     } getOrElse(Seq())
 
-    val combinedFuture = Future.sequence(tmsAliveFutures ++ jmsAliveFutures)
+    val combinedFuture = Future.sequence(tmsAliveFutures ++ jmsAliveFutures ++
+                                           resourceManagersAliveFutures)
 
     Await.ready(combinedFuture, timeout)
   }