You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ok...@apache.org on 2017/11/07 00:48:54 UTC

kafka git commit: KAFKA-5894; add the notion of max inflight requests to async ZooKeeperClient

Repository: kafka
Updated Branches:
  refs/heads/trunk 6f96d7f17 -> 58138126c


KAFKA-5894; add the notion of max inflight requests to async ZooKeeperClient

ZooKeeperClient is a zookeeper client that encourages pipelined requests to zookeeper. We want to add the notion of max inflight requests to the client for several reasons:
1. to bound memory overhead associated with async requests on the client.
2. to not overwhelm the zookeeper ensemble with a burst of requests.

Author: Onur Karaman <ok...@linkedin.com>

Reviewers: Ismael Juma <is...@juma.me.uk>, Ted Yu <yu...@gmail.com>, Jun Rao <ju...@gmail.com>, Manikumar Reddy <ma...@gmail.com>

Closes #3860 from onurkaraman/KAFKA-5894


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/58138126
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/58138126
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/58138126

Branch: refs/heads/trunk
Commit: 58138126ce93efc48291fe444f554eabdf4a5609
Parents: 6f96d7f
Author: Onur Karaman <ok...@linkedin.com>
Authored: Mon Nov 6 16:46:31 2017 -0800
Committer: Onur Karaman <ok...@linkedin.com>
Committed: Mon Nov 6 16:46:31 2017 -0800

