You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ha...@apache.org on 2019/07/08 21:26:21 UTC
[zookeeper] branch master updated: ZOOKEEPER-3243: Add server-side
request throttling
This is an automated email from the ASF dual-hosted git repository.
hanm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zookeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 7b3de52 ZOOKEEPER-3243: Add server-side request throttling
7b3de52 is described below
commit 7b3de52cdb15068aa343879ae283f4e456c68f39
Author: Jie Huang <ji...@fb.com>
AuthorDate: Mon Jul 8 14:26:11 2019 -0700
ZOOKEEPER-3243: Add server-side request throttling
Author: Jie Huang <ji...@fb.com>
Author: Joseph Blomstedt <jd...@fb.com>
Reviewers: Michael Han <ha...@apache.org>
Closes #986 from jhuan31/ZOOKEEPER-3243
---
.../src/main/resources/markdown/zookeeperAdmin.md | 60 +++--
.../zookeeper/server/FinalRequestProcessor.java | 5 +
.../org/apache/zookeeper/server/NIOServerCnxn.java | 33 ++-
.../java/org/apache/zookeeper/server/Request.java | 69 ++++++
.../apache/zookeeper/server/RequestThrottler.java | 267 +++++++++++++++++++++
.../org/apache/zookeeper/server/ServerCnxn.java | 31 ++-
.../org/apache/zookeeper/server/ServerMetrics.java | 10 +
.../apache/zookeeper/server/ZooKeeperServer.java | 63 ++++-
.../zookeeper/server/ZooKeeperServerBean.java | 47 ++++
.../zookeeper/server/ZooKeeperServerMXBean.java | 15 ++
.../zookeeper/server/RequestThrottlerTest.java | 242 +++++++++++++++++++
.../zookeeper/server/SessionTrackerTest.java | 4 +
12 files changed, 814 insertions(+), 32 deletions(-)
diff --git a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
index 27654b0..b0b07dc 100644
--- a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
+++ b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
@@ -229,7 +229,7 @@ ensemble:
7. If your configuration file is set up, you can start a
ZooKeeper server:
- $ java -cp zookeeper.jar:lib/*:conf org.apache.zookeeper.server.quorum.QuorumPeerMain zoo.conf
+ $ java -cp zookeeper.jar:lib/*:conf org.apache.zookeeper.server.quorum.QuorumPeerMain zoo.conf
QuorumPeerMain starts a ZooKeeper server,
[JMX](http://java.sun.com/javase/technologies/core/mntr-mgmt/javamanagement/)
@@ -833,7 +833,7 @@ property, when available, is noted below.
* *serverCnxnFactory* :
(Java system property: **zookeeper.serverCnxnFactory**)
- Specifies ServerCnxnFactory implementation.
+ Specifies ServerCnxnFactory implementation.
This should be set to `NettyServerCnxnFactory` in order to use TLS based server communication.
Default is `NIOServerCnxnFactory`.
@@ -857,6 +857,36 @@ property, when available, is noted below.
Does not affect the limit defined by *flushDelay*.
Default is 1000.
+* *requestThrottleLimit* :
+ (Java system property: **zookeeper.request_throttle_max_requests**)
+ **New in 3.6.0:**
+ The total number of outstanding requests allowed before the RequestThrottler starts stalling. When set to 0, throttling is disabled. The default is 0.
+
+* *requestThrottleStallTime* :
+ (Java system property: **zookeeper.request_throttle_stall_time**)
+ **New in 3.6.0:**
+ The maximum time (in milliseconds) for which a thread may wait to be notified that it may proceed processing a request. The default is 100.
+
+* *requestThrottleDropStale* :
+ (Java system property: **request_throttle_drop_stale**)
+ **New in 3.6.0:**
+ When enabled, the throttler will drop stale requests rather than issue them to the request pipeline. A stale request is a request sent by a connection that is now closed, and/or a request that will have a request latency higher than the sessionTimeout. The default is true.
+
+* *requestStaleLatencyCheck* :
+ (Java system property: **zookeeper.request_stale_latency_check**)
+ **New in 3.6.0:**
+ When enabled, a request is considered stale if the request latency is higher than its associated session timeout. Disabled by default.
+
+* *requestStaleConnectionCheck* :
+ (Java system property: **zookeeper.request_stale_connection_check**)
+ **New in 3.6.0:**
+ When enabled, a request is considered stale if the request's connection has closed. Enabled by default.
+
+* *zookeeper.request_throttler.shutdownTimeout* :
+ (Java system property only)
+ **New in 3.6.0:**
+ The time (in milliseconds) the RequestThrottler waits for the request queue to drain during shutdown before it shuts down forcefully. The default is 10000.
+
<a name="sc_clusterOptions"></a>
#### Cluster Options
@@ -1108,20 +1138,20 @@ encryption/authentication/authorization performed by the service.
(Java system property: **zookeeper.sslQuorum**)
**New in 3.5.5:**
Enables encrypted quorum communication. Default is `false`.
-
+
* *ssl.keyStore.location and ssl.keyStore.password* and *ssl.quorum.keyStore.location* and *ssl.quorum.keyStore.password* :
(Java system properties: **zookeeper.ssl.keyStore.location** and **zookeeper.ssl.keyStore.password** and **zookeeper.ssl.quorum.keyStore.location** and **zookeeper.ssl.quorum.keyStore.password**)
**New in 3.5.5:**
Specifies the file path to a Java keystore containing the local
credentials to be used for client and quorum TLS connections, and the
password to unlock the file.
-
+
* *ssl.keyStore.type* and *ssl.quorum.keyStore.type* :
(Java system properties: **zookeeper.ssl.keyStore.type** and **zookeeper.ssl.quorum.keyStore.type**)
**New in 3.5.5:**
Specifies the file format of client and quorum keystores. Values: JKS, PEM or null (detect by filename).
Default: null
-
+
* *ssl.trustStore.location* and *ssl.trustStore.password* and *ssl.quorum.trustStore.location* and *ssl.quorum.trustStore.password* :
(Java system properties: **zookeeper.ssl.trustStore.location** and **zookeeper.ssl.trustStore.password** and **zookeeper.ssl.quorum.trustStore.location** and **zookeeper.ssl.quorum.trustStore.password**)
**New in 3.5.5:**
@@ -1146,7 +1176,7 @@ encryption/authentication/authorization performed by the service.
**New in 3.5.5:**
Specifies the enabled protocols in client and quorum TLS negotiation.
Default: value of `protocol` property
-
+
* *ssl.ciphersuites* and *ssl.quorum.ciphersuites* :
(Java system properties: **zookeeper.ssl.ciphersuites** and **zookeeper.ssl.quorum.ciphersuites**)
**New in 3.5.5:**
@@ -1161,7 +1191,7 @@ encryption/authentication/authorization performed by the service.
1. Use hardware keystore, loaded in using PKCS11 or something similar.
2. You don't have access to the software keystore, but can retrieve an already-constructed SSLContext from their container.
Default: null
-
+
* *ssl.hostnameVerification* and *ssl.quorum.hostnameVerification* :
(Java system properties: **zookeeper.ssl.hostnameVerification** and **zookeeper.ssl.quorum.hostnameVerification**)
**New in 3.5.5:**
@@ -1180,12 +1210,12 @@ encryption/authentication/authorization performed by the service.
**New in 3.5.5:**
Specifies whether Online Certificate Status Protocol is enabled in client and quorum TLS protocols.
Default: false
-
+
* *ssl.clientAuth* and *ssl.quorum.clientAuth* :
(Java system properties: **zookeeper.ssl.clientAuth** and **zookeeper.ssl.quorum.clientAuth**)
**New in 3.5.5:**
TBD
-
+
* *ssl.handshakeDetectionTimeoutMillis* and *ssl.quorum.handshakeDetectionTimeoutMillis* :
(Java system properties: **zookeeper.ssl.handshakeDetectionTimeoutMillis** and **zookeeper.ssl.quorum.handshakeDetectionTimeoutMillis**)
**New in 3.5.5:**
@@ -1468,20 +1498,20 @@ and quorum communication protocols.
One keystore should be created for each ZK instance.
-In this example we generate a self-signed certificate and store it
-together with the private key in `keystore.jks`. This is suitable for
-testing purposes, but you probably need an official certificate to sign
+In this example we generate a self-signed certificate and store it
+together with the private key in `keystore.jks`. This is suitable for
+testing purposes, but you probably need an official certificate to sign
your keys in a production environment.
Please note that the alias (`-alias`) and the distinguished name (`-dname`)
-must match the hostname of the machine that is associated with, otherwise
+must match the hostname of the machine that is associated with, otherwise
hostname verification won't work.
```
keytool -genkeypair -alias $(hostname -f) -keyalg RSA -keysize 2048 -dname "cn=$(hostname -f)" -keypass password -keystore keystore.jks -storepass password
```
-2. Extract the signed public key (certificate) from keystore
+2. Extract the signed public key (certificate) from keystore
*This step might only necessary for self-signed certificates.*
@@ -1569,7 +1599,7 @@ and do another rolling restart
```
sslQuorum=true
portUnification=false
-```
+```
<a name="sc_zkCommands"></a>
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java
index e15d189..cd0c0eb 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java
@@ -217,6 +217,11 @@ public class FinalRequestProcessor implements RequestProcessor {
if (LOG.isDebugEnabled()) {
LOG.debug("{}",request);
}
+
+ if (request.isStale()) {
+ ServerMetrics.getMetrics().STALE_REPLIES.add(1);
+ }
+
switch (request.type) {
case OpCode.ping: {
lastOp = "PING";
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java
index fb7ce4d..48aadfc 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java
@@ -150,17 +150,31 @@ public class NIOServerCnxn extends ServerCnxn {
requestInterestOpsUpdate();
}
+ /**
+ * When read on socket failed, this is typically because client closed the
+ * connection. In most cases, the client does this when the server doesn't
+ * respond within 2/3 of session timeout. This possibly indicates server
+ * health/performance issue, so we need to log and keep track of stat
+ *
+ * @throws EndOfStreamException
+ */
+ private void handleFailedRead() throws EndOfStreamException {
+ setStale();
+ ServerMetrics.getMetrics().CONNECTION_DROP_COUNT.add(1);
+ throw new EndOfStreamException(
+ "Unable to read additional data from client,"
+ + " it probably closed the socket:"
+ + " address = " + sock.socket().getRemoteSocketAddress() + ","
+ + " session = 0x" + Long.toHexString(sessionId),
+ DisconnectReason.UNABLE_TO_READ_FROM_CLIENT);
+ }
+
/** Read the request payload (everything following the length prefix) */
private void readPayload() throws IOException, InterruptedException, ClientCnxnLimitException {
if (incomingBuffer.remaining() != 0) { // have we read length bytes?
int rc = sock.read(incomingBuffer); // sock is non-blocking, so ok
if (rc < 0) {
- ServerMetrics.getMetrics().CONNECTION_DROP_COUNT.add(1);
- throw new EndOfStreamException(
- "Unable to read additional data from client sessionid 0x"
- + Long.toHexString(sessionId)
- + ", likely client has closed socket",
- DisconnectReason.UNABLE_TO_READ_FROM_CLIENT);
+ handleFailedRead();
}
}
@@ -318,12 +332,7 @@ public class NIOServerCnxn extends ServerCnxn {
if (k.isReadable()) {
int rc = sock.read(incomingBuffer);
if (rc < 0) {
- ServerMetrics.getMetrics().CONNECTION_DROP_COUNT.add(1);
- throw new EndOfStreamException(
- "Unable to read additional data from client sessionid 0x"
- + Long.toHexString(sessionId)
- + ", likely client has closed socket",
- DisconnectReason.UNABLE_TO_READ_FROM_CLIENT);
+ handleFailedRead();
}
if (incomingBuffer.remaining() == 0) {
boolean isPayload;
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java
index d730a66..e4fdb4d 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java
@@ -39,6 +39,16 @@ import org.apache.zookeeper.txn.TxnHeader;
public class Request {
public final static Request requestOfDeath = new Request(null, 0, 0, 0, null, null);
+ // Considers a request stale if the request's connection has closed. Enabled
+ // by default.
+ private static volatile boolean staleConnectionCheck = Boolean.parseBoolean(
+ System.getProperty("zookeeper.request_stale_connection_check","true"));
+
+ // Considers a request stale if the request latency is higher than its
+ // associated session timeout. Disabled by default.
+ private static volatile boolean staleLatencyCheck = Boolean.parseBoolean(
+ System.getProperty("zookeeper.request_stale_latency_check","false"));
+
public Request(ServerCnxn cnxn, long sessionId, int xid, int type, ByteBuffer bb, List<Id> authInfo) {
this.cnxn = cnxn;
this.sessionId = sessionId;
@@ -133,6 +143,65 @@ public class Request {
this.txn = txn;
}
+ public ServerCnxn getConnection() {
+ return cnxn;
+ }
+
+ public static boolean getStaleLatencyCheck() {
+ return staleLatencyCheck;
+ }
+
+ public static void setStaleLatencyCheck(boolean check) {
+ staleLatencyCheck = check;
+ }
+
+ public static boolean getStaleConnectionCheck() {
+ return staleConnectionCheck;
+ }
+
+ public static void setStaleConnectionCheck(boolean check) {
+ staleConnectionCheck = check;
+ }
+
+ public boolean isStale() {
+ if (cnxn == null) {
+ return false;
+ }
+
+ // closeSession requests should be able to outlive the session in order
+ // to clean-up state.
+ if (type == OpCode.closeSession) {
+ return false;
+ }
+
+ if (staleConnectionCheck) {
+ // If the connection is closed, consider the request stale.
+ if (cnxn.isStale() || cnxn.isInvalid()) {
+ return true;
+ }
+ }
+
+ if (staleLatencyCheck) {
+ // If the request latency is higher than session timeout, consider
+ // the request stale.
+ long currentTime = Time.currentElapsedTime();
+ if ((currentTime - createTime) > cnxn.getSessionTimeout()) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ /**
+ * A prior request was dropped on this request's connection and
+ * therefore this request must also be dropped to ensure correct
+ * ordering semantics.
+ */
+ public boolean mustDrop() {
+ return ((cnxn != null) && cnxn.isInvalid());
+ }
+
/**
* is the packet type a valid packet in zookeeper
*
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/RequestThrottler.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/RequestThrottler.java
new file mode 100644
index 0000000..0e9772b
--- /dev/null
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/RequestThrottler.java
@@ -0,0 +1,267 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server;
+
+import java.util.concurrent.LinkedBlockingQueue;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.zookeeper.common.Time;
+
+/**
+ * When enabled, the RequestThrottler limits the number of outstanding requests
+ * currently submitted to the request processor pipeline. The throttler augments
+ * the limit imposed by the <code>globalOutstandingLimit</code> that is enforced
+ * by the connection layer ({@link NIOServerCnxn}, {@link NettyServerCnxn}).
+ *
+ * The connection layer limit applies backpressure against the TCP connection by
+ * disabling selection on connections once the request limit is reached. However,
+ * the connection layer always allows a connection to send at least one request
+ * before disabling selection on that connection. Thus, in a scenario with 40000
+ * client connections, the total number of requests inflight may be as high as
+ * 40000 even if the <code>globalOustandingLimit</code> was set lower.
+ *
+ * The RequestThrottler addresses this issue by adding additional queueing. When
+ * enabled, client connections no longer submit requests directly to the request
+ * processor pipeline but instead to the RequestThrottler. The RequestThrottler
+ * is then responsible for issuing requests to the request processors, and
+ * enforces a separate <code>maxRequests</code> limit. If the total number of
+ * outstanding requests is higher than <code>maxRequests</code>, the throttler
+ * will continually stall for <code>stallTime</code> milliseconds until
+ * underlimit.
+ *
+ * The RequestThrottler can also optionally drop stale requests rather than
+ * submit them to the processor pipeline. A stale request is a request sent
+ * by a connection that is already closed, and/or a request whose latency
+ * will end up being higher than its associated session timeout. The notion
+ * of staleness is configurable, @see Request for more details.
+ *
+ * To ensure ordering guarantees, if a request is ever dropped from a connection
+ * that connection is closed and flagged as invalid. All subsequent requests
+ * inflight from that connection are then dropped as well.
+ */
+public class RequestThrottler extends ZooKeeperCriticalThread {
+ private static final Logger LOG = LoggerFactory.getLogger(RequestThrottler.class);
+
+ private final LinkedBlockingQueue<Request> submittedRequests =
+ new LinkedBlockingQueue<Request>();
+
+ private final ZooKeeperServer zks;
+ private volatile boolean stopping;
+ private volatile boolean killed;
+
+ private static final String SHUTDOWN_TIMEOUT = "zookeeper.request_throttler.shutdownTimeout";
+ private static int shutdownTimeout = 10000;
+
+ static {
+ shutdownTimeout = Integer.getInteger(SHUTDOWN_TIMEOUT, 10000);
+ LOG.info("{} = {}", SHUTDOWN_TIMEOUT, shutdownTimeout);
+ }
+
+ /**
+ * The total number of outstanding requests allowed before the throttler
+ * starts stalling.
+ *
+ * When maxRequests = 0, throttling is disabled.
+ */
+ private static volatile int maxRequests =
+ Integer.getInteger("zookeeper.request_throttle_max_requests", 0);
+
+ /**
+ * The time (in milliseconds) this is the maximum time for which throttler
+ * thread may wait to be notified that it may proceed processing a request.
+ */
+ private static volatile int stallTime =
+ Integer.getInteger("zookeeper.request_throttle_stall_time", 100);
+
+ /**
+ * When true, the throttler will drop stale requests rather than issue
+ * them to the request pipeline. A stale request is a request sent by
+ * a connection that is now closed, and/or a request that will have a
+ * request latency higher than the sessionTimeout. The staleness of
+ * a request is tunable property, @see Request for details.
+ */
+ private static volatile boolean dropStaleRequests = Boolean.parseBoolean(
+ System.getProperty("zookeeper.request_throttle_drop_stale", "true"));
+
+ public RequestThrottler(ZooKeeperServer zks) {
+ super("RequestThrottler", zks.getZooKeeperServerListener());
+ this.zks = zks;
+ this.stopping = false;
+ this.killed = false;
+ }
+
+ public static int getMaxRequests() {
+ return maxRequests;
+ }
+
+ public static void setMaxRequests(int requests) {
+ maxRequests = requests;
+ }
+
+ public static int getStallTime() {
+ return stallTime;
+ }
+
+ public static void setStallTime(int time) {
+ stallTime = time;
+ }
+
+ public static boolean getDropStaleRequests() {
+ return dropStaleRequests;
+ }
+
+ public static void setDropStaleRequests(boolean drop) {
+ dropStaleRequests = drop;
+ }
+
+ @Override
+ public void run() {
+ try {
+ while (true) {
+ if (killed) {
+ break;
+ }
+
+ Request request = submittedRequests.take();
+ if (Request.requestOfDeath == request) {
+ break;
+ }
+
+ if (request.mustDrop()) {
+ continue;
+ }
+
+ // Throttling is disabled when maxRequests = 0
+ if (maxRequests > 0) {
+ while (!killed) {
+ if (dropStaleRequests && request.isStale()) {
+ // Note: this will close the connection
+ dropRequest(request);
+ ServerMetrics.getMetrics().STALE_REQUESTS_DROPPED.add(1);
+ request = null;
+ break;
+ }
+ if (zks.getInProcess() < maxRequests) {
+ break;
+ }
+ throttleSleep(stallTime);
+ }
+ }
+
+ if (killed) {
+ break;
+ }
+
+ // A dropped stale request will be null
+ if (request != null) {
+ if (request.isStale()) {
+ ServerMetrics.getMetrics().STALE_REQUESTS.add(1);
+ }
+ zks.submitRequestNow(request);
+ }
+ }
+ } catch (InterruptedException e) {
+ LOG.error("Unexpected interruption", e);
+ }
+ int dropped = drainQueue();
+ LOG.info("RequestThrottler shutdown. Dropped {} requests", dropped);
+ }
+
+ private synchronized void throttleSleep(int stallTime) {
+ try {
+ ServerMetrics.getMetrics().REQUEST_THROTTLE_WAIT_COUNT.add(1);
+ this.wait(stallTime);
+ } catch(InterruptedException ie) {
+ return;
+ }
+ }
+
+ @SuppressFBWarnings(value = "NN_NAKED_NOTIFY",
+ justification = "state change is in ZooKeeperServer.decInProgress() ")
+ public synchronized void throttleWake() {
+ this.notify();
+ }
+
+ private int drainQueue() {
+ // If the throttler shutdown gracefully, the queue will be empty.
+ // However, if the shutdown time limit was reached and the throttler
+ // was killed, we have no other option than to drop all remaining
+ // requests on the floor.
+ int dropped = 0;
+ Request request;
+ LOG.info("Draining request throttler queue");
+ while ((request = submittedRequests.poll()) != null) {
+ dropped += 1;
+ dropRequest(request);
+ }
+ return dropped;
+ }
+
+ private void dropRequest(Request request) {
+ // Since we're dropping a request on the floor, we must mark the
+ // connection as invalid to ensure any future requests from this
+ // connection are also dropped in order to ensure ordering
+ // semantics.
+ ServerCnxn conn = request.getConnection();
+ if (conn != null) {
+ // Note: this will close the connection
+ conn.setInvalid();
+ }
+ }
+
+ public void submitRequest(Request request) {
+ if (stopping) {
+ LOG.debug("Shutdown in progress. Request cannot be processed");
+ dropRequest(request);
+ } else {
+ submittedRequests.add(request);
+ }
+ }
+
+ public int getInflight() {
+ return submittedRequests.size();
+ }
+
+ @SuppressFBWarnings("DM_EXIT")
+ public void shutdown() {
+ // Try to shutdown gracefully
+ LOG.info("Shutting down");
+ stopping = true;
+ submittedRequests.add(Request.requestOfDeath);
+ try {
+ this.join(shutdownTimeout);
+ } catch (InterruptedException e) {
+ LOG.warn("Interrupted while waiting for {} to finish", this);
+ }
+
+ // Forcibly shutdown if necessary in order to ensure request
+ // queue is drained.
+ killed = true;
+ try {
+ this.join();
+ } catch (InterruptedException e) {
+ LOG.warn("Interrupted while waiting for {} to finish", this);
+ //TODO apply ZOOKEEPER-575 and remove this line.
+ System.exit(ExitCode.UNEXPECTED_ERROR.getValue());
+ }
+ }
+}
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerCnxn.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerCnxn.java
index 6953e3a..42d28e2 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerCnxn.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerCnxn.java
@@ -68,8 +68,6 @@ public abstract class ServerCnxn implements Stats, Watcher {
*/
boolean isOldClient = true;
- private volatile boolean stale = false;
-
AtomicLong outstandingCount = new AtomicLong();
/** The ZooKeeperServer for this connection. May be null if the server
@@ -123,6 +121,22 @@ public abstract class ServerCnxn implements Stats, Watcher {
this.zkServer = zkServer;
}
+ /**
+ * Flag that indicates that this connection is known to be closed/closing
+ * and from which we can optionally ignore outstanding requests as part
+ * of request throttling. This flag may be false when a connection is
+ * actually closed (false negative), but should never be true with
+ * a connection is still alive (false positive).
+ */
+ private volatile boolean stale = false;
+
+ /**
+ * Flag that indicates that a request for this connection was previously
+ * dropped as part of request throttling and therefore all future requests
+ * must also be dropped to ensure ordering guarantees.
+ */
+ private volatile boolean invalid = false;
+
abstract int getSessionTimeout();
public void incrOutstandingAndCheckThrottle(RequestHeader h) {
@@ -276,6 +290,19 @@ public abstract class ServerCnxn implements Stats, Watcher {
stale = true;
}
+ public boolean isInvalid() {
+ return invalid;
+ }
+
+ public void setInvalid() {
+ if (!invalid) {
+ if (!stale) {
+ sendCloseSession();
+ }
+ invalid = true;
+ }
+ }
+
protected void packetReceived(long bytes) {
incrPacketsReceived();
ServerStats serverStats = serverStats();
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java
index 366770b..ca05f94 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java
@@ -220,6 +220,11 @@ public final class ServerMetrics {
ACK_LATENCY = metricsContext.getSummarySet("ack_latency", DetailLevel.ADVANCED);
PROPOSAL_COUNT = metricsContext.getCounter("proposal_count");
QUIT_LEADING_DUE_TO_DISLOYAL_VOTER = metricsContext.getCounter("quit_leading_due_to_disloyal_voter");
+
+ STALE_REQUESTS = metricsContext.getCounter("stale_requests");
+ STALE_REQUESTS_DROPPED = metricsContext.getCounter("stale_requests_dropped");
+ STALE_REPLIES = metricsContext.getCounter("stale_replies");
+ REQUEST_THROTTLE_WAIT_COUNT = metricsContext.getCounter("request_throttle_wait_count");
}
/**
@@ -414,6 +419,11 @@ public final class ServerMetrics {
*/
public final Counter ENSEMBLE_AUTH_SKIP;
+ public final Counter STALE_REQUESTS;
+ public final Counter STALE_REQUESTS_DROPPED;
+ public final Counter STALE_REPLIES;
+ public final Counter REQUEST_THROTTLE_WAIT_COUNT;
+
private final MetricsProvider metricsProvider;
public void resetAll() {
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
index f5c770b..4258916 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
@@ -40,6 +40,7 @@ import java.util.function.BiConsumer;
import javax.security.sasl.SaslException;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import org.apache.jute.BinaryInputArchive;
import org.apache.jute.BinaryOutputArchive;
import org.apache.jute.Record;
@@ -79,7 +80,6 @@ import org.apache.zookeeper.txn.TxnHeader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
/**
* This class implements a simple standalone ZooKeeperServer. It sets up the
* following chain of RequestProcessors to process requests:
@@ -187,6 +187,11 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
// Connection throttling
private BlueThrottle connThrottle;
+ @SuppressFBWarnings(value = "IS2_INCONSISTENT_SYNC",
+ justification = "Internally the throttler has a BlockingQueue so " +
+ "once the throttler is created and started, it is thread-safe")
+ private RequestThrottler requestThrottler;
+
void removeCnxn(ServerCnxn cnxn) {
zkDb.removeCnxn(cnxn);
}
@@ -250,7 +255,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
ZKDatabase zkDb, String initialConfig) {
this(txnLogFactory, tickTime, minSessionTimeout, maxSessionTimeout, clientPortListenBacklog, zkDb, initialConfig);
this.jvmPauseMonitor = jvmPauseMonitor;
- if(jvmPauseMonitor != null) {
+ if (jvmPauseMonitor != null) {
LOG.info("Added JvmPauseMonitor to server");
}
}
@@ -558,6 +563,8 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
startSessionTracker();
setupRequestProcessors();
+ startRequestThrottler();
+
registerJMX();
startJvmPauseMonitor();
@@ -574,6 +581,12 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
}
}
+ protected void startRequestThrottler() {
+ requestThrottler = new RequestThrottler(this);
+ requestThrottler.start();
+
+ }
+
protected void setupRequestProcessors() {
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
RequestProcessor syncProcessor = new SyncRequestProcessor(this,
@@ -674,6 +687,10 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
// subclasses will do their specific clean up
unregisterMetrics();
+ if (requestThrottler != null) {
+ requestThrottler.shutdown();
+ }
+
// Since sessionTracker and syncThreads poll we just have to
// set running to false and they will detect it during the poll
// interval.
@@ -735,12 +752,26 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
public void decInProcess() {
requestsInProcess.decrementAndGet();
+ if (requestThrottler != null) {
+ requestThrottler.throttleWake();
+ }
}
public int getInProcess() {
return requestsInProcess.get();
}
+ public int getInflight() {
+ return requestThrottleInflight();
+ }
+
+ private int requestThrottleInflight() {
+ if (requestThrottler != null) {
+ return requestThrottler.getInflight();
+ }
+ return 0;
+ }
+
/**
* This structure is used to facilitate information sharing between PrepRP
* and FinalRP.
@@ -910,6 +941,32 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
}
public void submitRequest(Request si) {
+ enqueueRequest(si);
+ }
+
+ public void enqueueRequest(Request si) {
+ if (requestThrottler == null) {
+ synchronized (this) {
+ try {
+ // Since all requests are passed to the request
+ // processor it should wait for setting up the request
+ // processor chain. The state will be updated to RUNNING
+ // after the setup.
+ while (state == State.INITIAL) {
+ wait(1000);
+ }
+ } catch (InterruptedException e) {
+ LOG.warn("Unexpected interruption", e);
+ }
+ if (requestThrottler == null) {
+ throw new RuntimeException("Not started");
+ }
+ }
+ }
+ requestThrottler.submitRequest(si);
+ }
+
+ public void submitRequestNow(Request si) {
if (firstProcessor == null) {
synchronized (this) {
try {
@@ -1224,7 +1281,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
}
public boolean shouldThrottle(long outStandingCount) {
- if (getGlobalOutstandingLimit() < getInProcess()) {
+ if (getGlobalOutstandingLimit() < getInflight()) {
return outStandingCount > 0;
}
return false;
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerBean.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerBean.java
index 39f542f..92ceab8 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerBean.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerBean.java
@@ -291,6 +291,37 @@ public class ZooKeeperServerBean implements ZooKeeperServerMXBean, ZKMBeanInfo {
ZooKeeperServer.setFlushDelay(delay);
}
+ // Request throttling settings
+ ///////////////////////////////////////////////////////////////////////////
+
+ public int getRequestThrottleLimit() {
+ return RequestThrottler.getMaxRequests();
+ }
+
+ public void setRequestThrottleLimit(int requests) {
+ RequestThrottler.setMaxRequests(requests);
+ }
+
+ ///////////////////////////////////////////////////////////////////////////
+
+ public int getRequestThrottleStallTime() {
+ return RequestThrottler.getStallTime();
+ }
+
+ public void setRequestThrottleStallTime(int time) {
+ RequestThrottler.setStallTime(time);
+ }
+
+ ///////////////////////////////////////////////////////////////////////////
+
+ public boolean getRequestThrottleDropStale() {
+ return RequestThrottler.getDropStaleRequests();
+ }
+
+ public void setRequestThrottleDropStale(boolean drop) {
+ RequestThrottler.setDropStaleRequests(drop);
+ }
+
///////////////////////////////////////////////////////////////////////////
@Override
@@ -303,6 +334,14 @@ public class ZooKeeperServerBean implements ZooKeeperServerMXBean, ZKMBeanInfo {
ZooKeeperServer.setMaxWriteQueuePollTime(delay);
}
+ public boolean getRequestStaleLatencyCheck() {
+ return Request.getStaleLatencyCheck();
+ }
+
+ public void setRequestStaleLatencyCheck(boolean check) {
+ Request.setStaleLatencyCheck(check);
+ }
+
///////////////////////////////////////////////////////////////////////////
@Override
@@ -314,4 +353,12 @@ public class ZooKeeperServerBean implements ZooKeeperServerMXBean, ZKMBeanInfo {
public void setMaxBatchSize(int size) {
ZooKeeperServer.setMaxBatchSize(size);
}
+
+ public boolean getRequestStaleConnectionCheck() {
+ return Request.getStaleConnectionCheck();
+ }
+
+ public void setRequestStaleConnectionCheck(boolean check) {
+ Request.setStaleConnectionCheck(check);
+ }
}
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerMXBean.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerMXBean.java
index a4482d6..4c71eac 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerMXBean.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerMXBean.java
@@ -120,6 +120,21 @@ public interface ZooKeeperServerMXBean {
public double getConnectionDecreaseRatio();
public void setConnectionDecreaseRatio(double val);
+ public int getRequestThrottleLimit();
+ public void setRequestThrottleLimit(int requests);
+
+ public int getRequestThrottleStallTime();
+ public void setRequestThrottleStallTime(int time);
+
+ public boolean getRequestThrottleDropStale();
+ public void setRequestThrottleDropStale(boolean drop);
+
+ public boolean getRequestStaleLatencyCheck();
+ public void setRequestStaleLatencyCheck(boolean check);
+
+ public boolean getRequestStaleConnectionCheck();
+ public void setRequestStaleConnectionCheck(boolean check);
+
/**
* Reset packet and latency statistics
*/
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/RequestThrottlerTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/RequestThrottlerTest.java
new file mode 100644
index 0000000..9eee926
--- /dev/null
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/RequestThrottlerTest.java
@@ -0,0 +1,242 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.PortAssignment;
+import org.apache.zookeeper.ZKTestCase;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.common.Time;
+import org.apache.zookeeper.metrics.MetricsUtils;
+import org.apache.zookeeper.test.ClientBase;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.lessThan;
+
+public class RequestThrottlerTest extends ZKTestCase {
+ private static final Logger LOG = LoggerFactory.getLogger(RequestThrottlerTest.class);
+
+ private static String HOSTPORT = "127.0.0.1:" + PortAssignment.unique();
+ private final static int TOTAL_REQUESTS = 5;
+ private final static int STALL_TIME = 5000;
+
+ // latch to hold requests in the PrepRequestProcessor to
+ // keep them from going down the pipeline to reach the final
+ // request processor, where the number of in process requests
+ // will be decreased
+ CountDownLatch resumeProcess = null;
+
+ // latch to make sure all requests are submitted
+ CountDownLatch submitted = null;
+
+ // latch to make sure all requests entered the pipeline
+ CountDownLatch entered = null;
+
+ ZooKeeperServer zks = null;
+ ServerCnxnFactory f = null;
+ ZooKeeper zk = null;
+
+ @Before
+ public void setup() throws Exception {
+ // start a server and create a client
+ File tmpDir = ClientBase.createTmpDir();
+ ClientBase.setupTestEnv();
+ zks = new TestZooKeeperServer(tmpDir, tmpDir, 3000);
+ final int PORT = Integer.parseInt(HOSTPORT.split(":")[1]);
+ f = ServerCnxnFactory.createFactory(PORT, -1);
+ f.startup(zks);
+ LOG.info("starting up the zookeeper server .. waiting");
+ Assert.assertTrue("waiting for server being up",
+ ClientBase.waitForServerUp(HOSTPORT, CONNECTION_TIMEOUT));
+
+ resumeProcess = null;
+ submitted = null;
+
+ zk = ClientBase.createZKClient(HOSTPORT);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ // shut down the server and the client
+ if (null != zk) {
+ zk.close();
+ }
+
+ if (null != f) {
+ f.shutdown();
+ }
+ if (null != zks) {
+ zks.shutdown();
+ }
+ }
+
+ // TestZooKeeperServer
+ // 1. uses our version of PrepRequestProcessor, which can hold the request as long as we want
+ // 2. count the number of submitted requests
+ class TestZooKeeperServer extends ZooKeeperServer {
+ public TestZooKeeperServer(File snapDir, File logDir, int tickTime) throws IOException {
+ super(snapDir, logDir, tickTime);
+ }
+
+ @Override
+ protected void setupRequestProcessors() {
+ RequestProcessor finalProcessor = new FinalRequestProcessor(this);
+ RequestProcessor syncProcessor = new SyncRequestProcessor(this,
+ finalProcessor);
+ ((SyncRequestProcessor) syncProcessor).start();
+ firstProcessor = new TestPrepRequestProcessor(this, syncProcessor);
+ ((TestPrepRequestProcessor) firstProcessor).start();
+ }
+
+ @Override
+ public void submitRequest(Request si) {
+ if (null != submitted) {
+ submitted.countDown();
+ }
+ super.submitRequest(si);
+ }
+ }
+
+ class TestPrepRequestProcessor extends PrepRequestProcessor {
+ public TestPrepRequestProcessor(ZooKeeperServer zks, RequestProcessor syncProcessor) {
+ super(zks, syncProcessor);
+ }
+
+ @Override
+ protected void pRequest(Request request) throws RequestProcessorException {
+ // keep the request in the processor as long as we want
+ if (resumeProcess != null) {
+ try {
+ resumeProcess.await(20, TimeUnit.SECONDS);
+ } catch (Exception e) {
+
+ }
+ }
+
+ if (entered != null) {
+ entered.countDown();
+ }
+
+ super.pRequest(request);
+ }
+ }
+
+ @Test
+ public void testRequestThrottler() throws Exception {
+ ServerMetrics.getMetrics().resetAll();
+
+ // we only allow two requests in the pipeline
+ RequestThrottler.setMaxRequests(2);
+
+ RequestThrottler.setStallTime(STALL_TIME);
+ RequestThrottler.setDropStaleRequests(false);
+
+ // no requests can go through the pipeline unless we raise the latch
+ resumeProcess = new CountDownLatch(1);
+ submitted = new CountDownLatch(TOTAL_REQUESTS);
+ entered = new CountDownLatch(TOTAL_REQUESTS);
+
+ // send 5 requests asynchronously
+ for (int i =0; i < TOTAL_REQUESTS; i++) {
+ zk.create("/request_throttle_test- " + i , ("/request_throttle_test- " + i).getBytes(),
+ ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, (rc, path, ctx, name) -> {}, null);
+ }
+
+ // make sure the server received all 5 requests
+ submitted.await(5, TimeUnit.SECONDS);
+ Map<String, Object> metrics = MetricsUtils.currentServerMetrics();
+
+ // but only two requests can get into the pipeline because of the throttler
+ Assert.assertEquals(2L, (long)metrics.get("prep_processor_request_queued"));
+ Assert.assertEquals(1L, (long)metrics.get("request_throttle_wait_count"));
+
+ // let the requests go through the pipeline and the throttler will be waken up to allow more requests
+ // to enter the pipeline
+ resumeProcess.countDown();
+ entered.await(STALL_TIME, TimeUnit.MILLISECONDS);
+
+ metrics = MetricsUtils.currentServerMetrics();
+ Assert.assertEquals(TOTAL_REQUESTS, (long)metrics.get("prep_processor_request_queued"));
+ }
+
+ @Test
+ public void testDropStaleRequests() throws Exception {
+ ServerMetrics.getMetrics().resetAll();
+
+ // we only allow two requests in the pipeline
+ RequestThrottler.setMaxRequests(2);
+
+ RequestThrottler.setStallTime(STALL_TIME);
+
+ RequestThrottler.setDropStaleRequests(true);
+
+ // no requests can go through the pipeline unless we raise the latch
+ resumeProcess = new CountDownLatch(1);
+ submitted = new CountDownLatch(TOTAL_REQUESTS);
+
+ // send 5 requests asynchronously
+ for (int i=0; i<TOTAL_REQUESTS; i++) {
+ zk.create("/request_throttle_test- " + i , ("/request_throttle_test- " + i).getBytes(),
+ ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, (rc, path, ctx, name) -> {}, null);
+ }
+
+ // make sure the server received all 5 requests
+ submitted.await(5, TimeUnit.SECONDS);
+ Map<String, Object> metrics = MetricsUtils.currentServerMetrics();
+
+ // but only two requests can get into the pipeline because of the throttler
+ Assert.assertEquals(2L, (long)metrics.get("prep_processor_request_queued"));
+ Assert.assertEquals(1L, (long)metrics.get("request_throttle_wait_count"));
+
+ for (ServerCnxn cnxn : f.cnxns){
+ cnxn.setStale();
+ }
+ zk = null;
+
+ resumeProcess.countDown();
+ LOG.info("raise the latch");
+
+ while (zks.getInflight() > 0) {
+ Thread.sleep(50);
+ }
+
+ // the rest of the 3 requests will be dropped
+ // but only the first one for a connection will be counted
+ metrics = MetricsUtils.currentServerMetrics();
+ Assert.assertEquals(2L, (long)metrics.get("prep_processor_request_queued"));
+ Assert.assertEquals(1, (long)metrics.get("stale_requests_dropped"));
+ }
+}
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/SessionTrackerTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/SessionTrackerTest.java
index abe9aa0..c2e0b12 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/SessionTrackerTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/SessionTrackerTest.java
@@ -31,6 +31,8 @@ import org.apache.zookeeper.server.SessionTrackerImpl.SessionImpl;
import org.apache.zookeeper.test.ClientBase;
import org.junit.Assert;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Testing zk client session logic in sessiontracker
@@ -48,6 +50,7 @@ public class SessionTrackerTest extends ZKTestCase {
*/
@Test(timeout = 20000)
public void testAddSessionAfterSessionExpiry() throws Exception {
+ RequestThrottler.setMaxRequests(0);
ZooKeeperServer zks = setupSessionTracker();
latch = new CountDownLatch(1);
@@ -127,6 +130,7 @@ public class SessionTrackerTest extends ZKTestCase {
// setup session tracker
zks.createSessionTracker();
zks.startSessionTracker();
+ zks.startRequestThrottler();
return zks;
}