You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/06/17 12:50:40 UTC

flink git commit: [FLINK-2733] [tests] Harden ZooKeeperLeaderElectionTest

Repository: flink
Updated Branches:
  refs/heads/master 4d8cbec4f -> 1a6bab3ef


[FLINK-2733] [tests] Harden ZooKeeperLeaderElectionTest

Hardens ZooKeeperElectionTest by allowing the testing listener to return
out-dated leader information. This can happen if the ZooKeeper connection
was suspended and the new leader information has not been sent to the
testing listener. In this case, the testing listener will be queried again
to return the actual leader information.

Add debug statements to ZooKeeperLeaderElectionTest.testZooKeeperReelection

This closes #2103.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1a6bab3e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1a6bab3e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1a6bab3e

Branch: refs/heads/master
Commit: 1a6bab3ef76805685044cf4521e32315169f9033
Parents: 4d8cbec
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon Jun 6 17:18:59 2016 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Jun 17 14:49:59 2016 +0200

----------------------------------------------------------------------
 .../ZooKeeperLeaderElectionService.java         |  4 +-
 .../runtime/leaderelection/TestingListener.java |  4 +-
 .../ZooKeeperLeaderElectionTest.java            | 39 +++++++++++++++-----
 3 files changed, 34 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1a6bab3e/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java
index e9aaaf8..0fa6a9e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java
@@ -337,7 +337,7 @@ public class ZooKeeperLeaderElectionService implements LeaderElectionService, Le
 				break;
 			case SUSPENDED:
 				LOG.warn("Connection to ZooKeeper suspended. The contender " + leaderContender.getAddress()
-					+ "no longer participates in the leader election.");
+					+ " no longer participates in the leader election.");
 				break;
 			case RECONNECTED:
 				LOG.info("Connection to ZooKeeper was reconnected. Leader election can be restarted.");
@@ -345,7 +345,7 @@ public class ZooKeeperLeaderElectionService implements LeaderElectionService, Le
 			case LOST:
 				// Maybe we have to throw an exception here to terminate the JobManager
 				LOG.warn("Connection to ZooKeeper lost. The contender " + leaderContender.getAddress()
-					+ "no longer participates in the leader election.");
+					+ " no longer participates in the leader election.");
 				break;
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/1a6bab3e/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingListener.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingListener.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingListener.java
index 54ee822..87decc7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingListener.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingListener.java
@@ -47,7 +47,7 @@ public class TestingListener implements LeaderRetrievalListener {
 		return leaderSessionID;
 	}
 
-	public void waitForNewLeader(long timeout) throws Exception {
+	public String waitForNewLeader(long timeout) throws Exception {
 		long start = System.currentTimeMillis();
 		long curTimeout;
 
@@ -72,6 +72,8 @@ public class TestingListener implements LeaderRetrievalListener {
 		}
 
 		oldAddress = address;
+
+		return address;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/1a6bab3e/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
index 34a582f..6ebf2dd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
@@ -38,9 +38,12 @@ import org.junit.Test;
 import org.mockito.Matchers;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.Promise;
+import scala.concurrent.duration.Deadline;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.io.ByteArrayOutputStream;
@@ -58,6 +61,8 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
 	private static final String TEST_URL = "akka//user/jobmanager";
 	private static final FiniteDuration timeout = new FiniteDuration(200, TimeUnit.SECONDS);
 
+	private static Logger LOG = LoggerFactory.getLogger(ZooKeeperLeaderElectionTest.class);
+
 	@Before
 	public void before() {
 		try {
@@ -132,7 +137,9 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
 		configuration.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, testingServer.getConnectString());
 		configuration.setString(ConfigConstants.RECOVERY_MODE, "zookeeper");
 
-		int num = 50;
+		Deadline deadline = new FiniteDuration(3, TimeUnit.MINUTES).fromNow();
+
+		int num = 25;
 
 		ZooKeeperLeaderElectionService[] leaderElectionService = new ZooKeeperLeaderElectionService[num];
 		TestingContender[] contenders = new TestingContender[num];
@@ -143,40 +150,52 @@ public class ZooKeeperLeaderElectionTest extends TestLogger {
 		try {
 			leaderRetrievalService = ZooKeeperUtils.createLeaderRetrievalService(configuration);
 
+			LOG.debug("Start leader retrieval service for the TestingListener.");
+
 			leaderRetrievalService.start(listener);
 
 			for (int i = 0; i < num; i++) {
 				leaderElectionService[i] = ZooKeeperUtils.createLeaderElectionService(configuration);
 				contenders[i] = new TestingContender(TEST_URL + "_" + i, leaderElectionService[i]);
 
+				LOG.debug("Start leader election service for contender #{}.", i);
+
 				leaderElectionService[i].start(contenders[i]);
 			}
 
 			String pattern = TEST_URL + "_" + "(\\d+)";
 			Pattern regex = Pattern.compile(pattern);
 
-			for (int i = 0; i < num; i++) {
-				listener.waitForNewLeader(timeout.toMillis());
+			int numberSeenLeaders = 0;
 
-				String address = listener.getAddress();
+			while (deadline.hasTimeLeft() && numberSeenLeaders < num) {
+				LOG.debug("Wait for new leader #{}.", numberSeenLeaders);
+				String address = listener.waitForNewLeader(deadline.timeLeft().toMillis());
 
 				Matcher m = regex.matcher(address);
 
 				if (m.find()) {
 					int index = Integer.parseInt(m.group(1));
 
-					// check that the leader session ID of the listeners and the leader are equal
-					assertEquals(listener.getLeaderSessionID(), contenders[index].getLeaderSessionID());
-					assertEquals(TEST_URL + "_" + index, listener.getAddress());
+					TestingContender contender = contenders[index];
 
-					// kill the election service of the leader
-					leaderElectionService[index].stop();
-					leaderElectionService[index] = null;
+					// check that the retrieval service has retrieved the correct leader
+					if (address.equals(contender.getAddress()) && listener.getLeaderSessionID().equals(contender.getLeaderSessionID())) {
+						// kill the election service of the leader
+						LOG.debug("Stop leader election service of contender #{}.", numberSeenLeaders);
+						leaderElectionService[index].stop();
+						leaderElectionService[index] = null;
+
+						numberSeenLeaders++;
+					}
 				} else {
 					fail("Did not find the leader's index.");
 				}
 			}
 
+			assertFalse(deadline.isOverdue());
+			assertEquals(num, numberSeenLeaders);
+
 		} finally {
 			if (leaderRetrievalService != null) {
 				leaderRetrievalService.stop();