You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ha...@apache.org on 2018/11/17 17:33:50 UTC

zookeeper git commit: ZOOKEEPER-3177: Refactor request throttle logic in NIO and Netty to keep the same behavior and make the code easier to maintain

Repository: zookeeper
Updated Branches:
  refs/heads/master fe25fed93 -> db074423f


ZOOKEEPER-3177: Refactor request throttle logic in NIO and Netty to keep the same behavior and make the code easier to maintain

Author: Fangmin Lyu <al...@fb.com>

Reviewers: Enrico Olivelli <eo...@gmail.com>, Venkateswarlu Tumati <tu...@gmail.com>, Michael Han <ha...@apache.org>

Closes #673 from lvfangmin/ZOOKEEPER-3177


Project: http://git-wip-us.apache.org/repos/asf/zookeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/zookeeper/commit/db074423
Tree: http://git-wip-us.apache.org/repos/asf/zookeeper/tree/db074423
Diff: http://git-wip-us.apache.org/repos/asf/zookeeper/diff/db074423

Branch: refs/heads/master
Commit: db074423f09026446640242ecfcf26310467b1fa
Parents: fe25fed
Author: Fangmin Lyu <al...@fb.com>
Authored: Sat Nov 17 09:33:40 2018 -0800
Committer: Michael Han <ha...@apache.org>
Committed: Sat Nov 17 09:33:40 2018 -0800

----------------------------------------------------------------------
 .../apache/zookeeper/server/DumbWatcher.java    |  3 +-
 .../apache/zookeeper/server/NIOServerCnxn.java  | 47 +++-----------------
 .../zookeeper/server/NettyServerCnxn.java       | 35 +++------------
 .../org/apache/zookeeper/server/ServerCnxn.java | 44 +++++++++++++++---
 .../zookeeper/server/ZooKeeperServer.java       | 32 +++++++------
 .../server/quorum/FollowerZooKeeperServer.java  | 10 ++---
 .../server/quorum/LeaderZooKeeperServer.java    | 10 ++---
 .../apache/zookeeper/server/MockServerCnxn.java |  6 ++-
 8 files changed, 85 insertions(+), 102 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zookeeper/blob/db074423/zookeeper-server/src/main/java/org/apache/zookeeper/server/DumbWatcher.java
----------------------------------------------------------------------
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/DumbWatcher.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/DumbWatcher.java
index ff17181..f384d7c 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/DumbWatcher.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/DumbWatcher.java
@@ -41,6 +41,7 @@ public class DumbWatcher extends ServerCnxn {
     }
 
     public DumbWatcher(long sessionId) {
+        super(null);
         this.sessionId = sessionId;
     }
 
