You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ha...@apache.org on 2019/07/08 21:26:21 UTC

[zookeeper] branch master updated: ZOOKEEPER-3243: Add server-side request throttling

This is an automated email from the ASF dual-hosted git repository.

hanm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zookeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 7b3de52  ZOOKEEPER-3243: Add server-side request throttling
7b3de52 is described below

commit 7b3de52cdb15068aa343879ae283f4e456c68f39
Author: Jie Huang <ji...@fb.com>
AuthorDate: Mon Jul 8 14:26:11 2019 -0700

    ZOOKEEPER-3243: Add server-side request throttling
    
    Author: Jie Huang <ji...@fb.com>
    Author: Joseph Blomstedt <jd...@fb.com>
    
    Reviewers: Michael Han <ha...@apache.org>
    
    Closes #986 from jhuan31/ZOOKEEPER-3243
---
 .../src/main/resources/markdown/zookeeperAdmin.md  |  60 +++--
 .../zookeeper/server/FinalRequestProcessor.java    |   5 +
 .../org/apache/zookeeper/server/NIOServerCnxn.java |  33 ++-
 .../java/org/apache/zookeeper/server/Request.java  |  69 ++++++
 .../apache/zookeeper/server/RequestThrottler.java  | 267 +++++++++++++++++++++
 .../org/apache/zookeeper/server/ServerCnxn.java    |  31 ++-
 .../org/apache/zookeeper/server/ServerMetrics.java |  10 +
 .../apache/zookeeper/server/ZooKeeperServer.java   |  63 ++++-
 .../zookeeper/server/ZooKeeperServerBean.java      |  47 ++++
 .../zookeeper/server/ZooKeeperServerMXBean.java    |  15 ++
 .../zookeeper/server/RequestThrottlerTest.java     | 242 +++++++++++++++++++
 .../zookeeper/server/SessionTrackerTest.java       |   4 +
 12 files changed, 814 insertions(+), 32 deletions(-)

diff --git a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
index 27654b0..b0b07dc 100644
--- a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
+++ b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
@@ -229,7 +229,7 @@ ensemble:
 7. If your configuration file is set up, you can start a
   ZooKeeper server:  
 
