You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ha...@apache.org on 2018/11/17 17:33:50 UTC
zookeeper git commit: ZOOKEEPER-3177: Refactor request throttle logic
in NIO and Netty to keep the same behavior and make the code easier to
maintain
Repository: zookeeper
Updated Branches:
refs/heads/master fe25fed93 -> db074423f
ZOOKEEPER-3177: Refactor request throttle logic in NIO and Netty to keep the same behavior and make the code easier to maintain
Author: Fangmin Lyu <al...@fb.com>
Reviewers: Enrico Olivelli <eo...@gmail.com>, Venkateswarlu Tumati <tu...@gmail.com>, Michael Han <ha...@apache.org>
Closes #673 from lvfangmin/ZOOKEEPER-3177
Project: http://git-wip-us.apache.org/repos/asf/zookeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/zookeeper/commit/db074423
Tree: http://git-wip-us.apache.org/repos/asf/zookeeper/tree/db074423
Diff: http://git-wip-us.apache.org/repos/asf/zookeeper/diff/db074423
Branch: refs/heads/master
Commit: db074423f09026446640242ecfcf26310467b1fa
Parents: fe25fed
Author: Fangmin Lyu <al...@fb.com>
Authored: Sat Nov 17 09:33:40 2018 -0800
Committer: Michael Han <ha...@apache.org>
Committed: Sat Nov 17 09:33:40 2018 -0800
----------------------------------------------------------------------
.../apache/zookeeper/server/DumbWatcher.java | 3 +-
.../apache/zookeeper/server/NIOServerCnxn.java | 47 +++-----------------
.../zookeeper/server/NettyServerCnxn.java | 35 +++------------
.../org/apache/zookeeper/server/ServerCnxn.java | 44 +++++++++++++++---
.../zookeeper/server/ZooKeeperServer.java | 32 +++++++------
.../server/quorum/FollowerZooKeeperServer.java | 10 ++---
.../server/quorum/LeaderZooKeeperServer.java | 10 ++---
.../apache/zookeeper/server/MockServerCnxn.java | 6 ++-
8 files changed, 85 insertions(+), 102 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/db074423/zookeeper-server/src/main/java/org/apache/zookeeper/server/DumbWatcher.java
----------------------------------------------------------------------
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/DumbWatcher.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/DumbWatcher.java
index ff17181..f384d7c 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/DumbWatcher.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/DumbWatcher.java
@@ -41,6 +41,7 @@ public class DumbWatcher extends ServerCnxn {
}
public DumbWatcher(long sessionId) {
+ super(null);
this.sessionId = sessionId;
}
@@ -75,7 +76,7 @@ public class DumbWatcher extends ServerCnxn {
void enableRecv() { }
@Override
- void disableRecv() { }
+ void disableRecv(boolean waitDisableRecv) { }
@Override
protected ServerStats serverStats() { return null; }
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/db074423/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java
----------------------------------------------------------------------
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java
index c344c65..b48eb3d 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java
@@ -77,25 +77,16 @@ public class NIOServerCnxn extends ServerCnxn {
private int sessionTimeout;
- private final ZooKeeperServer zkServer;
-
- /**
- * The number of requests that have been submitted but not yet responded to.
- */
- private final AtomicInteger outstandingRequests = new AtomicInteger(0);
-
/**
* This is the id that uniquely identifies the session of a client. Once
* this session is no longer active, the ephemeral nodes will go away.
*/
private long sessionId;
- private final int outstandingLimit;
-
public NIOServerCnxn(ZooKeeperServer zk, SocketChannel sock,
SelectionKey sk, NIOServerCnxnFactory factory,
SelectorThread selectorThread) throws IOException {
- this.zkServer = zk;
+ super(zk);
this.sock = sock;
this.sk = sk;
this.factory = factory;
@@ -103,11 +94,6 @@ public class NIOServerCnxn extends ServerCnxn {
if (this.factory.login != null) {
this.zooKeeperSaslServer = new ZooKeeperSaslServer(factory.login);
}
- if (zk != null) {
- outstandingLimit = zk.getGlobalOutstandingLimit();
- } else {
- outstandingLimit = 1;
- }
sock.socket().setTcpNoDelay(true);
/* set socket linger to false, so that socket close does not block */
sock.socket().setSoLinger(false, -1);
@@ -380,21 +366,6 @@ public class NIOServerCnxn extends ServerCnxn {
zkServer.processPacket(this, incomingBuffer);
}
- // Only called as callback from zkServer.processPacket()
- protected void incrOutstandingRequests(RequestHeader h) {
- if (h.getXid() >= 0) {
- outstandingRequests.incrementAndGet();
- // check throttling
- int inProcess = zkServer.getInProcess();
- if (inProcess > outstandingLimit) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Throttling recv " + inProcess);
- }
- disableRecv();
- }
- }
- }
-
// returns whether we are interested in writing, which is determined
// by whether we have any pending buffers on the output queue or not
private boolean getWriteInterest() {
@@ -411,7 +382,9 @@ public class NIOServerCnxn extends ServerCnxn {
// Throttle acceptance of new requests. If this entailed a state change,
// register an interest op update request with the selector.
- public void disableRecv() {
+ //
+ // Don't support wait disable receive in NIO, ignore the parameter
+ public void disableRecv(boolean waitDisableRecv) {
if (throttled.compareAndSet(false, true)) {
requestInterestOpsUpdate();
}
@@ -566,10 +539,6 @@ public class NIOServerCnxn extends ServerCnxn {
return zkServer != null && zkServer.isRunning();
}
- public long getOutstandingRequests() {
- return outstandingRequests.get();
- }
-
/*
* (non-Javadoc)
*
@@ -689,13 +658,7 @@ public class NIOServerCnxn extends ServerCnxn {
public void sendResponse(ReplyHeader h, Record r, String tag) {
try {
super.sendResponse(h, r, tag);
- if (h.getXid() > 0) {
- // check throttling
- if (outstandingRequests.decrementAndGet() < 1 ||
- zkServer.getInProcess() < outstandingLimit) {
- enableRecv();
- }
- }
+ decrOutstandingAndCheckThrottle(h);
} catch(Exception e) {
LOG.warn("Unexpected exception. Destruction averted.", e);
}
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/db074423/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java
----------------------------------------------------------------------
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java
index ab3fd09..f0a8f7f 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java
@@ -61,23 +61,16 @@ public class NettyServerCnxn extends ServerCnxn {
ByteBuffer bbLen = ByteBuffer.allocate(4);
long sessionId;
int sessionTimeout;
- AtomicLong outstandingCount = new AtomicLong();
Certificate[] clientChain;
volatile boolean closingChannel;
- /** The ZooKeeperServer for this connection. May be null if the server
- * is not currently serving requests (for example if the server is not
- * an active quorum participant.
- */
- private volatile ZooKeeperServer zkServer;
-
NettyServerCnxnFactory factory;
boolean initialized;
NettyServerCnxn(Channel channel, ZooKeeperServer zks, NettyServerCnxnFactory factory) {
+ super(zks);
this.channel = channel;
this.closingChannel = false;
- this.zkServer = zks;
this.factory = factory;
if (this.factory.login != null) {
this.zooKeeperSaslServer = new ZooKeeperSaslServer(factory.login);
@@ -189,12 +182,7 @@ public class NettyServerCnxn extends ServerCnxn {
return;
}
super.sendResponse(h, r, tag);
- if (h.getXid() > 0) {
- // zks cannot be null otherwise we would not have gotten here!
- if (!zkServer.shouldThrottle(outstandingCount.decrementAndGet())) {
- enableRecv();
- }
- }
+ decrOutstandingAndCheckThrottle(h);
}
@Override
@@ -355,10 +343,6 @@ public class NettyServerCnxn extends ServerCnxn {
}
if (initialized) {
zks.processPacket(this, bb);
-
- if (zks.shouldThrottle(outstandingCount.incrementAndGet())) {
- disableRecvNoWait();
- }
} else {
LOG.debug("got conn req request from "
+ getRemoteSocketAddress());
@@ -420,21 +404,16 @@ public class NettyServerCnxn extends ServerCnxn {
}
@Override
- public void disableRecv() {
- disableRecvNoWait().awaitUninterruptibly();
- }
-
- private ChannelFuture disableRecvNoWait() {
+ public void disableRecv(boolean waitDisableRecv) {
throttled = true;
if (LOG.isDebugEnabled()) {
LOG.debug("Throttling - disabling recv " + this);
}
- return channel.setReadable(false);
- }
+ ChannelFuture cf = channel.setReadable(false);
- @Override
- public long getOutstandingRequests() {
- return outstandingCount.longValue();
+ if (waitDisableRecv) {
+ cf.awaitUninterruptibly();
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/db074423/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerCnxn.java
----------------------------------------------------------------------
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerCnxn.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerCnxn.java
index 0822f19..8e145cb 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerCnxn.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerCnxn.java
@@ -68,8 +68,39 @@ public abstract class ServerCnxn implements Stats, Watcher {
private volatile boolean stale = false;
+ AtomicLong outstandingCount = new AtomicLong();
+
+ /** The ZooKeeperServer for this connection. May be null if the server
+ * is not currently serving requests (for example if the server is not
+ * an active quorum participant.
+ */
+ final ZooKeeperServer zkServer;
+
+ public ServerCnxn(final ZooKeeperServer zkServer) {
+ this.zkServer = zkServer;
+ }
+
abstract int getSessionTimeout();
+ public void incrOutstandingAndCheckThrottle(RequestHeader h) {
+ if (h.getXid() <= 0) {
+ return;
+ }
+ if (zkServer.shouldThrottle(outstandingCount.incrementAndGet())) {
+ disableRecv(false);
+ }
+ }
+
+ // will be called from zkServer.processPacket
+ public void decrOutstandingAndCheckThrottle(ReplyHeader h) {
+ if (h.getXid() <= 0) {
+ return;
+ }
+ if (!zkServer.shouldThrottle(outstandingCount.decrementAndGet())) {
+ enableRecv();
+ }
+ }
+
abstract void close();
public void sendResponse(ReplyHeader h, Record r, String tag) throws IOException {
@@ -119,8 +150,12 @@ public abstract class ServerCnxn implements Stats, Watcher {
abstract void enableRecv();
- abstract void disableRecv();
+ void disableRecv() {
+ disableRecv(true);
+ }
+ abstract void disableRecv(boolean waitDisableRecv);
+
abstract void setSessionTimeout(int sessionTimeout);
protected ZooKeeperSaslServer zooKeeperSaslServer = null;
@@ -207,9 +242,6 @@ public abstract class ServerCnxn implements Stats, Watcher {
return packetsReceived.incrementAndGet();
}
- protected void incrOutstandingRequests(RequestHeader h) {
- }
-
protected long incrPacketsSent() {
return packetsSent.incrementAndGet();
}
@@ -241,7 +273,9 @@ public abstract class ServerCnxn implements Stats, Watcher {
return (Date)established.clone();
}
- public abstract long getOutstandingRequests();
+ public long getOutstandingRequests() {
+ return outstandingCount.longValue();
+ }
public long getPacketsReceived() {
return packetsReceived.longValue();
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/db074423/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
----------------------------------------------------------------------
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
index 02df585..3ab81e7 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
@@ -86,10 +86,16 @@ import org.slf4j.LoggerFactory;
public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
protected static final Logger LOG;
+ public static final String GLOBAL_OUTSTANDING_LIMIT = "zookeeper.globalOutstandingLimit";
+ protected static int globalOutstandingLimit = 1000;
+
static {
LOG = LoggerFactory.getLogger(ZooKeeperServer.class);
Environment.logEnv("Server environment:", LOG);
+
+ globalOutstandingLimit = Integer.getInteger(GLOBAL_OUTSTANDING_LIMIT, 1000);
+ LOG.info("{} = {}", GLOBAL_OUTSTANDING_LIMIT, globalOutstandingLimit);
}
protected ZooKeeperServerBean jmxServerBean;
@@ -858,17 +864,6 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
}
}
- public int getGlobalOutstandingLimit() {
- String sc = System.getProperty("zookeeper.globalOutstandingLimit");
- int limit;
- try {
- limit = Integer.parseInt(sc);
- } catch (Exception e) {
- limit = 1000;
- }
- return limit;
- }
-
public void setServerCnxnFactory(ServerCnxnFactory factory) {
serverCnxnFactory = factory;
}
@@ -1095,7 +1090,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
}
public boolean shouldThrottle(long outStandingCount) {
- if (getGlobalOutstandingLimit() < getInProcess()) {
+ if (globalOutstandingLimit < getInProcess()) {
return outStandingCount > 0;
}
return false;
@@ -1107,6 +1102,18 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
BinaryInputArchive bia = BinaryInputArchive.getArchive(bais);
RequestHeader h = new RequestHeader();
h.deserialize(bia, "header");
+
+ // Need to increase the outstanding request count first, otherwise
+ // there might be a race condition that it enabled recv after
+ // processing request and then disabled when check throttling.
+ //
+ // Be aware that we're actually checking the global outstanding
+ // request before this request.
+ //
+ // It's fine if the IOException thrown before we decrease the count
+ // in cnxn, since it will close the cnxn anyway.
+ cnxn.incrOutstandingAndCheckThrottle(h);
+
// Through the magic of byte buffers, txn will not be
// pointing
// to the start of the txn
@@ -1157,7 +1164,6 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
cnxn.sendResponse(rh,rsp, "response"); // not sure about 3rd arg..what is it?
return;
} else {
- cnxn.incrOutstandingRequests(h);
Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(),
h.getType(), incomingBuffer, cnxn.getAuthInfo());
si.setOwner(ServerCnxn.me);
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/db074423/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java
----------------------------------------------------------------------
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java
index 35ef055..ec529de 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java
@@ -60,6 +60,10 @@ public class FollowerZooKeeperServer extends LearnerZooKeeperServer {
super(logFactory, self.tickTime, self.minSessionTimeout,
self.maxSessionTimeout, zkDb, self);
this.pendingSyncs = new ConcurrentLinkedQueue<Request>();
+
+ int divisor = self.getQuorumSize() > 2 ? self.getQuorumSize() - 1 : 1;
+ globalOutstandingLimit = Integer.getInteger(GLOBAL_OUTSTANDING_LIMIT, 1000) / divisor;
+ LOG.info("Override {} to {}", GLOBAL_OUTSTANDING_LIMIT, globalOutstandingLimit);
}
public Follower getFollower(){
@@ -123,12 +127,6 @@ public class FollowerZooKeeperServer extends LearnerZooKeeperServer {
}
@Override
- public int getGlobalOutstandingLimit() {
- int divisor = self.getQuorumSize() > 2 ? self.getQuorumSize() - 1 : 1;
- return super.getGlobalOutstandingLimit() / divisor;
- }
-
- @Override
public String getState() {
return "follower";
}
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/db074423/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java
----------------------------------------------------------------------
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java
index 4f8c095..c6f60e1 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java
@@ -55,6 +55,10 @@ public class LeaderZooKeeperServer extends QuorumZooKeeperServer {
*/
LeaderZooKeeperServer(FileTxnSnapLog logFactory, QuorumPeer self, ZKDatabase zkDb) throws IOException {
super(logFactory, self.tickTime, self.minSessionTimeout, self.maxSessionTimeout, zkDb, self);
+
+ int divisor = self.getQuorumSize() > 2 ? self.getQuorumSize() - 1 : 1;
+ globalOutstandingLimit = Integer.getInteger(GLOBAL_OUTSTANDING_LIMIT, 1000) / divisor;
+ LOG.info("Override {} to {}", GLOBAL_OUTSTANDING_LIMIT, globalOutstandingLimit);
}
public Leader getLeader(){
@@ -103,12 +107,6 @@ public class LeaderZooKeeperServer extends QuorumZooKeeperServer {
}
@Override
- public int getGlobalOutstandingLimit() {
- int divisor = self.getQuorumSize() > 2 ? self.getQuorumSize() - 1 : 1;
- return super.getGlobalOutstandingLimit() / divisor;
- }
-
- @Override
public void createSessionTracker() {
sessionTracker = new LeaderSessionTracker(
this, getZKDatabase().getSessionWithTimeOuts(),
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/db074423/zookeeper-server/src/test/java/org/apache/zookeeper/server/MockServerCnxn.java
----------------------------------------------------------------------
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/MockServerCnxn.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/MockServerCnxn.java
index 7ae0004..20cf36d 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/MockServerCnxn.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/MockServerCnxn.java
@@ -29,6 +29,10 @@ public class MockServerCnxn extends ServerCnxn {
public Certificate[] clientChain;
public boolean secure;
+ public MockServerCnxn() {
+ super(null);
+ }
+
@Override
int getSessionTimeout() {
return 0;
@@ -84,7 +88,7 @@ public class MockServerCnxn extends ServerCnxn {
}
@Override
- void disableRecv() {
+ void disableRecv(boolean waitDisableRecv) {
}
@Override