You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by eo...@apache.org on 2019/06/14 07:06:49 UTC

[zookeeper] branch master updated: ZOOKEEPER-3296: Explicitly closing the sslsocket when it failed handshake to prevent issue where peers cannot join quorum

This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zookeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 5874a0f  ZOOKEEPER-3296: Explicitly closing the sslsocket when it failed handshake to prevent issue where peers cannot join quorum
5874a0f is described below

commit 5874a0f355417024ce8ebe03ab2f6eaf5b9a228c
Author: Fangmin Lyu <fa...@apache.org>
AuthorDate: Fri Jun 14 09:06:42 2019 +0200

    ZOOKEEPER-3296: Explicitly closing the sslsocket when it failed handshake to prevent issue where peers cannot join quorum
    
    The quorum connection manager is handling connections sequentially with a default listen backlog queue size 50, during the network loss, there are socket read timed out, which is syncLimit * tickTime, and almost all the following connect requests in the backlog queue will timed out from the other side before it's being processed.
    
    Those timed out learners will try to connect to a different server, and leaves the connect requests on server side without sending the close_notify packet. The server is slowly consuming from these queue with syncLimit * tickTime timeout for each of those requests which haven't sent notify_close packet. Any new connect requests will be queued up again when there is spot in the listen backlog queue, but timed out before the server handles it, and it can never successfully finish any ne [...]
    
    Please check the Jira for more details.
    
    Author: Fangmin Lyu <fa...@apache.org>
    
    Reviewers: Enrico Olivelli <eo...@apache.org>, Andor Molnár <an...@apache.org>
    
    Closes #843 from lvfangmin/ZOOKEEPER-3296
---
 .../zookeeper/server/quorum/QuorumCnxManager.java  | 22 +++---
 .../apache/zookeeper/server/quorum/QuorumPeer.java |  7 +-
 .../{test => server/quorum}/CnxManagerTest.java    | 86 +++++++++++++++++++++-
 3 files changed, 103 insertions(+), 12 deletions(-)

diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumCnxManager.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
index 7ba068b..7870bb3 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
@@ -635,10 +635,12 @@ public class QuorumCnxManager {
     /**
      * Try to establish a connection to server with id sid using its electionAddr.
      *
+     * VisibleForTesting.
+     *
      *  @param sid  server id
      *  @return boolean success indication
      */
-    synchronized private boolean connectOne(long sid, InetSocketAddress electionAddr){
+    synchronized boolean connectOne(long sid, InetSocketAddress electionAddr){
         if (senderWorkerMap.get(sid) != null) {
             LOG.debug("There is a connection already for server " + sid);
             return true;
@@ -648,18 +650,18 @@ public class QuorumCnxManager {
         try {
             LOG.debug("Opening channel to server " + sid);
             if (self.isSslQuorum()) {
-                 SSLSocket sslSock = self.getX509Util().createSSLSocket();
-                 setSockOpts(sslSock);
-                 sslSock.connect(electionAddr, cnxTO);
-                 sslSock.startHandshake();
-                 sock = sslSock;
-                 LOG.info("SSL handshake complete with {} - {} - {}", sslSock.getRemoteSocketAddress(), sslSock.getSession().getProtocol(), sslSock.getSession().getCipherSuite());
+                 sock = self.getX509Util().createSSLSocket();
              } else {
                  sock = new Socket();
-                 setSockOpts(sock);
-                 sock.connect(electionAddr, cnxTO);
-
              }
+            setSockOpts(sock);
+            sock.connect(electionAddr, cnxTO);
+            if (sock instanceof SSLSocket) {
+                SSLSocket sslSock = (SSLSocket) sock;
+                sslSock.startHandshake();
+                LOG.info("SSL handshake complete with {} - {} - {}", sslSock.getRemoteSocketAddress(), sslSock.getSession().getProtocol(), sslSock.getSession().getCipherSuite());
+            }
+
              LOG.debug("Connected to server " + sid);
             // Sends connection request asynchronously if the quorum
             // sasl authentication is enabled. This is required because
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java
index 8e866bd..8811875 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java
@@ -883,10 +883,15 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
         quorumStats = new QuorumStats(this);
         jmxRemotePeerBean = new HashMap<Long, RemotePeerBean>();
         adminServer = AdminServerFactory.createAdminServer();
-        x509Util = new QuorumX509Util();
+        x509Util = createX509Util();
         initialize();
     }
 
+    // VisibleForTesting
+    QuorumX509Util createX509Util() {
+        return new QuorumX509Util();
+    }
+
     /**
      * For backward compatibility purposes, we instantiate QuorumMaj by default.
      */
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/CnxManagerTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CnxManagerTest.java
similarity index 84%
rename from zookeeper-server/src/test/java/org/apache/zookeeper/test/CnxManagerTest.java
rename to zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CnxManagerTest.java
index 4e4c040..d3a631b 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/CnxManagerTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CnxManagerTest.java
@@ -16,14 +16,16 @@
  * limitations under the License.
  */
 
-package org.apache.zookeeper.test;
+package org.apache.zookeeper.server.quorum;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.File;
+import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.net.SocketAddress;
 import java.nio.ByteBuffer;
 import java.nio.channels.SocketChannel;
 import java.util.ArrayList;
@@ -31,9 +33,14 @@ import java.util.Date;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Random;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.net.Socket;
+import javax.net.ssl.SSLSession;
+import javax.net.ssl.SSLSocket;
+import javax.net.ssl.HandshakeCompletedListener;
 
+import org.apache.zookeeper.common.QuorumX509Util;
 import org.apache.zookeeper.common.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -46,6 +53,8 @@ import org.apache.zookeeper.server.quorum.QuorumPeer;
 import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType;
 import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
 import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
+import org.apache.zookeeper.test.ClientBase;
+import org.apache.zookeeper.test.FLENewEpochTest;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -363,6 +372,81 @@ public class CnxManagerTest extends ZKTestCase {
         Assert.assertFalse(cnxManager.listener.isAlive());
     }
 
+    /**
+     * Test the SSLSocket is explicitly closed when there is IOException
+     * happened during connect.
+     */
+    @Test
+    public void testSSLSocketClosedWhenHandshakeTimeout() throws Exception {
+        final CountDownLatch closeLatch = new CountDownLatch(1);
+        QuorumX509Util mockedX509Util = new QuorumX509Util() {
+            @Override
+            public SSLSocket createSSLSocket() {
+                return new SSLSocket() {
+
+                    @Override
+                    public void connect(SocketAddress endpoint, int timeout) {}
+
+                    @Override
+                    public void startHandshake() throws IOException {
+                        throw new IOException();
+                    }
+
+                    @Override
+                    public void close() {
+                        closeLatch.countDown();
+                    }
+
+                    public String [] getSupportedCipherSuites() {
+                        throw new UnsupportedOperationException();
+                    }
+
+                    public String [] getEnabledCipherSuites() {
+                        throw new UnsupportedOperationException();
+                    }
+
+                    public String [] getSupportedProtocols() {
+                        throw new UnsupportedOperationException();
+                    }
+
+                    public String [] getEnabledProtocols() {
+                        throw new UnsupportedOperationException();
+                    }
+
+                    public SSLSession getSession() {
+                        throw new UnsupportedOperationException();
+                    }
+
+                    public void setEnabledCipherSuites(String suites []) {}
+                    public void setEnabledProtocols(String protocols[]) {}
+                    public void addHandshakeCompletedListener(HandshakeCompletedListener listener) {}
+                    public void removeHandshakeCompletedListener(HandshakeCompletedListener listener) {}
+                    public void setUseClientMode(boolean mode) {}
+                    public boolean getUseClientMode() { return true; }
+                    public void setNeedClientAuth(boolean need) {}
+                    public boolean getNeedClientAuth() { return true; }
+                    public void setWantClientAuth(boolean want) {}
+                    public boolean getWantClientAuth() { return true; }
+                    public void setEnableSessionCreation(boolean flag) {}
+                    public boolean getEnableSessionCreation() { return true; }
+                };
+            }
+        };
+
+        QuorumPeer peer = new QuorumPeer(peers, peerTmpdir[0], peerTmpdir[0],
+                peerClientPort[0], 3, 0, 2000, 2, 2) {
+            @Override
+            public QuorumX509Util createX509Util() {
+                return mockedX509Util;
+            }
+        };
+
+        peer.setSslQuorum(true);
+        QuorumCnxManager cnxManager = peer.createCnxnManager();
+        cnxManager.connectOne(1, peers.get(1L).electionAddr);
+        Assert.assertTrue(closeLatch.await(1, TimeUnit.SECONDS));
+    }
+
     /*
      * Test if Worker threads are getting killed after connection loss
      */