You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2018/03/02 02:24:04 UTC
[kafka] branch 1.1 updated: KAFKA-6111: Improve test coverage of
KafkaZkClient, fix bugs found by new tests;
This is an automated email from the ASF dual-hosted git repository.
junrao pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/1.1 by this push:
new 873b28e KAFKA-6111: Improve test coverage of KafkaZkClient, fix bugs found by new tests;
873b28e is described below
commit 873b28ea95345510365959c94d0c5fbf9756ca86
Author: Sandor Murakozi <sm...@gmail.com>
AuthorDate: Thu Mar 1 18:12:52 2018 -0800
KAFKA-6111: Improve test coverage of KafkaZkClient, fix bugs found by new tests;
---
core/src/main/scala/kafka/zk/KafkaZkClient.scala | 5 +-
.../scala/unit/kafka/zk/KafkaZkClientTest.scala | 576 ++++++++++++++++++++-
2 files changed, 555 insertions(+), 26 deletions(-)
diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index 6f2e79e..a5b6e25 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -88,7 +88,8 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean
def updateBrokerInfoInZk(brokerInfo: BrokerInfo): Unit = {
val brokerIdPath = brokerInfo.path
val setDataRequest = SetDataRequest(brokerIdPath, brokerInfo.toJsonBytes, ZkVersion.NoVersion)
- retryRequestUntilConnected(setDataRequest)
+ val response = retryRequestUntilConnected(setDataRequest)
+ response.maybeThrow()
info("Updated broker %d at path %s with addresses: %s".format(brokerInfo.broker.id, brokerIdPath, brokerInfo.broker.endPoints))
}
@@ -424,7 +425,7 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean
def deleteLogDirEventNotifications(): Unit = {
val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(LogDirEventNotificationZNode.path))
if (getChildrenResponse.resultCode == Code.OK) {
- deleteLogDirEventNotifications(getChildrenResponse.children)
+ deleteLogDirEventNotifications(getChildrenResponse.children.map(LogDirEventNotificationSequenceZNode.sequenceNumber))
} else if (getChildrenResponse.resultCode != Code.NONODE) {
getChildrenResponse.maybeThrow
}
diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
index d3726c2..e44c2c9 100644
--- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
@@ -16,10 +16,11 @@
*/
package kafka.zk
-import java.util.{Properties, UUID}
+import java.util.{Collections, Properties, UUID}
import java.nio.charset.StandardCharsets.UTF_8
+import java.util.concurrent.{CountDownLatch, TimeUnit}
-import kafka.api.ApiVersion
+import kafka.api.{ApiVersion, LeaderAndIsr}
import kafka.cluster.{Broker, EndPoint}
import kafka.log.LogConfig
import kafka.security.auth._
@@ -29,17 +30,48 @@ import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.common.security.token.delegation.TokenInformation
-import org.apache.kafka.common.utils.SecurityUtils
-import org.apache.zookeeper.KeeperException.NodeExistsException
+import org.apache.kafka.common.utils.{SecurityUtils, Time}
+import org.apache.zookeeper.KeeperException.{Code, NoNodeException, NodeExistsException}
import org.junit.Assert._
-import org.junit.Test
-
+import org.junit.{After, Before, Test}
import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.{Seq, mutable}
import scala.util.Random
+import kafka.controller.LeaderIsrAndControllerEpoch
+import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult
+import kafka.zookeeper._
+import org.apache.kafka.common.security.JaasUtils
+import org.apache.zookeeper.data.Stat
+
class KafkaZkClientTest extends ZooKeeperTestHarness {
private val group = "my-group"
+ private val topic1 = "topic1"
+ private val topic2 = "topic2"
+
+ val topicPartition10 = new TopicPartition(topic1, 0)
+ val topicPartition11 = new TopicPartition(topic1, 1)
+ val topicPartition20 = new TopicPartition(topic2, 0)
+ val topicPartitions10_11 = Seq(topicPartition10, topicPartition11)
+
+ var otherZkClient: KafkaZkClient = _
+
+ @Before
+ override def setUp(): Unit = {
+ super.setUp()
+ otherZkClient = KafkaZkClient(zkConnect, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled), zkSessionTimeout,
+ zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM)
+ }
+
+ @After
+ override def tearDown(): Unit = {
+ if (otherZkClient != null)
+ otherZkClient.close()
+ super.tearDown()
+ }
+
private val topicPartition = new TopicPartition("topic", 0)
@Test
@@ -90,10 +122,10 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
@Test
def testTopicAssignmentMethods() {
- val topic1 = "topic1"
- val topic2 = "topic2"
+ assertTrue(zkClient.getAllTopicsInCluster.isEmpty)
// test with non-existing topic
+ assertFalse(zkClient.topicExists(topic1))
assertTrue(zkClient.getTopicPartitionCount(topic1).isEmpty)
assertTrue(zkClient.getPartitionAssignmentForTopics(Set(topic1)).isEmpty)
assertTrue(zkClient.getPartitionsForTopics(Set(topic1)).isEmpty)
@@ -108,6 +140,8 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
// create a topic assignment
zkClient.createTopicAssignment(topic1, assignment)
+ assertTrue(zkClient.topicExists(topic1))
+
val expectedAssignment = assignment map { topicAssignment =>
val partition = topicAssignment._1.partition
val assignment = topicAssignment._2
@@ -215,6 +249,43 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
}
@Test
+ def testIsrChangeNotificationGetters(): Unit = {
+ assertEquals("Failed for non existing parent ZK node", Seq.empty, zkClient.getAllIsrChangeNotifications)
+ assertEquals("Failed for non existing parent ZK node", Seq.empty, zkClient.getPartitionsFromIsrChangeNotifications(Seq("0000000000")))
+
+ zkClient.createRecursive("/isr_change_notification")
+
+ zkClient.propagateIsrChanges(Set(topicPartition10, topicPartition11))
+ zkClient.propagateIsrChanges(Set(topicPartition10))
+
+ assertEquals(Set("0000000000", "0000000001"), zkClient.getAllIsrChangeNotifications.toSet)
+
+ // A partition can have multiple notifications
+ assertEquals(Seq(topicPartition10, topicPartition11, topicPartition10),
+ zkClient.getPartitionsFromIsrChangeNotifications(Seq("0000000000", "0000000001")))
+ }
+
+ @Test
+ def testIsrChangeNotificationsDeletion(): Unit = {
+ // Should not fail even if parent node does not exist
+ zkClient.deleteIsrChangeNotifications(Seq("0000000000"))
+
+ zkClient.createRecursive("/isr_change_notification")
+
+ zkClient.propagateIsrChanges(Set(topicPartition10, topicPartition11))
+ zkClient.propagateIsrChanges(Set(topicPartition10))
+ zkClient.propagateIsrChanges(Set(topicPartition11))
+
+ zkClient.deleteIsrChangeNotifications(Seq("0000000001"))
+ // Should not fail if called on a non-existent notification
+ zkClient.deleteIsrChangeNotifications(Seq("0000000001"))
+
+ assertEquals(Set("0000000000", "0000000002"), zkClient.getAllIsrChangeNotifications.toSet)
+ zkClient.deleteIsrChangeNotifications()
+ assertEquals(Seq.empty,zkClient.getAllIsrChangeNotifications)
+ }
+
+ @Test
def testPropagateLogDir(): Unit = {
zkClient.createRecursive("/log_dir_event_notification")
@@ -238,6 +309,54 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
}
@Test
+ def testLogDirGetters(): Unit = {
+ assertEquals("getAllLogDirEventNotifications failed for non existing parent ZK node",
+ Seq.empty, zkClient.getAllLogDirEventNotifications)
+ assertEquals("getBrokerIdsFromLogDirEvents failed for non existing parent ZK node",
+ Seq.empty, zkClient.getBrokerIdsFromLogDirEvents(Seq("0000000000")))
+
+ zkClient.createRecursive("/log_dir_event_notification")
+
+ val brokerId = 3
+ zkClient.propagateLogDirEvent(brokerId)
+
+ assertEquals(Seq(3), zkClient.getBrokerIdsFromLogDirEvents(Seq("0000000000")))
+
+ zkClient.propagateLogDirEvent(brokerId)
+
+ val anotherBrokerId = 4
+ zkClient.propagateLogDirEvent(anotherBrokerId)
+
+ val notifications012 = Seq("0000000000", "0000000001", "0000000002")
+ assertEquals(notifications012.toSet, zkClient.getAllLogDirEventNotifications.toSet)
+ assertEquals(Seq(3, 3, 4), zkClient.getBrokerIdsFromLogDirEvents(notifications012))
+ }
+
+ @Test
+ def testLogDirEventNotificationsDeletion(): Unit = {
+ // Should not fail even if parent node does not exist
+ zkClient.deleteLogDirEventNotifications(Seq("0000000000", "0000000002"))
+
+ zkClient.createRecursive("/log_dir_event_notification")
+
+ val brokerId = 3
+ val anotherBrokerId = 4
+
+ zkClient.propagateLogDirEvent(brokerId)
+ zkClient.propagateLogDirEvent(brokerId)
+ zkClient.propagateLogDirEvent(anotherBrokerId)
+
+ zkClient.deleteLogDirEventNotifications(Seq("0000000000", "0000000002"))
+
+ assertEquals(Seq("0000000001"), zkClient.getAllLogDirEventNotifications)
+
+ zkClient.propagateLogDirEvent(anotherBrokerId)
+
+ zkClient.deleteLogDirEventNotifications()
+ assertEquals(Seq.empty, zkClient.getAllLogDirEventNotifications)
+ }
+
+ @Test
def testSetGetAndDeletePartitionReassignment() {
zkClient.createRecursive(AdminZNode.path)
@@ -377,10 +496,23 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
}
@Test
- def testDeleteTopicPathMethods() {
- val topic1 = "topic1"
- val topic2 = "topic2"
+ def testDeletePath(): Unit = {
+ val path = "/a/b/c"
+ zkClient.createRecursive(path)
+ zkClient.deletePath(path)
+ assertFalse(zkClient.pathExists(path))
+ }
+
+ @Test
+ def testDeleteTopicZNode(): Unit = {
+ zkClient.deleteTopicZNode(topic1)
+ zkClient.createRecursive(TopicZNode.path(topic1))
+ zkClient.deleteTopicZNode(topic1)
+ assertFalse(zkClient.pathExists(TopicZNode.path(topic1)))
+ }
+ @Test
+ def testDeleteTopicPathMethods() {
assertFalse(zkClient.isTopicMarkedForDeletion(topic1))
assertTrue(zkClient.getTopicDeletions.isEmpty)
@@ -394,18 +526,26 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
assertTrue(zkClient.getTopicDeletions.isEmpty)
}
+ private def assertPathExistenceAndData(expectedPath: String, data: String): Unit = {
+ assertTrue(zkClient.pathExists(expectedPath))
+ assertEquals(Some(data), dataAsString(expectedPath))
+ }
+
+ @Test
+ def testCreateTokenChangeNotification(): Unit = {
+ intercept[NoNodeException] {
+ zkClient.createTokenChangeNotification("delegationToken")
+ }
+ zkClient.createDelegationTokenPaths()
+
+ zkClient.createTokenChangeNotification("delegationToken")
+ assertPathExistenceAndData("/delegation_token/token_changes/token_change_0000000000", "delegationToken")
+ }
+
@Test
def testEntityConfigManagementMethods() {
- val topic1 = "topic1"
- val topic2 = "topic2"
-
assertTrue(zkClient.getEntityConfigs(ConfigType.Topic, topic1).isEmpty)
- val logProps = new Properties()
- logProps.put(LogConfig.SegmentBytesProp, "1024")
- logProps.put(LogConfig.SegmentIndexBytesProp, "1024")
- logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
-
zkClient.setOrCreateEntityConfigs(ConfigType.Topic, topic1, logProps)
assertEquals(logProps, zkClient.getEntityConfigs(ConfigType.Topic, topic1))
@@ -421,15 +561,399 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
}
@Test
- def testBrokerRegistrationMethods() {
+ def testCreateConfigChangeNotification(): Unit = {
+ intercept[NoNodeException] {
+ zkClient.createConfigChangeNotification(ConfigEntityZNode.path(ConfigType.Topic, topic1))
+ }
+
+ zkClient.createTopLevelPaths()
+ zkClient.createConfigChangeNotification(ConfigEntityZNode.path(ConfigType.Topic, topic1))
+
+ assertPathExistenceAndData(
+ "/config/changes/config_change_0000000000",
+ """{"version":2,"entity_path":"/config/topics/topic1"}""")
+ }
+
+ private def createLogProps(bytesProp: Int): Properties = {
+ val logProps = new Properties()
+ logProps.put(LogConfig.SegmentBytesProp, bytesProp.toString)
+ logProps.put(LogConfig.SegmentIndexBytesProp, bytesProp.toString)
+ logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
+ logProps
+ }
+
+ private val logProps = createLogProps(1024)
+
+ @Test
+ def testGetLogConfigs(): Unit = {
+ val emptyConfig = LogConfig(Collections.emptyMap())
+ assertEquals("Non existent config, no defaults",
+ (Map(topic1 -> emptyConfig), Map.empty),
+ zkClient.getLogConfigs(Seq(topic1), Collections.emptyMap()))
+
+ val logProps2 = createLogProps(2048)
+
+ zkClient.setOrCreateEntityConfigs(ConfigType.Topic, topic1, logProps)
+ assertEquals("One existing and one non-existent topic",
+ (Map(topic1 -> LogConfig(logProps), topic2 -> emptyConfig), Map.empty),
+ zkClient.getLogConfigs(Seq(topic1, topic2), Collections.emptyMap()))
+
+ zkClient.setOrCreateEntityConfigs(ConfigType.Topic, topic2, logProps2)
+ assertEquals("Two existing topics",
+ (Map(topic1 -> LogConfig(logProps), topic2 -> LogConfig(logProps2)), Map.empty),
+ zkClient.getLogConfigs(Seq(topic1, topic2), Collections.emptyMap()))
+
+ val logProps1WithMoreValues = createLogProps(1024)
+ logProps1WithMoreValues.put(LogConfig.SegmentJitterMsProp, "100")
+ logProps1WithMoreValues.put(LogConfig.SegmentBytesProp, "1024")
+
+ assertEquals("Config with defaults",
+ (Map(topic1 -> LogConfig(logProps1WithMoreValues)), Map.empty),
+ zkClient.getLogConfigs(Seq(topic1),
+ Map[String, AnyRef](LogConfig.SegmentJitterMsProp -> "100", LogConfig.SegmentBytesProp -> "128").asJava))
+ }
+
+ private def createBrokerInfo(id: Int, host: String, port: Int, securityProtocol: SecurityProtocol,
+ rack: Option[String] = None): BrokerInfo =
+ BrokerInfo(Broker(id, Seq(new EndPoint(host, port, ListenerName.forSecurityProtocol
+ (securityProtocol), securityProtocol)), rack = rack), ApiVersion.latestVersion, jmxPort = port + 10)
+
+ @Test
+ def testRegisterBrokerInfo(): Unit = {
zkClient.createTopLevelPaths()
- val brokerInfo = BrokerInfo(Broker(1,
- Seq(new EndPoint("test.host", 9999, ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), SecurityProtocol.PLAINTEXT)),
- rack = None), ApiVersion.latestVersion, jmxPort = 9998)
+ val brokerInfo = createBrokerInfo(1, "test.host", 9999, SecurityProtocol.PLAINTEXT)
+ val differentBrokerInfoWithSameId = createBrokerInfo(1, "test.host2", 9995, SecurityProtocol.SSL)
zkClient.registerBrokerInZk(brokerInfo)
assertEquals(Some(brokerInfo.broker), zkClient.getBroker(1))
+ assertEquals("Other ZK clients can read broker info", Some(brokerInfo.broker), otherZkClient.getBroker(1))
+
+ // Node exists, owned by current session - no error, no update
+ zkClient.registerBrokerInZk(differentBrokerInfoWithSameId)
+ assertEquals(Some(brokerInfo.broker), zkClient.getBroker(1))
+
+ // Other client tries to register broker with same id causes failure, info is not changed in ZK
+ intercept[NodeExistsException] {
+ otherZkClient.registerBrokerInZk(differentBrokerInfoWithSameId)
+ }
+ assertEquals(Some(brokerInfo.broker), zkClient.getBroker(1))
+ }
+
+ @Test
+ def testGetBrokerMethods(): Unit = {
+ zkClient.createTopLevelPaths()
+
+ assertEquals(Seq.empty,zkClient.getAllBrokersInCluster)
+ assertEquals(Seq.empty, zkClient.getSortedBrokerList())
+ assertEquals(None, zkClient.getBroker(0))
+
+ val brokerInfo0 = createBrokerInfo(0, "test.host0", 9998, SecurityProtocol.PLAINTEXT)
+ val brokerInfo1 = createBrokerInfo(1, "test.host1", 9999, SecurityProtocol.SSL)
+
+ zkClient.registerBrokerInZk(brokerInfo1)
+ otherZkClient.registerBrokerInZk(brokerInfo0)
+
+ assertEquals(Seq(0, 1), zkClient.getSortedBrokerList())
+ assertEquals(
+ Seq(brokerInfo0.broker, brokerInfo1.broker),
+ zkClient.getAllBrokersInCluster
+ )
+ assertEquals(Some(brokerInfo0.broker), zkClient.getBroker(0))
+ }
+
+ @Test
+ def testUpdateBrokerInfo(): Unit = {
+ zkClient.createTopLevelPaths()
+
+ // Updating info of a broker not existing in ZK fails
+ val originalBrokerInfo = createBrokerInfo(1, "test.host", 9999, SecurityProtocol.PLAINTEXT)
+ intercept[NoNodeException]{
+ zkClient.updateBrokerInfoInZk(originalBrokerInfo)
+ }
+
+ zkClient.registerBrokerInZk(originalBrokerInfo)
+
+ val updatedBrokerInfo = createBrokerInfo(1, "test.host2", 9995, SecurityProtocol.SSL)
+ zkClient.updateBrokerInfoInZk(updatedBrokerInfo)
+ assertEquals(Some(updatedBrokerInfo.broker), zkClient.getBroker(1))
+
+ // Other ZK clients can update info
+ otherZkClient.updateBrokerInfoInZk(originalBrokerInfo)
+ assertEquals(Some(originalBrokerInfo.broker), otherZkClient.getBroker(1))
+ }
+
+ private def statWithVersion(version: Int): Stat = {
+ val stat = new Stat(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)
+ stat.setVersion(version)
+ stat
+ }
+
+ private def leaderIsrAndControllerEpochs(state: Int, zkVersion: Int): Map[TopicPartition, LeaderIsrAndControllerEpoch] =
+ Map(
+ topicPartition10 -> LeaderIsrAndControllerEpoch(
+ LeaderAndIsr(leader = 1, leaderEpoch = state, isr = List(2 + state, 3 + state), zkVersion = zkVersion),
+ controllerEpoch = 4),
+ topicPartition11 -> LeaderIsrAndControllerEpoch(
+ LeaderAndIsr(leader = 0, leaderEpoch = state + 1, isr = List(1 + state, 2 + state), zkVersion = zkVersion),
+ controllerEpoch = 4))
+
+ val initialLeaderIsrAndControllerEpochs: Map[TopicPartition, LeaderIsrAndControllerEpoch] =
+ leaderIsrAndControllerEpochs(0, 0)
+
+ val initialLeaderIsrs: Map[TopicPartition, LeaderAndIsr] = initialLeaderIsrAndControllerEpochs.mapValues(_.leaderAndIsr)
+ private def leaderIsrs(state: Int, zkVersion: Int): Map[TopicPartition, LeaderAndIsr] =
+ leaderIsrAndControllerEpochs(state, zkVersion).mapValues(_.leaderAndIsr)
+
+ private def checkUpdateLeaderAndIsrResult(
+ expectedSuccessfulPartitions: Map[TopicPartition, LeaderAndIsr],
+ expectedPartitionsToRetry: Seq[TopicPartition],
+ expectedFailedPartitions: Map[TopicPartition, (Class[_], String)],
+ actualUpdateLeaderAndIsrResult: UpdateLeaderAndIsrResult): Unit = {
+ val failedPartitionsExcerpt =
+ actualUpdateLeaderAndIsrResult.failedPartitions.mapValues(e => (e.getClass, e.getMessage))
+ assertEquals("Permanently failed updates do not match expected",
+ expectedFailedPartitions, failedPartitionsExcerpt)
+ assertEquals("Retriable updates (due to BADVERSION) do not match expected",
+ expectedPartitionsToRetry, actualUpdateLeaderAndIsrResult.partitionsToRetry)
+ assertEquals("Successful updates do not match expected",
+ expectedSuccessfulPartitions, actualUpdateLeaderAndIsrResult.successfulPartitions)
+ }
+
+ @Test
+ def testUpdateLeaderAndIsr(): Unit = {
+ zkClient.createRecursive(TopicZNode.path(topic1))
+
+ // Non-existing topicPartitions
+ checkUpdateLeaderAndIsrResult(
+ Map.empty,
+ mutable.ArrayBuffer.empty,
+ Map(
+ topicPartition10 -> (classOf[NoNodeException], "KeeperErrorCode = NoNode for /brokers/topics/topic1/partitions/0/state"),
+ topicPartition11 -> (classOf[NoNodeException], "KeeperErrorCode = NoNode for /brokers/topics/topic1/partitions/1/state")),
+ zkClient.updateLeaderAndIsr(initialLeaderIsrs, controllerEpoch = 4))
+
+ zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs)
+
+ checkUpdateLeaderAndIsrResult(
+ leaderIsrs(state = 1, zkVersion = 1),
+ mutable.ArrayBuffer.empty,
+ Map.empty,
+ zkClient.updateLeaderAndIsr(leaderIsrs(state = 1, zkVersion = 0),controllerEpoch = 4))
+
+ // Try to update with wrong ZK version
+ checkUpdateLeaderAndIsrResult(
+ Map.empty,
+ ArrayBuffer(topicPartition10, topicPartition11),
+ Map.empty,
+ zkClient.updateLeaderAndIsr(leaderIsrs(state = 1, zkVersion = 0),controllerEpoch = 4))
+
+ // Trigger successful, to be retried and failed partitions in same call
+ val mixedState = Map(
+ topicPartition10 -> LeaderAndIsr(leader = 1, leaderEpoch = 2, isr = List(4, 5), zkVersion = 1),
+ topicPartition11 -> LeaderAndIsr(leader = 0, leaderEpoch = 2, isr = List(3, 4), zkVersion = 0),
+ topicPartition20 -> LeaderAndIsr(leader = 0, leaderEpoch = 2, isr = List(3, 4), zkVersion = 0))
+
+ checkUpdateLeaderAndIsrResult(
+ leaderIsrs(state = 2, zkVersion = 2).filterKeys{_ == topicPartition10},
+ ArrayBuffer(topicPartition11),
+ Map(
+ topicPartition20 -> (classOf[NoNodeException], "KeeperErrorCode = NoNode for /brokers/topics/topic2/partitions/0/state")),
+ zkClient.updateLeaderAndIsr(mixedState, controllerEpoch = 4))
+ }
+
+ private def checkGetDataResponse(
+ leaderIsrAndControllerEpochs: Map[TopicPartition,LeaderIsrAndControllerEpoch],
+ topicPartition: TopicPartition,
+ response: GetDataResponse): Unit = {
+ val zkVersion = leaderIsrAndControllerEpochs(topicPartition).leaderAndIsr.zkVersion
+ assertEquals(Code.OK, response.resultCode)
+ assertEquals(TopicPartitionStateZNode.path(topicPartition), response.path)
+ assertEquals(Some(topicPartition), response.ctx)
+ assertEquals(
+ Some(leaderIsrAndControllerEpochs(topicPartition)),
+ TopicPartitionStateZNode.decode(response.data, statWithVersion(zkVersion)))
+ }
+
+ private def eraseMetadata(response: CreateResponse): CreateResponse =
+ response.copy(metadata = ResponseMetadata(0, 0))
+
+ @Test
+ def testGetTopicsAndPartitions(): Unit = {
+ assertTrue(zkClient.getAllTopicsInCluster.isEmpty)
+ assertTrue(zkClient.getAllPartitions.isEmpty)
+
+ zkClient.createRecursive(TopicZNode.path(topic1))
+ zkClient.createRecursive(TopicZNode.path(topic2))
+ assertEquals(Set(topic1, topic2), zkClient.getAllTopicsInCluster.toSet)
+
+ assertTrue(zkClient.getAllPartitions.isEmpty)
+
+ zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs)
+ assertEquals(Set(topicPartition10, topicPartition11), zkClient.getAllPartitions)
+ }
+
+ @Test
+ def testCreateAndGetTopicPartitionStatesRaw(): Unit = {
+ zkClient.createRecursive(TopicZNode.path(topic1))
+
+ assertEquals(
+ Seq(
+ CreateResponse(Code.OK, TopicPartitionStateZNode.path(topicPartition10), Some(topicPartition10),
+ TopicPartitionStateZNode.path(topicPartition10), ResponseMetadata(0, 0)),
+ CreateResponse(Code.OK, TopicPartitionStateZNode.path(topicPartition11), Some(topicPartition11),
+ TopicPartitionStateZNode.path(topicPartition11), ResponseMetadata(0, 0))),
+ zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs)
+ .map(eraseMetadata).toList)
+
+ val getResponses = zkClient.getTopicPartitionStatesRaw(topicPartitions10_11)
+ assertEquals(2, getResponses.size)
+ topicPartitions10_11.zip(getResponses) foreach {case (tp, r) => checkGetDataResponse(initialLeaderIsrAndControllerEpochs, tp, r)}
+
+ // Trying to create existing topicPartition states fails
+ assertEquals(
+ Seq(
+ CreateResponse(Code.NODEEXISTS, TopicPartitionStateZNode.path(topicPartition10), Some(topicPartition10),
+ null, ResponseMetadata(0, 0)),
+ CreateResponse(Code.NODEEXISTS, TopicPartitionStateZNode.path(topicPartition11), Some(topicPartition11),
+ null, ResponseMetadata(0, 0))),
+ zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs).map(eraseMetadata).toList)
+ }
+
+ @Test
+ def testSetTopicPartitionStatesRaw(): Unit = {
+
+ def expectedSetDataResponses(topicPartitions: TopicPartition*)(resultCode: Code, stat: Stat) =
+ topicPartitions.map { topicPartition =>
+ SetDataResponse(resultCode, TopicPartitionStateZNode.path(topicPartition),
+ Some(topicPartition), stat, ResponseMetadata(0, 0))
+ }
+
+ zkClient.createRecursive(TopicZNode.path(topic1))
+
+ // Trying to set non-existing topicPartition's data results in NONODE responses
+ assertEquals(
+ expectedSetDataResponses(topicPartition10, topicPartition11)(Code.NONODE, null),
+ zkClient.setTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs).map {
+ _.copy(metadata = ResponseMetadata(0, 0))}.toList)
+
+ zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs)
+
+ assertEquals(
+ expectedSetDataResponses(topicPartition10, topicPartition11)(Code.OK, statWithVersion(1)),
+ zkClient.setTopicPartitionStatesRaw(leaderIsrAndControllerEpochs(state = 1, zkVersion = 0)).map {
+ eraseMetadataAndStat}.toList)
+
+
+ val getResponses = zkClient.getTopicPartitionStatesRaw(topicPartitions10_11)
+ assertEquals(2, getResponses.size)
+ topicPartitions10_11.zip(getResponses) foreach {case (tp, r) => checkGetDataResponse(leaderIsrAndControllerEpochs(state = 1, zkVersion = 0), tp, r)}
+
+ // Other ZK client can also write the state of a partition
+ assertEquals(
+ expectedSetDataResponses(topicPartition10, topicPartition11)(Code.OK, statWithVersion(2)),
+ otherZkClient.setTopicPartitionStatesRaw(leaderIsrAndControllerEpochs(state = 2, zkVersion = 1)).map {
+ eraseMetadataAndStat}.toList)
+ }
+
+ @Test
+ def testReassignPartitionsInProgress(): Unit = {
+ assertFalse(zkClient.reassignPartitionsInProgress)
+ zkClient.createRecursive(ReassignPartitionsZNode.path)
+ assertTrue(zkClient.reassignPartitionsInProgress)
+ }
+
+ @Test
+ def testGetTopicPartitionStates(): Unit = {
+ assertEquals(None, zkClient.getTopicPartitionState(topicPartition10))
+ assertEquals(None, zkClient.getLeaderForPartition(topicPartition10))
+
+ zkClient.createRecursive(TopicZNode.path(topic1))
+
+ zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs)
+ assertEquals(
+ initialLeaderIsrAndControllerEpochs,
+ zkClient.getTopicPartitionStates(Seq(topicPartition10, topicPartition11))
+ )
+
+ assertEquals(
+ Some(initialLeaderIsrAndControllerEpochs(topicPartition10)),
+ zkClient.getTopicPartitionState(topicPartition10)
+ )
+
+ assertEquals(Some(1), zkClient.getLeaderForPartition(topicPartition10))
+
+ val notExistingPartition = new TopicPartition(topic1, 2)
+ assertTrue(zkClient.getTopicPartitionStates(Seq(notExistingPartition)).isEmpty)
+ assertEquals(
+ Map(topicPartition10 -> initialLeaderIsrAndControllerEpochs(topicPartition10)),
+ zkClient.getTopicPartitionStates(Seq(topicPartition10, notExistingPartition))
+ )
+
+ assertEquals(None, zkClient.getTopicPartitionState(notExistingPartition))
+ assertEquals(None, zkClient.getLeaderForPartition(notExistingPartition))
+
+ }
+
+ private def eraseMetadataAndStat(response: SetDataResponse): SetDataResponse = {
+ val stat = if (response.stat != null) statWithVersion(response.stat.getVersion) else null
+ response.copy(metadata = ResponseMetadata(0, 0), stat = stat)
+ }
+
+ @Test
+ def testControllerEpochMethods(): Unit = {
+ assertEquals(None, zkClient.getControllerEpoch)
+
+ assertEquals("Setting non existing nodes should return NONODE results",
+ SetDataResponse(Code.NONODE, ControllerEpochZNode.path, None, null, ResponseMetadata(0, 0)),
+ eraseMetadataAndStat(zkClient.setControllerEpochRaw(1, 0)))
+
+ assertEquals("Creating non existing nodes is OK",
+ CreateResponse(Code.OK, ControllerEpochZNode.path, None, ControllerEpochZNode.path, ResponseMetadata(0, 0)),
+ eraseMetadata(zkClient.createControllerEpochRaw(0)))
+ assertEquals(0, zkClient.getControllerEpoch.get._1)
+
+ assertEquals("Attemt to create existing nodes should return NODEEXISTS",
+ CreateResponse(Code.NODEEXISTS, ControllerEpochZNode.path, None, null, ResponseMetadata(0, 0)),
+ eraseMetadata(zkClient.createControllerEpochRaw(0)))
+
+ assertEquals("Updating existing nodes is OK",
+ SetDataResponse(Code.OK, ControllerEpochZNode.path, None, statWithVersion(1), ResponseMetadata(0, 0)),
+ eraseMetadataAndStat(zkClient.setControllerEpochRaw(1, 0)))
+ assertEquals(1, zkClient.getControllerEpoch.get._1)
+
+ assertEquals("Updating with wrong ZK version returns BADVERSION",
+ SetDataResponse(Code.BADVERSION, ControllerEpochZNode.path, None, null, ResponseMetadata(0, 0)),
+ eraseMetadataAndStat(zkClient.setControllerEpochRaw(1, 0)))
+ }
+
+ @Test
+ def testControllerManagementMethods(): Unit = {
+ // No controller
+ assertEquals(None, zkClient.getControllerId)
+ // Create controller
+ zkClient.checkedEphemeralCreate(ControllerZNode.path, ControllerZNode.encode(brokerId = 1, timestamp = 123456))
+ assertEquals(Some(1), zkClient.getControllerId)
+ zkClient.deleteController()
+ assertEquals(None, zkClient.getControllerId)
+ }
+
+ @Test
+ def testZNodeChangeHandlerForDataChange(): Unit = {
+ val mockPath = "/foo"
+
+ val znodeChangeHandlerCountDownLatch = new CountDownLatch(1)
+ val zNodeChangeHandler = new ZNodeChangeHandler {
+ override def handleCreation(): Unit = {
+ znodeChangeHandlerCountDownLatch.countDown()
+ }
+
+ override val path: String = mockPath
+ }
+
+ zkClient.registerZNodeChangeHandlerAndCheckExistence(zNodeChangeHandler)
+ zkClient.createRecursive(mockPath)
+ assertTrue("Failed to receive create notification", znodeChangeHandlerCountDownLatch.await(5, TimeUnit.SECONDS))
}
@Test
@@ -458,7 +982,6 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
assertTrue(zkClient.getPreferredReplicaElection.isEmpty)
- val topic1 = "topic1"
val electionPartitions = Set(new TopicPartition(topic1, 0), new TopicPartition(topic1, 1))
zkClient.createPreferredReplicaElection(electionPartitions)
@@ -498,6 +1021,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
// test non-existent token
assertTrue(zkClient.getDelegationTokenInfo(tokenId).isEmpty)
+ assertFalse(zkClient.deleteDelegationToken(tokenId))
// create a token
zkClient.setOrCreateDelegationToken(token)
@@ -511,5 +1035,9 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
//test updated token
assertEquals(tokenInfo, zkClient.getDelegationTokenInfo(tokenId).get)
+
+ //test deleting token
+ assertTrue(zkClient.deleteDelegationToken(tokenId))
+ assertEquals(None, zkClient.getDelegationTokenInfo(tokenId))
}
}
--
To stop receiving notification emails like this one, please contact
junrao@apache.org.