You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ok...@apache.org on 2017/11/07 00:48:54 UTC
kafka git commit: KAFKA-5894;
add the notion of max inflight requests to async ZooKeeperClient
Repository: kafka
Updated Branches:
refs/heads/trunk 6f96d7f17 -> 58138126c
KAFKA-5894; add the notion of max inflight requests to async ZooKeeperClient
ZooKeeperClient is a zookeeper client that encourages pipelined requests to zookeeper. We want to add the notion of max inflight requests to the client for several reasons:
1. to bound memory overhead associated with async requests on the client.
2. to not overwhelm the zookeeper ensemble with a burst of requests.
Author: Onur Karaman <ok...@linkedin.com>
Reviewers: Ismael Juma <is...@juma.me.uk>, Ted Yu <yu...@gmail.com>, Jun Rao <ju...@gmail.com>, Manikumar Reddy <ma...@gmail.com>
Closes #3860 from onurkaraman/KAFKA-5894
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/58138126
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/58138126
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/58138126
Branch: refs/heads/trunk
Commit: 58138126ce93efc48291fe444f554eabdf4a5609
Parents: 6f96d7f
Author: Onur Karaman <ok...@linkedin.com>
Authored: Mon Nov 6 16:46:31 2017 -0800
Committer: Onur Karaman <ok...@linkedin.com>
Committed: Mon Nov 6 16:46:31 2017 -0800
----------------------------------------------------------------------
.../main/scala/kafka/server/KafkaConfig.scala | 6 +-
.../main/scala/kafka/server/KafkaServer.scala | 2 +-
.../scala/kafka/zookeeper/ZooKeeperClient.scala | 23 ++++--
.../unit/kafka/server/KafkaConfigTest.scala | 1 +
.../scala/unit/kafka/zk/KafkaZkClientTest.scala | 6 +-
.../kafka/zookeeper/ZooKeeperClientTest.scala | 84 ++++++++++++++------
6 files changed, 87 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/58138126/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index d336499..e24659c 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -44,6 +44,7 @@ object Defaults {
val ZkSessionTimeoutMs = 6000
val ZkSyncTimeMs = 2000
val ZkEnableSecureAcls = false
+ val ZkMaxInFlightRequests = 10
/** ********* General Configuration ***********/
val BrokerIdGenerationEnable = true
@@ -231,6 +232,7 @@ object KafkaConfig {
val ZkConnectionTimeoutMsProp = "zookeeper.connection.timeout.ms"
val ZkSyncTimeMsProp = "zookeeper.sync.time.ms"
val ZkEnableSecureAclsProp = "zookeeper.set.acl"
+ val ZkMaxInFlightRequestsProp = "zookeeper.max.in.flight.requests"
/** ********* General Configuration ***********/
val BrokerIdGenerationEnableProp = "broker.id.generation.enable"
val MaxReservedBrokerIdProp = "reserved.broker.max.id"
@@ -418,6 +420,7 @@ object KafkaConfig {
val ZkConnectionTimeoutMsDoc = "The max time that the client waits to establish a connection to zookeeper. If not set, the value in " + ZkSessionTimeoutMsProp + " is used"
val ZkSyncTimeMsDoc = "How far a ZK follower can be behind a ZK leader"
val ZkEnableSecureAclsDoc = "Set client to use secure ACLs"
+ val ZkMaxInFlightRequestsDoc = "The maximum number of unacknowledged requests the client will send to Zookeeper before blocking."
/** ********* General Configuration ***********/
val BrokerIdGenerationEnableDoc = s"Enable automatic broker id generation on the server. When enabled the value configured for $MaxReservedBrokerIdProp should be reviewed."
val MaxReservedBrokerIdDoc = "Max number that can be used for a broker.id"
@@ -695,6 +698,7 @@ object KafkaConfig {
.define(ZkConnectionTimeoutMsProp, INT, null, HIGH, ZkConnectionTimeoutMsDoc)
.define(ZkSyncTimeMsProp, INT, Defaults.ZkSyncTimeMs, LOW, ZkSyncTimeMsDoc)
.define(ZkEnableSecureAclsProp, BOOLEAN, Defaults.ZkEnableSecureAcls, HIGH, ZkEnableSecureAclsDoc)
+ .define(ZkMaxInFlightRequestsProp, INT, Defaults.ZkMaxInFlightRequests, atLeast(1), HIGH, ZkMaxInFlightRequestsDoc)
/** ********* General Configuration ***********/
.define(BrokerIdGenerationEnableProp, BOOLEAN, Defaults.BrokerIdGenerationEnable, MEDIUM, BrokerIdGenerationEnableDoc)
@@ -916,7 +920,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends Abstra
Option(getInt(KafkaConfig.ZkConnectionTimeoutMsProp)).map(_.toInt).getOrElse(getInt(KafkaConfig.ZkSessionTimeoutMsProp))
val zkSyncTimeMs: Int = getInt(KafkaConfig.ZkSyncTimeMsProp)
val zkEnableSecureAcls: Boolean = getBoolean(KafkaConfig.ZkEnableSecureAclsProp)
-
+ val zkMaxInFlightRequests: Int = getInt(KafkaConfig.ZkMaxInFlightRequestsProp)
/** ********* General Configuration ***********/
val brokerIdGenerationEnable: Boolean = getBoolean(KafkaConfig.BrokerIdGenerationEnableProp)
val maxReservedBrokerId: Int = getInt(KafkaConfig.MaxReservedBrokerIdProp)
http://git-wip-us.apache.org/repos/asf/kafka/blob/58138126/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index f8111ff..dff83db 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -222,7 +222,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size)
val zooKeeperClient = new ZooKeeperClient(config.zkConnect, config.zkSessionTimeoutMs,
- config.zkConnectionTimeoutMs, new StateChangeHandler {
+ config.zkConnectionTimeoutMs, config.zkMaxInFlightRequests, new StateChangeHandler {
override def onReconnectionTimeout(): Unit = {
error("Reconnection timeout.")
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/58138126/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
index 0ff34c0..149e7eb 100644
--- a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
+++ b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
@@ -18,7 +18,7 @@
package kafka.zookeeper
import java.util.concurrent.locks.{ReentrantLock, ReentrantReadWriteLock}
-import java.util.concurrent.{ArrayBlockingQueue, ConcurrentHashMap, CountDownLatch, TimeUnit}
+import java.util.concurrent.{ArrayBlockingQueue, ConcurrentHashMap, CountDownLatch, Semaphore, TimeUnit}
import kafka.utils.CoreUtils.{inLock, inReadLock, inWriteLock}
import kafka.utils.Logging
@@ -37,9 +37,13 @@ import scala.collection.JavaConverters._
* @param connectString comma separated host:port pairs, each corresponding to a zk server
* @param sessionTimeoutMs session timeout in milliseconds
* @param connectionTimeoutMs connection timeout in milliseconds
+ * @param maxInFlightRequests maximum number of unacknowledged requests the client will send before blocking.
* @param stateChangeHandler state change handler callbacks called by the underlying zookeeper client's EventThread.
*/
-class ZooKeeperClient(connectString: String, sessionTimeoutMs: Int, connectionTimeoutMs: Int,
+class ZooKeeperClient(connectString: String,
+ sessionTimeoutMs: Int,
+ connectionTimeoutMs: Int,
+ maxInFlightRequests: Int,
stateChangeHandler: StateChangeHandler) extends Logging {
this.logIdent = "[ZooKeeperClient] "
private val initializationLock = new ReentrantReadWriteLock()
@@ -47,6 +51,7 @@ class ZooKeeperClient(connectString: String, sessionTimeoutMs: Int, connectionTi
private val isConnectedOrExpiredCondition = isConnectedOrExpiredLock.newCondition()
private val zNodeChangeHandlers = new ConcurrentHashMap[String, ZNodeChangeHandler]().asScala
private val zNodeChildChangeHandlers = new ConcurrentHashMap[String, ZNodeChildChangeHandler]().asScala
+ private val inFlightRequests = new Semaphore(maxInFlightRequests)
info(s"Initializing a new session to $connectString.")
@volatile private var zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, ZooKeeperClientWatcher)
@@ -81,9 +86,17 @@ class ZooKeeperClient(connectString: String, sessionTimeoutMs: Int, connectionTi
val responseQueue = new ArrayBlockingQueue[Req#Response](requests.size)
requests.foreach { request =>
- send(request) { response =>
- responseQueue.add(response)
- countDownLatch.countDown()
+ inFlightRequests.acquire()
+ try {
+ send(request) { response =>
+ responseQueue.add(response)
+ inFlightRequests.release()
+ countDownLatch.countDown()
+ }
+ } catch {
+ case e: Throwable =>
+ inFlightRequests.release()
+ throw e
}
}
countDownLatch.await()
http://git-wip-us.apache.org/repos/asf/kafka/blob/58138126/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index fabd9f7..9c459d8 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -539,6 +539,7 @@ class KafkaConfigTest {
case KafkaConfig.ZkConnectionTimeoutMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.ZkSyncTimeMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.ZkEnableSecureAclsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_boolean")
+ case KafkaConfig.ZkMaxInFlightRequestsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
case KafkaConfig.BrokerIdProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.NumNetworkThreadsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
http://git-wip-us.apache.org/repos/asf/kafka/blob/58138126/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
index 8d064f8..77ac748 100644
--- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
@@ -17,7 +17,7 @@
package kafka.zk
import kafka.common.TopicAndPartition
-import kafka.utils.ZkUtils
+import kafka.server.Defaults
import kafka.zookeeper.ZooKeeperClient
import org.apache.kafka.common.TopicPartition
import org.junit.Assert.{assertEquals, assertFalse, assertTrue}
@@ -36,7 +36,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
@Before
override def setUp() {
super.setUp()
- zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
+ zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Defaults.ZkMaxInFlightRequests, null)
zkClient = new KafkaZkClient(zooKeeperClient, false)
}
@@ -100,7 +100,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
assertTrue(zkClient.getTopicPartitionCount(topic).isEmpty)
// create a topic path
- zkClient.createRecursive(ZkUtils.getTopicPath(topic))
+ zkClient.createRecursive(TopicZNode.path(topic))
val assignment = new mutable.HashMap[TopicAndPartition, Seq[Int]]()
assignment.put(new TopicAndPartition(topic, 0), Seq(0,1))
http://git-wip-us.apache.org/repos/asf/kafka/blob/58138126/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
index d595221..50a065f 100644
--- a/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
@@ -19,7 +19,8 @@ package kafka.zookeeper
import java.net.UnknownHostException
import java.nio.charset.StandardCharsets
import java.util.UUID
-import java.util.concurrent.{CountDownLatch, TimeUnit}
+import java.util.concurrent.atomic.AtomicBoolean
+import java.util.concurrent.{ArrayBlockingQueue, CountDownLatch, TimeUnit}
import javax.security.auth.login.Configuration
import kafka.zk.ZooKeeperTestHarness
@@ -41,23 +42,23 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
@Test(expected = classOf[UnknownHostException])
def testUnresolvableConnectString(): Unit = {
- new ZooKeeperClient("some.invalid.hostname.foo.bar.local", -1, -1, null)
+ new ZooKeeperClient("some.invalid.hostname.foo.bar.local", -1, -1, Int.MaxValue, null)
}
@Test(expected = classOf[ZooKeeperClientTimeoutException])
def testConnectionTimeout(): Unit = {
zookeeper.shutdown()
- new ZooKeeperClient(zkConnect, zkSessionTimeout, connectionTimeoutMs = 100, null)
+ new ZooKeeperClient(zkConnect, zkSessionTimeout, connectionTimeoutMs = 100, Int.MaxValue, null)
}
@Test
def testConnection(): Unit = {
- new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
+ new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, null)
}
@Test
def testDeleteNonExistentZNode(): Unit = {
- val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
+ val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, null)
val deleteResponse = zooKeeperClient.handleRequest(DeleteRequest(mockPath, -1))
assertEquals("Response code should be NONODE", Code.NONODE, deleteResponse.resultCode)
}
@@ -65,7 +66,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
@Test
def testDeleteExistingZNode(): Unit = {
import scala.collection.JavaConverters._
- val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
+ val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, null)
val createResponse = zooKeeperClient.handleRequest(CreateRequest(mockPath, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
assertEquals("Response code for create should be OK", Code.OK, createResponse.resultCode)
val deleteResponse = zooKeeperClient.handleRequest(DeleteRequest(mockPath, -1))
@@ -74,7 +75,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
@Test
def testExistsNonExistentZNode(): Unit = {
- val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
+ val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, null)
val existsResponse = zooKeeperClient.handleRequest(ExistsRequest(mockPath))
assertEquals("Response code should be NONODE", Code.NONODE, existsResponse.resultCode)
}
@@ -82,7 +83,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
@Test
def testExistsExistingZNode(): Unit = {
import scala.collection.JavaConverters._
- val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
+ val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, null)
val createResponse = zooKeeperClient.handleRequest(CreateRequest(mockPath, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
assertEquals("Response code for create should be OK", Code.OK, createResponse.resultCode)
val existsResponse = zooKeeperClient.handleRequest(ExistsRequest(mockPath))
@@ -91,7 +92,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
@Test
def testGetDataNonExistentZNode(): Unit = {
- val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
+ val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, null)
val getDataResponse = zooKeeperClient.handleRequest(GetDataRequest(mockPath))
assertEquals("Response code should be NONODE", Code.NONODE, getDataResponse.resultCode)
}
@@ -100,7 +101,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
def testGetDataExistingZNode(): Unit = {
import scala.collection.JavaConverters._
val data = bytes
- val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
+ val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, null)
val createResponse = zooKeeperClient.handleRequest(CreateRequest(mockPath, data, ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala,
CreateMode.PERSISTENT))
assertEquals("Response code for create should be OK", Code.OK, createResponse.resultCode)
@@ -111,7 +112,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
@Test
def testSetDataNonExistentZNode(): Unit = {
- val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
+ val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, null)
val setDataResponse = zooKeeperClient.handleRequest(SetDataRequest(mockPath, Array.empty[Byte], -1))
assertEquals("Response code should be NONODE", Code.NONODE, setDataResponse.resultCode)
}
@@ -120,7 +121,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
def testSetDataExistingZNode(): Unit = {
import scala.collection.JavaConverters._
val data = bytes
- val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
+ val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, null)
val createResponse = zooKeeperClient.handleRequest(CreateRequest(mockPath, Array.empty[Byte],
ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
assertEquals("Response code for create should be OK", Code.OK, createResponse.resultCode)
@@ -133,7 +134,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
@Test
def testGetAclNonExistentZNode(): Unit = {
- val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
+ val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, null)
val getAclResponse = zooKeeperClient.handleRequest(GetAclRequest(mockPath))
assertEquals("Response code should be NONODE", Code.NONODE, getAclResponse.resultCode)
}
@@ -141,7 +142,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
@Test
def testGetAclExistingZNode(): Unit = {
import scala.collection.JavaConverters._
- val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
+ val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, null)
val createResponse = zooKeeperClient.handleRequest(CreateRequest(mockPath, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
assertEquals("Response code for create should be OK", Code.OK, createResponse.resultCode)
val getAclResponse = zooKeeperClient.handleRequest(GetAclRequest(mockPath))
@@ -152,14 +153,14 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
@Test
def testSetAclNonExistentZNode(): Unit = {
import scala.collection.JavaConverters._
- val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
+ val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, null)
val setAclResponse = zooKeeperClient.handleRequest(SetAclRequest(mockPath, ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, -1))
assertEquals("Response code should be NONODE", Code.NONODE, setAclResponse.resultCode)
}
@Test
def testGetChildrenNonExistentZNode(): Unit = {
- val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
+ val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, null)
val getChildrenResponse = zooKeeperClient.handleRequest(GetChildrenRequest(mockPath))
assertEquals("Response code should be NONODE", Code.NONODE, getChildrenResponse.resultCode)
}
@@ -167,7 +168,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
@Test
def testGetChildrenExistingZNode(): Unit = {
import scala.collection.JavaConverters._
- val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
+ val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, null)
val createResponse = zooKeeperClient.handleRequest(CreateRequest(mockPath, Array.empty[Byte],
ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
assertEquals("Response code for create should be OK", Code.OK, createResponse.resultCode)
@@ -183,7 +184,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
val child2 = "child2"
val child1Path = mockPath + "/" + child1
val child2Path = mockPath + "/" + child2
- val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
+ val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, null)
val createResponse = zooKeeperClient.handleRequest(CreateRequest(mockPath, Array.empty[Byte],
ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
assertEquals("Response code for create should be OK", Code.OK, createResponse.resultCode)
@@ -202,7 +203,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
@Test
def testPipelinedGetData(): Unit = {
import scala.collection.JavaConverters._
- val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
+ val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, null)
val createRequests = (1 to 3).map(x => CreateRequest("/" + x, (x * 2).toString.getBytes, ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
val createResponses = createRequests.map(zooKeeperClient.handleRequest)
createResponses.foreach(createResponse => assertEquals("Response code for create should be OK", Code.OK, createResponse.resultCode))
@@ -219,7 +220,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
@Test
def testMixedPipeline(): Unit = {
import scala.collection.JavaConverters._
- val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
+ val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, null)
val createResponse = zooKeeperClient.handleRequest(CreateRequest(mockPath, Array.empty[Byte],
ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT))
assertEquals("Response code for create should be OK", Code.OK, createResponse.resultCode)
@@ -234,7 +235,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
@Test
def testZNodeChangeHandlerForCreation(): Unit = {
import scala.collection.JavaConverters._
- val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
+ val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, null)
val znodeChangeHandlerCountDownLatch = new CountDownLatch(1)
val zNodeChangeHandler = new ZNodeChangeHandler {
override def handleCreation(): Unit = {
@@ -255,7 +256,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
@Test
def testZNodeChangeHandlerForDeletion(): Unit = {
import scala.collection.JavaConverters._
- val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
+ val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, null)
val znodeChangeHandlerCountDownLatch = new CountDownLatch(1)
val zNodeChangeHandler = new ZNodeChangeHandler {
override def handleDeletion(): Unit = {
@@ -278,7 +279,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
@Test
def testZNodeChangeHandlerForDataChange(): Unit = {
import scala.collection.JavaConverters._
- val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
+ val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, null)
val znodeChangeHandlerCountDownLatch = new CountDownLatch(1)
val zNodeChangeHandler = new ZNodeChangeHandler {
override def handleDataChange(): Unit = {
@@ -301,7 +302,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
@Test
def testZNodeChildChangeHandlerForChildChange(): Unit = {
import scala.collection.JavaConverters._
- val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
+ val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, null)
val zNodeChildChangeHandlerCountDownLatch = new CountDownLatch(1)
val zNodeChildChangeHandler = new ZNodeChildChangeHandler {
override def handleChildChange(): Unit = {
@@ -331,9 +332,42 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
stateChangeHandlerCountDownLatch.countDown()
}
}
- new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, stateChangeHandler)
+ new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, Int.MaxValue, stateChangeHandler)
assertTrue("Failed to receive auth failed notification", stateChangeHandlerCountDownLatch.await(5, TimeUnit.SECONDS))
}
+ @Test
+ def testConnectionLossRequestTermination(): Unit = {
+ val batchSize = 10
+ val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, 2, null)
+ zookeeper.shutdown()
+ val requests = (1 to batchSize).map(i => GetDataRequest(s"/$i"))
+ val countDownLatch = new CountDownLatch(1)
+ val running = new AtomicBoolean(true)
+ val unexpectedResponses = new ArrayBlockingQueue[GetDataResponse](batchSize)
+ val requestThread = new Thread {
+ override def run(): Unit = {
+ while (running.get()) {
+ val responses = zooKeeperClient.handleRequests(requests)
+ val suffix = responses.dropWhile(response => response.resultCode != Code.CONNECTIONLOSS)
+ if (!suffix.forall(response => response.resultCode == Code.CONNECTIONLOSS))
+ responses.foreach(unexpectedResponses.add)
+ if (!unexpectedResponses.isEmpty || suffix.nonEmpty)
+ running.set(false)
+ }
+ countDownLatch.countDown()
+ }
+ }
+ requestThread.start()
+ val requestThreadTerminated = countDownLatch.await(30, TimeUnit.SECONDS)
+ if (!requestThreadTerminated) {
+ running.set(false)
+ requestThread.join(5000)
+ fail("Failed to receive a CONNECTIONLOSS response code after zookeeper has shutdown.")
+ } else if (!unexpectedResponses.isEmpty) {
+ fail(s"Received an unexpected non-CONNECTIONLOSS response code after a CONNECTIONLOSS response code from a single batch: $unexpectedResponses")
+ }
+ }
+
private def bytes = UUID.randomUUID().toString.getBytes(StandardCharsets.UTF_8)
}