@@ -75,7 +76,7 @@ public class DumbWatcher extends ServerCnxn {
     void enableRecv() { }
 
     @Override
-    void disableRecv() { }
+    void disableRecv(boolean waitDisableRecv) { }
 
     @Override
     protected ServerStats serverStats() { return null; }

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/db074423/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java
----------------------------------------------------------------------
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java
index c344c65..b48eb3d 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java
@@ -77,25 +77,16 @@ public class NIOServerCnxn extends ServerCnxn {
 
     private int sessionTimeout;
 
-    private final ZooKeeperServer zkServer;
-
-    /**
-     * The number of requests that have been submitted but not yet responded to.
-     */
-    private final AtomicInteger outstandingRequests = new AtomicInteger(0);
-
     /**
      * This is the id that uniquely identifies the session of a client. Once
      * this session is no longer active, the ephemeral nodes will go away.
      */
     private long sessionId;
 
-    private final int outstandingLimit;
-
     public NIOServerCnxn(ZooKeeperServer zk, SocketChannel sock,
                          SelectionKey sk, NIOServerCnxnFactory factory,
                          SelectorThread selectorThread) throws IOException {
-        this.zkServer = zk;
+        super(zk);
         this.sock = sock;
         this.sk = sk;
         this.factory = factory;
@@ -103,11 +94,6 @@ public class NIOServerCnxn extends ServerCnxn {
         if (this.factory.login != null) {
             this.zooKeeperSaslServer = new ZooKeeperSaslServer(factory.login);
         }
-        if (zk != null) {
-            outstandingLimit = zk.getGlobalOutstandingLimit();
-        } else {
-            outstandingLimit = 1;
-        }
         sock.socket().setTcpNoDelay(true);
         /* set socket linger to false, so that socket close does not block */
         sock.socket().setSoLinger(false, -1);
@@ -380,21 +366,6 @@ public class NIOServerCnxn extends ServerCnxn {
         zkServer.processPacket(this, incomingBuffer);
     }
 
-    // Only called as callback from zkServer.processPacket()
-    protected void incrOutstandingRequests(RequestHeader h) {
-        if (h.getXid() >= 0) {
-            outstandingRequests.incrementAndGet();
-            // check throttling
-            int inProcess = zkServer.getInProcess();
-            if (inProcess > outstandingLimit) {
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Throttling recv " + inProcess);
-                }
-                disableRecv();
-            }
-        }
-    }
-
     // returns whether we are interested in writing, which is determined
     // by whether we have any pending buffers on the output queue or not
     private boolean getWriteInterest() {
@@ -411,7 +382,9 @@ public class NIOServerCnxn extends ServerCnxn {
 
     // Throttle acceptance of new requests. If this entailed a state change,
     // register an interest op update request with the selector.
-    public void disableRecv() {
+    //
+    // Don't support wait disable receive in NIO, ignore the parameter
+    public void disableRecv(boolean waitDisableRecv) {
         if (throttled.compareAndSet(false, true)) {
             requestInterestOpsUpdate();
         }
@@ -566,10 +539,6 @@ public class NIOServerCnxn extends ServerCnxn {
         return zkServer != null && zkServer.isRunning();
     }
 
-    public long getOutstandingRequests() {
-        return outstandingRequests.get();
-    }
-
     /*
      * (non-Javadoc)
      *
@@ -689,13 +658,7 @@ public class NIOServerCnxn extends ServerCnxn {
     public void sendResponse(ReplyHeader h, Record r, String tag) {
         try {
             super.sendResponse(h, r, tag);
-            if (h.getXid() > 0) {
-                // check throttling
-                if (outstandingRequests.decrementAndGet() < 1 ||
-                    zkServer.getInProcess() < outstandingLimit) {
-                    enableRecv();
-                }
-            }
+            decrOutstandingAndCheckThrottle(h);
          } catch(Exception e) {
             LOG.warn("Unexpected exception. Destruction averted.", e);
          }

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/db074423/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java
----------------------------------------------------------------------
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java
index ab3fd09..f0a8f7f 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java
@@ -61,23 +61,16 @@ public class NettyServerCnxn extends ServerCnxn {
     ByteBuffer bbLen = ByteBuffer.allocate(4);
     long sessionId;
     int sessionTimeout;
-    AtomicLong outstandingCount = new AtomicLong();
     Certificate[] clientChain;
     volatile boolean closingChannel;
 
-    /** The ZooKeeperServer for this connection. May be null if the server
-     * is not currently serving requests (for example if the server is not
-     * an active quorum participant.
-     */
-    private volatile ZooKeeperServer zkServer;
-
     NettyServerCnxnFactory factory;
     boolean initialized;
 
     NettyServerCnxn(Channel channel, ZooKeeperServer zks, NettyServerCnxnFactory factory) {
+        super(zks);
         this.channel = channel;
         this.closingChannel = false;
-        this.zkServer = zks;
         this.factory = factory;
         if (this.factory.login != null) {
             this.zooKeeperSaslServer = new ZooKeeperSaslServer(factory.login);
@@ -189,12 +182,7 @@ public class NettyServerCnxn extends ServerCnxn {
             return;
         }
         super.sendResponse(h, r, tag);
-        if (h.getXid() > 0) {
-            // zks cannot be null otherwise we would not have gotten here!
-            if (!zkServer.shouldThrottle(outstandingCount.decrementAndGet())) {
-                enableRecv();
-            }
-        }
+        decrOutstandingAndCheckThrottle(h);
     }
 
     @Override
@@ -355,10 +343,6 @@ public class NettyServerCnxn extends ServerCnxn {
                         }
                         if (initialized) {
                             zks.processPacket(this, bb);
-
-                            if (zks.shouldThrottle(outstandingCount.incrementAndGet())) {
-                                disableRecvNoWait();
-                            }
                         } else {
                             LOG.debug("got conn req request from "
                                     + getRemoteSocketAddress());
@@ -420,21 +404,16 @@ public class NettyServerCnxn extends ServerCnxn {
     }
 
     @Override
-    public void disableRecv() {
-        disableRecvNoWait().awaitUninterruptibly();
-    }
-
-    private ChannelFuture disableRecvNoWait() {
+    public void disableRecv(boolean waitDisableRecv) {
         throttled = true;
         if (LOG.isDebugEnabled()) {
             LOG.debug("Throttling - disabling recv " + this);
         }
-        return channel.setReadable(false);
-    }
+        ChannelFuture cf = channel.setReadable(false);
 
-    @Override
-    public long getOutstandingRequests() {
-        return outstandingCount.longValue();
+        if (waitDisableRecv) {
+            cf.awaitUninterruptibly();
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/db074423/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerCnxn.java
----------------------------------------------------------------------
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerCnxn.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerCnxn.java
index 0822f19..8e145cb 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerCnxn.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerCnxn.java
@@ -68,8 +68,39 @@ public abstract class ServerCnxn implements Stats, Watcher {
 
     private volatile boolean stale = false;
 
+    AtomicLong outstandingCount = new AtomicLong();
+
+    /** The ZooKeeperServer for this connection. May be null if the server
+     * is not currently serving requests (for example if the server is not
+     * an active quorum participant.
+     */
+    final ZooKeeperServer zkServer;
+
+    public ServerCnxn(final ZooKeeperServer zkServer) {
+        this.zkServer = zkServer;
+    }
+
     abstract int getSessionTimeout();
 
+    public void incrOutstandingAndCheckThrottle(RequestHeader h) {
+        if (h.getXid() <= 0) {
+            return;
+        }
+        if (zkServer.shouldThrottle(outstandingCount.incrementAndGet())) {
+            disableRecv(false);
+        }
+    }
+
+    // will be called from zkServer.processPacket
+    public void decrOutstandingAndCheckThrottle(ReplyHeader h) {
+        if (h.getXid() <= 0) {
+            return;
+        }
+        if (!zkServer.shouldThrottle(outstandingCount.decrementAndGet())) {
+            enableRecv();
+        }
+    }
+
     abstract void close();
 
     public void sendResponse(ReplyHeader h, Record r, String tag) throws IOException {
@@ -119,8 +150,12 @@ public abstract class ServerCnxn implements Stats, Watcher {
 
     abstract void enableRecv();
 
-    abstract void disableRecv();
+    void disableRecv() {
+        disableRecv(true);
+    }
 
+    abstract void disableRecv(boolean waitDisableRecv);
+    
     abstract void setSessionTimeout(int sessionTimeout);
 
     protected ZooKeeperSaslServer zooKeeperSaslServer = null;
@@ -207,9 +242,6 @@ public abstract class ServerCnxn implements Stats, Watcher {
         return packetsReceived.incrementAndGet();
     }
 
-    protected void incrOutstandingRequests(RequestHeader h) {
-    }
-
     protected long incrPacketsSent() {
         return packetsSent.incrementAndGet();
     }
@@ -241,7 +273,9 @@ public abstract class ServerCnxn implements Stats, Watcher {
         return (Date)established.clone();
     }
 
-    public abstract long getOutstandingRequests();
+    public long getOutstandingRequests() {
+        return outstandingCount.longValue();
+    }
 
     public long getPacketsReceived() {
         return packetsReceived.longValue();

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/db074423/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
----------------------------------------------------------------------
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
index 02df585..3ab81e7 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
@@ -86,10 +86,16 @@ import org.slf4j.LoggerFactory;
 public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
     protected static final Logger LOG;
 
+    public static final String GLOBAL_OUTSTANDING_LIMIT = "zookeeper.globalOutstandingLimit";
+    protected static int globalOutstandingLimit = 1000;
+
     static {
         LOG = LoggerFactory.getLogger(ZooKeeperServer.class);
 
         Environment.logEnv("Server environment:", LOG);
+
+        globalOutstandingLimit = Integer.getInteger(GLOBAL_OUTSTANDING_LIMIT, 1000);
+        LOG.info("{} = {}", GLOBAL_OUTSTANDING_LIMIT, globalOutstandingLimit);
     }
 
     protected ZooKeeperServerBean jmxServerBean;
@@ -858,17 +864,6 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
         }
     }
 
-    public int getGlobalOutstandingLimit() {
-        String sc = System.getProperty("zookeeper.globalOutstandingLimit");
-        int limit;
-        try {
-            limit = Integer.parseInt(sc);
-        } catch (Exception e) {
-            limit = 1000;
-        }
-        return limit;
-    }
-
     public void setServerCnxnFactory(ServerCnxnFactory factory) {
         serverCnxnFactory = factory;
     }
@@ -1095,7 +1090,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
     }
 
     public boolean shouldThrottle(long outStandingCount) {
-        if (getGlobalOutstandingLimit() < getInProcess()) {
+        if (globalOutstandingLimit < getInProcess()) {
             return outStandingCount > 0;
         }
         return false;
@@ -1107,6 +1102,18 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
         BinaryInputArchive bia = BinaryInputArchive.getArchive(bais);
         RequestHeader h = new RequestHeader();
         h.deserialize(bia, "header");
+
+        // Need to increase the outstanding request count first, otherwise
+        // there might be a race condition that it enabled recv after
+        // processing request and then disabled when check throttling.
+        //
+        // Be aware that we're actually checking the global outstanding 
+        // request before this request.
+        //
+        // It's fine if the IOException thrown before we decrease the count
+        // in cnxn, since it will close the cnxn anyway.
+        cnxn.incrOutstandingAndCheckThrottle(h);
+
         // Through the magic of byte buffers, txn will not be
         // pointing
         // to the start of the txn
@@ -1157,7 +1164,6 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
             cnxn.sendResponse(rh,rsp, "response"); // not sure about 3rd arg..what is it?
             return;
         } else {
-            cnxn.incrOutstandingRequests(h);
             Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(),
               h.getType(), incomingBuffer, cnxn.getAuthInfo());
             si.setOwner(ServerCnxn.me);

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/db074423/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java
----------------------------------------------------------------------
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java
index 35ef055..ec529de 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java
@@ -60,6 +60,10 @@ public class FollowerZooKeeperServer extends LearnerZooKeeperServer {
         super(logFactory, self.tickTime, self.minSessionTimeout,
                 self.maxSessionTimeout, zkDb, self);
         this.pendingSyncs = new ConcurrentLinkedQueue<Request>();
+
+        int divisor = self.getQuorumSize() > 2 ? self.getQuorumSize() - 1 : 1;
+        globalOutstandingLimit = Integer.getInteger(GLOBAL_OUTSTANDING_LIMIT, 1000) / divisor;
+        LOG.info("Override {} to {}", GLOBAL_OUTSTANDING_LIMIT, globalOutstandingLimit);
     }
 
     public Follower getFollower(){
@@ -123,12 +127,6 @@ public class FollowerZooKeeperServer extends LearnerZooKeeperServer {
     }
 
     @Override
-    public int getGlobalOutstandingLimit() {
-        int divisor = self.getQuorumSize() > 2 ? self.getQuorumSize() - 1 : 1;
-        return super.getGlobalOutstandingLimit() / divisor;
-    }
-
-    @Override
     public String getState() {
         return "follower";
     }

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/db074423/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java
----------------------------------------------------------------------
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java
index 4f8c095..c6f60e1 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java
@@ -55,6 +55,10 @@ public class LeaderZooKeeperServer extends QuorumZooKeeperServer {
      */
     LeaderZooKeeperServer(FileTxnSnapLog logFactory, QuorumPeer self, ZKDatabase zkDb) throws IOException {
         super(logFactory, self.tickTime, self.minSessionTimeout, self.maxSessionTimeout, zkDb, self);
+
+        int divisor = self.getQuorumSize() > 2 ? self.getQuorumSize() - 1 : 1;
+        globalOutstandingLimit = Integer.getInteger(GLOBAL_OUTSTANDING_LIMIT, 1000) / divisor;
+        LOG.info("Override {} to {}", GLOBAL_OUTSTANDING_LIMIT, globalOutstandingLimit);
     }
 
     public Leader getLeader(){
@@ -103,12 +107,6 @@ public class LeaderZooKeeperServer extends QuorumZooKeeperServer {
     }
 
     @Override
-    public int getGlobalOutstandingLimit() {
-        int divisor = self.getQuorumSize() > 2 ? self.getQuorumSize() - 1 : 1;
-        return super.getGlobalOutstandingLimit() / divisor;
-    }
-
-    @Override
     public void createSessionTracker() {
         sessionTracker = new LeaderSessionTracker(
                 this, getZKDatabase().getSessionWithTimeOuts(),

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/db074423/zookeeper-server/src/test/java/org/apache/zookeeper/server/MockServerCnxn.java
----------------------------------------------------------------------
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/MockServerCnxn.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/MockServerCnxn.java
index 7ae0004..20cf36d 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/MockServerCnxn.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/MockServerCnxn.java
@@ -29,6 +29,10 @@ public class MockServerCnxn extends ServerCnxn {
     public Certificate[] clientChain;
     public boolean secure;
 
+    public MockServerCnxn() {
+        super(null);
+    }
+
     @Override
     int getSessionTimeout() {
         return 0;
@@ -84,7 +88,7 @@ public class MockServerCnxn extends ServerCnxn {
     }
 
     @Override
-    void disableRecv() {
+    void disableRecv(boolean waitDisableRecv) {
     }
 
     @Override