You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by eo...@apache.org on 2020/03/12 13:47:46 UTC

[zookeeper] branch master updated: ZOOKEEPER-2164: Quorum members can not rejoin after restart

This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zookeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 0287c95  ZOOKEEPER-2164: Quorum members can not rejoin after restart
0287c95 is described below

commit 0287c95726e66aa6723f0f91f8d85f9973dbbb60
Author: Mate Szalay-Beko <sz...@gmail.com>
AuthorDate: Thu Mar 12 14:47:27 2020 +0100

    ZOOKEEPER-2164: Quorum members can not rejoin after restart
    
    Ever since ZOOKEEPER-107 (released in 3.5.0) the servers are sending
    their addresses during initial connection requests. The receiving
    server can potentially use these addresses to send back a new
    connection request if the challenge is won by the receiver.
    
    If the server config contains wildcard address (e.g. 0.0.0.0 in case
    of IPv4) then the first connection request sent by A to B will contain
    this address. If the ID of A is smaller than the ID of B, then A will
    lose the challenge and the second connection request sent back by B
    will never reach A, as B will send the initial message to 0.0.0.0.
    
    So in any 3.5+ ZooKeeper, if wildcard addresses are used in the configs,
    then there might be some servers never able to rejoin to the quorum
    after they got restarted.
    
    In 3.5+ for backward compatibility reasons (needed during rolling
    upgrade) there is a version of the QuorumCnxManager.connectOne()
    method that needs no election address but use the last known address
    to initiate the connection. In this commit, we simply call this method
    if the address is a wildcard address.
    
    I also added a few restart realted tests, to make sure that restart
    still works when we don't use wildcard addresses. We can not test
    the original error with unit tests, as it would require to start
    the quorum on multiple hosts.
    
    I also tested the patch for rolling restart manually both with and
    without wildcard addresses in the config.
    
    Author: Mate Szalay-Beko <sz...@gmail.com>
    
    Reviewers: Enrico Olivelli <eo...@apache.org>, Norbert Kalmar <nk...@apache.org>
    
    Closes #1254 from symat/ZOOKEEPER-2164
---
 .../zookeeper/server/quorum/QuorumCnxManager.java  |  67 ++++++++---
 .../zookeeper/server/quorum/CnxManagerTest.java    |  11 ++
 .../apache/zookeeper/test/QuorumRestartTest.java   | 133 +++++++++++++++++++++
 .../java/org/apache/zookeeper/test/QuorumUtil.java |  19 +++
 4 files changed, 216 insertions(+), 14 deletions(-)

diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumCnxManager.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
index 45ffeb8..66f6883 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
@@ -32,6 +32,7 @@ import java.net.ServerSocket;
 import java.net.Socket;
 import java.net.SocketException;
 import java.net.SocketTimeoutException;
+import java.net.UnknownHostException;
 import java.nio.BufferUnderflowException;
 import java.nio.ByteBuffer;
 import java.nio.channels.UnresolvedAddressException;
@@ -266,12 +267,33 @@ public class QuorumCnxManager {
                 } catch (ArrayIndexOutOfBoundsException e) {
                     throw new InitialMessageException("No port number in: %s", addr);
                 }
-                addresses.add(new InetSocketAddress(host_port[0], port));
+                if (!isWildcardAddress(host_port[0])) {
+                    addresses.add(new InetSocketAddress(host_port[0], port));
+                }
             }
 
             return new InitialMessage(sid, addresses);
         }
 
