You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by eo...@apache.org on 2020/03/12 16:11:05 UTC

[zookeeper] branch branch-3.5 updated: ZOOKEEPER-2164: Quorum members can not rejoin after restart

This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/zookeeper.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
     new 800cc26  ZOOKEEPER-2164: Quorum members can not rejoin after restart
800cc26 is described below

commit 800cc26a768fceda9ea42fe8121acf45fe781c57
Author: Suhas Dantkale <sd...@salesforce.com>
AuthorDate: Thu Mar 12 17:10:43 2020 +0100

    ZOOKEEPER-2164: Quorum members can not rejoin after restart
    
    (Ported from Mate Szalay-Beko's work in master branch.
    Refer to https://github.com/apache/zookeeper/pull/1254)
    
    Ever since ZOOKEEPER-107 (released in 3.5.0) the servers are sending
    their addresses during initial connection requests. The receiving
    server can potentially use these addresses to send back a new
    connection request if the challenge is won by the receiver.
    
    If the server config contains wildcard address (e.g. 0.0.0.0 in case
    of IPv4) then the first connection request sent by A to B will contain
    this address. If the ID of A is smaller than the ID of B, then A will
    lose the challenge and the second connection request sent back by B
    will never reach A, as B will send the initial message to 0.0.0.0.
    
    So in any 3.5+ ZooKeeper, if wildcard addresses are used in the configs,
    then there might be some servers never able to rejoin to the quorum
    after they got restarted.
    
    In 3.5+ for backward compatibility reasons (needed during rolling
    upgrade) there is a version of the QuorumCnxManager.connectOne()
    method that needs no election address but use the last known address
    to initiate the connection. In this commit, we simply call this method
    if the address is a wildcard address.
    
    I also added a few restart related tests, to make sure that restart
    still works when we don't use no wildcard addresses. We can not test
    the original error with unit tests, as it would require to start
    the quorum on multiple hosts.
    
    I also tested manually the rolling restart both with and without
    wildcard addresses in the config.
    
    Reviewer: Mate Szalay-Beko <szalay.beko.mategmail.com>
    
    Author: Suhas Dantkale <sd...@salesforce.com>
    
    Reviewers: Enrico Olivelli <eo...@apache.org>, Norbert Kalmar <nk...@apache.org>
    
    Closes #1256 from suhasdantkale/suhas/ZOOKEEPER-2164
---
 .../zookeeper/server/quorum/QuorumCnxManager.java  |  64 ++++++++---
 .../test/java/org/apache/zookeeper/ZKTestCase.java |  33 +++++-
 .../org/apache/zookeeper/test/CnxManagerTest.java  |  28 +++--
 .../apache/zookeeper/test/QuorumRestartTest.java   | 128 +++++++++++++++++++++
 .../java/org/apache/zookeeper/test/QuorumUtil.java |  22 +++-
 5 files changed, 249 insertions(+), 26 deletions(-)

diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumCnxManager.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
index 22a24bc..4e91edc 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
@@ -25,11 +25,13 @@ import java.io.BufferedOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
 import java.net.Socket;
 import java.net.SocketException;
 import java.net.SocketTimeoutException;
+import java.net.UnknownHostException;
 import java.nio.BufferUnderflowException;
 import java.nio.ByteBuffer;
 import java.nio.channels.UnresolvedAddressException;
@@ -202,7 +204,7 @@ public class QuorumCnxManager {
         }
 
         static public InitialMessage parse(Long protocolVersion, DataInputStream din)
-            throws InitialMessageException, IOException {
+                throws InitialMessageException, IOException {
             Long sid;
 
             if (protocolVersion != PROTOCOL_VERSION) {
@@ -248,7 +250,29 @@ public class QuorumCnxManager {
                 throw new InitialMessageException("No port number in: %s", addr);
             }
 
-            return new InitialMessage(sid, new InetSocketAddress(host_port[0], port));
+            return new InitialMessage(sid, isWildcardAddress(host_port[0]) ? null :
+                    new InetSocketAddress(host_port[0], port));
+        }
+
+        /**
+         * Returns true if the specified hostname is a wildcard address,
+         * like 0.0.0.0 for IPv4 or :: for IPv6
+         */
+        public static boolean isWildcardAddress(final String hostname) {
+            try {
+                return InetAddress.getByName(hostname).isAnyLocalAddress();
+            } catch (UnknownHostException e) {
+                // if we can not resolve, it can not be a wildcard address
+                return false;
+            } catch (SecurityException e) {
+                LOG.warn("SecurityException in getByName() for" + hostname);
+                return false;
+            }
+        }
+
+        @Override
+        public String toString() {
+            return "InitialMessage{sid=" + sid + ", electionAddr=" + electionAddr + '}';
         }
     }
 
@@ -402,6 +426,7 @@ public class QuorumCnxManager {
             throws IOException {
         DataOutputStream dout = null;
         DataInputStream din = null;
+        LOG.debug("startConnection (myId:{} --> sid:{})", self.getId(), sid);
         try {
             // Use BufferedOutputStream to reduce the number of IP packets. This is
             // important for x-DC scenarios.
@@ -435,11 +460,11 @@ public class QuorumCnxManager {
 
         // If lost the challenge, then drop the new connection
         if (sid > self.getId()) {
-            LOG.info("Have smaller server identifier, so dropping the " +
-                    "connection: (" + sid + ", " + self.getId() + ")");
+            LOG.info("Have smaller server identifier, so dropping the connection: (myId:{} --> sid:{})", self.getId(), sid);
             closeSocket(sock);
             // Otherwise proceed with the connection
         } else {
+            LOG.debug("Have larger server identifier, so keeping the connection: (myId:{} --> sid:{})", self.getId(), sid);
             SendWorker sw = new SendWorker(sock, sid);
             RecvWorker rw = new RecvWorker(sock, din, sid, sw);
             sw.setRecv(rw);
@@ -475,10 +500,12 @@ public class QuorumCnxManager {
             din = new DataInputStream(
                     new BufferedInputStream(sock.getInputStream()));
 
+            LOG.debug("Sync handling of connection request received from: {}", sock.getRemoteSocketAddress());
             handleConnection(sock, din);
         } catch (IOException e) {
             LOG.error("Exception handling connection, addr: {}, closing server connection",
                     sock.getRemoteSocketAddress());
+            LOG.debug("Exception details: ", e);
             closeSocket(sock);
         }
     }
@@ -489,12 +516,14 @@ public class QuorumCnxManager {
      */
     public void receiveConnectionAsync(final Socket sock) {
         try {
+            LOG.debug("Async handling of connection request received from: {}", sock.getRemoteSocketAddress());
             connectionExecutor.execute(
                     new QuorumConnectionReceiverThread(sock));
             connectionThreadCnt.incrementAndGet();
         } catch (Throwable e) {
             LOG.error("Exception handling connection, addr: {}, closing server connection",
                     sock.getRemoteSocketAddress());
+            LOG.debug("Exception details: ", e);
             closeSocket(sock);
         }
     }
@@ -530,7 +559,7 @@ public class QuorumCnxManager {
                     sid = init.sid;
                     electionAddr = init.electionAddr;
                 } catch (InitialMessage.InitialMessageException ex) {
-                    LOG.error(ex.toString());
+                    LOG.error("Initial message parsing error!", ex);
                     closeSocket(sock);
                     return;
                 }
@@ -575,7 +604,10 @@ public class QuorumCnxManager {
             } else {
                 connectOne(sid);
             }
-
+        } else if (sid == self.getId()) {
+            // we saw this case in ZOOKEEPER-2164
+            LOG.warn("We got a connection request from a server with our own ID. "
+                    + "This should be either a configuration error, or a bug.");
         } else { // Otherwise start worker threads to receive data.
             SendWorker sw = new SendWorker(sock, sid);
             RecvWorker rw = new RecvWorker(sock, din, sid, sw);
@@ -710,6 +742,7 @@ public class QuorumCnxManager {
             Map<Long, QuorumPeer.QuorumServer> lastProposedView = lastSeenQV.getAllMembers();
             if (lastCommittedView.containsKey(sid)) {
                 knownId = true;
+                LOG.debug("Server {} knows {} already, it is in the lastCommittedView", self.getId(), sid);
                 if (connectOne(sid, lastCommittedView.get(sid).electionAddr))
                     return;
             }
@@ -717,6 +750,7 @@ public class QuorumCnxManager {
                     && (!knownId || (lastProposedView.get(sid).electionAddr !=
                     lastCommittedView.get(sid).electionAddr))) {
                 knownId = true;
+                LOG.debug("Server {} knows {} already, it is in the lastProposedView", self.getId(), sid);
                 if (connectOne(sid, lastProposedView.get(sid).electionAddr))
                     return;
             }
@@ -787,6 +821,7 @@ public class QuorumCnxManager {
     public void softHalt() {
         for (SendWorker sw : senderWorkerMap.values()) {
             LOG.debug("Halting sender: " + sw);
+            LOG.debug("Server {} is soft-halting sender towards: {}", self.getId(), sw);
             sw.finish();
         }
     }
@@ -892,6 +927,7 @@ public class QuorumCnxManager {
             Socket client = null;
             Exception exitException = null;
             while ((!shutdown) && (portBindMaxRetry == 0 || numRetries < portBindMaxRetry)) {
+                LOG.debug("Listener thread started, myId: {}", self.getId());
                 try {
                     if (self.shouldUsePortUnification()) {
                         LOG.info("Creating TLS-enabled quorum server socket");
@@ -914,15 +950,14 @@ public class QuorumCnxManager {
                         self.recreateSocketAddresses(self.getId());
                         addr = self.getElectionAddress();
                     }
-                    LOG.info("My election bind port: " + addr.toString());
+                    LOG.info("{} is accepting connections now, my election bind port: {}", QuorumCnxManager.this.mySid, addr.toString());
                     setName(addr.toString());
                     ss.bind(addr);
                     while (!shutdown) {
                         try {
                             client = ss.accept();
                             setSockOpts(client);
-                            LOG.info("Received connection request "
-                                    + formatInetAddr((InetSocketAddress)client.getRemoteSocketAddress()));
+                            LOG.info("Received connection request from {}", client.getRemoteSocketAddress());
                             // Receive and handle the connection request
                             // asynchronously if the quorum sasl authentication is
                             // enabled. This is required because sasl server
@@ -990,7 +1025,8 @@ public class QuorumCnxManager {
          */
         void halt(){
             try{
-                LOG.debug("Trying to close listener: " + ss);
+
+                LOG.debug("Halt called: Trying to close listeners");
                 if(ss != null) {
                     LOG.debug("Closing listener: "
                               + QuorumCnxManager.this.mySid);
@@ -1052,7 +1088,7 @@ public class QuorumCnxManager {
         }
 
         synchronized boolean finish() {
-            LOG.debug("Calling finish for " + sid);
+            LOG.debug("Calling SendWorker.finish for {}", sid);
 
             if(!running){
                 /*
@@ -1119,7 +1155,7 @@ public class QuorumCnxManager {
                 LOG.error("Failed to send last message. Shutting down thread.", e);
                 this.finish();
             }
-
+            LOG.debug("SendWorker thread started towards {}. myId: {}", sid, QuorumCnxManager.this.mySid);
             try {
                 while (running && !shutdown && sock != null) {
 
@@ -1187,6 +1223,7 @@ public class QuorumCnxManager {
          * @return boolean  Value of variable running
          */
         synchronized boolean finish() {
+            LOG.debug("RecvWorker.finish called. sid: {}. myId: {}", sid, QuorumCnxManager.this.mySid);
             if(!running){
                 /*
                  * Avoids running finish() twice.
@@ -1204,6 +1241,7 @@ public class QuorumCnxManager {
         public void run() {
             threadCnt.incrementAndGet();
             try {
+                LOG.debug("RecvWorker thread towards {} started. myId: {}", sid, QuorumCnxManager.this.mySid);
                 while (running && !shutdown && sock != null) {
                     /**
                      * Reads the first int to determine the length of the
@@ -1227,7 +1265,7 @@ public class QuorumCnxManager {
                 LOG.warn("Connection broken for id " + sid + ", my id = "
                          + QuorumCnxManager.this.mySid + ", error = " , e);
             } finally {
-                LOG.warn("Interrupting SendWorker");
+                LOG.warn("Interrupting SendWorker thread from RecvWorker. sid: {}. myId: {}", sid, QuorumCnxManager.this.mySid);
                 sw.finish();
                 closeSocket(sock);
             }
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/ZKTestCase.java b/zookeeper-server/src/test/java/org/apache/zookeeper/ZKTestCase.java
index f6c6bb3..e8dae75 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/ZKTestCase.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/ZKTestCase.java
@@ -18,13 +18,15 @@
 
 package org.apache.zookeeper;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import static org.junit.Assert.fail;
+import java.time.LocalDateTime;
 import org.junit.Rule;
 import org.junit.rules.TestWatcher;
 import org.junit.runner.Description;
 import org.junit.runner.RunWith;
-import org.junit.runners.model.FrameworkMethod;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 
 /**
  * Base class for a non-parameterized ZK test.
@@ -75,5 +77,30 @@ public class ZKTestCase {
         }
 
     };
+    public interface WaitForCondition {
+        /**
+         * @return true when success
+         */
+        boolean evaluate();
+    }
+
+    /**
+     * Wait for condition to be true; otherwise fail the test if it exceed
+     * timeout
+     * @param msg       error message to print when fail
+     * @param condition condition to evaluate
+     * @param timeout   timeout in seconds
+     * @throws InterruptedException
+     */
+    public void waitFor(String msg, WaitForCondition condition, int timeout) throws InterruptedException {
+        final LocalDateTime deadline = LocalDateTime.now().plusSeconds(timeout);
+        while (LocalDateTime.now().isBefore(deadline)) {
+            if (condition.evaluate()) {
+                return;
+            }
+            Thread.sleep(100);
+        }
+        fail(msg);
+    }
 
 }
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/CnxManagerTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/CnxManagerTest.java
index ea16961..7755b6b 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/CnxManagerTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/CnxManagerTest.java
@@ -34,7 +34,6 @@ import java.util.Random;
 import java.util.concurrent.TimeUnit;
 import java.net.Socket;
 import java.util.concurrent.atomic.AtomicBoolean;
-
 import org.apache.zookeeper.common.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -50,6 +49,8 @@ import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 public class CnxManagerTest extends ZKTestCase {
     protected static final Logger LOG = LoggerFactory.getLogger(FLENewEpochTest.class);
@@ -182,7 +183,7 @@ public class CnxManagerTest extends ZKTestCase {
                 Assert.fail("Did not receive expected message");
         }
         cnxManager.halt();
-        Assert.assertFalse(cnxManager.listener.isAlive());
+        assertFalse(cnxManager.listener.isAlive());
     }
 
     @Test
@@ -216,7 +217,7 @@ public class CnxManagerTest extends ZKTestCase {
 
         if((end - begin) > 6000) Assert.fail("Waited more than necessary");
         cnxManager.halt();
-        Assert.assertFalse(cnxManager.listener.isAlive());
+        assertFalse(cnxManager.listener.isAlive());
     }
 
     /**
@@ -279,7 +280,7 @@ public class CnxManagerTest extends ZKTestCase {
         }
         peer.shutdown();
         cnxManager.halt();
-        Assert.assertFalse(cnxManager.listener.isAlive());
+        assertFalse(cnxManager.listener.isAlive());
     }
 
     /**
@@ -306,9 +307,9 @@ public class CnxManagerTest extends ZKTestCase {
         // listener thread should stop and throws error which notify QuorumPeer about error.
         // QuorumPeer should start shutdown process
         listener.join(15000); // set wait time, if listener contains bug and thread not stops.
-        Assert.assertFalse(listener.isAlive());
+        assertFalse(listener.isAlive());
         Assert.assertTrue(errorHappend.get());
-        Assert.assertFalse(QuorumPeer.class.getSimpleName() + " not stopped after "
+        assertFalse(QuorumPeer.class.getSimpleName() + " not stopped after "
                            + "listener thread death", listener.isAlive());
     }
 
@@ -363,7 +364,7 @@ public class CnxManagerTest extends ZKTestCase {
 
         peer.shutdown();
         cnxManager.halt();
-        Assert.assertFalse(cnxManager.listener.isAlive());
+        assertFalse(cnxManager.listener.isAlive());
     }
 
     /*
@@ -391,7 +392,7 @@ public class CnxManagerTest extends ZKTestCase {
         long end = Time.currentElapsedTime();
         if((end - begin) > ((peer.getSyncLimit() * peer.getTickTime()) + 500)) Assert.fail("Waited more than necessary");
         cnxManager.halt();
-        Assert.assertFalse(cnxManager.listener.isAlive());
+        assertFalse(cnxManager.listener.isAlive());
     }
 
     /*
@@ -545,6 +546,17 @@ public class CnxManagerTest extends ZKTestCase {
         }
     }
 
+    @Test
+    public void testWildcardAddressRecognition() {
+        assertTrue(QuorumCnxManager.InitialMessage.isWildcardAddress("0.0.0.0"));
+        assertTrue(QuorumCnxManager.InitialMessage.isWildcardAddress("::"));
+        assertFalse(QuorumCnxManager.InitialMessage.isWildcardAddress("::1"));
+        assertFalse(QuorumCnxManager.InitialMessage.isWildcardAddress("some.unresolvable.host.com"));
+        assertFalse(QuorumCnxManager.InitialMessage.isWildcardAddress("127.0.0.1"));
+        assertFalse(QuorumCnxManager.InitialMessage.isWildcardAddress("255.255.255.255"));
+        assertFalse(QuorumCnxManager.InitialMessage.isWildcardAddress("1.2.3.4"));
+        assertFalse(QuorumCnxManager.InitialMessage.isWildcardAddress("www.google.com"));
+    }
     private String createLongString(int size) {
         StringBuilder sb = new StringBuilder(size);
         for (int i=0; i < size; i++) {
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumRestartTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumRestartTest.java
new file mode 100644
index 0000000..0129550
--- /dev/null
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumRestartTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import static org.apache.zookeeper.client.ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET;
+import static org.junit.Assert.assertTrue;
+import org.apache.zookeeper.ZKTestCase;
+import org.apache.zookeeper.server.ServerCnxnFactory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class QuorumRestartTest extends ZKTestCase {
+    private static final Logger LOG = LoggerFactory.getLogger(QuorumRestartTest.class);
+    private QuorumUtil qu;
+
+    @Before
+    public void setUp() throws Exception {
+        System.setProperty(ZOOKEEPER_CLIENT_CNXN_SOCKET, "org.apache.zookeeper.ClientCnxnSocketNetty");
+        System.setProperty(ServerCnxnFactory.ZOOKEEPER_SERVER_CNXN_FACTORY, "org.apache.zookeeper.server.NettyServerCnxnFactory");
+
+        // starting a 3 node ensemble without observers
+        qu = new QuorumUtil(1, 2);
+        qu.startAll();
+    }
+
+    /**
+     * A basic test for rolling restart. We are restarting the ZooKeeper servers one by one,
+     * starting from the first server. We always make sure that all the nodes joined to the
+     * Quorum before moving forward.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testRollingRestart() throws Exception {
+        for (int serverToRestart = 1; serverToRestart <= 3; serverToRestart++) {
+            LOG.info("***** restarting: " + serverToRestart);
+            qu.shutdown(serverToRestart);
+
+            assertTrue(String.format("Timeout during waiting for server %d to go down", serverToRestart),
+                    ClientBase.waitForServerDown("127.0.0.1:" + qu.getPeer(serverToRestart).clientPort, ClientBase.CONNECTION_TIMEOUT));
+
+            qu.restart(serverToRestart);
+
+            final String errorMessage = "Not all the quorum members are connected after restarting server " + serverToRestart;
+            waitFor(errorMessage, () -> qu.allPeersAreConnected(), 30);
+
+            LOG.info("***** Restart {} succeeded", serverToRestart);
+        }
+    }
+
+    /**
+     * Testing one of the errors reported in ZOOKEEPER-2164, when some servers can not
+     * rejoin to the Quorum after restarting the servers backwards
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testRollingRestartBackwards() throws Exception {
+        for (int serverToRestart = 3; serverToRestart >= 1; serverToRestart--) {
+            LOG.info("***** restarting: " + serverToRestart);
+            qu.shutdown(serverToRestart);
+
+            assertTrue(String.format("Timeout during waiting for server %d to go down", serverToRestart),
+                    ClientBase.waitForServerDown("127.0.0.1:" + qu.getPeer(serverToRestart).clientPort, ClientBase.CONNECTION_TIMEOUT));
+
+            qu.restart(serverToRestart);
+
+            final String errorMessage = "Not all the quorum members are connected after restarting server " + serverToRestart;
+            waitFor(errorMessage, () -> qu.allPeersAreConnected(), 30);
+
+            LOG.info("***** Restart {} succeeded", serverToRestart);
+        }
+    }
+
+    /**
+     * Testing one of the errors reported in ZOOKEEPER-2164, when some servers can not
+     * rejoin to the Quorum after restarting the current leader multiple times
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testRestartingLeaderMultipleTimes() throws Exception {
+        for (int restartCount = 1; restartCount <= 3; restartCount++) {
+            int leaderId = qu.getLeaderServer();
+            LOG.info("***** new leader: " + leaderId);
+            qu.shutdown(leaderId);
+
+            assertTrue("Timeout during waiting for current leader to go down",
+                    ClientBase.waitForServerDown("127.0.0.1:" + qu.getPeer(leaderId).clientPort, ClientBase.CONNECTION_TIMEOUT));
+
+            String errorMessage = "No new leader was elected";
+            waitFor(errorMessage, () -> qu.leaderExists() && qu.getLeaderServer() != leaderId, 30);
+
+            qu.restart(leaderId);
+
+            errorMessage = "Not all the quorum members are connected after restarting the old leader";
+            waitFor(errorMessage, () -> qu.allPeersAreConnected(), 30);
+
+            LOG.info("***** Leader Restart {} succeeded", restartCount);
+        }
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        qu.shutdownAll();
+        System.clearProperty(ZOOKEEPER_CLIENT_CNXN_SOCKET);
+        System.clearProperty(ServerCnxnFactory.ZOOKEEPER_SERVER_CNXN_FACTORY);
+    }
+}
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumUtil.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumUtil.java
index 314171d..113452a 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumUtil.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumUtil.java
@@ -22,13 +22,14 @@ import java.io.File;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-
+import java.util.TreeSet;
 import org.apache.zookeeper.PortAssignment;
 import org.apache.zookeeper.server.quorum.Election;
 import org.apache.zookeeper.server.quorum.QuorumPeer;
@@ -50,6 +51,8 @@ public class QuorumUtil {
     // TODO refactor QuorumBase to be special case of this
 
     private static final Logger LOG = LoggerFactory.getLogger(QuorumUtil.class);
+    private static final Set<QuorumPeer.ServerState> CONNECTED_STATES = new TreeSet<>(
+            Arrays.asList(QuorumPeer.ServerState.LEADING, QuorumPeer.ServerState.FOLLOWING, QuorumPeer.ServerState.OBSERVING));
 
     public static class PeerStruct {
         public int id;
@@ -107,7 +110,7 @@ public class QuorumUtil {
                 peersView.put(Long.valueOf(i), new QuorumServer(i, 
                                new InetSocketAddress("127.0.0.1", PortAssignment.unique()),
                                new InetSocketAddress("127.0.0.1", PortAssignment.unique()),
-                               new InetSocketAddress("127.0.0.1", ps.clientPort), 
+                               new InetSocketAddress("127.0.0.1", ps.clientPort),
                                LearnerType.PARTICIPANT));
                 hostPort += "127.0.0.1:" + ps.clientPort + ((i == ALL) ? "" : ",");
             }
@@ -274,6 +277,12 @@ public class QuorumUtil {
         return "127.0.0.1:" + peer.getClientPort();
     }
 
+    public boolean allPeersAreConnected() {
+        return peers.values().stream()
+                .map(ps -> ps.peer)
+                .allMatch(peer -> CONNECTED_STATES.contains(peer.getPeerState()));
+    }
+
     public QuorumPeer getLeaderQuorumPeer() {
         for (PeerStruct ps: peers.values()) {
             if (ps.peer.leader != null) {
@@ -320,6 +329,15 @@ public class QuorumUtil {
         return index;
     }
 
+    public boolean leaderExists() {
+        for (int i = 1; i <= ALL; i++) {
+            if (getPeer(i).peer.leader != null) {
+                return true;
+            }
+        }
+        return false;
+    }
+
     public String getConnectionStringForServer(final int index) {
         return "127.0.0.1:" + getPeer(index).clientPort;
     }