You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ha...@apache.org on 2017/08/03 16:23:02 UTC

zookeeper git commit: ZOOKEEPER-1669: Operations to server will be timed-out while thousands of sessions expired same time

Repository: zookeeper
Updated Branches:
  refs/heads/branch-3.4 1789e0d6f -> 7294f8b1b


ZOOKEEPER-1669: Operations to server will be timed-out while thousands of sessions expired same time

The issue is raised while tens thousands of clients try to reconnect ZooKeeper service. Actually, we came across the issue during maintaining our HBase cluster, which used a 5-server ZooKeeper cluster. The HBase cluster was composed of many many regionservers (in thousand order of magnitude), and connected by tens thousands of clients to do massive reads/writes. Because the r/w throughput is very high, ZooKeeper zxid increased quickly as well. Basically, each two or three weeks, Zookeeper would make leader relection triggered by the zxid roll over. The leader relection will cause the clients(HBase regionservers and HBase clients) disconnected and reconnected with Zookeeper servers in the mean time, and try to renew the sessions.

In current implementation of session renew, NIOServerCnxnFactory will clone all the connections at first in order to avoid race condition in multi-threads and go iterate the cloned connection set one by one to find the related session to renew. It's very time consuming. In our case (described above), it caused many region servers can't successfully renew session before session timeout, and eventually the HBase cluster lose these region servers and affect the HBase stability.

The change is to make refactoring to the close session logic and introduce a ConcurrentHashMap to store session id and connection map relation, which is a thread-safe data structure and eliminate the necessary to clone the connection set at first.

Author: Sun Qi <su...@qiyi.com>

Reviewers: Edward Ribeiro <ed...@gmail.com>, Michael Han <ha...@apache.org>

Closes #312 from CheneySun/branch-3.4


Project: http://git-wip-us.apache.org/repos/asf/zookeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/zookeeper/commit/7294f8b1
Tree: http://git-wip-us.apache.org/repos/asf/zookeeper/tree/7294f8b1
Diff: http://git-wip-us.apache.org/repos/asf/zookeeper/diff/7294f8b1

Branch: refs/heads/branch-3.4
Commit: 7294f8b1b260c76fc6cdd5d3f6e5125c4e9577b3
Parents: 1789e0d
Author: Sun Qi <su...@qiyi.com>
Authored: Thu Aug 3 09:22:54 2017 -0700
Committer: Michael Han <ha...@apache.org>
Committed: Thu Aug 3 09:22:54 2017 -0700

----------------------------------------------------------------------
 .../apache/zookeeper/server/NIOServerCnxn.java  | 49 +++++++++-----------
 .../zookeeper/server/NIOServerCnxnFactory.java  | 42 +++++++++++------
 .../zookeeper/server/NettyServerCnxn.java       | 32 +++++--------
 .../server/NettyServerCnxnFactory.java          | 41 +++++++++++-----
 .../org/apache/zookeeper/server/ServerCnxn.java |  6 +++
 .../zookeeper/server/ServerCnxnFactory.java     |  9 ++++
 6 files changed, 108 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zookeeper/blob/7294f8b1/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java b/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java
index 456d4c2..52d0626 100644
--- a/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java
+++ b/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java
@@ -222,6 +222,15 @@ public class NIOServerCnxn extends ServerCnxn {
         return sock.isOpen();
     }
 
+    @Override
+    public InetAddress getSocketAddress() {
+        if (sock == null) {
+            return null;
+        }
+
+        return sock.socket().getInetAddress();
+    }
+
     /**
      * Handles read/write IO on connection.
      */
@@ -453,7 +462,7 @@ public class NIOServerCnxn extends ServerCnxn {
             }
         }
     }
