You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2018/03/02 01:48:35 UTC
[kafka] 03/09: Additional tests to improve test coverage of
KafkaZkClient.
This is an automated email from the ASF dual-hosted git repository.
junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
commit d0eb552a891ecd99150ddf4b89985525806c6e31
Author: Sandor Murakozi <sm...@gmail.com>
AuthorDate: Mon Feb 19 16:52:46 2018 +0100
Additional tests to improve test coverage of KafkaZkClient.
---
.../scala/unit/kafka/zk/KafkaZkClientTest.scala | 558 ++++++++++++++++++++-
.../scala/unit/kafka/zk/ZooKeeperTestHarness.scala | 18 +-
2 files changed, 547 insertions(+), 29 deletions(-)
diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
index d3726c2..9329430 100644
--- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
@@ -16,10 +16,11 @@
*/
package kafka.zk
-import java.util.{Properties, UUID}
+import java.util.{Collections, Properties, UUID}
import java.nio.charset.StandardCharsets.UTF_8
+import java.util.concurrent.{CountDownLatch, TimeUnit}
-import kafka.api.ApiVersion
+import kafka.api.{ApiVersion, LeaderAndIsr}
import kafka.cluster.{Broker, EndPoint}
import kafka.log.LogConfig
import kafka.security.auth._
@@ -30,16 +31,30 @@ import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.common.security.token.delegation.TokenInformation
import org.apache.kafka.common.utils.SecurityUtils
-import org.apache.zookeeper.KeeperException.NodeExistsException
+import org.apache.zookeeper.KeeperException.{Code, NoNodeException, NodeExistsException}
import org.junit.Assert._
import org.junit.Test
-
import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.{Seq, mutable}
import scala.util.Random
+import kafka.controller.LeaderIsrAndControllerEpoch
+import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult
+import kafka.zookeeper._
+import org.apache.zookeeper.data.Stat
+
class KafkaZkClientTest extends ZooKeeperTestHarness {
private val group = "my-group"
+ private val topic1 = "topic1"
+ private val topic2 = "topic2"
+
+ val topicPartition10 = new TopicPartition(topic1, 0)
+ val topicPartition11 = new TopicPartition(topic1, 1)
+ val topicPartition20 = new TopicPartition(topic2, 0)
+ val topicPartitions10_11 = Seq(topicPartition10, topicPartition11)
+
private val topicPartition = new TopicPartition("topic", 0)
@Test
@@ -90,17 +105,18 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
@Test
def testTopicAssignmentMethods() {
- val topic1 = "topic1"
- val topic2 = "topic2"
+ assertTrue(zkClient.getAllTopicsInCluster.isEmpty)
// test with non-existing topic
+ assertFalse(zkClient.topicExists(topic1))
assertTrue(zkClient.getTopicPartitionCount(topic1).isEmpty)
assertTrue(zkClient.getPartitionAssignmentForTopics(Set(topic1)).isEmpty)
assertTrue(zkClient.getPartitionsForTopics(Set(topic1)).isEmpty)
assertTrue(zkClient.getReplicasForPartition(new TopicPartition(topic1, 2)).isEmpty)
+ val topicPartition = new TopicPartition(topic1, 0)
val assignment = Map(
- new TopicPartition(topic1, 0) -> Seq(0, 1),
+ topicPartition -> Seq(0, 1),
new TopicPartition(topic1, 1) -> Seq(0, 1),
new TopicPartition(topic1, 2) -> Seq(1, 2, 3)
)
@@ -108,6 +124,8 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
// create a topic assignment
zkClient.createTopicAssignment(topic1, assignment)
+ assertTrue(zkClient.topicExists(topic1))
+
val expectedAssignment = assignment map { topicAssignment =>
val partition = topicAssignment._1.partition
val assignment = topicAssignment._2
@@ -215,6 +233,43 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
}
@Test
+ def testIsrChangeNotificationGetters(): Unit = {
+ assertEquals("Failed for non existing parent ZK node", Seq.empty, zkClient.getAllIsrChangeNotifications)
+ assertEquals("Failed for non existing parent ZK node", Seq.empty, zkClient.getPartitionsFromIsrChangeNotifications(Seq("0000000000")))
+
+ zkClient.createRecursive("/isr_change_notification")
+
+ zkClient.propagateIsrChanges(Set(topicPartition10, topicPartition11))
+ zkClient.propagateIsrChanges(Set(topicPartition10))
+
+ assertEquals(Set("0000000000", "0000000001"), zkClient.getAllIsrChangeNotifications.toSet)
+
+ // A partition can have multiple notifications
+ assertEquals(Seq(topicPartition10, topicPartition11, topicPartition10),
+ zkClient.getPartitionsFromIsrChangeNotifications(Seq("0000000000", "0000000001")))
+ }
+
+ @Test
+ def testIsrChangeNotificationsDeletion(): Unit = {
+ // Should not fail even if parent node does not exist
+ zkClient.deleteIsrChangeNotifications(Seq("0000000000"))
+
+ zkClient.createRecursive("/isr_change_notification")
+
+ zkClient.propagateIsrChanges(Set(topicPartition10, topicPartition11))
+ zkClient.propagateIsrChanges(Set(topicPartition10))
+ zkClient.propagateIsrChanges(Set(topicPartition11))
+
+ zkClient.deleteIsrChangeNotifications(Seq("0000000001"))
+ // Should not fail if called on a non-existent notification
+ zkClient.deleteIsrChangeNotifications(Seq("0000000001"))
+
+ assertEquals(Set("0000000000", "0000000002"), zkClient.getAllIsrChangeNotifications.toSet)
+ zkClient.deleteIsrChangeNotifications()
+ assertEquals(Seq.empty,zkClient.getAllIsrChangeNotifications)
+ }
+
+ @Test
def testPropagateLogDir(): Unit = {
zkClient.createRecursive("/log_dir_event_notification")
@@ -238,6 +293,52 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
}
@Test
+ def testLogDirGetters(): Unit = {
+ assertEquals("Failed for non existing parent ZK node", Seq.empty, zkClient.getAllLogDirEventNotifications)
+ assertEquals("Failed for non existing parent ZK node", Seq.empty, zkClient.getBrokerIdsFromLogDirEvents(Seq("0000000000")))
+
+ zkClient.createRecursive("/log_dir_event_notification")
+
+ val brokerId = 3
+ zkClient.propagateLogDirEvent(brokerId)
+
+ assertEquals(Seq(3), zkClient.getBrokerIdsFromLogDirEvents(Seq("0000000000")))
+
+ zkClient.propagateLogDirEvent(brokerId)
+
+ val anotherBrokerId = 4
+ zkClient.propagateLogDirEvent(anotherBrokerId)
+
+ val notifications012 = Seq("0000000000", "0000000001", "0000000002")
+ assertEquals(notifications012.toSet, zkClient.getAllLogDirEventNotifications.toSet)
+ assertEquals(Seq(3, 3, 4), zkClient.getBrokerIdsFromLogDirEvents(notifications012))
+ }
+
+ @Test
+ def testLogDirEventNotificationsDeletion(): Unit = {
+ // Should not fail even if parent node does not exist
+ zkClient.deleteLogDirEventNotifications(Seq("0000000000", "0000000002"))
+
+ zkClient.createRecursive("/log_dir_event_notification")
+
+ val brokerId = 3
+ val anotherBrokerId = 4
+
+ zkClient.propagateLogDirEvent(brokerId)
+ zkClient.propagateLogDirEvent(brokerId)
+ zkClient.propagateLogDirEvent(anotherBrokerId)
+
+ zkClient.deleteLogDirEventNotifications(Seq("0000000000", "0000000002"))
+
+ assertEquals(Seq("0000000001"), zkClient.getAllLogDirEventNotifications)
+
+ zkClient.propagateLogDirEvent(anotherBrokerId)
+
+ zkClient.deleteLogDirEventNotifications()
+ assertEquals(Seq.empty, zkClient.getAllLogDirEventNotifications)
+ }
+
+ @Test
def testSetGetAndDeletePartitionReassignment() {
zkClient.createRecursive(AdminZNode.path)
@@ -377,10 +478,23 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
}
@Test
- def testDeleteTopicPathMethods() {
- val topic1 = "topic1"
- val topic2 = "topic2"
+ def testDeletePath() {
+ val path = "/a/b/c"
+ zkClient.createRecursive(path)
+ zkClient.deletePath(path)
+ assertFalse(zkClient.pathExists(path))
+ }
+ @Test
+ def testDeleteTopicZNode(): Unit ={
+ zkClient.deleteTopicZNode(topic1)
+ zkClient.createRecursive(TopicZNode.path(topic1))
+ zkClient.deleteTopicZNode(topic1)
+ assertFalse(zkClient.pathExists(TopicZNode.path(topic1)))
+ }
+
+ @Test
+ def testDeleteTopicPathMethods() {
assertFalse(zkClient.isTopicMarkedForDeletion(topic1))
assertTrue(zkClient.getTopicDeletions.isEmpty)
@@ -394,17 +508,25 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
assertTrue(zkClient.getTopicDeletions.isEmpty)
}
+ private def assertPathExistenceAndData(expectedPath: String, data: String){
+ assertTrue(zkClient.pathExists(expectedPath))
+ assertEquals(Some(data), dataAsString(expectedPath))
+ }
+
@Test
- def testEntityConfigManagementMethods() {
- val topic1 = "topic1"
- val topic2 = "topic2"
+ def testCreateTokenChangeNotification() {
+ intercept[NoNodeException] {
+ zkClient.createTokenChangeNotification("delegationToken")
+ }
+ zkClient.createDelegationTokenPaths()
- assertTrue(zkClient.getEntityConfigs(ConfigType.Topic, topic1).isEmpty)
+ zkClient.createTokenChangeNotification("delegationToken")
+ assertPathExistenceAndData(s"/delegation_token/token_changes/token_change_0000000000", "delegationToken")
+ }
- val logProps = new Properties()
- logProps.put(LogConfig.SegmentBytesProp, "1024")
- logProps.put(LogConfig.SegmentIndexBytesProp, "1024")
- logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
+ @Test
+ def testEntityConfigManagementMethods() {
+ assertTrue(zkClient.getEntityConfigs(ConfigType.Topic, topic1).isEmpty)
zkClient.setOrCreateEntityConfigs(ConfigType.Topic, topic1, logProps)
assertEquals(logProps, zkClient.getEntityConfigs(ConfigType.Topic, topic1))
@@ -421,15 +543,399 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
}
@Test
- def testBrokerRegistrationMethods() {
+ def testCreateConfigChangeNotification() {
+ intercept[NoNodeException] {
+ zkClient.createConfigChangeNotification(ConfigEntityZNode.path(ConfigType.Topic, topic1))
+ }
+
+ zkClient.createTopLevelPaths()
+ zkClient.createConfigChangeNotification(ConfigEntityZNode.path(ConfigType.Topic, topic1))
+
+ assertPathExistenceAndData(
+ s"/config/changes/config_change_0000000000",
+ """{"version":2,"entity_path":"/config/topics/topic1"}""")
+ }
+
+ private def createLogProps(bytesProp: Int) = {
+ val logProps = new Properties()
+ logProps.put(LogConfig.SegmentBytesProp, bytesProp.toString)
+ logProps.put(LogConfig.SegmentIndexBytesProp, bytesProp.toString)
+ logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
+ logProps
+ }
+
+ private val logProps = createLogProps(1024)
+
+ @Test
+ def testGetLogConfigs() {
+ val emptyConfig = LogConfig(Collections.emptyMap())
+ assertEquals("Non existent config, no defaults",
+ (Map(topic1 -> emptyConfig), Map.empty),
+ zkClient.getLogConfigs(Seq(topic1), Collections.emptyMap()))
+
+ val logProps2 = createLogProps(2048)
+
+ zkClient.setOrCreateEntityConfigs(ConfigType.Topic, topic1, logProps)
+ assertEquals("One existing and one non-existent topic",
+ (Map(topic1 -> LogConfig(logProps), topic2 -> emptyConfig), Map.empty),
+ zkClient.getLogConfigs(Seq(topic1, topic2), Collections.emptyMap()))
+
+ zkClient.setOrCreateEntityConfigs(ConfigType.Topic, topic2, logProps2)
+ assertEquals("Two existing topics",
+ (Map(topic1 -> LogConfig(logProps), topic2 -> LogConfig(logProps2)), Map.empty),
+ zkClient.getLogConfigs(Seq(topic1, topic2), Collections.emptyMap()))
+
+ val logProps1WithMoreValues = createLogProps(1024)
+ logProps1WithMoreValues.put(LogConfig.SegmentJitterMsProp, "100")
+ logProps1WithMoreValues.put(LogConfig.SegmentBytesProp, "1024")
+
+ assertEquals("Config with defaults",
+ (Map(topic1 -> LogConfig(logProps1WithMoreValues)), Map.empty),
+ zkClient.getLogConfigs(Seq(topic1),
+ Map[String, AnyRef](LogConfig.SegmentJitterMsProp -> "100", LogConfig.SegmentBytesProp -> "128").asJava))
+ }
+
+ private def createBrokerInfo(id: Int, host: String, port: Int,
+ securityProtocol: SecurityProtocol, rack: Option[String] = None) =
+ BrokerInfo(Broker(id, Seq(new EndPoint(host, port, ListenerName.forSecurityProtocol
+ (securityProtocol), securityProtocol)), rack = rack), ApiVersion.latestVersion, jmxPort = port + 10)
+
+ @Test
+ def testRegisterBrokerInfo() {
zkClient.createTopLevelPaths()
- val brokerInfo = BrokerInfo(Broker(1,
- Seq(new EndPoint("test.host", 9999, ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), SecurityProtocol.PLAINTEXT)),
- rack = None), ApiVersion.latestVersion, jmxPort = 9998)
+ val brokerInfo = createBrokerInfo(1, "test.host", 9999, SecurityProtocol.PLAINTEXT)
+ val differentBrokerInfoWithSameId = createBrokerInfo(1, "test.host2", 9995, SecurityProtocol.SSL)
zkClient.registerBrokerInZk(brokerInfo)
assertEquals(Some(brokerInfo.broker), zkClient.getBroker(1))
+ assertEquals("Other ZK clients can read broker info", Some(brokerInfo.broker), otherZkClient.getBroker(1))
+
+ // Node exists, owned by current session - no error, no update
+ zkClient.registerBrokerInZk(differentBrokerInfoWithSameId)
+ assertEquals(Some(brokerInfo.broker), zkClient.getBroker(1))
+
+ // Other client tries to register broker with same id causes failure, info is not changed in ZK
+ intercept[NodeExistsException] {
+ otherZkClient.registerBrokerInZk(differentBrokerInfoWithSameId)
+ }
+ assertEquals(Some(brokerInfo.broker), zkClient.getBroker(1))
+ }
+
+ @Test
+ def testGetBrokerMethods() {
+ zkClient.createTopLevelPaths()
+
+ assertEquals(Seq.empty,zkClient.getAllBrokersInCluster)
+ assertEquals(Seq.empty, zkClient.getSortedBrokerList())
+ assertEquals(None, zkClient.getBroker(0))
+
+ val brokerInfo0 = createBrokerInfo(0, "test.host0", 9998, SecurityProtocol.PLAINTEXT)
+ val brokerInfo1 = createBrokerInfo(1, "test.host1", 9999, SecurityProtocol.SSL)
+
+ zkClient.registerBrokerInZk(brokerInfo1)
+ otherZkClient.registerBrokerInZk(brokerInfo0)
+
+ assertEquals(Seq(0, 1), zkClient.getSortedBrokerList())
+ assertEquals(
+ Seq(brokerInfo0.broker, brokerInfo1.broker),
+ zkClient.getAllBrokersInCluster
+ )
+ assertEquals(Some(brokerInfo0.broker), zkClient.getBroker(0))
+ }
+
+ @Test
+ def testUpdateBrokerInfo() {
+ zkClient.createTopLevelPaths()
+
+ // Updating info of a broker not existing in ZK fails
+ val originalBrokerInfo = createBrokerInfo(1, "test.host", 9999, SecurityProtocol.PLAINTEXT)
+ intercept[NoNodeException]{
+ zkClient.updateBrokerInfoInZk(originalBrokerInfo)
+ }
+
+ zkClient.registerBrokerInZk(originalBrokerInfo)
+
+ val updatedBrokerInfo = createBrokerInfo(1, "test.host2", 9995, SecurityProtocol.SSL)
+ zkClient.updateBrokerInfoInZk(updatedBrokerInfo)
+ assertEquals(Some(updatedBrokerInfo.broker), zkClient.getBroker(1))
+
+ // Other ZK clients can update info
+ otherZkClient.updateBrokerInfoInZk(originalBrokerInfo)
+ assertEquals(Some(originalBrokerInfo.broker), otherZkClient.getBroker(1))
+ }
+
+ private def statWithVersion(version: Int) = {
+ val stat = new Stat(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)
+ stat.setVersion(version)
+ stat
+ }
+
+ private def leaderIsrAndControllerEpochs(state: Int, zkVersion: Int) = Map(
+ topicPartition10 -> LeaderIsrAndControllerEpoch(
+ LeaderAndIsr(leader = 1, leaderEpoch = state, isr = List(2 + state, 3 + state), zkVersion = zkVersion),
+ controllerEpoch = 4),
+ topicPartition11 -> LeaderIsrAndControllerEpoch(
+ LeaderAndIsr(leader = 0, leaderEpoch = state + 1, isr = List(1 + state, 2 + state), zkVersion = zkVersion),
+ controllerEpoch = 4))
+
+ private def initialLeaderIsrAndControllerEpochs = leaderIsrAndControllerEpochs(0, 0)
+ private def updatedLeaderIsrAndControllerEpochs(state: Int) = leaderIsrAndControllerEpochs(state, state - 1)
+
+ private def initialLeaderIsrs = initialLeaderIsrAndControllerEpochs.mapValues(_.leaderAndIsr)
+ private def updatedLeaderIsrs(state: Int) = updatedLeaderIsrAndControllerEpochs(state).mapValues(_.leaderAndIsr)
+ private def leaderIsrs(state: Int, zkVersion: Int) =
+ leaderIsrAndControllerEpochs(state, zkVersion).mapValues(_.leaderAndIsr)
+
+ private def checkUpdateLeaderAndIsrResult(
+ expectedSuccessfulPartitions: Map[TopicPartition, LeaderAndIsr],
+ expectedPartitionsToRetry: Seq[TopicPartition],
+ expectedFailedPartitions: Map[TopicPartition, (Class[_], String)],
+ actualUpdateLeaderAndIsrResult: UpdateLeaderAndIsrResult): Unit = {
+ val failedPartitionsExcerpt =
+ actualUpdateLeaderAndIsrResult.failedPartitions.mapValues(e => (e.getClass, e.getMessage))
+ assertEquals("Permanently failed updates do not match expected",
+ expectedFailedPartitions, failedPartitionsExcerpt)
+ assertEquals("Retriable updates (due to BADVERSION) do not match expected",
+ expectedPartitionsToRetry, actualUpdateLeaderAndIsrResult.partitionsToRetry)
+ assertEquals("Successful updates do not match expected",
+ expectedSuccessfulPartitions, actualUpdateLeaderAndIsrResult.successfulPartitions)
+ }
+
+ @Test
+ def testUpdateLeaderAndIsr() {
+ zkClient.createRecursive(TopicZNode.path(topic1))
+
+ // Non-existing topicPartitions
+ checkUpdateLeaderAndIsrResult(
+ Map.empty,
+ mutable.ArrayBuffer.empty,
+ Map(
+ topicPartition10 -> (classOf[NoNodeException], "KeeperErrorCode = NoNode for /brokers/topics/topic1/partitions/0/state"),
+ topicPartition11 -> (classOf[NoNodeException], "KeeperErrorCode = NoNode for /brokers/topics/topic1/partitions/1/state")),
+ zkClient.updateLeaderAndIsr(initialLeaderIsrs, controllerEpoch = 4))
+
+ zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs)
+
+ checkUpdateLeaderAndIsrResult(
+ leaderIsrs(state = 1, zkVersion = 1),
+ mutable.ArrayBuffer.empty,
+ Map.empty,
+ zkClient.updateLeaderAndIsr(updatedLeaderIsrs(1),controllerEpoch = 4))
+
+ // Try to update with wrong ZK version
+ checkUpdateLeaderAndIsrResult(
+ Map.empty,
+ ArrayBuffer(topicPartition10, topicPartition11),
+ Map.empty,
+ zkClient.updateLeaderAndIsr(updatedLeaderIsrs(1),controllerEpoch = 4))
+
+ // Trigger successful, to be retried and failed partitions in same call
+ val mixedState = Map(
+ topicPartition10 -> LeaderAndIsr(leader = 1, leaderEpoch = 2, isr = List(4, 5), zkVersion = 1),
+ topicPartition11 -> LeaderAndIsr(leader = 0, leaderEpoch = 2, isr = List(3, 4), zkVersion = 0),
+ topicPartition20 -> LeaderAndIsr(leader = 0, leaderEpoch = 2, isr = List(3, 4), zkVersion = 0))
+
+ checkUpdateLeaderAndIsrResult(
+ leaderIsrs(state = 2, zkVersion = 2).filterKeys{_ == topicPartition10},
+ ArrayBuffer(topicPartition11),
+ Map(
+ topicPartition20 -> (classOf[NoNodeException], "KeeperErrorCode = NoNode for /brokers/topics/topic2/partitions/0/state")),
+ zkClient.updateLeaderAndIsr(mixedState, controllerEpoch = 4))
+ }
+
+ private def checkGetDataResponse(
+ leaderIsrAndControllerEpochs: Map[TopicPartition,LeaderIsrAndControllerEpoch],
+ topicPartition: TopicPartition,
+ response: GetDataResponse) = {
+ val zkVersion = leaderIsrAndControllerEpochs(topicPartition).leaderAndIsr.zkVersion
+ assertEquals(Code.OK, response.resultCode)
+ assertEquals(TopicPartitionStateZNode.path(topicPartition), response.path)
+ assertEquals(Some(topicPartition), response.ctx)
+ assertEquals(
+ Some(leaderIsrAndControllerEpochs(topicPartition)),
+ TopicPartitionStateZNode.decode(response.data, statWithVersion(zkVersion)))
+ }
+
+ private def eraseMetadata(response: CreateResponse) = response.copy(metadata = ResponseMetadata(0, 0))
+
+ @Test
+ def testGetTopicsAndPartitions() {
+ assertTrue(zkClient.getAllTopicsInCluster.isEmpty)
+ assertTrue(zkClient.getAllPartitions.isEmpty)
+
+ zkClient.createRecursive(TopicZNode.path(topic1))
+ zkClient.createRecursive(TopicZNode.path(topic2))
+ assertEquals(Set(topic1, topic2), zkClient.getAllTopicsInCluster.toSet)
+
+ assertTrue(zkClient.getAllPartitions.isEmpty)
+
+ zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs)
+ assertEquals(Set(topicPartition10, topicPartition11), zkClient.getAllPartitions)
+ }
+
+ @Test
+ def testCreateAndGetTopicPartitionStatesRaw() {
+ zkClient.createRecursive(TopicZNode.path(topic1))
+
+ assertEquals(
+ Seq(
+ CreateResponse(Code.OK, TopicPartitionStateZNode.path(topicPartition10), Some(topicPartition10),
+ TopicPartitionStateZNode.path(topicPartition10), ResponseMetadata(0, 0)),
+ CreateResponse(Code.OK, TopicPartitionStateZNode.path(topicPartition11), Some(topicPartition11),
+ TopicPartitionStateZNode.path(topicPartition11), ResponseMetadata(0, 0))),
+ zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs)
+ .map(eraseMetadata).toList)
+
+ val getResponses = zkClient.getTopicPartitionStatesRaw(topicPartitions10_11)
+ assertEquals(2, getResponses.size)
+ topicPartitions10_11.zip(getResponses) foreach {case (tp, r) => checkGetDataResponse(initialLeaderIsrAndControllerEpochs, tp, r)}
+
+ // Trying to create existing topicPartition states fails
+ assertEquals(
+ Seq(
+ CreateResponse(Code.NODEEXISTS, TopicPartitionStateZNode.path(topicPartition10), Some(topicPartition10),
+ null, ResponseMetadata(0, 0)),
+ CreateResponse(Code.NODEEXISTS, TopicPartitionStateZNode.path(topicPartition11), Some(topicPartition11),
+ null, ResponseMetadata(0, 0))),
+ zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs).map(eraseMetadata).toList)
+ }
+
+ @Test
+ def testSetTopicPartitionStatesRaw() {
+
+ def expectedSetDataResponses(topicPartitions: TopicPartition*)(resultCode: Code, stat: Stat) =
+ topicPartitions.map { topicPartition =>
+ SetDataResponse(resultCode, TopicPartitionStateZNode.path(topicPartition),
+ Some(topicPartition), stat, ResponseMetadata(0, 0))
+ }
+
+ zkClient.createRecursive(TopicZNode.path(topic1))
+
+ // Trying to set non-existing topicPartition's data results in NONODE responses
+ assertEquals(
+ expectedSetDataResponses(topicPartition10, topicPartition11)(Code.NONODE, null),
+ zkClient.setTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs).map {
+ _.copy(metadata = ResponseMetadata(0, 0))}.toList)
+
+ zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs)
+
+ assertEquals(
+ expectedSetDataResponses(topicPartition10, topicPartition11)(Code.OK, statWithVersion(1)),
+ zkClient.setTopicPartitionStatesRaw(updatedLeaderIsrAndControllerEpochs(1)).map {
+ eraseMetadataAndStat}.toList)
+
+
+ val getResponses = zkClient.getTopicPartitionStatesRaw(topicPartitions10_11)
+ assertEquals(2, getResponses.size)
+ topicPartitions10_11.zip(getResponses) foreach {case (tp, r) => checkGetDataResponse(updatedLeaderIsrAndControllerEpochs(1), tp, r)}
+
+ // Other ZK client can also write the state of a partition
+ assertEquals(
+ expectedSetDataResponses(topicPartition10, topicPartition11)(Code.OK, statWithVersion(2)),
+ otherZkClient.setTopicPartitionStatesRaw(updatedLeaderIsrAndControllerEpochs(2)).map {
+ eraseMetadataAndStat}.toList)
+ }
+
+ @Test
+ def testReassignPartitionsInProgress() {
+ assertFalse(zkClient.reassignPartitionsInProgress)
+ zkClient.createRecursive(ReassignPartitionsZNode.path)
+ assertTrue(zkClient.reassignPartitionsInProgress)
+ }
+
+ @Test
+ def testGetTopicPartitionStates() {
+ assertEquals(None, zkClient.getTopicPartitionState(topicPartition10))
+ assertEquals(None, zkClient.getLeaderForPartition(topicPartition10))
+
+ zkClient.createRecursive(TopicZNode.path(topic1))
+
+
+ zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs).map(eraseMetadata).toList
+ assertEquals(
+ initialLeaderIsrAndControllerEpochs,
+ zkClient.getTopicPartitionStates(Seq(topicPartition10, topicPartition11))
+ )
+
+ assertEquals(
+ Some(initialLeaderIsrAndControllerEpochs(topicPartition10)),
+ zkClient.getTopicPartitionState(topicPartition10)
+ )
+
+ assertEquals(Some(1), zkClient.getLeaderForPartition(topicPartition10))
+
+ val notExistingPartition = new TopicPartition(topic1, 2)
+ assertTrue(zkClient.getTopicPartitionStates(Seq(notExistingPartition)).isEmpty)
+ assertEquals(
+ Map(topicPartition10 -> initialLeaderIsrAndControllerEpochs(topicPartition10)),
+ zkClient.getTopicPartitionStates(Seq(topicPartition10, notExistingPartition))
+ )
+
+ assertEquals(None, zkClient.getTopicPartitionState(notExistingPartition))
+ assertEquals(None, zkClient.getLeaderForPartition(notExistingPartition))
+
+ }
+
+ private def eraseMetadataAndStat(response: SetDataResponse) = {
+ val stat = if (response.stat != null) statWithVersion(response.stat.getVersion) else null
+ response.copy(metadata = ResponseMetadata(0, 0), stat = stat)
+ }
+
+ @Test
+ def testControllerEpochMethods() {
+ assertEquals(None, zkClient.getControllerEpoch)
+
+ assertEquals("Setting non existing nodes should return NONODE results",
+ SetDataResponse(Code.NONODE, ControllerEpochZNode.path, None, null, ResponseMetadata(0, 0)),
+ eraseMetadataAndStat(zkClient.setControllerEpochRaw(1, 0)))
+
+ assertEquals("Creating non existing nodes is OK",
+ CreateResponse(Code.OK, ControllerEpochZNode.path, None, ControllerEpochZNode.path, ResponseMetadata(0, 0)),
+ eraseMetadata(zkClient.createControllerEpochRaw(0)))
+ assertEquals(0, zkClient.getControllerEpoch.get._1)
+
+ assertEquals("Attemt to create existing nodes should return NODEEXISTS",
+ CreateResponse(Code.NODEEXISTS, ControllerEpochZNode.path, None, null, ResponseMetadata(0, 0)),
+ eraseMetadata(zkClient.createControllerEpochRaw(0)))
+
+ assertEquals("Updating existing nodes is OK",
+ SetDataResponse(Code.OK, ControllerEpochZNode.path, None, statWithVersion(1), ResponseMetadata(0, 0)),
+ eraseMetadataAndStat(zkClient.setControllerEpochRaw(1, 0)))
+ assertEquals(1, zkClient.getControllerEpoch.get._1)
+
+ assertEquals("Updating with wrong ZK version returns BADVERSION",
+ SetDataResponse(Code.BADVERSION, ControllerEpochZNode.path, None, null, ResponseMetadata(0, 0)),
+ eraseMetadataAndStat(zkClient.setControllerEpochRaw(1, 0)))
+ }
+
+ @Test
+ def testControllerManagementMethods() {
+ // No controller
+ assertEquals(None, zkClient.getControllerId)
+ // Create controller
+ zkClient.checkedEphemeralCreate(ControllerZNode.path, ControllerZNode.encode(brokerId = 1, timestamp = 123456))
+ assertEquals(Some(1), zkClient.getControllerId)
+ zkClient.deleteController()
+ assertEquals(None, zkClient.getControllerId)
+ }
+
+ @Test
+ def testZNodeChangeHandlerForDataChange(): Unit = {
+ val mockPath = "/foo"
+
+ val znodeChangeHandlerCountDownLatch = new CountDownLatch(1)
+ val zNodeChangeHandler = new ZNodeChangeHandler {
+ override def handleCreation(): Unit = {
+ znodeChangeHandlerCountDownLatch.countDown()
+ }
+
+ override val path: String = mockPath
+ }
+
+ zkClient.registerZNodeChangeHandlerAndCheckExistence(zNodeChangeHandler)
+ zkClient.createRecursive(mockPath)
+ assertTrue("Failed to receive create notification", znodeChangeHandlerCountDownLatch.await(5, TimeUnit.SECONDS))
}
@Test
@@ -458,7 +964,6 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
assertTrue(zkClient.getPreferredReplicaElection.isEmpty)
- val topic1 = "topic1"
val electionPartitions = Set(new TopicPartition(topic1, 0), new TopicPartition(topic1, 1))
zkClient.createPreferredReplicaElection(electionPartitions)
@@ -498,6 +1003,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
// test non-existent token
assertTrue(zkClient.getDelegationTokenInfo(tokenId).isEmpty)
+ assertFalse(zkClient.deleteDelegationToken(tokenId))
// create a token
zkClient.setOrCreateDelegationToken(token)
@@ -511,5 +1017,9 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
//test updated token
assertEquals(tokenInfo, zkClient.getDelegationTokenInfo(tokenId).get)
+
+ //test deleting token
+ assertTrue(zkClient.deleteDelegationToken(tokenId))
+ assertEquals(None, zkClient.getDelegationTokenInfo(tokenId))
}
-}
+}
\ No newline at end of file
diff --git a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
index a122297..f9cb8e3 100755
--- a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
@@ -19,16 +19,16 @@ package kafka.zk
import javax.security.auth.login.Configuration
-import kafka.utils.{CoreUtils, Logging, TestUtils}
+import kafka.utils.{CoreUtils, Logging, TestUtils, ZkUtils}
import org.junit.{After, AfterClass, Before, BeforeClass}
import org.junit.Assert._
import org.scalatest.junit.JUnitSuite
import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.test.IntegrationTest
import org.junit.experimental.categories.Category
-
import scala.collection.Set
import scala.collection.JavaConverters._
+
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.consumer.internals.AbstractCoordinator
import kafka.controller.ControllerEventManager
@@ -45,6 +45,7 @@ abstract class ZooKeeperTestHarness extends JUnitSuite with Logging {
protected val zkAclsEnabled: Option[Boolean] = None
var zkClient: KafkaZkClient = null
+ var otherZkClient: KafkaZkClient = null
var adminZkClient: AdminZkClient = null
var zookeeper: EmbeddedZookeeper = null
@@ -55,15 +56,22 @@ abstract class ZooKeeperTestHarness extends JUnitSuite with Logging {
@Before
def setUp() {
zookeeper = new EmbeddedZookeeper()
- zkClient = KafkaZkClient(zkConnect, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled), zkSessionTimeout,
- zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM)
+ zkClient = createZkClient
+ otherZkClient = createZkClient
adminZkClient = new AdminZkClient(zkClient)
}
+ protected def createZkClient = {
+ KafkaZkClient(zkConnect, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled), zkSessionTimeout,
+ zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM)
+ }
+
@After
def tearDown() {
if (zkClient != null)
- zkClient.close()
+ zkClient.close()
+ if (otherZkClient != null)
+ otherZkClient.close()
if (zookeeper != null)
CoreUtils.swallow(zookeeper.shutdown(), this)
Configuration.setConfiguration(null)
--
To stop receiving notification emails like this one, please contact
junrao@apache.org.