+        /**
+         * Returns true if the specified hostname is a wildcard address,
+         * like 0.0.0.0 for IPv4 or :: for IPv6
+         *
+         * (the function is package-private to be visible for testing)
+         */
+        static boolean isWildcardAddress(final String hostname) {
+            try {
+                return InetAddress.getByName(hostname).isAnyLocalAddress();
+            } catch (UnknownHostException e) {
+                // if we can not resolve, it can not be a wildcard address
+                return false;
+            }
+        }
+
+        @Override
+        public String toString() {
+            return "InitialMessage{sid=" + sid + ", electionAddr=" + electionAddr + '}';
+        }
     }
 
     public QuorumCnxManager(QuorumPeer self, final long mySid, Map<Long, QuorumPeer.QuorumServer> view,
@@ -415,6 +437,7 @@ public class QuorumCnxManager {
     private boolean startConnection(Socket sock, Long sid) throws IOException {
         DataOutputStream dout = null;
         DataInputStream din = null;
+        LOG.debug("startConnection (myId:{} --> sid:{})", self.getId(), sid);
         try {
             // Use BufferedOutputStream to reduce the number of IP packets. This is
             // important for x-DC scenarios.
@@ -459,13 +482,11 @@ public class QuorumCnxManager {
 
         // If lost the challenge, then drop the new connection
         if (sid > self.getId()) {
-            LOG.info(
-                "Have smaller server identifier, so dropping the connection: ({}, {})",
-                sid,
-                self.getId());
+            LOG.info("Have smaller server identifier, so dropping the connection: (myId:{} --> sid:{})", self.getId(), sid);
             closeSocket(sock);
             // Otherwise proceed with the connection
         } else {
+            LOG.debug("Have larger server identifier, so keeping the connection: (myId:{} --> sid:{})", self.getId(), sid);
             SendWorker sw = new SendWorker(sock, sid);
             RecvWorker rw = new RecvWorker(sock, din, sid, sw);
             sw.setRecv(rw);
@@ -501,9 +522,11 @@ public class QuorumCnxManager {
         try {
             din = new DataInputStream(new BufferedInputStream(sock.getInputStream()));
 
+            LOG.debug("Sync handling of connection request received from: {}", sock.getRemoteSocketAddress());
             handleConnection(sock, din);
         } catch (IOException e) {
             LOG.error("Exception handling connection, addr: {}, closing server connection", sock.getRemoteSocketAddress());
+            LOG.debug("Exception details: ", e);
             closeSocket(sock);
         }
     }
@@ -514,10 +537,12 @@ public class QuorumCnxManager {
      */
     public void receiveConnectionAsync(final Socket sock) {
         try {
+            LOG.debug("Async handling of connection request received from: {}", sock.getRemoteSocketAddress());
             connectionExecutor.execute(new QuorumConnectionReceiverThread(sock));
             connectionThreadCnt.incrementAndGet();
         } catch (Throwable e) {
             LOG.error("Exception handling connection, addr: {}, closing server connection", sock.getRemoteSocketAddress());
+            LOG.debug("Exception details: ", e);
             closeSocket(sock);
         }
     }
@@ -552,10 +577,13 @@ public class QuorumCnxManager {
                 try {
                     InitialMessage init = InitialMessage.parse(protocolVersion, din);
                     sid = init.sid;
-                    electionAddr = new MultipleAddresses(init.electionAddr,
-                        Duration.ofMillis(self.getMultiAddressReachabilityCheckTimeoutMs()));
+                    if (!init.electionAddr.isEmpty()) {
+                        electionAddr = new MultipleAddresses(init.electionAddr,
+                                Duration.ofMillis(self.getMultiAddressReachabilityCheckTimeoutMs()));
+                    }
+                    LOG.debug("Initial message parsed by {}: {}", self.getId(), init.toString());
                 } catch (InitialMessage.InitialMessageException ex) {
-                    LOG.error(ex.toString());
+                    LOG.error("Initial message parsing error!", ex);
                     closeSocket(sock);
                     return;
                 }
@@ -601,6 +629,10 @@ public class QuorumCnxManager {
                 connectOne(sid);
             }
 
+        } else if (sid == self.getId()) {
+            // we saw this case in ZOOKEEPER-2164
+            LOG.warn("We got a connection request from a server with our own ID. "
+                     + "This should be either a configuration error, or a bug.");
         } else { // Otherwise start worker threads to receive data.
             SendWorker sw = new SendWorker(sock, sid);
             RecvWorker rw = new RecvWorker(sock, din, sid, sw);
@@ -745,6 +777,7 @@ public class QuorumCnxManager {
             Map<Long, QuorumPeer.QuorumServer> lastProposedView = lastSeenQV.getAllMembers();
             if (lastCommittedView.containsKey(sid)) {
                 knownId = true;
+                LOG.debug("Server {} knows {} already, it is in the lastCommittedView", self.getId(), sid);
                 if (connectOne(sid, lastCommittedView.get(sid).electionAddr)) {
                     return;
                 }
@@ -754,6 +787,8 @@ public class QuorumCnxManager {
                 && (!knownId
                     || (lastProposedView.get(sid).electionAddr != lastCommittedView.get(sid).electionAddr))) {
                 knownId = true;
+                LOG.debug("Server {} knows {} already, it is in the lastProposedView", self.getId(), sid);
+
                 if (connectOne(sid, lastProposedView.get(sid).electionAddr)) {
                     return;
                 }
@@ -821,7 +856,7 @@ public class QuorumCnxManager {
      */
     public void softHalt() {
         for (SendWorker sw : senderWorkerMap.values()) {
-            LOG.debug("Halting sender: {}", sw);
+            LOG.debug("Server {} is soft-halting sender towards: {}", self.getId(), sw);
             sw.finish();
         }
     }
@@ -925,6 +960,7 @@ public class QuorumCnxManager {
         @Override
         public void run() {
             if (!shutdown) {
+                LOG.debug("Listener thread started, myId: {}", self.getId());
                 Set<InetSocketAddress> addresses;
 
                 if (self.getQuorumListenOnAllIPs()) {
@@ -977,7 +1013,7 @@ public class QuorumCnxManager {
          * Halts this listener thread.
          */
         void halt() {
-            LOG.debug("Trying to close listeners");
+            LOG.debug("Halt called: Trying to close listeners");
             if (listenerHandlers != null) {
                 LOG.debug("Closing listener: {}", QuorumCnxManager.this.mySid);
                 for (ListenerHandler handler : listenerHandlers) {
@@ -1044,12 +1080,12 @@ public class QuorumCnxManager {
                 while ((!shutdown) && (portBindMaxRetry == 0 || numRetries < portBindMaxRetry)) {
                     try {
                         serverSocket = createNewServerSocket();
-                        LOG.info("My election bind port: {}", address.toString());
+                        LOG.info("{} is accepting connections now, my election bind port: {}", QuorumCnxManager.this.mySid, address.toString());
                         while (!shutdown) {
                             try {
                                 client = serverSocket.accept();
                                 setSockOpts(client);
-                                LOG.info("Received connection request {}", client.getRemoteSocketAddress());
+                                LOG.info("Received connection request from {}", client.getRemoteSocketAddress());
                                 // Receive and handle the connection request
                                 // asynchronously if the quorum sasl authentication is
                                 // enabled. This is required because sasl server
@@ -1173,7 +1209,7 @@ public class QuorumCnxManager {
         }
 
         synchronized boolean finish() {
-            LOG.debug("Calling finish for {}", sid);
+            LOG.debug("Calling SendWorker.finish for {}", sid);
 
             if (!running) {
                 /*
@@ -1240,6 +1276,7 @@ public class QuorumCnxManager {
                 LOG.error("Failed to send last message. Shutting down thread.", e);
                 this.finish();
             }
+            LOG.debug("SendWorker thread started towards {}. myId: {}", sid, QuorumCnxManager.this.mySid);
 
             try {
                 while (running && !shutdown && sock != null) {
@@ -1337,6 +1374,7 @@ public class QuorumCnxManager {
          * @return boolean  Value of variable running
          */
         synchronized boolean finish() {
+            LOG.debug("RecvWorker.finish called. sid: {}. myId: {}", sid, QuorumCnxManager.this.mySid);
             if (!running) {
                 /*
                  * Avoids running finish() twice.
@@ -1354,6 +1392,7 @@ public class QuorumCnxManager {
         public void run() {
             threadCnt.incrementAndGet();
             try {
+                LOG.debug("RecvWorker thread towards {} started. myId: {}", sid, QuorumCnxManager.this.mySid);
                 while (running && !shutdown && sock != null) {
                     /**
                      * Reads the first int to determine the length of the
@@ -1377,7 +1416,7 @@ public class QuorumCnxManager {
                     QuorumCnxManager.this.mySid,
                     e);
             } finally {
-                LOG.warn("Interrupting SendWorker");
+                LOG.warn("Interrupting SendWorker thread from RecvWorker. sid: {}. myId: {}", sid, QuorumCnxManager.this.mySid);
                 sw.finish();
                 closeSocket(sock);
             }
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CnxManagerTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CnxManagerTest.java
index 269b6a8..a99e336 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CnxManagerTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CnxManagerTest.java
@@ -667,6 +667,17 @@ public class CnxManagerTest extends ZKTestCase {
         }
     }
 
+    @Test
+    public void testWildcardAddressRecognition() {
+        assertTrue(QuorumCnxManager.InitialMessage.isWildcardAddress("0.0.0.0"));
+        assertTrue(QuorumCnxManager.InitialMessage.isWildcardAddress("::"));
+        assertFalse(QuorumCnxManager.InitialMessage.isWildcardAddress("some.unresolvable.host.com"));
+        assertFalse(QuorumCnxManager.InitialMessage.isWildcardAddress("127.0.0.1"));
+        assertFalse(QuorumCnxManager.InitialMessage.isWildcardAddress("255.255.255.255"));
+        assertFalse(QuorumCnxManager.InitialMessage.isWildcardAddress("1.2.3.4"));
+        assertFalse(QuorumCnxManager.InitialMessage.isWildcardAddress("www.google.com"));
+    }
+
     private String createLongString(int size) {
         StringBuilder sb = new StringBuilder(size);
         for (int i = 0; i < size; i++) {
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumRestartTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumRestartTest.java
new file mode 100644
index 0000000..afdafb7
--- /dev/null
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumRestartTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import static org.apache.zookeeper.client.ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET;
+import static org.junit.Assert.assertTrue;
+import org.apache.zookeeper.ZKTestCase;
+import org.apache.zookeeper.server.ServerCnxnFactory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class QuorumRestartTest extends ZKTestCase {
+
+    private static final Logger LOG = LoggerFactory.getLogger(QuorumRestartTest.class);
+    private QuorumUtil qu;
+
+    @Before
+    public void setUp() throws Exception {
+        System.setProperty(ZOOKEEPER_CLIENT_CNXN_SOCKET, "org.apache.zookeeper.ClientCnxnSocketNetty");
+        System.setProperty(ServerCnxnFactory.ZOOKEEPER_SERVER_CNXN_FACTORY, "org.apache.zookeeper.server.NettyServerCnxnFactory");
+
+        // starting a 3 node ensemble without observers
+        qu = new QuorumUtil(1, 2);
+        qu.startAll();
+    }
+
+
+    /**
+     * A basic test for rolling restart. We are restarting the ZooKeeper servers one by one,
+     * starting from the first server. We always make sure that all the nodes joined to the
+     * Quorum before moving forward.
+     * @throws Exception
+     */
+    @Test
+    public void testRollingRestart() throws Exception {
+        for (int serverToRestart = 1; serverToRestart <= 3; serverToRestart++) {
+            LOG.info("***** restarting: " + serverToRestart);
+            qu.shutdown(serverToRestart);
+
+            assertTrue(String.format("Timeout during waiting for server %d to go down", serverToRestart),
+                    ClientBase.waitForServerDown("127.0.0.1:" + qu.getPeer(serverToRestart).clientPort, ClientBase.CONNECTION_TIMEOUT));
+
+            qu.restart(serverToRestart);
+
+            final String errorMessage = "Not all the quorum members are connected after restarting server " + serverToRestart;
+            waitFor(errorMessage, () -> qu.allPeersAreConnected(), 30);
+
+            LOG.info("***** Restart {} succeeded", serverToRestart);
+        }
+    }
+
+    /**
+     * Testing one of the errors reported in ZOOKEEPER-2164, when some servers can not
+     * rejoin to the Quorum after restarting the servers backwards
+     * @throws Exception
+     */
+    @Test
+    public void testRollingRestartBackwards() throws Exception {
+        for (int serverToRestart = 3; serverToRestart >= 1; serverToRestart--) {
+            LOG.info("***** restarting: " + serverToRestart);
+            qu.shutdown(serverToRestart);
+
+            assertTrue(String.format("Timeout during waiting for server %d to go down", serverToRestart),
+                    ClientBase.waitForServerDown("127.0.0.1:" + qu.getPeer(serverToRestart).clientPort, ClientBase.CONNECTION_TIMEOUT));
+
+            qu.restart(serverToRestart);
+
+            final String errorMessage = "Not all the quorum members are connected after restarting server " + serverToRestart;
+            waitFor(errorMessage, () -> qu.allPeersAreConnected(), 30);
+
+            LOG.info("***** Restart {} succeeded", serverToRestart);
+        }
+    }
+
+
+    /**
+     * Testing one of the errors reported in ZOOKEEPER-2164, when some servers can not
+     * rejoin to the Quorum after restarting the current leader multiple times
+     * @throws Exception
+     */
+    @Test
+    public void testRestartingLeaderMultipleTimes() throws Exception {
+        for (int restartCount = 1; restartCount <= 3; restartCount++) {
+            int leaderId = qu.getLeaderServer();
+            LOG.info("***** new leader: " + leaderId);
+            qu.shutdown(leaderId);
+
+            assertTrue("Timeout during waiting for current leader to go down",
+                    ClientBase.waitForServerDown("127.0.0.1:" + qu.getPeer(leaderId).clientPort, ClientBase.CONNECTION_TIMEOUT));
+
+            String errorMessage = "No new leader was elected";
+            waitFor(errorMessage, () -> qu.leaderExists() && qu.getLeaderServer() != leaderId, 30);
+
+            qu.restart(leaderId);
+
+            errorMessage = "Not all the quorum members are connected after restarting the old leader";
+            waitFor(errorMessage, () -> qu.allPeersAreConnected(), 30);
+
+            LOG.info("***** Leader Restart {} succeeded", restartCount);
+        }
+    }
+
+
+    @After
+    public void tearDown() throws Exception {
+        qu.shutdownAll();
+        System.clearProperty(ZOOKEEPER_CLIENT_CNXN_SOCKET);
+        System.clearProperty(ServerCnxnFactory.ZOOKEEPER_SERVER_CNXN_FACTORY);
+    }
+
+
+
+
+}
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumUtil.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumUtil.java
index 14e3bee..f793585 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumUtil.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumUtil.java
@@ -25,12 +25,14 @@ import java.io.File;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.TreeSet;
 import org.apache.zookeeper.PortAssignment;
 import org.apache.zookeeper.server.quorum.Election;
 import org.apache.zookeeper.server.quorum.QuorumPeer;
@@ -51,6 +53,8 @@ public class QuorumUtil {
     // TODO refactor QuorumBase to be special case of this
 
     private static final Logger LOG = LoggerFactory.getLogger(QuorumUtil.class);
+    private static final Set<QuorumPeer.ServerState> CONNECTED_STATES = new TreeSet<>(
+        Arrays.asList(QuorumPeer.ServerState.LEADING, QuorumPeer.ServerState.FOLLOWING, QuorumPeer.ServerState.OBSERVING));
 
     public static class PeerStruct {
 
@@ -274,6 +278,12 @@ public class QuorumUtil {
         return "127.0.0.1:" + peer.getClientPort();
     }
 
+    public boolean allPeersAreConnected() {
+        return peers.values().stream()
+          .map(ps -> ps.peer)
+          .allMatch(peer -> CONNECTED_STATES.contains(peer.getPeerState()));
+    }
+
     public QuorumPeer getLeaderQuorumPeer() {
         for (PeerStruct ps : peers.values()) {
             if (ps.peer.leader != null) {
@@ -320,6 +330,15 @@ public class QuorumUtil {
         return index;
     }
 
+    public boolean leaderExists() {
+        for (int i = 1; i <= ALL; i++) {
+            if (getPeer(i).peer.leader != null) {
+                return true;
+            }
+        }
+        return false;
+    }
+
     public String getConnectionStringForServer(final int index) {
         return "127.0.0.1:" + getPeer(index).clientPort;
     }