-    
+
     /**
      * This class wraps the sendBuffer method of NIOServerCnxn. It is
      * responsible for chunking up the response to a client. Rather
@@ -1000,34 +1009,21 @@ public class NIOServerCnxn extends ServerCnxn {
      */
     @Override
     public void close() {
-        synchronized(factory.cnxns){
-            // if this is not in cnxns then it's already closed
-            if (!factory.cnxns.remove(this)) {
-                return;
-            }
+        factory.removeCnxn(this);
 
-            synchronized (factory.ipMap) {
-                Set<NIOServerCnxn> s =
-                    factory.ipMap.get(sock.socket().getInetAddress());
-                s.remove(this);
-            }
+        if (zkServer != null) {
+            zkServer.removeCnxn(this);
+        }
 
-            factory.unregisterConnection(this);
+        closeSock();
 
-            if (zkServer != null) {
-                zkServer.removeCnxn(this);
-            }
-    
-            closeSock();
-    
-            if (sk != null) {
-                try {
-                    // need to cancel this selection key from the selector
-                    sk.cancel();
-                } catch (Exception e) {
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug("ignoring exception during selectionkey cancel", e);
-                    }
+        if (sk != null) {
+            try {
+                // need to cancel this selection key from the selector
+                sk.cancel();
+            } catch (Exception e) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("ignoring exception during selectionkey cancel", e);
                 }
             }
         }
@@ -1168,6 +1164,7 @@ public class NIOServerCnxn extends ServerCnxn {
     @Override
     public void setSessionId(long sessionId) {
         this.sessionId = sessionId;
+        this.factory.addSession(sessionId, this);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/7294f8b1/src/java/main/org/apache/zookeeper/server/NIOServerCnxnFactory.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/NIOServerCnxnFactory.java b/src/java/main/org/apache/zookeeper/server/NIOServerCnxnFactory.java
index bd86cdd..d7581a4 100644
--- a/src/java/main/org/apache/zookeeper/server/NIOServerCnxnFactory.java
+++ b/src/java/main/org/apache/zookeeper/server/NIOServerCnxnFactory.java
@@ -151,6 +151,29 @@ public class NIOServerCnxnFactory extends ServerCnxnFactory implements Runnable
         }
     }
 
+    public void removeCnxn(NIOServerCnxn cnxn) {
+        synchronized(cnxns) {
+            // Remove the related session from the sessionMap.
+            long sessionId = cnxn.getSessionId();
+            if (sessionId != 0) {
+                sessionMap.remove(sessionId);
+            }
+
+            // if this is not in cnxns then it's already closed
+            if (!cnxns.remove(cnxn)) {
+                return;
+            }
+
+            synchronized (ipMap) {
+                Set<NIOServerCnxn> s =
+                        ipMap.get(cnxn.getSocketAddress());
+                s.remove(cnxn);
+            }
+
+            unregisterConnection(cnxn);
+        }
+    }
+
     protected NIOServerCnxn createConnection(SocketChannel sock,
             SelectionKey sk) throws IOException {
         return new NIOServerCnxn(zkServer, sock, sk, this);
@@ -275,19 +298,12 @@ public class NIOServerCnxnFactory extends ServerCnxnFactory implements Runnable
 
     @SuppressWarnings("unchecked")
     private void closeSessionWithoutWakeup(long sessionId) {
-        HashSet<NIOServerCnxn> cnxns;
-        synchronized (this.cnxns) {
-            cnxns = (HashSet<NIOServerCnxn>)this.cnxns.clone();
-        }
-
-        for (NIOServerCnxn cnxn : cnxns) {
-            if (cnxn.getSessionId() == sessionId) {
-                try {
-                    cnxn.close();
-                } catch (Exception e) {
-                    LOG.warn("exception during session close", e);
-                }
-                break;
+        NIOServerCnxn cnxn = (NIOServerCnxn) sessionMap.remove(sessionId);
+        if (cnxn != null) {
+            try {
+                cnxn.close();
+            } catch (Exception e) {
+                LOG.warn("exception during session close", e);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/7294f8b1/src/java/main/org/apache/zookeeper/server/NettyServerCnxn.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/NettyServerCnxn.java b/src/java/main/org/apache/zookeeper/server/NettyServerCnxn.java
index 99058d1..a1ca54b 100644
--- a/src/java/main/org/apache/zookeeper/server/NettyServerCnxn.java
+++ b/src/java/main/org/apache/zookeeper/server/NettyServerCnxn.java
@@ -25,6 +25,7 @@ import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.io.Writer;
+import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.nio.ByteBuffer;
@@ -95,26 +96,7 @@ public class NettyServerCnxn extends ServerCnxn {
         // connection bean leak under certain race conditions.
         factory.unregisterConnection(this);
 
-        synchronized(factory.cnxns){
-            // if this is not in cnxns then it's already closed
-            if (!factory.cnxns.remove(this)) {
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("cnxns size:" + factory.cnxns.size());
-                }
-                return;
-            }
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("close in progress for sessionid:0x"
-                        + Long.toHexString(sessionId));
-            }
-
-            synchronized (factory.ipMap) {
-                Set<NettyServerCnxn> s =
-                    factory.ipMap.get(((InetSocketAddress)channel
-                            .getRemoteAddress()).getAddress());
-                s.remove(this);
-            }
-        }
+        factory.removeCnxn(this);
 
         if (channel.isOpen()) {
             channel.close();
@@ -204,6 +186,7 @@ public class NettyServerCnxn extends ServerCnxn {
     @Override
     public void setSessionId(long sessionId) {
         this.sessionId = sessionId;
+        factory.addSession(sessionId, this);
     }
 
     @Override
@@ -227,6 +210,15 @@ public class NettyServerCnxn extends ServerCnxn {
         packetSent();
     }
 
+    @Override
+    public InetAddress getSocketAddress() {
+        if (channel == null) {
+            return null;
+        }
+
+        return ((InetSocketAddress)(channel.getRemoteAddress())).getAddress();
+    }
+
     /**
      * clean up the socket related to a command and also make sure we flush the
      * data before we do that

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/7294f8b1/src/java/main/org/apache/zookeeper/server/NettyServerCnxnFactory.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/NettyServerCnxnFactory.java b/src/java/main/org/apache/zookeeper/server/NettyServerCnxnFactory.java
index 2d3c93f..a34a398 100644
--- a/src/java/main/org/apache/zookeeper/server/NettyServerCnxnFactory.java
+++ b/src/java/main/org/apache/zookeeper/server/NettyServerCnxnFactory.java
@@ -287,18 +287,13 @@ public class NettyServerCnxnFactory extends ServerCnxnFactory {
         if (LOG.isDebugEnabled()) {
             LOG.debug("closeSession sessionid:0x" + sessionId);
         }
-        NettyServerCnxn[] allCnxns = null;
-        synchronized (cnxns) {
-            allCnxns = cnxns.toArray(new NettyServerCnxn[cnxns.size()]);
-        }
-        for (NettyServerCnxn cnxn : allCnxns) {
-            if (cnxn.getSessionId() == sessionId) {
-                try {
-                    cnxn.close();
-                } catch (Exception e) {
-                    LOG.warn("exception during session close", e);
-                }
-                break;
+
+        NettyServerCnxn cnxn = (NettyServerCnxn) sessionMap.remove(sessionId);
+        if (cnxn != null) {
+            try {
+                cnxn.close();
+            } catch (Exception e) {
+                LOG.warn("exception during session close", e);
             }
         }
     }
@@ -402,4 +397,26 @@ public class NettyServerCnxnFactory extends ServerCnxnFactory {
         }
     }
 
+    public void removeCnxn(ServerCnxn cnxn) {
+        synchronized(cnxns){
+            // if this is not in cnxns then it's already closed
+            if (!cnxns.remove(cnxn)) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("cnxns size:" + cnxns.size());
+                }
+                return;
+            }
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("close in progress for sessionid:0x"
+                        + Long.toHexString(cnxn.getSessionId()));
+            }
+
+            synchronized (ipMap) {
+                Set<NettyServerCnxn> s =
+                        ipMap.get(cnxn.getSocketAddress());
+                s.remove(cnxn);
+            }
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/7294f8b1/src/java/main/org/apache/zookeeper/server/ServerCnxn.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/ServerCnxn.java b/src/java/main/org/apache/zookeeper/server/ServerCnxn.java
index ad259e3..f3dc8e2 100644
--- a/src/java/main/org/apache/zookeeper/server/ServerCnxn.java
+++ b/src/java/main/org/apache/zookeeper/server/ServerCnxn.java
@@ -21,6 +21,7 @@ package org.apache.zookeeper.server;
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.io.StringWriter;
+import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -101,6 +102,11 @@ public abstract class ServerCnxn implements Stats, Watcher {
 
     abstract void setSessionTimeout(int sessionTimeout);
 
+    /**
+     * Wrapper method to return the socket address
+     */
+    public abstract InetAddress getSocketAddress();
+
     protected ZooKeeperSaslServer zooKeeperSaslServer = null;
 
     protected static class CloseRequestException extends IOException {

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/7294f8b1/src/java/main/org/apache/zookeeper/server/ServerCnxnFactory.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/ServerCnxnFactory.java b/src/java/main/org/apache/zookeeper/server/ServerCnxnFactory.java
index 2a67ec5..c37d83b 100644
--- a/src/java/main/org/apache/zookeeper/server/ServerCnxnFactory.java
+++ b/src/java/main/org/apache/zookeeper/server/ServerCnxnFactory.java
@@ -24,6 +24,7 @@ import java.nio.ByteBuffer;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 
 import javax.security.auth.login.Configuration;
 import javax.security.auth.login.LoginException;
@@ -48,6 +49,10 @@ public abstract class ServerCnxnFactory {
     
     private static final Logger LOG = LoggerFactory.getLogger(ServerCnxnFactory.class);
 
+    // sessionMap is used to speed up closeSession()
+    protected final ConcurrentMap<Long, ServerCnxn> sessionMap =
+            new ConcurrentHashMap<Long, ServerCnxn>();
+
     /**
      * The buffer will cause the connection to be close when we do a send.
      */
@@ -158,6 +163,10 @@ public abstract class ServerCnxnFactory {
 
     }
 
+    public void addSession(long sessionId, ServerCnxn cnxn) {
+        sessionMap.put(sessionId, cnxn);
+    }
+
     /**
      * Initialize the server SASL if specified.
      *