-        $ java -cp zookeeper.jar:lib/*:conf org.apache.zookeeper.server.quorum.QuorumPeerMain zoo.conf 
+        $ java -cp zookeeper.jar:lib/*:conf org.apache.zookeeper.server.quorum.QuorumPeerMain zoo.conf
 
   QuorumPeerMain starts a ZooKeeper server,
   [JMX](http://java.sun.com/javase/technologies/core/mntr-mgmt/javamanagement/)
@@ -833,7 +833,7 @@ property, when available, is noted below.
 
 * *serverCnxnFactory* :
     (Java system property: **zookeeper.serverCnxnFactory**)
-    Specifies ServerCnxnFactory implementation. 
+    Specifies ServerCnxnFactory implementation.
     This should be set to `NettyServerCnxnFactory` in order to use TLS based server communication.
     Default is `NIOServerCnxnFactory`.
 
@@ -857,6 +857,36 @@ property, when available, is noted below.
     Does not affect the limit defined by *flushDelay*.
     Default is 1000.
 
+* *requestThrottleLimit* :
+    (Java system property: **zookeeper.request_throttle_max_requests**)
+    **New in 3.6.0:**
+    The total number of outstanding requests allowed before the RequestThrottler starts stalling. When set to 0, throttling is disabled. The default is 0.
+
+* *requestThrottleStallTime* :
+    (Java system property: **zookeeper.request_throttle_stall_time**)
+    **New in 3.6.0:**
+    The maximum time (in milliseconds) for which a thread may wait to be notified that it may proceed processing a request. The default is 100.
+
+* *requestThrottleDropStale* :
+    (Java system property: **request_throttle_drop_stale**)
+    **New in 3.6.0:**
+    When enabled, the throttler will drop stale requests rather than issue them to the request pipeline. A stale request is a request sent by a connection that is now closed, and/or a request that will have a  request latency higher than the sessionTimeout. The default is true.
+
+* *requestStaleLatencyCheck* :
+    (Java system property: **zookeeper.request_stale_latency_check**)
+    **New in 3.6.0:**
+    When enabled, a request is considered stale if the request latency is higher than its associated session timeout. Disabled by default.
+
+* *requestStaleConnectionCheck* :
+    (Java system property: **zookeeper.request_stale_connection_check**)
+    **New in 3.6.0:**
+    When enabled, a request is considered stale if the request's connection has closed. Enabled by default.
+
+* *zookeeper.request_throttler.shutdownTimeout* :
+    (Java system property only)
+    **New in 3.6.0:**
+    The time (in milliseconds) the RequestThrottler waits for the request queue to drain during shutdown before it shuts down forcefully. The default is 10000.  
+
 <a name="sc_clusterOptions"></a>
 
 #### Cluster Options
@@ -1108,20 +1138,20 @@ encryption/authentication/authorization performed by the service.
     (Java system property: **zookeeper.sslQuorum**)
     **New in 3.5.5:**
     Enables encrypted quorum communication. Default is `false`.
-       
+
 * *ssl.keyStore.location and ssl.keyStore.password* and *ssl.quorum.keyStore.location* and *ssl.quorum.keyStore.password* :
     (Java system properties: **zookeeper.ssl.keyStore.location** and **zookeeper.ssl.keyStore.password** and **zookeeper.ssl.quorum.keyStore.location** and **zookeeper.ssl.quorum.keyStore.password**)
     **New in 3.5.5:**
     Specifies the file path to a Java keystore containing the local
     credentials to be used for client and quorum TLS connections, and the
     password to unlock the file.
-    
+
 * *ssl.keyStore.type* and *ssl.quorum.keyStore.type* :
     (Java system properties: **zookeeper.ssl.keyStore.type** and **zookeeper.ssl.quorum.keyStore.type**)
     **New in 3.5.5:**
     Specifies the file format of client and quorum keystores. Values: JKS, PEM or null (detect by filename).    
     Default: null     
-    
+
 * *ssl.trustStore.location* and *ssl.trustStore.password* and *ssl.quorum.trustStore.location* and *ssl.quorum.trustStore.password* :
     (Java system properties: **zookeeper.ssl.trustStore.location** and **zookeeper.ssl.trustStore.password** and **zookeeper.ssl.quorum.trustStore.location** and **zookeeper.ssl.quorum.trustStore.password**)
     **New in 3.5.5:**
@@ -1146,7 +1176,7 @@ encryption/authentication/authorization performed by the service.
     **New in 3.5.5:**
     Specifies the enabled protocols in client and quorum TLS negotiation.
     Default: value of `protocol` property
-    
+
 * *ssl.ciphersuites* and *ssl.quorum.ciphersuites* :
     (Java system properties: **zookeeper.ssl.ciphersuites** and **zookeeper.ssl.quorum.ciphersuites**)
     **New in 3.5.5:**
@@ -1161,7 +1191,7 @@ encryption/authentication/authorization performed by the service.
     1. Use hardware keystore, loaded in using PKCS11 or something similar.
     2. You don't have access to the software keystore, but can retrieve an already-constructed SSLContext from their container.
     Default: null
-    
+
 * *ssl.hostnameVerification* and *ssl.quorum.hostnameVerification* :
     (Java system properties: **zookeeper.ssl.hostnameVerification** and **zookeeper.ssl.quorum.hostnameVerification**)
     **New in 3.5.5:**
@@ -1180,12 +1210,12 @@ encryption/authentication/authorization performed by the service.
     **New in 3.5.5:**
     Specifies whether Online Certificate Status Protocol is enabled in client and quorum TLS protocols.
     Default: false
-    
+
 * *ssl.clientAuth* and *ssl.quorum.clientAuth* :
     (Java system properties: **zookeeper.ssl.clientAuth** and **zookeeper.ssl.quorum.clientAuth**)
     **New in 3.5.5:**
     TBD
-    
+
 * *ssl.handshakeDetectionTimeoutMillis* and *ssl.quorum.handshakeDetectionTimeoutMillis* :
     (Java system properties: **zookeeper.ssl.handshakeDetectionTimeoutMillis** and **zookeeper.ssl.quorum.handshakeDetectionTimeoutMillis**)
     **New in 3.5.5:**
@@ -1468,20 +1498,20 @@ and quorum communication protocols.
 
 One keystore should be created for each ZK instance.
 
-In this example we generate a self-signed certificate and store it 
-together with the private key in `keystore.jks`. This is suitable for 
-testing purposes, but you probably need an official certificate to sign 
+In this example we generate a self-signed certificate and store it
+together with the private key in `keystore.jks`. This is suitable for
+testing purposes, but you probably need an official certificate to sign
 your keys in a production environment.
 
 Please note that the alias (`-alias`) and the distinguished name (`-dname`)
-must match the hostname of the machine that is associated with, otherwise 
+must match the hostname of the machine that is associated with, otherwise
 hostname verification won't work.
 
 ```
 keytool -genkeypair -alias $(hostname -f) -keyalg RSA -keysize 2048 -dname "cn=$(hostname -f)" -keypass password -keystore keystore.jks -storepass password
 ```
 
-2. Extract the signed public key (certificate) from keystore 
+2. Extract the signed public key (certificate) from keystore
 
 *This step might only necessary for self-signed certificates.*
 
@@ -1569,7 +1599,7 @@ and do another rolling restart
 ```
 sslQuorum=true
 portUnification=false
-``` 
+```
 
 
 <a name="sc_zkCommands"></a>
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java
index e15d189..cd0c0eb 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java
@@ -217,6 +217,11 @@ public class FinalRequestProcessor implements RequestProcessor {
             if (LOG.isDebugEnabled()) {
                 LOG.debug("{}",request);
             }
+
+            if (request.isStale()) {
+                ServerMetrics.getMetrics().STALE_REPLIES.add(1);
+            }
+
             switch (request.type) {
             case OpCode.ping: {
                 lastOp = "PING";
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java
index fb7ce4d..48aadfc 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java
@@ -150,17 +150,31 @@ public class NIOServerCnxn extends ServerCnxn {
         requestInterestOpsUpdate();
     }
 
+    /**
+     * When read on socket failed, this is typically because client closed the
+     * connection. In most cases, the client does this when the server doesn't
+     * respond within 2/3 of session timeout. This possibly indicates server
+     * health/performance issue, so we need to log and keep track of stat
+     *
+     * @throws EndOfStreamException
+     */
+    private void handleFailedRead() throws EndOfStreamException {
+        setStale();
+        ServerMetrics.getMetrics().CONNECTION_DROP_COUNT.add(1);
+        throw new EndOfStreamException(
+                "Unable to read additional data from client,"
+                + " it probably closed the socket:"
+                + " address = " + sock.socket().getRemoteSocketAddress() + ","
+                + " session = 0x" + Long.toHexString(sessionId),
+                DisconnectReason.UNABLE_TO_READ_FROM_CLIENT);
+    }
+
     /** Read the request payload (everything following the length prefix) */
     private void readPayload() throws IOException, InterruptedException, ClientCnxnLimitException {
         if (incomingBuffer.remaining() != 0) { // have we read length bytes?
             int rc = sock.read(incomingBuffer); // sock is non-blocking, so ok
             if (rc < 0) {
-                ServerMetrics.getMetrics().CONNECTION_DROP_COUNT.add(1);
-                throw new EndOfStreamException(
-                        "Unable to read additional data from client sessionid 0x"
-                        + Long.toHexString(sessionId)
-                        + ", likely client has closed socket",
-                        DisconnectReason.UNABLE_TO_READ_FROM_CLIENT);
+                handleFailedRead();
             }
         }
 
@@ -318,12 +332,7 @@ public class NIOServerCnxn extends ServerCnxn {
             if (k.isReadable()) {
                 int rc = sock.read(incomingBuffer);
                 if (rc < 0) {
-                    ServerMetrics.getMetrics().CONNECTION_DROP_COUNT.add(1);
-                    throw new EndOfStreamException(
-                            "Unable to read additional data from client sessionid 0x"
-                            + Long.toHexString(sessionId)
-                            + ", likely client has closed socket",
-                            DisconnectReason.UNABLE_TO_READ_FROM_CLIENT);
+                    handleFailedRead();
                 }
                 if (incomingBuffer.remaining() == 0) {
                     boolean isPayload;
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java
index d730a66..e4fdb4d 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java
@@ -39,6 +39,16 @@ import org.apache.zookeeper.txn.TxnHeader;
 public class Request {
     public final static Request requestOfDeath = new Request(null, 0, 0, 0, null, null);
 
+    // Considers a request stale if the request's connection has closed. Enabled
+    // by default.
+    private static volatile boolean staleConnectionCheck = Boolean.parseBoolean(
+            System.getProperty("zookeeper.request_stale_connection_check","true"));
+
+    // Considers a request stale if the request latency is higher than its
+    // associated session timeout. Disabled by default.
+    private static volatile boolean staleLatencyCheck = Boolean.parseBoolean(
+            System.getProperty("zookeeper.request_stale_latency_check","false"));
+
     public Request(ServerCnxn cnxn, long sessionId, int xid, int type, ByteBuffer bb, List<Id> authInfo) {
         this.cnxn = cnxn;
         this.sessionId = sessionId;
@@ -133,6 +143,65 @@ public class Request {
         this.txn = txn;
     }
 
+    public ServerCnxn getConnection() {
+        return cnxn;
+    }
+
+    public static boolean getStaleLatencyCheck() {
+        return staleLatencyCheck;
+    }
+
+    public static void setStaleLatencyCheck(boolean check) {
+        staleLatencyCheck = check;
+    }
+
+    public static boolean getStaleConnectionCheck() {
+        return staleConnectionCheck;
+    }
+
+    public static void setStaleConnectionCheck(boolean check) {
+        staleConnectionCheck = check;
+    }
+
+    public boolean isStale() {
+        if (cnxn == null) {
+            return false;
+        }
+
+        // closeSession requests should be able to outlive the session in order
+        // to clean-up state.
+        if (type == OpCode.closeSession) {
+            return false;
+        }
+
+        if (staleConnectionCheck) {
+            // If the connection is closed, consider the request stale.
+            if (cnxn.isStale() || cnxn.isInvalid()) {
+                return true;
+            }
+        }
+
+        if (staleLatencyCheck) {
+            // If the request latency is higher than session timeout, consider
+            // the request stale.
+            long currentTime = Time.currentElapsedTime();
+            if ((currentTime - createTime) > cnxn.getSessionTimeout()) {
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+    /**
+     * A prior request was dropped on this request's connection and
+     * therefore this request must also be dropped to ensure correct
+     * ordering semantics.
+     */
+    public boolean mustDrop() {
+        return ((cnxn != null) && cnxn.isInvalid());
+    }
+
     /**
      * is the packet type a valid packet in zookeeper
      *
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/RequestThrottler.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/RequestThrottler.java
new file mode 100644
index 0000000..0e9772b
--- /dev/null
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/RequestThrottler.java
@@ -0,0 +1,267 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server;
+
+import java.util.concurrent.LinkedBlockingQueue;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.zookeeper.common.Time;
+
+/**
+ * When enabled, the RequestThrottler limits the number of outstanding requests
+ * currently submitted to the request processor pipeline. The throttler augments
+ * the limit imposed by the <code>globalOutstandingLimit</code> that is enforced
+ * by the connection layer ({@link NIOServerCnxn}, {@link NettyServerCnxn}).
+ *
+ * The connection layer limit applies backpressure against the TCP connection by
+ * disabling selection on connections once the request limit is reached. However,
+ * the connection layer always allows a connection to send at least one request
+ * before disabling selection on that connection. Thus, in a scenario with 40000
+ * client connections, the total number of requests inflight may be as high as
+ * 40000 even if the <code>globalOustandingLimit</code> was set lower.
+ *
+ * The RequestThrottler addresses this issue by adding additional queueing. When
+ * enabled, client connections no longer submit requests directly to the request
+ * processor pipeline but instead to the RequestThrottler. The RequestThrottler
+ * is then responsible for issuing requests to the request processors, and
+ * enforces a separate <code>maxRequests</code> limit. If the total number of
+ * outstanding requests is higher than <code>maxRequests</code>, the throttler
+ * will continually stall for <code>stallTime</code> milliseconds until
+ * underlimit.
+ *
+ * The RequestThrottler can also optionally drop stale requests rather than
+ * submit them to the processor pipeline. A stale request is a request sent
+ * by a connection that is already closed, and/or a request whose latency
+ * will end up being higher than its associated session timeout. The notion
+ * of staleness is configurable, @see Request for more details.
+ *
+ * To ensure ordering guarantees, if a request is ever dropped from a connection
+ * that connection is closed and flagged as invalid. All subsequent requests
+ * inflight from that connection are then dropped as well.
+ */
+public class RequestThrottler extends ZooKeeperCriticalThread {
+    private static final Logger LOG = LoggerFactory.getLogger(RequestThrottler.class);
+
+    private final LinkedBlockingQueue<Request> submittedRequests =
+        new LinkedBlockingQueue<Request>();
+
+    private final ZooKeeperServer zks;
+    private volatile boolean stopping;
+    private volatile boolean killed;
+
+    private static final String SHUTDOWN_TIMEOUT = "zookeeper.request_throttler.shutdownTimeout";
+    private static int shutdownTimeout = 10000;
+
+    static {
+        shutdownTimeout = Integer.getInteger(SHUTDOWN_TIMEOUT, 10000);
+        LOG.info("{} = {}", SHUTDOWN_TIMEOUT, shutdownTimeout);
+    }
+
+    /**
+     * The total number of outstanding requests allowed before the throttler
+     * starts stalling.
+     *
+     * When maxRequests = 0, throttling is disabled.
+     */
+    private static volatile int maxRequests =
+        Integer.getInteger("zookeeper.request_throttle_max_requests", 0);
+
+    /**
+     * The time (in milliseconds) this is the maximum time for which throttler
+     * thread may wait to be notified that it may proceed processing a request.
+     */
+    private static volatile int stallTime =
+        Integer.getInteger("zookeeper.request_throttle_stall_time", 100);
+
+    /**
+     * When true, the throttler will drop stale requests rather than issue
+     * them to the request pipeline. A stale request is a request sent by
+     * a connection that is now closed, and/or a request that will have a
+     * request latency higher than the sessionTimeout. The staleness of
+     * a request is tunable property, @see Request for details.
+     */
+    private static volatile boolean dropStaleRequests = Boolean.parseBoolean(
+            System.getProperty("zookeeper.request_throttle_drop_stale", "true"));
+
+    public RequestThrottler(ZooKeeperServer zks) {
+        super("RequestThrottler", zks.getZooKeeperServerListener());
+        this.zks = zks;
+        this.stopping = false;
+        this.killed = false;
+    }
+
+    public static int getMaxRequests() {
+        return maxRequests;
+    }
+
+    public static void setMaxRequests(int requests) {
+        maxRequests = requests;
+    }
+
+    public static int getStallTime() {
+        return stallTime;
+    }
+
+    public static void setStallTime(int time) {
+        stallTime = time;
+    }
+
+    public static boolean getDropStaleRequests() {
+        return dropStaleRequests;
+    }
+
+    public static void setDropStaleRequests(boolean drop) {
+        dropStaleRequests = drop;
+    }
+
+    @Override
+    public void run() {
+        try {
+            while (true) {
+                if (killed) {
+                    break;
+                }
+
+                Request request = submittedRequests.take();
+                if (Request.requestOfDeath == request) {
+                    break;
+                }
+
+                if (request.mustDrop()) {
+                    continue;
+                }
+
+                // Throttling is disabled when maxRequests = 0
+                if (maxRequests > 0) {
+                    while (!killed) {
+                        if (dropStaleRequests && request.isStale()) {
+                            // Note: this will close the connection
+                            dropRequest(request);
+                            ServerMetrics.getMetrics().STALE_REQUESTS_DROPPED.add(1);
+                            request = null;
+                            break;
+                        }
+                        if (zks.getInProcess() < maxRequests) {
+                            break;
+                        }
+                        throttleSleep(stallTime);
+                    }
+                }
+
+                if (killed) {
+                    break;
+                }
+
+                // A dropped stale request will be null
+                if (request != null) {
+                    if (request.isStale()) {
+                        ServerMetrics.getMetrics().STALE_REQUESTS.add(1);
+                    }
+                    zks.submitRequestNow(request);
+                }
+            }
+        } catch (InterruptedException e) {
+            LOG.error("Unexpected interruption", e);
+        }
+        int dropped = drainQueue();
+        LOG.info("RequestThrottler shutdown. Dropped {} requests", dropped);
+    }
+
+    private synchronized void throttleSleep(int stallTime) {
+        try {
+            ServerMetrics.getMetrics().REQUEST_THROTTLE_WAIT_COUNT.add(1);
+            this.wait(stallTime);
+        } catch(InterruptedException ie) {
+            return;
+        }
+    }
+
+    @SuppressFBWarnings(value = "NN_NAKED_NOTIFY",
+            justification = "state change is in ZooKeeperServer.decInProgress() ")
+    public synchronized void throttleWake() {
+        this.notify();
+    }
+
+    private int drainQueue() {
+        // If the throttler shutdown gracefully, the queue will be empty.
+        // However, if the shutdown time limit was reached and the throttler
+        // was killed, we have no other option than to drop all remaining
+        // requests on the floor.
+        int dropped = 0;
+        Request request;
+        LOG.info("Draining request throttler queue");
+        while ((request = submittedRequests.poll()) != null) {
+            dropped += 1;
+            dropRequest(request);
+        }
+        return dropped;
+    }
+
+    private void dropRequest(Request request) {
+        // Since we're dropping a request on the floor, we must mark the
+        // connection as invalid to ensure any future requests from this
+        // connection are also dropped in order to ensure ordering
+        // semantics.
+        ServerCnxn conn = request.getConnection();
+        if (conn != null) {
+            // Note: this will close the connection
+            conn.setInvalid();
+        }
+    }
+
+    public void submitRequest(Request request) {
+        if (stopping) {
+            LOG.debug("Shutdown in progress. Request cannot be processed");
+            dropRequest(request);
+        } else {
+            submittedRequests.add(request);
+        }
+    }
+
+    public int getInflight() {
+        return submittedRequests.size();
+    }
+
+    @SuppressFBWarnings("DM_EXIT")
+    public void shutdown() {
+        // Try to shutdown gracefully
+        LOG.info("Shutting down");
+        stopping = true;
+        submittedRequests.add(Request.requestOfDeath);
+        try {
+            this.join(shutdownTimeout);
+        } catch (InterruptedException e) {
+            LOG.warn("Interrupted while waiting for {} to finish", this);
+        }
+
+        // Forcibly shutdown if necessary in order to ensure request
+        // queue is drained.
+        killed = true;
+        try {
+            this.join();
+        } catch (InterruptedException e) {
+            LOG.warn("Interrupted while waiting for {} to finish", this);
+            //TODO apply ZOOKEEPER-575 and remove this line.
+            System.exit(ExitCode.UNEXPECTED_ERROR.getValue());
+        }
+    }
+}
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerCnxn.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerCnxn.java
index 6953e3a..42d28e2 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerCnxn.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerCnxn.java
@@ -68,8 +68,6 @@ public abstract class ServerCnxn implements Stats, Watcher {
      */
     boolean isOldClient = true;
 
-    private volatile boolean stale = false;
-
     AtomicLong outstandingCount = new AtomicLong();
 
     /** The ZooKeeperServer for this connection. May be null if the server
@@ -123,6 +121,22 @@ public abstract class ServerCnxn implements Stats, Watcher {
         this.zkServer = zkServer;
     }
 
+    /**
+     * Flag that indicates that this connection is known to be closed/closing
+     * and from which we can optionally ignore outstanding requests as part
+     * of request throttling. This flag may be false when a connection is
+     * actually closed (false negative), but should never be true with
+     * a connection is still alive (false positive).
+     */
+    private volatile boolean stale = false;
+
+    /**
+     * Flag that indicates that a request for this connection was previously
+     * dropped as part of request throttling and therefore all future requests
+     * must also be dropped to ensure ordering guarantees.
+     */
+    private volatile boolean invalid = false;
+
     abstract int getSessionTimeout();
 
     public void incrOutstandingAndCheckThrottle(RequestHeader h) {
@@ -276,6 +290,19 @@ public abstract class ServerCnxn implements Stats, Watcher {
         stale = true;
     }
 
+    public boolean isInvalid() {
+        return invalid;
+    }
+
+    public void setInvalid() {
+        if (!invalid) {
+            if (!stale) {
+                sendCloseSession();
+            }
+            invalid = true;
+        }
+    }
+
     protected void packetReceived(long bytes) {
         incrPacketsReceived();
         ServerStats serverStats = serverStats();
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java
index 366770b..ca05f94 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java
@@ -220,6 +220,11 @@ public final class ServerMetrics {
         ACK_LATENCY = metricsContext.getSummarySet("ack_latency", DetailLevel.ADVANCED);
         PROPOSAL_COUNT = metricsContext.getCounter("proposal_count");
         QUIT_LEADING_DUE_TO_DISLOYAL_VOTER = metricsContext.getCounter("quit_leading_due_to_disloyal_voter");
+
+        STALE_REQUESTS = metricsContext.getCounter("stale_requests");
+        STALE_REQUESTS_DROPPED = metricsContext.getCounter("stale_requests_dropped");
+        STALE_REPLIES = metricsContext.getCounter("stale_replies");
+        REQUEST_THROTTLE_WAIT_COUNT = metricsContext.getCounter("request_throttle_wait_count");
     }
 
     /**
@@ -414,6 +419,11 @@ public final class ServerMetrics {
      */
     public final Counter ENSEMBLE_AUTH_SKIP;
 
+    public final Counter STALE_REQUESTS;
+    public final Counter STALE_REQUESTS_DROPPED;
+    public final Counter STALE_REPLIES;
+    public final Counter REQUEST_THROTTLE_WAIT_COUNT;
+
     private final MetricsProvider metricsProvider;
 
     public void resetAll() {
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
index f5c770b..4258916 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
@@ -40,6 +40,7 @@ import java.util.function.BiConsumer;
 
 import javax.security.sasl.SaslException;
 
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import org.apache.jute.BinaryInputArchive;
 import org.apache.jute.BinaryOutputArchive;
 import org.apache.jute.Record;
@@ -79,7 +80,6 @@ import org.apache.zookeeper.txn.TxnHeader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 /**
  * This class implements a simple standalone ZooKeeperServer. It sets up the
  * following chain of RequestProcessors to process requests:
@@ -187,6 +187,11 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
     // Connection throttling
     private BlueThrottle connThrottle;
 
+    @SuppressFBWarnings(value = "IS2_INCONSISTENT_SYNC",
+            justification = "Internally the throttler has a BlockingQueue so " +
+                    "once the throttler is created and started, it is thread-safe")
+    private RequestThrottler requestThrottler;
+
     void removeCnxn(ServerCnxn cnxn) {
         zkDb.removeCnxn(cnxn);
     }
@@ -250,7 +255,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
                            ZKDatabase zkDb, String initialConfig) {
         this(txnLogFactory, tickTime, minSessionTimeout, maxSessionTimeout, clientPortListenBacklog, zkDb, initialConfig);
         this.jvmPauseMonitor = jvmPauseMonitor;
-        if(jvmPauseMonitor != null) {
+        if (jvmPauseMonitor != null) {
             LOG.info("Added JvmPauseMonitor to server");
         }
     }
@@ -558,6 +563,8 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
         startSessionTracker();
         setupRequestProcessors();
 
+        startRequestThrottler();
+
         registerJMX();
 
         startJvmPauseMonitor();
@@ -574,6 +581,12 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
         }
     }
 
+    protected void startRequestThrottler() {
+        requestThrottler = new RequestThrottler(this);
+        requestThrottler.start();
+
+    }
+
     protected void setupRequestProcessors() {
         RequestProcessor finalProcessor = new FinalRequestProcessor(this);
         RequestProcessor syncProcessor = new SyncRequestProcessor(this,
@@ -674,6 +687,10 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
         // subclasses will do their specific clean up
         unregisterMetrics();
 
+        if (requestThrottler != null) {
+            requestThrottler.shutdown();
+        }
+
         // Since sessionTracker and syncThreads poll we just have to
         // set running to false and they will detect it during the poll
         // interval.
@@ -735,12 +752,26 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
 
     public void decInProcess() {
         requestsInProcess.decrementAndGet();
+        if (requestThrottler != null) {
+            requestThrottler.throttleWake();
+        }
     }
 
     public int getInProcess() {
         return requestsInProcess.get();
     }
 
+    public int getInflight() {
+        return requestThrottleInflight();
+    }
+
+    private int requestThrottleInflight() {
+        if (requestThrottler != null) {
+            return requestThrottler.getInflight();
+        }
+        return 0;
+    }
+
     /**
      * This structure is used to facilitate information sharing between PrepRP
      * and FinalRP.
@@ -910,6 +941,32 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
     }
 
     public void submitRequest(Request si) {
+        enqueueRequest(si);
+    }
+
+    public void enqueueRequest(Request si) {
+        if (requestThrottler == null) {
+            synchronized (this) {
+                try {
+                    // Since all requests are passed to the request
+                    // processor it should wait for setting up the request
+                    // processor chain. The state will be updated to RUNNING
+                    // after the setup.
+                    while (state == State.INITIAL) {
+                        wait(1000);
+                    }
+                } catch (InterruptedException e) {
+                    LOG.warn("Unexpected interruption", e);
+                }
+                if (requestThrottler == null) {
+                    throw new RuntimeException("Not started");
+                }
+            }
+        }
+        requestThrottler.submitRequest(si);
+    }
+
+    public void submitRequestNow(Request si) {
         if (firstProcessor == null) {
             synchronized (this) {
                 try {
@@ -1224,7 +1281,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
     }
 
     public boolean shouldThrottle(long outStandingCount) {
-        if (getGlobalOutstandingLimit() < getInProcess()) {
+        if (getGlobalOutstandingLimit() < getInflight()) {
             return outStandingCount > 0;
         }
         return false;
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerBean.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerBean.java
index 39f542f..92ceab8 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerBean.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerBean.java
@@ -291,6 +291,37 @@ public class ZooKeeperServerBean implements ZooKeeperServerMXBean, ZKMBeanInfo {
         ZooKeeperServer.setFlushDelay(delay);
     }
 
+    // Request throttling settings
+    ///////////////////////////////////////////////////////////////////////////
+
+    public int getRequestThrottleLimit() {
+        return RequestThrottler.getMaxRequests();
+    }
+
+    public void setRequestThrottleLimit(int requests) {
+        RequestThrottler.setMaxRequests(requests);
+    }
+
+    ///////////////////////////////////////////////////////////////////////////
+
+    public int getRequestThrottleStallTime() {
+        return RequestThrottler.getStallTime();
+    }
+
+    public void setRequestThrottleStallTime(int time) {
+        RequestThrottler.setStallTime(time);
+    }
+
+    ///////////////////////////////////////////////////////////////////////////
+
+    public boolean getRequestThrottleDropStale() {
+        return RequestThrottler.getDropStaleRequests();
+    }
+
+    public void setRequestThrottleDropStale(boolean drop) {
+        RequestThrottler.setDropStaleRequests(drop);
+    }
+
     ///////////////////////////////////////////////////////////////////////////
 
     @Override
@@ -303,6 +334,14 @@ public class ZooKeeperServerBean implements ZooKeeperServerMXBean, ZKMBeanInfo {
         ZooKeeperServer.setMaxWriteQueuePollTime(delay);
     }
 
+    public boolean getRequestStaleLatencyCheck() {
+        return Request.getStaleLatencyCheck();
+    }
+
+    public void setRequestStaleLatencyCheck(boolean check) {
+        Request.setStaleLatencyCheck(check);
+    }
+
     ///////////////////////////////////////////////////////////////////////////
 
     @Override
@@ -314,4 +353,12 @@ public class ZooKeeperServerBean implements ZooKeeperServerMXBean, ZKMBeanInfo {
     public void setMaxBatchSize(int size) {
         ZooKeeperServer.setMaxBatchSize(size);
     }
+
+    public boolean getRequestStaleConnectionCheck() {
+        return Request.getStaleConnectionCheck();
+    }
+
+    public void setRequestStaleConnectionCheck(boolean check) {
+        Request.setStaleConnectionCheck(check);
+    }
 }
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerMXBean.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerMXBean.java
index a4482d6..4c71eac 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerMXBean.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerMXBean.java
@@ -120,6 +120,21 @@ public interface ZooKeeperServerMXBean {
     public double getConnectionDecreaseRatio();
     public void setConnectionDecreaseRatio(double val);
 
+    public int getRequestThrottleLimit();
+    public void setRequestThrottleLimit(int requests);
+
+    public int getRequestThrottleStallTime();
+    public void setRequestThrottleStallTime(int time);
+
+    public boolean getRequestThrottleDropStale();
+    public void setRequestThrottleDropStale(boolean drop);
+
+    public boolean getRequestStaleLatencyCheck();
+    public void setRequestStaleLatencyCheck(boolean check);
+
+    public boolean getRequestStaleConnectionCheck();
+    public void setRequestStaleConnectionCheck(boolean check);
+
     /**
      * Reset packet and latency statistics 
      */
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/RequestThrottlerTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/RequestThrottlerTest.java
new file mode 100644
index 0000000..9eee926
--- /dev/null
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/RequestThrottlerTest.java
@@ -0,0 +1,242 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.PortAssignment;
+import org.apache.zookeeper.ZKTestCase;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.common.Time;
+import org.apache.zookeeper.metrics.MetricsUtils;
+import org.apache.zookeeper.test.ClientBase;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.lessThan;
+
+public class RequestThrottlerTest extends ZKTestCase {
+    private static final Logger LOG = LoggerFactory.getLogger(RequestThrottlerTest.class);
+
+    private static String HOSTPORT = "127.0.0.1:" + PortAssignment.unique();
+    private final static int TOTAL_REQUESTS = 5;
+    private final static int STALL_TIME = 5000;
+
+    // latch to hold requests in the PrepRequestProcessor to
+    // keep them from going down the pipeline to reach the final
+    // request processor, where the number of in process requests
+    // will be decreased
+    CountDownLatch resumeProcess = null;
+
+    // latch to make sure all requests are submitted
+    CountDownLatch submitted = null;
+
+    // latch to make sure all requests entered the pipeline
+    CountDownLatch entered = null;
+
+    ZooKeeperServer zks = null;
+    ServerCnxnFactory f = null;
+    ZooKeeper zk = null;
+
+    @Before
+    public void setup() throws Exception {
+        // start a server and create a client
+        File tmpDir = ClientBase.createTmpDir();
+        ClientBase.setupTestEnv();
+        zks = new TestZooKeeperServer(tmpDir, tmpDir, 3000);
+        final int PORT = Integer.parseInt(HOSTPORT.split(":")[1]);
+        f = ServerCnxnFactory.createFactory(PORT, -1);
+        f.startup(zks);
+        LOG.info("starting up the zookeeper server .. waiting");
+        Assert.assertTrue("waiting for server being up",
+                ClientBase.waitForServerUp(HOSTPORT, CONNECTION_TIMEOUT));
+
+        resumeProcess = null;
+        submitted = null;
+
+        zk = ClientBase.createZKClient(HOSTPORT);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        // shut down the server and the client
+        if (null != zk) {
+            zk.close();
+        }
+
+        if (null != f) {
+            f.shutdown();
+        }
+        if (null != zks) {
+            zks.shutdown();
+        }
+    }
+
+    // TestZooKeeperServer
+    // 1. uses our version of PrepRequestProcessor, which can hold the request as long as we want
+    // 2. count the number of submitted requests
+    class TestZooKeeperServer extends ZooKeeperServer {
+        public TestZooKeeperServer(File snapDir, File logDir, int tickTime) throws IOException {
+            super(snapDir, logDir, tickTime);
+        }
+
+        @Override
+        protected void setupRequestProcessors() {
+            RequestProcessor finalProcessor = new FinalRequestProcessor(this);
+            RequestProcessor syncProcessor = new SyncRequestProcessor(this,
+                    finalProcessor);
+            ((SyncRequestProcessor) syncProcessor).start();
+            firstProcessor = new TestPrepRequestProcessor(this, syncProcessor);
+            ((TestPrepRequestProcessor) firstProcessor).start();
+        }
+
+        @Override
+        public void submitRequest(Request si) {
+            if (null != submitted) {
+                submitted.countDown();
+            }
+            super.submitRequest(si);
+        }
+    }
+
+    class TestPrepRequestProcessor extends PrepRequestProcessor {
+        public TestPrepRequestProcessor(ZooKeeperServer zks, RequestProcessor syncProcessor) {
+            super(zks, syncProcessor);
+        }
+
+        @Override
+        protected void pRequest(Request request) throws RequestProcessorException {
+            // keep the request in the processor as long as we want
+            if (resumeProcess != null) {
+                try {
+                    resumeProcess.await(20, TimeUnit.SECONDS);
+                } catch (Exception e) {
+
+                }
+            }
+
+            if (entered != null) {
+                entered.countDown();
+            }
+
+            super.pRequest(request);
+        }
+    }
+
+    @Test
+    public void testRequestThrottler() throws Exception {
+        ServerMetrics.getMetrics().resetAll();
+
+        // we only allow two requests in the pipeline
+        RequestThrottler.setMaxRequests(2);
+
+        RequestThrottler.setStallTime(STALL_TIME);
+        RequestThrottler.setDropStaleRequests(false);
+
+        // no requests can go through the pipeline unless we raise the latch
+        resumeProcess = new CountDownLatch(1);
+        submitted = new CountDownLatch(TOTAL_REQUESTS);
+        entered = new CountDownLatch(TOTAL_REQUESTS);
+
+        // send 5 requests asynchronously
+        for (int i =0; i < TOTAL_REQUESTS; i++) {
+            zk.create("/request_throttle_test- " + i , ("/request_throttle_test- " + i).getBytes(),
+                    ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, (rc, path, ctx, name) -> {}, null);
+        }
+
+        // make sure the server received all 5 requests
+        submitted.await(5, TimeUnit.SECONDS);
+        Map<String, Object> metrics = MetricsUtils.currentServerMetrics();
+
+        // but only two requests can get into the pipeline because of the throttler
+        Assert.assertEquals(2L, (long)metrics.get("prep_processor_request_queued"));
+        Assert.assertEquals(1L, (long)metrics.get("request_throttle_wait_count"));
+
+        // let the requests go through the pipeline and the throttler will be waken up to allow more requests
+        // to enter the pipeline
+        resumeProcess.countDown();
+        entered.await(STALL_TIME, TimeUnit.MILLISECONDS);
+
+        metrics = MetricsUtils.currentServerMetrics();
+        Assert.assertEquals(TOTAL_REQUESTS, (long)metrics.get("prep_processor_request_queued"));
+    }
+
+    @Test
+    public void testDropStaleRequests() throws Exception {
+        ServerMetrics.getMetrics().resetAll();
+
+        // we only allow two requests in the pipeline
+        RequestThrottler.setMaxRequests(2);
+
+        RequestThrottler.setStallTime(STALL_TIME);
+
+        RequestThrottler.setDropStaleRequests(true);
+
+        // no requests can go through the pipeline unless we raise the latch
+        resumeProcess = new CountDownLatch(1);
+        submitted = new CountDownLatch(TOTAL_REQUESTS);
+
+        // send 5 requests asynchronously
+        for (int i=0; i<TOTAL_REQUESTS; i++) {
+            zk.create("/request_throttle_test- " + i , ("/request_throttle_test- " + i).getBytes(),
+                    ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, (rc, path, ctx, name) -> {}, null);
+        }
+
+        // make sure the server received all 5 requests
+        submitted.await(5, TimeUnit.SECONDS);
+        Map<String, Object> metrics = MetricsUtils.currentServerMetrics();
+
+        // but only two requests can get into the pipeline because of the throttler
+        Assert.assertEquals(2L, (long)metrics.get("prep_processor_request_queued"));
+        Assert.assertEquals(1L, (long)metrics.get("request_throttle_wait_count"));
+
+        for (ServerCnxn cnxn : f.cnxns){
+            cnxn.setStale();
+        }
+        zk = null;
+
+        resumeProcess.countDown();
+        LOG.info("raise the latch");
+
+        while (zks.getInflight() > 0) {
+            Thread.sleep(50);
+        }
+
+        // the rest of the 3 requests will be dropped
+        // but only the first one for a connection will be counted
+        metrics = MetricsUtils.currentServerMetrics();
+        Assert.assertEquals(2L, (long)metrics.get("prep_processor_request_queued"));
+        Assert.assertEquals(1, (long)metrics.get("stale_requests_dropped"));
+    }
+}
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/SessionTrackerTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/SessionTrackerTest.java
index abe9aa0..c2e0b12 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/SessionTrackerTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/SessionTrackerTest.java
@@ -31,6 +31,8 @@ import org.apache.zookeeper.server.SessionTrackerImpl.SessionImpl;
 import org.apache.zookeeper.test.ClientBase;
 import org.junit.Assert;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Testing zk client session logic in sessiontracker
@@ -48,6 +50,7 @@ public class SessionTrackerTest extends ZKTestCase {
      */
     @Test(timeout = 20000)
     public void testAddSessionAfterSessionExpiry() throws Exception {
+        RequestThrottler.setMaxRequests(0);
         ZooKeeperServer zks = setupSessionTracker();
 
         latch = new CountDownLatch(1);
@@ -127,6 +130,7 @@ public class SessionTrackerTest extends ZKTestCase {
         // setup session tracker
         zks.createSessionTracker();
         zks.startSessionTracker();
+        zks.startRequestThrottler();
         return zks;
     }