You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/02/17 12:53:19 UTC

flink git commit: [FLINK-5616] [tests] Harden YarnIntraNonHaMasterServicesTest.testClosingReportsToLeader

Repository: flink
Updated Branches:
  refs/heads/master c04941d2a -> cc9334a46


[FLINK-5616] [tests] Harden YarnIntraNonHaMasterServicesTest.testClosingReportsToLeader

Prevent the race condition between shutting down the leader election service and the
contender becoming the leader by waiting on a LeaderRetrievalListener. The problem
was that if the service has been shut down before the contender has become the leader,
then the contender would not be notified about the shut down via the handleError method.

At correct waiting behaviour using Mockito's verify statement

This closes #3327.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cc9334a4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cc9334a4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cc9334a4

Branch: refs/heads/master
Commit: cc9334a4694b06abde2723548f9576256ae063db
Parents: c04941d
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Feb 15 18:15:50 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Feb 17 13:52:56 2017 +0100

----------------------------------------------------------------------
 .../leaderelection/SingleLeaderElectionService.java |  2 ++
 .../YarnIntraNonHaMasterServicesTest.java           | 16 ++++++++++++++--
 2 files changed, 16 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/cc9334a4/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/leaderelection/SingleLeaderElectionService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/leaderelection/SingleLeaderElectionService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/leaderelection/SingleLeaderElectionService.java
index 26e3cbf..96e1390 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/leaderelection/SingleLeaderElectionService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/leaderelection/SingleLeaderElectionService.java
@@ -92,6 +92,8 @@ public class SingleLeaderElectionService implements LeaderElectionService {
 		this.notificationExecutor = checkNotNull(notificationsDispatcher);
 		this.leaderId = checkNotNull(leaderId);
 		this.listeners = new HashSet<>();
+
+		shutdown = false;
 	}
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/cc9334a4/flink-yarn/src/test/java/org/apache/flink/yarn/highavailability/YarnIntraNonHaMasterServicesTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/highavailability/YarnIntraNonHaMasterServicesTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/highavailability/YarnIntraNonHaMasterServicesTest.java
index 0e7bf0f..64c22d2 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/highavailability/YarnIntraNonHaMasterServicesTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/highavailability/YarnIntraNonHaMasterServicesTest.java
@@ -22,8 +22,11 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.leaderelection.LeaderContender;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.util.StringUtils;
 
+import org.apache.flink.util.TestLogger;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 
 import org.junit.AfterClass;
@@ -41,13 +44,14 @@ import java.util.Random;
 import java.util.UUID;
 
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.timeout;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
-public class YarnIntraNonHaMasterServicesTest {
+public class YarnIntraNonHaMasterServicesTest extends TestLogger {
 
 	private static final Random RND = new Random();
 
@@ -112,12 +116,20 @@ public class YarnIntraNonHaMasterServicesTest {
 
 		try (YarnHighAvailabilityServices services = new YarnIntraNonHaMasterServices(flinkConfig, hadoopConfig)) {
 			final LeaderElectionService elector = services.getResourceManagerLeaderElectionService();
+			final LeaderRetrievalService retrieval = services.getResourceManagerLeaderRetriever();
 			final LeaderContender contender = mockContender(elector);
+			final LeaderRetrievalListener listener = mock(LeaderRetrievalListener.class);
 
 			elector.start(contender);
+			retrieval.start(listener);
+
+			// wait until the contender has become the leader
+			verify(listener, timeout(1000L).times(1)).notifyLeaderAddress(anyString(), any(UUID.class));
+
+			// now we can close the election service
 			services.close();
 
-			verify(contender, timeout(100).times(1)).handleError(any(Exception.class));
+			verify(contender, timeout(1000L).times(1)).handleError(any(Exception.class));
 		}
 	}