----------------------------------------------------------------------
 .../main/scala/kafka/server/KafkaConfig.scala   |  6 +-
 .../main/scala/kafka/server/KafkaServer.scala   |  2 +-
 .../scala/kafka/zookeeper/ZooKeeperClient.scala | 23 ++++--
 .../unit/kafka/server/KafkaConfigTest.scala     |  1 +
 .../scala/unit/kafka/zk/KafkaZkClientTest.scala |  6 +-
 .../kafka/zookeeper/ZooKeeperClientTest.scala   | 84 ++++++++++++++------
 6 files changed, 87 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/58138126/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index d336499..e24659c 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -44,6 +44,7 @@ object Defaults {
   val ZkSessionTimeoutMs = 6000
   val ZkSyncTimeMs = 2000
   val ZkEnableSecureAcls = false
+  val ZkMaxInFlightRequests = 10
 
   /** ********* General Configuration ***********/
   val BrokerIdGenerationEnable = true
@@ -231,6 +232,7 @@ object KafkaConfig {
   val ZkConnectionTimeoutMsProp = "zookeeper.connection.timeout.ms"
   val ZkSyncTimeMsProp = "zookeeper.sync.time.ms"
   val ZkEnableSecureAclsProp = "zookeeper.set.acl"
+  val ZkMaxInFlightRequestsProp = "zookeeper.max.in.flight.requests"
   /** ********* General Configuration ***********/
   val BrokerIdGenerationEnableProp = "broker.id.generation.enable"
   val MaxReservedBrokerIdProp = "reserved.broker.max.id"
@@ -418,6 +420,7 @@ object KafkaConfig {
   val ZkConnectionTimeoutMsDoc = "The max time that the client waits to establish a connection to zookeeper. If not set, the value in " + ZkSessionTimeoutMsProp + " is used"
   val ZkSyncTimeMsDoc = "How far a ZK follower can be behind a ZK leader"
   val ZkEnableSecureAclsDoc = "Set client to use secure ACLs"
+  val ZkMaxInFlightRequestsDoc = "The maximum number of unacknowledged requests the client will send to Zookeeper before blocking."
   /** ********* General Configuration ***********/
   val BrokerIdGenerationEnableDoc = s"Enable automatic broker id generation on the server. When enabled the value configured for $MaxReservedBrokerIdProp should be reviewed."
   val MaxReservedBrokerIdDoc = "Max number that can be used for a broker.id"
@@ -695,6 +698,7 @@ object KafkaConfig {
       .define(ZkConnectionTimeoutMsProp, INT, null, HIGH, ZkConnectionTimeoutMsDoc)
       .define(ZkSyncTimeMsProp, INT, Defaults.ZkSyncTimeMs, LOW, ZkSyncTimeMsDoc)
       .define(ZkEnableSecureAclsProp, BOOLEAN, Defaults.ZkEnableSecureAcls, HIGH, ZkEnableSecureAclsDoc)
+      .define(ZkMaxInFlightRequestsProp, INT, Defaults.ZkMaxInFlightRequests, atLeast(1), HIGH, ZkMaxInFlightRequestsDoc)
 
       /** ********* General Configuration ***********/
       .define(BrokerIdGenerationEnableProp, BOOLEAN, Defaults.BrokerIdGenerationEnable, MEDIUM, BrokerIdGenerationEnableDoc)
@@ -916,7 +920,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends Abstra
     Option(getInt(KafkaConfig.ZkConnectionTimeoutMsProp)).map(_.toInt).getOrElse(getInt(KafkaConfig.ZkSessionTimeoutMsProp))
   val zkSyncTimeMs: Int = getInt(KafkaConfig.ZkSyncTimeMsProp)
   val zkEnableSecureAcls: Boolean = getBoolean(KafkaConfig.ZkEnableSecureAclsProp)
-
+  val zkMaxInFlightRequests: Int = getInt(KafkaConfig.ZkMaxInFlightRequestsProp)
   /** ********* General Configuration ***********/
   val brokerIdGenerationEnable: Boolean = getBoolean(KafkaConfig.BrokerIdGenerationEnableProp)
   val maxReservedBrokerId: Int = getInt(KafkaConfig.MaxReservedBrokerIdProp)

http://git-wip-us.apache.org/repos/asf/kafka/blob/58138126/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index f8111ff..dff83db 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -222,7 +222,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
         logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size)
 
         val zooKeeperClient = new ZooKeeperClient(config.zkConnect, config.zkSessionTimeoutMs,
-          config.zkConnectionTimeoutMs, new StateChangeHandler {
+          config.zkConnectionTimeoutMs, config.zkMaxInFlightRequests, new StateChangeHandler {
             override def onReconnectionTimeout(): Unit = {
               error("Reconnection timeout.")
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/58138126/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
index 0ff34c0..149e7eb 100644
--- a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
+++ b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
@@ -18,7 +18,7 @@
 package kafka.zookeeper
 
 import java.util.concurrent.locks.{ReentrantLock, ReentrantReadWriteLock}
-import java.util.concurrent.{ArrayBlockingQueue, ConcurrentHashMap, CountDownLatch, TimeUnit}
+import java.util.concurrent.{ArrayBlockingQueue, ConcurrentHashMap, CountDownLatch, Semaphore, TimeUnit}
 
 import kafka.utils.CoreUtils.{inLock, inReadLock, inWriteLock}
 import kafka.utils.Logging
@@ -37,9 +37,13 @@ import scala.collection.JavaConverters._
  * @param connectString comma separated host:port pairs, each corresponding to a zk server
  * @param sessionTimeoutMs session timeout in milliseconds
  * @param connectionTimeoutMs connection timeout in milliseconds
+ * @param maxInFlightRequests maximum number of unacknowledged requests the client will send before blocking.
  * @param stateChangeHandler state change handler callbacks called by the underlying zookeeper client's EventThread.
  */
-class ZooKeeperClient(connectString: String, sessionTimeoutMs: Int, connectionTimeoutMs: Int,
+class ZooKeeperClient(connectString: String,
+                      sessionTimeoutMs: Int,
+                      connectionTimeoutMs: Int,
+                      maxInFlightRequests: Int,
                       stateChangeHandler: StateChangeHandler) extends Logging {
   this.logIdent = "[ZooKeeperClient] "
   private val initializationLock = new ReentrantReadWriteLock()
@@ -47,6 +51,7 @@ class ZooKeeperClient(connectString: String, sessionTimeoutMs: Int, connectionTi
   private val isConnectedOrExpiredCondition = isConnectedOrExpiredLock.newCondition()
   private val zNodeChangeHandlers = new ConcurrentHashMap[String, ZNodeChangeHandler]().asScala
   private val zNodeChildChangeHandlers = new ConcurrentHashMap[String, ZNodeChildChangeHandler]().asScala
+  private val inFlightRequests = new Semaphore(maxInFlightRequests)
 
   info(s"Initializing a new session to $connectString.")
   @volatile private var zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, ZooKeeperClientWatcher)
@@ -81,9 +86,17 @@ class ZooKeeperClient(connectString: String, sessionTimeoutMs: Int, connectionTi
       val responseQueue = new ArrayBlockingQueue[Req#Response](requests.size)
 
       requests.foreach { request =>
-        send(request) { response =>
-          responseQueue.add(response)
-          countDownLatch.countDown()
+        inFlightRequests.acquire()
+        try {
+          send(request) { response =>
+            responseQueue.add(response)
+            inFlightRequests.release()
+            countDownLatch.countDown()
+          }
+        } catch {
+          case e: Throwable =>
+            inFlightRequests.release()
+            throw e
         }
       }
       countDownLatch.await()

http://git-wip-us.apache.org/repos/asf/kafka/blob/58138126/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index fabd9f7..9c459d8 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -539,6 +539,7 @@ class KafkaConfigTest {
         case KafkaConfig.ZkConnectionTimeoutMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
         case KafkaConfig.ZkSyncTimeMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
         case KafkaConfig.ZkEnableSecureAclsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_boolean")
+        case KafkaConfig.ZkMaxInFlightRequestsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
 
         case KafkaConfig.BrokerIdProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
         case KafkaConfig.NumNetworkThreadsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")

http://git-wip-us.apache.org/repos/asf/kafka/blob/58138126/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
index 8d064f8..77ac748 100644
--- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
@@ -17,7 +17,7 @@
 package kafka.zk
 
 import kafka.common.TopicAndPartition
-import kafka.utils.ZkUtils
+import kafka.server.Defaults
 import kafka.zookeeper.ZooKeeperClient
 import org.apache.kafka.common.TopicPartition
 import org.junit.Assert.{assertEquals, assertFalse, assertTrue}
@@ -36,7 +36,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
   @Before
   override def setUp() {
     super.setUp()
-    zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
+    zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Defaults.ZkMaxInFlightRequests, null)
     zkClient = new KafkaZkClient(zooKeeperClient, false)
   }
 
@@ -100,7 +100,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
     assertTrue(zkClient.getTopicPartitionCount(topic).isEmpty)
 
     // create a topic path
-    zkClient.createRecursive(ZkUtils.getTopicPath(topic))
+    zkClient.createRecursive(TopicZNode.path(topic))
 
     val assignment = new mutable.HashMap[TopicAndPartition, Seq[Int]]()
     assignment.put(new TopicAndPartition(topic, 0), Seq(0,1))

http://git-wip-us.apache.org/repos/asf/kafka/blob/58138126/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
index d595221..50a065f 100644
--- a/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
@@ -19,7 +19,8 @@ package kafka.zookeeper
 import java.net.UnknownHostException
 import java.nio.charset.StandardCharsets
 import java.util.UUID
-import java.util.concurrent.{CountDownLatch, TimeUnit}
+import java.util.concurrent.atomic.AtomicBoolean
+import java.util.concurrent.{ArrayBlockingQueue, CountDownLatch, TimeUnit}
 import javax.security.auth.login.Configuration
 
 import kafka.zk.ZooKeeperTestHarness
@@ -41,23 +42,23 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
 
   @Test(expected = classOf[UnknownHostException])
   def testUnresolvableConnectString(): Unit = {
-    new ZooKeeperClient("some.invalid.hostname.foo.bar.local", -1, -1, null)
+    new ZooKeeperClient("some.invalid.hostname.foo.bar.local", -1, -1, Int.MaxValue, null)
   }
 
   @Test(expected = classOf[ZooKeeperClientTimeoutException])
   def testConnectionTimeout(): Unit = {
     zookeeper.shutdown()
-    new ZooKeeperClient(zkConnect, zkSessionTimeout, connectionTimeoutMs = 100, null)
+    new ZooKeeperClient(zkConnect, zkSessionTimeout, connectionTimeoutMs = 100, Int.MaxValue, null)
   }
 
   @Test
   def testConnection(): Unit = {
-    new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
+    new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, null)
   }
 
   @Test
   def testDeleteNonExistentZNode(): Unit = {
-    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
+    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, null)
     val deleteResponse = zooKeeperClient.handleRequest(DeleteRequest(mockPath, -1))
     assertEquals("Response code should be NONODE", Code.NONODE, deleteResponse.resultCode)
   }
@@ -65,7 +66,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
   @Test
   def testDeleteExistingZNode(): Unit = {
     import scala.collection.JavaConverters._
-    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
+    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, null)
     val createResponse = zooKeeperClient.handleRequest(CreateRequest(mockPath, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
     assertEquals("Response code for create should be OK", Code.OK, createResponse.resultCode)
     val deleteResponse = zooKeeperClient.handleRequest(DeleteRequest(mockPath, -1))
@@ -74,7 +75,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
 
   @Test
   def testExistsNonExistentZNode(): Unit = {
-    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
+    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, null)
     val existsResponse = zooKeeperClient.handleRequest(ExistsRequest(mockPath))
     assertEquals("Response code should be NONODE", Code.NONODE, existsResponse.resultCode)
   }
@@ -82,7 +83,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
   @Test
   def testExistsExistingZNode(): Unit = {
     import scala.collection.JavaConverters._
-    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
+    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, null)
     val createResponse = zooKeeperClient.handleRequest(CreateRequest(mockPath, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
     assertEquals("Response code for create should be OK", Code.OK, createResponse.resultCode)
     val existsResponse = zooKeeperClient.handleRequest(ExistsRequest(mockPath))
@@ -91,7 +92,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
 
   @Test
   def testGetDataNonExistentZNode(): Unit = {
-    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
+    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, null)
     val getDataResponse = zooKeeperClient.handleRequest(GetDataRequest(mockPath))
     assertEquals("Response code should be NONODE", Code.NONODE, getDataResponse.resultCode)
   }
@@ -100,7 +101,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
   def testGetDataExistingZNode(): Unit = {
     import scala.collection.JavaConverters._
     val data = bytes
-    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
+    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, null)
     val createResponse = zooKeeperClient.handleRequest(CreateRequest(mockPath, data, ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala,
       CreateMode.PERSISTENT))
     assertEquals("Response code for create should be OK", Code.OK, createResponse.resultCode)
@@ -111,7 +112,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
 
   @Test
   def testSetDataNonExistentZNode(): Unit = {
-    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
+    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, null)
     val setDataResponse = zooKeeperClient.handleRequest(SetDataRequest(mockPath, Array.empty[Byte], -1))
     assertEquals("Response code should be NONODE", Code.NONODE, setDataResponse.resultCode)
   }
@@ -120,7 +121,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
   def testSetDataExistingZNode(): Unit = {
     import scala.collection.JavaConverters._
     val data = bytes
-    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
+    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, null)
     val createResponse = zooKeeperClient.handleRequest(CreateRequest(mockPath, Array.empty[Byte],
       ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
     assertEquals("Response code for create should be OK", Code.OK, createResponse.resultCode)
@@ -133,7 +134,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
 
   @Test
   def testGetAclNonExistentZNode(): Unit = {
-    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
+    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, null)
     val getAclResponse = zooKeeperClient.handleRequest(GetAclRequest(mockPath))
     assertEquals("Response code should be NONODE", Code.NONODE, getAclResponse.resultCode)
   }
@@ -141,7 +142,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
   @Test
   def testGetAclExistingZNode(): Unit = {
     import scala.collection.JavaConverters._
-    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
+    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, null)
     val createResponse = zooKeeperClient.handleRequest(CreateRequest(mockPath, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
     assertEquals("Response code for create should be OK", Code.OK, createResponse.resultCode)
     val getAclResponse = zooKeeperClient.handleRequest(GetAclRequest(mockPath))
@@ -152,14 +153,14 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
   @Test
   def testSetAclNonExistentZNode(): Unit = {
     import scala.collection.JavaConverters._
-    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
+    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, null)
     val setAclResponse = zooKeeperClient.handleRequest(SetAclRequest(mockPath, ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, -1))
     assertEquals("Response code should be NONODE", Code.NONODE, setAclResponse.resultCode)
   }
 
   @Test
   def testGetChildrenNonExistentZNode(): Unit = {
-    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
+    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, null)
     val getChildrenResponse = zooKeeperClient.handleRequest(GetChildrenRequest(mockPath))
     assertEquals("Response code should be NONODE", Code.NONODE, getChildrenResponse.resultCode)
   }
@@ -167,7 +168,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
   @Test
   def testGetChildrenExistingZNode(): Unit = {
     import scala.collection.JavaConverters._
-    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
+    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, null)
     val createResponse = zooKeeperClient.handleRequest(CreateRequest(mockPath, Array.empty[Byte],
       ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
     assertEquals("Response code for create should be OK", Code.OK, createResponse.resultCode)
@@ -183,7 +184,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
     val child2 = "child2"
     val child1Path = mockPath + "/" + child1
     val child2Path = mockPath + "/" + child2
-    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
+    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, null)
     val createResponse = zooKeeperClient.handleRequest(CreateRequest(mockPath, Array.empty[Byte],
       ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
     assertEquals("Response code for create should be OK", Code.OK, createResponse.resultCode)
@@ -202,7 +203,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
   @Test
   def testPipelinedGetData(): Unit = {
     import scala.collection.JavaConverters._
-    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
+    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, null)
     val createRequests = (1 to 3).map(x => CreateRequest("/" + x, (x * 2).toString.getBytes, ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
     val createResponses = createRequests.map(zooKeeperClient.handleRequest)
     createResponses.foreach(createResponse => assertEquals("Response code for create should be OK", Code.OK, createResponse.resultCode))
@@ -219,7 +220,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
   @Test
   def testMixedPipeline(): Unit = {
     import scala.collection.JavaConverters._
-    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
+    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, null)
     val createResponse = zooKeeperClient.handleRequest(CreateRequest(mockPath, Array.empty[Byte],
       ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
     assertEquals("Response code for create should be OK", Code.OK, createResponse.resultCode)
@@ -234,7 +235,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
   @Test
   def testZNodeChangeHandlerForCreation(): Unit = {
     import scala.collection.JavaConverters._
-    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
+    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, null)
     val znodeChangeHandlerCountDownLatch = new CountDownLatch(1)
     val zNodeChangeHandler = new ZNodeChangeHandler {
       override def handleCreation(): Unit = {
@@ -255,7 +256,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
   @Test
   def testZNodeChangeHandlerForDeletion(): Unit = {
     import scala.collection.JavaConverters._
-    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
+    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, null)
     val znodeChangeHandlerCountDownLatch = new CountDownLatch(1)
     val zNodeChangeHandler = new ZNodeChangeHandler {
       override def handleDeletion(): Unit = {
@@ -278,7 +279,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
   @Test
   def testZNodeChangeHandlerForDataChange(): Unit = {
     import scala.collection.JavaConverters._
-    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
+    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, null)
     val znodeChangeHandlerCountDownLatch = new CountDownLatch(1)
     val zNodeChangeHandler = new ZNodeChangeHandler {
       override def handleDataChange(): Unit = {
@@ -301,7 +302,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
   @Test
   def testZNodeChildChangeHandlerForChildChange(): Unit = {
     import scala.collection.JavaConverters._
-    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
+    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, null)
     val zNodeChildChangeHandlerCountDownLatch = new CountDownLatch(1)
     val zNodeChildChangeHandler = new ZNodeChildChangeHandler {
       override def handleChildChange(): Unit = {
@@ -331,9 +332,42 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
         stateChangeHandlerCountDownLatch.countDown()
       }
     }
-    new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, stateChangeHandler)
+    new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, stateChangeHandler)
     assertTrue("Failed to receive auth failed notification", stateChangeHandlerCountDownLatch.await(5, TimeUnit.SECONDS))
   }
 
+  @Test
+  def testConnectionLossRequestTermination(): Unit = {
+    val batchSize = 10
+    val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, 2, null)
+    zookeeper.shutdown()
+    val requests = (1 to batchSize).map(i => GetDataRequest(s"/$i"))
+    val countDownLatch = new CountDownLatch(1)
+    val running = new AtomicBoolean(true)
+    val unexpectedResponses = new ArrayBlockingQueue[GetDataResponse](batchSize)
+    val requestThread = new Thread {
+      override def run(): Unit = {
+        while (running.get()) {
+          val responses = zooKeeperClient.handleRequests(requests)
+          val suffix = responses.dropWhile(response => response.resultCode != Code.CONNECTIONLOSS)
+          if (!suffix.forall(response => response.resultCode == Code.CONNECTIONLOSS))
+            responses.foreach(unexpectedResponses.add)
+          if (!unexpectedResponses.isEmpty || suffix.nonEmpty)
+            running.set(false)
+        }
+        countDownLatch.countDown()
+      }
+    }
+    requestThread.start()
+    val requestThreadTerminated = countDownLatch.await(30, TimeUnit.SECONDS)
+    if (!requestThreadTerminated) {
+      running.set(false)
+      requestThread.join(5000)
+      fail("Failed to receive a CONNECTIONLOSS response code after zookeeper has shutdown.")
+    } else if (!unexpectedResponses.isEmpty) {
+      fail(s"Received an unexpected non-CONNECTIONLOSS response code after a CONNECTIONLOSS response code from a single batch: $unexpectedResponses")
+    }
+  }
+
   private def bytes = UUID.randomUUID().toString.getBytes(StandardCharsets.UTF_8)
 }