You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by eo...@apache.org on 2020/03/12 16:11:05 UTC
[zookeeper] branch branch-3.5 updated: ZOOKEEPER-2164: Quorum
members can not rejoin after restart
This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/zookeeper.git
The following commit(s) were added to refs/heads/branch-3.5 by this push:
new 800cc26 ZOOKEEPER-2164: Quorum members can not rejoin after restart
800cc26 is described below
commit 800cc26a768fceda9ea42fe8121acf45fe781c57
Author: Suhas Dantkale <sd...@salesforce.com>
AuthorDate: Thu Mar 12 17:10:43 2020 +0100
ZOOKEEPER-2164: Quorum members can not rejoin after restart
(Ported from Mate Szalay-Beko's work in master branch.
Refer to https://github.com/apache/zookeeper/pull/1254)
Ever since ZOOKEEPER-107 (released in 3.5.0) the servers are sending
their addresses during initial connection requests. The receiving
server can potentially use these addresses to send back a new
connection request if the challenge is won by the receiver.
If the server config contains wildcard address (e.g. 0.0.0.0 in case
of IPv4) then the first connection request sent by A to B will contain
this address. If the ID of A is smaller than the ID of B, then A will
lose the challenge and the second connection request sent back by B
will never reach A, as B will send the initial message to 0.0.0.0.
So in any 3.5+ ZooKeeper, if wildcard addresses are used in the configs,
then there might be some servers never able to rejoin to the quorum
after they got restarted.
In 3.5+ for backward compatibility reasons (needed during rolling
upgrade) there is a version of the QuorumCnxManager.connectOne()
method that needs no election address but use the last known address
to initiate the connection. In this commit, we simply call this method
if the address is a wildcard address.
I also added a few restart related tests, to make sure that restart
still works when we don't use no wildcard addresses. We can not test
the original error with unit tests, as it would require to start
the quorum on multiple hosts.
I also tested manually the rolling restart both with and without
wildcard addresses in the config.
Reviewer: Mate Szalay-Beko <szalay.beko.mategmail.com>
Author: Suhas Dantkale <sd...@salesforce.com>
Reviewers: Enrico Olivelli <eo...@apache.org>, Norbert Kalmar <nk...@apache.org>
Closes #1256 from suhasdantkale/suhas/ZOOKEEPER-2164
---
.../zookeeper/server/quorum/QuorumCnxManager.java | 64 ++++++++---
.../test/java/org/apache/zookeeper/ZKTestCase.java | 33 +++++-
.../org/apache/zookeeper/test/CnxManagerTest.java | 28 +++--
.../apache/zookeeper/test/QuorumRestartTest.java | 128 +++++++++++++++++++++
.../java/org/apache/zookeeper/test/QuorumUtil.java | 22 +++-
5 files changed, 249 insertions(+), 26 deletions(-)
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumCnxManager.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
index 22a24bc..4e91edc 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
@@ -25,11 +25,13 @@ import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
+import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
import java.net.SocketTimeoutException;
+import java.net.UnknownHostException;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.nio.channels.UnresolvedAddressException;
@@ -202,7 +204,7 @@ public class QuorumCnxManager {
}
static public InitialMessage parse(Long protocolVersion, DataInputStream din)
- throws InitialMessageException, IOException {
+ throws InitialMessageException, IOException {
Long sid;
if (protocolVersion != PROTOCOL_VERSION) {
@@ -248,7 +250,29 @@ public class QuorumCnxManager {
throw new InitialMessageException("No port number in: %s", addr);
}
- return new InitialMessage(sid, new InetSocketAddress(host_port[0], port));
+ return new InitialMessage(sid, isWildcardAddress(host_port[0]) ? null :
+ new InetSocketAddress(host_port[0], port));
+ }
+
+ /**
+ * Returns true if the specified hostname is a wildcard address,
+ * like 0.0.0.0 for IPv4 or :: for IPv6
+ */
+ public static boolean isWildcardAddress(final String hostname) {
+ try {
+ return InetAddress.getByName(hostname).isAnyLocalAddress();
+ } catch (UnknownHostException e) {
+ // if we can not resolve, it can not be a wildcard address
+ return false;
+ } catch (SecurityException e) {
+ LOG.warn("SecurityException in getByName() for" + hostname);
+ return false;
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "InitialMessage{sid=" + sid + ", electionAddr=" + electionAddr + '}';
}
}
@@ -402,6 +426,7 @@ public class QuorumCnxManager {
throws IOException {
DataOutputStream dout = null;
DataInputStream din = null;
+ LOG.debug("startConnection (myId:{} --> sid:{})", self.getId(), sid);
try {
// Use BufferedOutputStream to reduce the number of IP packets. This is
// important for x-DC scenarios.
@@ -435,11 +460,11 @@ public class QuorumCnxManager {
// If lost the challenge, then drop the new connection
if (sid > self.getId()) {
- LOG.info("Have smaller server identifier, so dropping the " +
- "connection: (" + sid + ", " + self.getId() + ")");
+ LOG.info("Have smaller server identifier, so dropping the connection: (myId:{} --> sid:{})", self.getId(), sid);
closeSocket(sock);
// Otherwise proceed with the connection
} else {
+ LOG.debug("Have larger server identifier, so keeping the connection: (myId:{} --> sid:{})", self.getId(), sid);
SendWorker sw = new SendWorker(sock, sid);
RecvWorker rw = new RecvWorker(sock, din, sid, sw);
sw.setRecv(rw);
@@ -475,10 +500,12 @@ public class QuorumCnxManager {
din = new DataInputStream(
new BufferedInputStream(sock.getInputStream()));
+ LOG.debug("Sync handling of connection request received from: {}", sock.getRemoteSocketAddress());
handleConnection(sock, din);
} catch (IOException e) {
LOG.error("Exception handling connection, addr: {}, closing server connection",
sock.getRemoteSocketAddress());
+ LOG.debug("Exception details: ", e);
closeSocket(sock);
}
}
@@ -489,12 +516,14 @@ public class QuorumCnxManager {
*/
public void receiveConnectionAsync(final Socket sock) {
try {
+ LOG.debug("Async handling of connection request received from: {}", sock.getRemoteSocketAddress());
connectionExecutor.execute(
new QuorumConnectionReceiverThread(sock));
connectionThreadCnt.incrementAndGet();
} catch (Throwable e) {
LOG.error("Exception handling connection, addr: {}, closing server connection",
sock.getRemoteSocketAddress());
+ LOG.debug("Exception details: ", e);
closeSocket(sock);
}
}
@@ -530,7 +559,7 @@ public class QuorumCnxManager {
sid = init.sid;
electionAddr = init.electionAddr;
} catch (InitialMessage.InitialMessageException ex) {
- LOG.error(ex.toString());
+ LOG.error("Initial message parsing error!", ex);
closeSocket(sock);
return;
}
@@ -575,7 +604,10 @@ public class QuorumCnxManager {
} else {
connectOne(sid);
}
-
+ } else if (sid == self.getId()) {
+ // we saw this case in ZOOKEEPER-2164
+ LOG.warn("We got a connection request from a server with our own ID. "
+ + "This should be either a configuration error, or a bug.");
} else { // Otherwise start worker threads to receive data.
SendWorker sw = new SendWorker(sock, sid);
RecvWorker rw = new RecvWorker(sock, din, sid, sw);
@@ -710,6 +742,7 @@ public class QuorumCnxManager {
Map<Long, QuorumPeer.QuorumServer> lastProposedView = lastSeenQV.getAllMembers();
if (lastCommittedView.containsKey(sid)) {
knownId = true;
+ LOG.debug("Server {} knows {} already, it is in the lastCommittedView", self.getId(), sid);
if (connectOne(sid, lastCommittedView.get(sid).electionAddr))
return;
}
@@ -717,6 +750,7 @@ public class QuorumCnxManager {
&& (!knownId || (lastProposedView.get(sid).electionAddr !=
lastCommittedView.get(sid).electionAddr))) {
knownId = true;
+ LOG.debug("Server {} knows {} already, it is in the lastProposedView", self.getId(), sid);
if (connectOne(sid, lastProposedView.get(sid).electionAddr))
return;
}
@@ -787,6 +821,7 @@ public class QuorumCnxManager {
public void softHalt() {
for (SendWorker sw : senderWorkerMap.values()) {
LOG.debug("Halting sender: " + sw);
+ LOG.debug("Server {} is soft-halting sender towards: {}", self.getId(), sw);
sw.finish();
}
}
@@ -892,6 +927,7 @@ public class QuorumCnxManager {
Socket client = null;
Exception exitException = null;
while ((!shutdown) && (portBindMaxRetry == 0 || numRetries < portBindMaxRetry)) {
+ LOG.debug("Listener thread started, myId: {}", self.getId());
try {
if (self.shouldUsePortUnification()) {
LOG.info("Creating TLS-enabled quorum server socket");
@@ -914,15 +950,14 @@ public class QuorumCnxManager {
self.recreateSocketAddresses(self.getId());
addr = self.getElectionAddress();
}
- LOG.info("My election bind port: " + addr.toString());
+ LOG.info("{} is accepting connections now, my election bind port: {}", QuorumCnxManager.this.mySid, addr.toString());
setName(addr.toString());
ss.bind(addr);
while (!shutdown) {
try {
client = ss.accept();
setSockOpts(client);
- LOG.info("Received connection request "
- + formatInetAddr((InetSocketAddress)client.getRemoteSocketAddress()));
+ LOG.info("Received connection request from {}", client.getRemoteSocketAddress());
// Receive and handle the connection request
// asynchronously if the quorum sasl authentication is
// enabled. This is required because sasl server
@@ -990,7 +1025,8 @@ public class QuorumCnxManager {
*/
void halt(){
try{
- LOG.debug("Trying to close listener: " + ss);
+
+ LOG.debug("Halt called: Trying to close listeners");
if(ss != null) {
LOG.debug("Closing listener: "
+ QuorumCnxManager.this.mySid);
@@ -1052,7 +1088,7 @@ public class QuorumCnxManager {
}
synchronized boolean finish() {
- LOG.debug("Calling finish for " + sid);
+ LOG.debug("Calling SendWorker.finish for {}", sid);
if(!running){
/*
@@ -1119,7 +1155,7 @@ public class QuorumCnxManager {
LOG.error("Failed to send last message. Shutting down thread.", e);
this.finish();
}
-
+ LOG.debug("SendWorker thread started towards {}. myId: {}", sid, QuorumCnxManager.this.mySid);
try {
while (running && !shutdown && sock != null) {
@@ -1187,6 +1223,7 @@ public class QuorumCnxManager {
* @return boolean Value of variable running
*/
synchronized boolean finish() {
+ LOG.debug("RecvWorker.finish called. sid: {}. myId: {}", sid, QuorumCnxManager.this.mySid);
if(!running){
/*
* Avoids running finish() twice.
@@ -1204,6 +1241,7 @@ public class QuorumCnxManager {
public void run() {
threadCnt.incrementAndGet();
try {
+ LOG.debug("RecvWorker thread towards {} started. myId: {}", sid, QuorumCnxManager.this.mySid);
while (running && !shutdown && sock != null) {
/**
* Reads the first int to determine the length of the
@@ -1227,7 +1265,7 @@ public class QuorumCnxManager {
LOG.warn("Connection broken for id " + sid + ", my id = "
+ QuorumCnxManager.this.mySid + ", error = " , e);
} finally {
- LOG.warn("Interrupting SendWorker");
+ LOG.warn("Interrupting SendWorker thread from RecvWorker. sid: {}. myId: {}", sid, QuorumCnxManager.this.mySid);
sw.finish();
closeSocket(sock);
}
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/ZKTestCase.java b/zookeeper-server/src/test/java/org/apache/zookeeper/ZKTestCase.java
index f6c6bb3..e8dae75 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/ZKTestCase.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/ZKTestCase.java
@@ -18,13 +18,15 @@
package org.apache.zookeeper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import static org.junit.Assert.fail;
+import java.time.LocalDateTime;
import org.junit.Rule;
import org.junit.rules.TestWatcher;
import org.junit.runner.Description;
import org.junit.runner.RunWith;
-import org.junit.runners.model.FrameworkMethod;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
* Base class for a non-parameterized ZK test.
@@ -75,5 +77,30 @@ public class ZKTestCase {
}
};
+ public interface WaitForCondition {
+ /**
+ * @return true when success
+ */
+ boolean evaluate();
+ }
+
+ /**
+ * Wait for condition to be true; otherwise fail the test if it exceed
+ * timeout
+ * @param msg error message to print when fail
+ * @param condition condition to evaluate
+ * @param timeout timeout in seconds
+ * @throws InterruptedException
+ */
+ public void waitFor(String msg, WaitForCondition condition, int timeout) throws InterruptedException {
+ final LocalDateTime deadline = LocalDateTime.now().plusSeconds(timeout);
+ while (LocalDateTime.now().isBefore(deadline)) {
+ if (condition.evaluate()) {
+ return;
+ }
+ Thread.sleep(100);
+ }
+ fail(msg);
+ }
}
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/CnxManagerTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/CnxManagerTest.java
index ea16961..7755b6b 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/CnxManagerTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/CnxManagerTest.java
@@ -34,7 +34,6 @@ import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.net.Socket;
import java.util.concurrent.atomic.AtomicBoolean;
-
import org.apache.zookeeper.common.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,6 +49,8 @@ import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
public class CnxManagerTest extends ZKTestCase {
protected static final Logger LOG = LoggerFactory.getLogger(FLENewEpochTest.class);
@@ -182,7 +183,7 @@ public class CnxManagerTest extends ZKTestCase {
Assert.fail("Did not receive expected message");
}
cnxManager.halt();
- Assert.assertFalse(cnxManager.listener.isAlive());
+ assertFalse(cnxManager.listener.isAlive());
}
@Test
@@ -216,7 +217,7 @@ public class CnxManagerTest extends ZKTestCase {
if((end - begin) > 6000) Assert.fail("Waited more than necessary");
cnxManager.halt();
- Assert.assertFalse(cnxManager.listener.isAlive());
+ assertFalse(cnxManager.listener.isAlive());
}
/**
@@ -279,7 +280,7 @@ public class CnxManagerTest extends ZKTestCase {
}
peer.shutdown();
cnxManager.halt();
- Assert.assertFalse(cnxManager.listener.isAlive());
+ assertFalse(cnxManager.listener.isAlive());
}
/**
@@ -306,9 +307,9 @@ public class CnxManagerTest extends ZKTestCase {
// listener thread should stop and throws error which notify QuorumPeer about error.
// QuorumPeer should start shutdown process
listener.join(15000); // set wait time, if listener contains bug and thread not stops.
- Assert.assertFalse(listener.isAlive());
+ assertFalse(listener.isAlive());
Assert.assertTrue(errorHappend.get());
- Assert.assertFalse(QuorumPeer.class.getSimpleName() + " not stopped after "
+ assertFalse(QuorumPeer.class.getSimpleName() + " not stopped after "
+ "listener thread death", listener.isAlive());
}
@@ -363,7 +364,7 @@ public class CnxManagerTest extends ZKTestCase {
peer.shutdown();
cnxManager.halt();
- Assert.assertFalse(cnxManager.listener.isAlive());
+ assertFalse(cnxManager.listener.isAlive());
}
/*
@@ -391,7 +392,7 @@ public class CnxManagerTest extends ZKTestCase {
long end = Time.currentElapsedTime();
if((end - begin) > ((peer.getSyncLimit() * peer.getTickTime()) + 500)) Assert.fail("Waited more than necessary");
cnxManager.halt();
- Assert.assertFalse(cnxManager.listener.isAlive());
+ assertFalse(cnxManager.listener.isAlive());
}
/*
@@ -545,6 +546,17 @@ public class CnxManagerTest extends ZKTestCase {
}
}
+ @Test
+ public void testWildcardAddressRecognition() {
+ assertTrue(QuorumCnxManager.InitialMessage.isWildcardAddress("0.0.0.0"));
+ assertTrue(QuorumCnxManager.InitialMessage.isWildcardAddress("::"));
+ assertFalse(QuorumCnxManager.InitialMessage.isWildcardAddress("::1"));
+ assertFalse(QuorumCnxManager.InitialMessage.isWildcardAddress("some.unresolvable.host.com"));
+ assertFalse(QuorumCnxManager.InitialMessage.isWildcardAddress("127.0.0.1"));
+ assertFalse(QuorumCnxManager.InitialMessage.isWildcardAddress("255.255.255.255"));
+ assertFalse(QuorumCnxManager.InitialMessage.isWildcardAddress("1.2.3.4"));
+ assertFalse(QuorumCnxManager.InitialMessage.isWildcardAddress("www.google.com"));
+ }
private String createLongString(int size) {
StringBuilder sb = new StringBuilder(size);
for (int i=0; i < size; i++) {
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumRestartTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumRestartTest.java
new file mode 100644
index 0000000..0129550
--- /dev/null
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumRestartTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import static org.apache.zookeeper.client.ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET;
+import static org.junit.Assert.assertTrue;
+import org.apache.zookeeper.ZKTestCase;
+import org.apache.zookeeper.server.ServerCnxnFactory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class QuorumRestartTest extends ZKTestCase {
+ private static final Logger LOG = LoggerFactory.getLogger(QuorumRestartTest.class);
+ private QuorumUtil qu;
+
+ @Before
+ public void setUp() throws Exception {
+ System.setProperty(ZOOKEEPER_CLIENT_CNXN_SOCKET, "org.apache.zookeeper.ClientCnxnSocketNetty");
+ System.setProperty(ServerCnxnFactory.ZOOKEEPER_SERVER_CNXN_FACTORY, "org.apache.zookeeper.server.NettyServerCnxnFactory");
+
+ // starting a 3 node ensemble without observers
+ qu = new QuorumUtil(1, 2);
+ qu.startAll();
+ }
+
+ /**
+ * A basic test for rolling restart. We are restarting the ZooKeeper servers one by one,
+ * starting from the first server. We always make sure that all the nodes joined to the
+ * Quorum before moving forward.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testRollingRestart() throws Exception {
+ for (int serverToRestart = 1; serverToRestart <= 3; serverToRestart++) {
+ LOG.info("***** restarting: " + serverToRestart);
+ qu.shutdown(serverToRestart);
+
+ assertTrue(String.format("Timeout during waiting for server %d to go down", serverToRestart),
+ ClientBase.waitForServerDown("127.0.0.1:" + qu.getPeer(serverToRestart).clientPort, ClientBase.CONNECTION_TIMEOUT));
+
+ qu.restart(serverToRestart);
+
+ final String errorMessage = "Not all the quorum members are connected after restarting server " + serverToRestart;
+ waitFor(errorMessage, () -> qu.allPeersAreConnected(), 30);
+
+ LOG.info("***** Restart {} succeeded", serverToRestart);
+ }
+ }
+
+ /**
+ * Testing one of the errors reported in ZOOKEEPER-2164, when some servers can not
+ * rejoin to the Quorum after restarting the servers backwards
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testRollingRestartBackwards() throws Exception {
+ for (int serverToRestart = 3; serverToRestart >= 1; serverToRestart--) {
+ LOG.info("***** restarting: " + serverToRestart);
+ qu.shutdown(serverToRestart);
+
+ assertTrue(String.format("Timeout during waiting for server %d to go down", serverToRestart),
+ ClientBase.waitForServerDown("127.0.0.1:" + qu.getPeer(serverToRestart).clientPort, ClientBase.CONNECTION_TIMEOUT));
+
+ qu.restart(serverToRestart);
+
+ final String errorMessage = "Not all the quorum members are connected after restarting server " + serverToRestart;
+ waitFor(errorMessage, () -> qu.allPeersAreConnected(), 30);
+
+ LOG.info("***** Restart {} succeeded", serverToRestart);
+ }
+ }
+
+ /**
+ * Testing one of the errors reported in ZOOKEEPER-2164, when some servers can not
+ * rejoin to the Quorum after restarting the current leader multiple times
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testRestartingLeaderMultipleTimes() throws Exception {
+ for (int restartCount = 1; restartCount <= 3; restartCount++) {
+ int leaderId = qu.getLeaderServer();
+ LOG.info("***** new leader: " + leaderId);
+ qu.shutdown(leaderId);
+
+ assertTrue("Timeout during waiting for current leader to go down",
+ ClientBase.waitForServerDown("127.0.0.1:" + qu.getPeer(leaderId).clientPort, ClientBase.CONNECTION_TIMEOUT));
+
+ String errorMessage = "No new leader was elected";
+ waitFor(errorMessage, () -> qu.leaderExists() && qu.getLeaderServer() != leaderId, 30);
+
+ qu.restart(leaderId);
+
+ errorMessage = "Not all the quorum members are connected after restarting the old leader";
+ waitFor(errorMessage, () -> qu.allPeersAreConnected(), 30);
+
+ LOG.info("***** Leader Restart {} succeeded", restartCount);
+ }
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ qu.shutdownAll();
+ System.clearProperty(ZOOKEEPER_CLIENT_CNXN_SOCKET);
+ System.clearProperty(ServerCnxnFactory.ZOOKEEPER_SERVER_CNXN_FACTORY);
+ }
+}
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumUtil.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumUtil.java
index 314171d..113452a 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumUtil.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumUtil.java
@@ -22,13 +22,14 @@ import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-
+import java.util.TreeSet;
import org.apache.zookeeper.PortAssignment;
import org.apache.zookeeper.server.quorum.Election;
import org.apache.zookeeper.server.quorum.QuorumPeer;
@@ -50,6 +51,8 @@ public class QuorumUtil {
// TODO refactor QuorumBase to be special case of this
private static final Logger LOG = LoggerFactory.getLogger(QuorumUtil.class);
+ private static final Set<QuorumPeer.ServerState> CONNECTED_STATES = new TreeSet<>(
+ Arrays.asList(QuorumPeer.ServerState.LEADING, QuorumPeer.ServerState.FOLLOWING, QuorumPeer.ServerState.OBSERVING));
public static class PeerStruct {
public int id;
@@ -107,7 +110,7 @@ public class QuorumUtil {
peersView.put(Long.valueOf(i), new QuorumServer(i,
new InetSocketAddress("127.0.0.1", PortAssignment.unique()),
new InetSocketAddress("127.0.0.1", PortAssignment.unique()),
- new InetSocketAddress("127.0.0.1", ps.clientPort),
+ new InetSocketAddress("127.0.0.1", ps.clientPort),
LearnerType.PARTICIPANT));
hostPort += "127.0.0.1:" + ps.clientPort + ((i == ALL) ? "" : ",");
}
@@ -274,6 +277,12 @@ public class QuorumUtil {
return "127.0.0.1:" + peer.getClientPort();
}
+ public boolean allPeersAreConnected() {
+ return peers.values().stream()
+ .map(ps -> ps.peer)
+ .allMatch(peer -> CONNECTED_STATES.contains(peer.getPeerState()));
+ }
+
public QuorumPeer getLeaderQuorumPeer() {
for (PeerStruct ps: peers.values()) {
if (ps.peer.leader != null) {
@@ -320,6 +329,15 @@ public class QuorumUtil {
return index;
}
+ public boolean leaderExists() {
+ for (int i = 1; i <= ALL; i++) {
+ if (getPeer(i).peer.leader != null) {
+ return true;
+ }
+ }
+ return false;
+ }
+
public String getConnectionStringForServer(final int index) {
return "127.0.0.1:" + getPeer(index).clientPort;
}