You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2018/03/02 02:03:38 UTC
[kafka] 07/09: Revert "Additional tests to improve test coverage of
KafkaZkClient."
This is an automated email from the ASF dual-hosted git repository.
junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
commit e03379905096b81362385fefe8c395b3f45105af
Author: Jun Rao <ju...@gmail.com>
AuthorDate: Thu Mar 1 18:02:07 2018 -0800
Revert "Additional tests to improve test coverage of KafkaZkClient."
This reverts commit d0eb552a891ecd99150ddf4b89985525806c6e31.
---
.../scala/unit/kafka/zk/KafkaZkClientTest.scala | 558 +--------------------
.../scala/unit/kafka/zk/ZooKeeperTestHarness.scala | 18 +-
2 files changed, 29 insertions(+), 547 deletions(-)
diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
index 9329430..d3726c2 100644
--- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
@@ -16,11 +16,10 @@
*/
package kafka.zk
-import java.util.{Collections, Properties, UUID}
+import java.util.{Properties, UUID}
import java.nio.charset.StandardCharsets.UTF_8
-import java.util.concurrent.{CountDownLatch, TimeUnit}
-import kafka.api.{ApiVersion, LeaderAndIsr}
+import kafka.api.ApiVersion
import kafka.cluster.{Broker, EndPoint}
import kafka.log.LogConfig
import kafka.security.auth._
@@ -31,30 +30,16 @@ import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.common.security.token.delegation.TokenInformation
import org.apache.kafka.common.utils.SecurityUtils
-import org.apache.zookeeper.KeeperException.{Code, NoNodeException, NodeExistsException}
+import org.apache.zookeeper.KeeperException.NodeExistsException
import org.junit.Assert._
import org.junit.Test
+
import scala.collection.JavaConverters._
-import scala.collection.mutable.ArrayBuffer
-import scala.collection.{Seq, mutable}
import scala.util.Random
-import kafka.controller.LeaderIsrAndControllerEpoch
-import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult
-import kafka.zookeeper._
-import org.apache.zookeeper.data.Stat
-
class KafkaZkClientTest extends ZooKeeperTestHarness {
private val group = "my-group"
- private val topic1 = "topic1"
- private val topic2 = "topic2"
-
- val topicPartition10 = new TopicPartition(topic1, 0)
- val topicPartition11 = new TopicPartition(topic1, 1)
- val topicPartition20 = new TopicPartition(topic2, 0)
- val topicPartitions10_11 = Seq(topicPartition10, topicPartition11)
-
private val topicPartition = new TopicPartition("topic", 0)
@Test
@@ -105,18 +90,17 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
@Test
def testTopicAssignmentMethods() {
- assertTrue(zkClient.getAllTopicsInCluster.isEmpty)
+ val topic1 = "topic1"
+ val topic2 = "topic2"
// test with non-existing topic
- assertFalse(zkClient.topicExists(topic1))
assertTrue(zkClient.getTopicPartitionCount(topic1).isEmpty)
assertTrue(zkClient.getPartitionAssignmentForTopics(Set(topic1)).isEmpty)
assertTrue(zkClient.getPartitionsForTopics(Set(topic1)).isEmpty)
assertTrue(zkClient.getReplicasForPartition(new TopicPartition(topic1, 2)).isEmpty)
- val topicPartition = new TopicPartition(topic1, 0)
val assignment = Map(
- topicPartition -> Seq(0, 1),
+ new TopicPartition(topic1, 0) -> Seq(0, 1),
new TopicPartition(topic1, 1) -> Seq(0, 1),
new TopicPartition(topic1, 2) -> Seq(1, 2, 3)
)
@@ -124,8 +108,6 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
// create a topic assignment
zkClient.createTopicAssignment(topic1, assignment)
- assertTrue(zkClient.topicExists(topic1))
-
val expectedAssignment = assignment map { topicAssignment =>
val partition = topicAssignment._1.partition
val assignment = topicAssignment._2
@@ -233,43 +215,6 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
}
@Test
- def testIsrChangeNotificationGetters(): Unit = {
- assertEquals("Failed for non existing parent ZK node", Seq.empty, zkClient.getAllIsrChangeNotifications)
- assertEquals("Failed for non existing parent ZK node", Seq.empty, zkClient.getPartitionsFromIsrChangeNotifications(Seq("0000000000")))
-
- zkClient.createRecursive("/isr_change_notification")
-
- zkClient.propagateIsrChanges(Set(topicPartition10, topicPartition11))
- zkClient.propagateIsrChanges(Set(topicPartition10))
-
- assertEquals(Set("0000000000", "0000000001"), zkClient.getAllIsrChangeNotifications.toSet)
-
- // A partition can have multiple notifications
- assertEquals(Seq(topicPartition10, topicPartition11, topicPartition10),
- zkClient.getPartitionsFromIsrChangeNotifications(Seq("0000000000", "0000000001")))
- }
-
- @Test
- def testIsrChangeNotificationsDeletion(): Unit = {
- // Should not fail even if parent node does not exist
- zkClient.deleteIsrChangeNotifications(Seq("0000000000"))
-
- zkClient.createRecursive("/isr_change_notification")
-
- zkClient.propagateIsrChanges(Set(topicPartition10, topicPartition11))
- zkClient.propagateIsrChanges(Set(topicPartition10))
- zkClient.propagateIsrChanges(Set(topicPartition11))
-
- zkClient.deleteIsrChangeNotifications(Seq("0000000001"))
- // Should not fail if called on a non-existent notification
- zkClient.deleteIsrChangeNotifications(Seq("0000000001"))
-
- assertEquals(Set("0000000000", "0000000002"), zkClient.getAllIsrChangeNotifications.toSet)
- zkClient.deleteIsrChangeNotifications()
- assertEquals(Seq.empty,zkClient.getAllIsrChangeNotifications)
- }
-
- @Test
def testPropagateLogDir(): Unit = {
zkClient.createRecursive("/log_dir_event_notification")
@@ -293,52 +238,6 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
}
@Test
- def testLogDirGetters(): Unit = {
- assertEquals("Failed for non existing parent ZK node", Seq.empty, zkClient.getAllLogDirEventNotifications)
- assertEquals("Failed for non existing parent ZK node", Seq.empty, zkClient.getBrokerIdsFromLogDirEvents(Seq("0000000000")))
-
- zkClient.createRecursive("/log_dir_event_notification")
-
- val brokerId = 3
- zkClient.propagateLogDirEvent(brokerId)
-
- assertEquals(Seq(3), zkClient.getBrokerIdsFromLogDirEvents(Seq("0000000000")))
-
- zkClient.propagateLogDirEvent(brokerId)
-
- val anotherBrokerId = 4
- zkClient.propagateLogDirEvent(anotherBrokerId)
-
- val notifications012 = Seq("0000000000", "0000000001", "0000000002")
- assertEquals(notifications012.toSet, zkClient.getAllLogDirEventNotifications.toSet)
- assertEquals(Seq(3, 3, 4), zkClient.getBrokerIdsFromLogDirEvents(notifications012))
- }
-
- @Test
- def testLogDirEventNotificationsDeletion(): Unit = {
- // Should not fail even if parent node does not exist
- zkClient.deleteLogDirEventNotifications(Seq("0000000000", "0000000002"))
-
- zkClient.createRecursive("/log_dir_event_notification")
-
- val brokerId = 3
- val anotherBrokerId = 4
-
- zkClient.propagateLogDirEvent(brokerId)
- zkClient.propagateLogDirEvent(brokerId)
- zkClient.propagateLogDirEvent(anotherBrokerId)
-
- zkClient.deleteLogDirEventNotifications(Seq("0000000000", "0000000002"))
-
- assertEquals(Seq("0000000001"), zkClient.getAllLogDirEventNotifications)
-
- zkClient.propagateLogDirEvent(anotherBrokerId)
-
- zkClient.deleteLogDirEventNotifications()
- assertEquals(Seq.empty, zkClient.getAllLogDirEventNotifications)
- }
-
- @Test
def testSetGetAndDeletePartitionReassignment() {
zkClient.createRecursive(AdminZNode.path)
@@ -478,23 +377,10 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
}
@Test
- def testDeletePath() {
- val path = "/a/b/c"
- zkClient.createRecursive(path)
- zkClient.deletePath(path)
- assertFalse(zkClient.pathExists(path))
- }
-
- @Test
- def testDeleteTopicZNode(): Unit ={
- zkClient.deleteTopicZNode(topic1)
- zkClient.createRecursive(TopicZNode.path(topic1))
- zkClient.deleteTopicZNode(topic1)
- assertFalse(zkClient.pathExists(TopicZNode.path(topic1)))
- }
-
- @Test
def testDeleteTopicPathMethods() {
+ val topic1 = "topic1"
+ val topic2 = "topic2"
+
assertFalse(zkClient.isTopicMarkedForDeletion(topic1))
assertTrue(zkClient.getTopicDeletions.isEmpty)
@@ -508,26 +394,18 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
assertTrue(zkClient.getTopicDeletions.isEmpty)
}
- private def assertPathExistenceAndData(expectedPath: String, data: String){
- assertTrue(zkClient.pathExists(expectedPath))
- assertEquals(Some(data), dataAsString(expectedPath))
- }
-
- @Test
- def testCreateTokenChangeNotification() {
- intercept[NoNodeException] {
- zkClient.createTokenChangeNotification("delegationToken")
- }
- zkClient.createDelegationTokenPaths()
-
- zkClient.createTokenChangeNotification("delegationToken")
- assertPathExistenceAndData(s"/delegation_token/token_changes/token_change_0000000000", "delegationToken")
- }
-
@Test
def testEntityConfigManagementMethods() {
+ val topic1 = "topic1"
+ val topic2 = "topic2"
+
assertTrue(zkClient.getEntityConfigs(ConfigType.Topic, topic1).isEmpty)
+ val logProps = new Properties()
+ logProps.put(LogConfig.SegmentBytesProp, "1024")
+ logProps.put(LogConfig.SegmentIndexBytesProp, "1024")
+ logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
+
zkClient.setOrCreateEntityConfigs(ConfigType.Topic, topic1, logProps)
assertEquals(logProps, zkClient.getEntityConfigs(ConfigType.Topic, topic1))
@@ -543,399 +421,15 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
}
@Test
- def testCreateConfigChangeNotification() {
- intercept[NoNodeException] {
- zkClient.createConfigChangeNotification(ConfigEntityZNode.path(ConfigType.Topic, topic1))
- }
-
- zkClient.createTopLevelPaths()
- zkClient.createConfigChangeNotification(ConfigEntityZNode.path(ConfigType.Topic, topic1))
-
- assertPathExistenceAndData(
- s"/config/changes/config_change_0000000000",
- """{"version":2,"entity_path":"/config/topics/topic1"}""")
- }
-
- private def createLogProps(bytesProp: Int) = {
- val logProps = new Properties()
- logProps.put(LogConfig.SegmentBytesProp, bytesProp.toString)
- logProps.put(LogConfig.SegmentIndexBytesProp, bytesProp.toString)
- logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
- logProps
- }
-
- private val logProps = createLogProps(1024)
-
- @Test
- def testGetLogConfigs() {
- val emptyConfig = LogConfig(Collections.emptyMap())
- assertEquals("Non existent config, no defaults",
- (Map(topic1 -> emptyConfig), Map.empty),
- zkClient.getLogConfigs(Seq(topic1), Collections.emptyMap()))
-
- val logProps2 = createLogProps(2048)
-
- zkClient.setOrCreateEntityConfigs(ConfigType.Topic, topic1, logProps)
- assertEquals("One existing and one non-existent topic",
- (Map(topic1 -> LogConfig(logProps), topic2 -> emptyConfig), Map.empty),
- zkClient.getLogConfigs(Seq(topic1, topic2), Collections.emptyMap()))
-
- zkClient.setOrCreateEntityConfigs(ConfigType.Topic, topic2, logProps2)
- assertEquals("Two existing topics",
- (Map(topic1 -> LogConfig(logProps), topic2 -> LogConfig(logProps2)), Map.empty),
- zkClient.getLogConfigs(Seq(topic1, topic2), Collections.emptyMap()))
-
- val logProps1WithMoreValues = createLogProps(1024)
- logProps1WithMoreValues.put(LogConfig.SegmentJitterMsProp, "100")
- logProps1WithMoreValues.put(LogConfig.SegmentBytesProp, "1024")
-
- assertEquals("Config with defaults",
- (Map(topic1 -> LogConfig(logProps1WithMoreValues)), Map.empty),
- zkClient.getLogConfigs(Seq(topic1),
- Map[String, AnyRef](LogConfig.SegmentJitterMsProp -> "100", LogConfig.SegmentBytesProp -> "128").asJava))
- }
-
- private def createBrokerInfo(id: Int, host: String, port: Int,
- securityProtocol: SecurityProtocol, rack: Option[String] = None) =
- BrokerInfo(Broker(id, Seq(new EndPoint(host, port, ListenerName.forSecurityProtocol
- (securityProtocol), securityProtocol)), rack = rack), ApiVersion.latestVersion, jmxPort = port + 10)
-
- @Test
- def testRegisterBrokerInfo() {
+ def testBrokerRegistrationMethods() {
zkClient.createTopLevelPaths()
- val brokerInfo = createBrokerInfo(1, "test.host", 9999, SecurityProtocol.PLAINTEXT)
- val differentBrokerInfoWithSameId = createBrokerInfo(1, "test.host2", 9995, SecurityProtocol.SSL)
+ val brokerInfo = BrokerInfo(Broker(1,
+ Seq(new EndPoint("test.host", 9999, ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), SecurityProtocol.PLAINTEXT)),
+ rack = None), ApiVersion.latestVersion, jmxPort = 9998)
zkClient.registerBrokerInZk(brokerInfo)
assertEquals(Some(brokerInfo.broker), zkClient.getBroker(1))
- assertEquals("Other ZK clients can read broker info", Some(brokerInfo.broker), otherZkClient.getBroker(1))
-
- // Node exists, owned by current session - no error, no update
- zkClient.registerBrokerInZk(differentBrokerInfoWithSameId)
- assertEquals(Some(brokerInfo.broker), zkClient.getBroker(1))
-
- // Other client tries to register broker with same id causes failure, info is not changed in ZK
- intercept[NodeExistsException] {
- otherZkClient.registerBrokerInZk(differentBrokerInfoWithSameId)
- }
- assertEquals(Some(brokerInfo.broker), zkClient.getBroker(1))
- }
-
- @Test
- def testGetBrokerMethods() {
- zkClient.createTopLevelPaths()
-
- assertEquals(Seq.empty,zkClient.getAllBrokersInCluster)
- assertEquals(Seq.empty, zkClient.getSortedBrokerList())
- assertEquals(None, zkClient.getBroker(0))
-
- val brokerInfo0 = createBrokerInfo(0, "test.host0", 9998, SecurityProtocol.PLAINTEXT)
- val brokerInfo1 = createBrokerInfo(1, "test.host1", 9999, SecurityProtocol.SSL)
-
- zkClient.registerBrokerInZk(brokerInfo1)
- otherZkClient.registerBrokerInZk(brokerInfo0)
-
- assertEquals(Seq(0, 1), zkClient.getSortedBrokerList())
- assertEquals(
- Seq(brokerInfo0.broker, brokerInfo1.broker),
- zkClient.getAllBrokersInCluster
- )
- assertEquals(Some(brokerInfo0.broker), zkClient.getBroker(0))
- }
-
- @Test
- def testUpdateBrokerInfo() {
- zkClient.createTopLevelPaths()
-
- // Updating info of a broker not existing in ZK fails
- val originalBrokerInfo = createBrokerInfo(1, "test.host", 9999, SecurityProtocol.PLAINTEXT)
- intercept[NoNodeException]{
- zkClient.updateBrokerInfoInZk(originalBrokerInfo)
- }
-
- zkClient.registerBrokerInZk(originalBrokerInfo)
-
- val updatedBrokerInfo = createBrokerInfo(1, "test.host2", 9995, SecurityProtocol.SSL)
- zkClient.updateBrokerInfoInZk(updatedBrokerInfo)
- assertEquals(Some(updatedBrokerInfo.broker), zkClient.getBroker(1))
-
- // Other ZK clients can update info
- otherZkClient.updateBrokerInfoInZk(originalBrokerInfo)
- assertEquals(Some(originalBrokerInfo.broker), otherZkClient.getBroker(1))
- }
-
- private def statWithVersion(version: Int) = {
- val stat = new Stat(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)
- stat.setVersion(version)
- stat
- }
-
- private def leaderIsrAndControllerEpochs(state: Int, zkVersion: Int) = Map(
- topicPartition10 -> LeaderIsrAndControllerEpoch(
- LeaderAndIsr(leader = 1, leaderEpoch = state, isr = List(2 + state, 3 + state), zkVersion = zkVersion),
- controllerEpoch = 4),
- topicPartition11 -> LeaderIsrAndControllerEpoch(
- LeaderAndIsr(leader = 0, leaderEpoch = state + 1, isr = List(1 + state, 2 + state), zkVersion = zkVersion),
- controllerEpoch = 4))
-
- private def initialLeaderIsrAndControllerEpochs = leaderIsrAndControllerEpochs(0, 0)
- private def updatedLeaderIsrAndControllerEpochs(state: Int) = leaderIsrAndControllerEpochs(state, state - 1)
-
- private def initialLeaderIsrs = initialLeaderIsrAndControllerEpochs.mapValues(_.leaderAndIsr)
- private def updatedLeaderIsrs(state: Int) = updatedLeaderIsrAndControllerEpochs(state).mapValues(_.leaderAndIsr)
- private def leaderIsrs(state: Int, zkVersion: Int) =
- leaderIsrAndControllerEpochs(state, zkVersion).mapValues(_.leaderAndIsr)
-
- private def checkUpdateLeaderAndIsrResult(
- expectedSuccessfulPartitions: Map[TopicPartition, LeaderAndIsr],
- expectedPartitionsToRetry: Seq[TopicPartition],
- expectedFailedPartitions: Map[TopicPartition, (Class[_], String)],
- actualUpdateLeaderAndIsrResult: UpdateLeaderAndIsrResult): Unit = {
- val failedPartitionsExcerpt =
- actualUpdateLeaderAndIsrResult.failedPartitions.mapValues(e => (e.getClass, e.getMessage))
- assertEquals("Permanently failed updates do not match expected",
- expectedFailedPartitions, failedPartitionsExcerpt)
- assertEquals("Retriable updates (due to BADVERSION) do not match expected",
- expectedPartitionsToRetry, actualUpdateLeaderAndIsrResult.partitionsToRetry)
- assertEquals("Successful updates do not match expected",
- expectedSuccessfulPartitions, actualUpdateLeaderAndIsrResult.successfulPartitions)
- }
-
- @Test
- def testUpdateLeaderAndIsr() {
- zkClient.createRecursive(TopicZNode.path(topic1))
-
- // Non-existing topicPartitions
- checkUpdateLeaderAndIsrResult(
- Map.empty,
- mutable.ArrayBuffer.empty,
- Map(
- topicPartition10 -> (classOf[NoNodeException], "KeeperErrorCode = NoNode for /brokers/topics/topic1/partitions/0/state"),
- topicPartition11 -> (classOf[NoNodeException], "KeeperErrorCode = NoNode for /brokers/topics/topic1/partitions/1/state")),
- zkClient.updateLeaderAndIsr(initialLeaderIsrs, controllerEpoch = 4))
-
- zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs)
-
- checkUpdateLeaderAndIsrResult(
- leaderIsrs(state = 1, zkVersion = 1),
- mutable.ArrayBuffer.empty,
- Map.empty,
- zkClient.updateLeaderAndIsr(updatedLeaderIsrs(1),controllerEpoch = 4))
-
- // Try to update with wrong ZK version
- checkUpdateLeaderAndIsrResult(
- Map.empty,
- ArrayBuffer(topicPartition10, topicPartition11),
- Map.empty,
- zkClient.updateLeaderAndIsr(updatedLeaderIsrs(1),controllerEpoch = 4))
-
- // Trigger successful, to be retried and failed partitions in same call
- val mixedState = Map(
- topicPartition10 -> LeaderAndIsr(leader = 1, leaderEpoch = 2, isr = List(4, 5), zkVersion = 1),
- topicPartition11 -> LeaderAndIsr(leader = 0, leaderEpoch = 2, isr = List(3, 4), zkVersion = 0),
- topicPartition20 -> LeaderAndIsr(leader = 0, leaderEpoch = 2, isr = List(3, 4), zkVersion = 0))
-
- checkUpdateLeaderAndIsrResult(
- leaderIsrs(state = 2, zkVersion = 2).filterKeys{_ == topicPartition10},
- ArrayBuffer(topicPartition11),
- Map(
- topicPartition20 -> (classOf[NoNodeException], "KeeperErrorCode = NoNode for /brokers/topics/topic2/partitions/0/state")),
- zkClient.updateLeaderAndIsr(mixedState, controllerEpoch = 4))
- }
-
- private def checkGetDataResponse(
- leaderIsrAndControllerEpochs: Map[TopicPartition,LeaderIsrAndControllerEpoch],
- topicPartition: TopicPartition,
- response: GetDataResponse) = {
- val zkVersion = leaderIsrAndControllerEpochs(topicPartition).leaderAndIsr.zkVersion
- assertEquals(Code.OK, response.resultCode)
- assertEquals(TopicPartitionStateZNode.path(topicPartition), response.path)
- assertEquals(Some(topicPartition), response.ctx)
- assertEquals(
- Some(leaderIsrAndControllerEpochs(topicPartition)),
- TopicPartitionStateZNode.decode(response.data, statWithVersion(zkVersion)))
- }
-
- private def eraseMetadata(response: CreateResponse) = response.copy(metadata = ResponseMetadata(0, 0))
-
- @Test
- def testGetTopicsAndPartitions() {
- assertTrue(zkClient.getAllTopicsInCluster.isEmpty)
- assertTrue(zkClient.getAllPartitions.isEmpty)
-
- zkClient.createRecursive(TopicZNode.path(topic1))
- zkClient.createRecursive(TopicZNode.path(topic2))
- assertEquals(Set(topic1, topic2), zkClient.getAllTopicsInCluster.toSet)
-
- assertTrue(zkClient.getAllPartitions.isEmpty)
-
- zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs)
- assertEquals(Set(topicPartition10, topicPartition11), zkClient.getAllPartitions)
- }
-
- @Test
- def testCreateAndGetTopicPartitionStatesRaw() {
- zkClient.createRecursive(TopicZNode.path(topic1))
-
- assertEquals(
- Seq(
- CreateResponse(Code.OK, TopicPartitionStateZNode.path(topicPartition10), Some(topicPartition10),
- TopicPartitionStateZNode.path(topicPartition10), ResponseMetadata(0, 0)),
- CreateResponse(Code.OK, TopicPartitionStateZNode.path(topicPartition11), Some(topicPartition11),
- TopicPartitionStateZNode.path(topicPartition11), ResponseMetadata(0, 0))),
- zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs)
- .map(eraseMetadata).toList)
-
- val getResponses = zkClient.getTopicPartitionStatesRaw(topicPartitions10_11)
- assertEquals(2, getResponses.size)
- topicPartitions10_11.zip(getResponses) foreach {case (tp, r) => checkGetDataResponse(initialLeaderIsrAndControllerEpochs, tp, r)}
-
- // Trying to create existing topicPartition states fails
- assertEquals(
- Seq(
- CreateResponse(Code.NODEEXISTS, TopicPartitionStateZNode.path(topicPartition10), Some(topicPartition10),
- null, ResponseMetadata(0, 0)),
- CreateResponse(Code.NODEEXISTS, TopicPartitionStateZNode.path(topicPartition11), Some(topicPartition11),
- null, ResponseMetadata(0, 0))),
- zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs).map(eraseMetadata).toList)
- }
-
- @Test
- def testSetTopicPartitionStatesRaw() {
-
- def expectedSetDataResponses(topicPartitions: TopicPartition*)(resultCode: Code, stat: Stat) =
- topicPartitions.map { topicPartition =>
- SetDataResponse(resultCode, TopicPartitionStateZNode.path(topicPartition),
- Some(topicPartition), stat, ResponseMetadata(0, 0))
- }
-
- zkClient.createRecursive(TopicZNode.path(topic1))
-
- // Trying to set non-existing topicPartition's data results in NONODE responses
- assertEquals(
- expectedSetDataResponses(topicPartition10, topicPartition11)(Code.NONODE, null),
- zkClient.setTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs).map {
- _.copy(metadata = ResponseMetadata(0, 0))}.toList)
-
- zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs)
-
- assertEquals(
- expectedSetDataResponses(topicPartition10, topicPartition11)(Code.OK, statWithVersion(1)),
- zkClient.setTopicPartitionStatesRaw(updatedLeaderIsrAndControllerEpochs(1)).map {
- eraseMetadataAndStat}.toList)
-
-
- val getResponses = zkClient.getTopicPartitionStatesRaw(topicPartitions10_11)
- assertEquals(2, getResponses.size)
- topicPartitions10_11.zip(getResponses) foreach {case (tp, r) => checkGetDataResponse(updatedLeaderIsrAndControllerEpochs(1), tp, r)}
-
- // Other ZK client can also write the state of a partition
- assertEquals(
- expectedSetDataResponses(topicPartition10, topicPartition11)(Code.OK, statWithVersion(2)),
- otherZkClient.setTopicPartitionStatesRaw(updatedLeaderIsrAndControllerEpochs(2)).map {
- eraseMetadataAndStat}.toList)
- }
-
- @Test
- def testReassignPartitionsInProgress() {
- assertFalse(zkClient.reassignPartitionsInProgress)
- zkClient.createRecursive(ReassignPartitionsZNode.path)
- assertTrue(zkClient.reassignPartitionsInProgress)
- }
-
- @Test
- def testGetTopicPartitionStates() {
- assertEquals(None, zkClient.getTopicPartitionState(topicPartition10))
- assertEquals(None, zkClient.getLeaderForPartition(topicPartition10))
-
- zkClient.createRecursive(TopicZNode.path(topic1))
-
-
- zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs).map(eraseMetadata).toList
- assertEquals(
- initialLeaderIsrAndControllerEpochs,
- zkClient.getTopicPartitionStates(Seq(topicPartition10, topicPartition11))
- )
-
- assertEquals(
- Some(initialLeaderIsrAndControllerEpochs(topicPartition10)),
- zkClient.getTopicPartitionState(topicPartition10)
- )
-
- assertEquals(Some(1), zkClient.getLeaderForPartition(topicPartition10))
-
- val notExistingPartition = new TopicPartition(topic1, 2)
- assertTrue(zkClient.getTopicPartitionStates(Seq(notExistingPartition)).isEmpty)
- assertEquals(
- Map(topicPartition10 -> initialLeaderIsrAndControllerEpochs(topicPartition10)),
- zkClient.getTopicPartitionStates(Seq(topicPartition10, notExistingPartition))
- )
-
- assertEquals(None, zkClient.getTopicPartitionState(notExistingPartition))
- assertEquals(None, zkClient.getLeaderForPartition(notExistingPartition))
-
- }
-
- private def eraseMetadataAndStat(response: SetDataResponse) = {
- val stat = if (response.stat != null) statWithVersion(response.stat.getVersion) else null
- response.copy(metadata = ResponseMetadata(0, 0), stat = stat)
- }
-
- @Test
- def testControllerEpochMethods() {
- assertEquals(None, zkClient.getControllerEpoch)
-
- assertEquals("Setting non existing nodes should return NONODE results",
- SetDataResponse(Code.NONODE, ControllerEpochZNode.path, None, null, ResponseMetadata(0, 0)),
- eraseMetadataAndStat(zkClient.setControllerEpochRaw(1, 0)))
-
- assertEquals("Creating non existing nodes is OK",
- CreateResponse(Code.OK, ControllerEpochZNode.path, None, ControllerEpochZNode.path, ResponseMetadata(0, 0)),
- eraseMetadata(zkClient.createControllerEpochRaw(0)))
- assertEquals(0, zkClient.getControllerEpoch.get._1)
-
- assertEquals("Attemt to create existing nodes should return NODEEXISTS",
- CreateResponse(Code.NODEEXISTS, ControllerEpochZNode.path, None, null, ResponseMetadata(0, 0)),
- eraseMetadata(zkClient.createControllerEpochRaw(0)))
-
- assertEquals("Updating existing nodes is OK",
- SetDataResponse(Code.OK, ControllerEpochZNode.path, None, statWithVersion(1), ResponseMetadata(0, 0)),
- eraseMetadataAndStat(zkClient.setControllerEpochRaw(1, 0)))
- assertEquals(1, zkClient.getControllerEpoch.get._1)
-
- assertEquals("Updating with wrong ZK version returns BADVERSION",
- SetDataResponse(Code.BADVERSION, ControllerEpochZNode.path, None, null, ResponseMetadata(0, 0)),
- eraseMetadataAndStat(zkClient.setControllerEpochRaw(1, 0)))
- }
-
- @Test
- def testControllerManagementMethods() {
- // No controller
- assertEquals(None, zkClient.getControllerId)
- // Create controller
- zkClient.checkedEphemeralCreate(ControllerZNode.path, ControllerZNode.encode(brokerId = 1, timestamp = 123456))
- assertEquals(Some(1), zkClient.getControllerId)
- zkClient.deleteController()
- assertEquals(None, zkClient.getControllerId)
- }
-
- @Test
- def testZNodeChangeHandlerForDataChange(): Unit = {
- val mockPath = "/foo"
-
- val znodeChangeHandlerCountDownLatch = new CountDownLatch(1)
- val zNodeChangeHandler = new ZNodeChangeHandler {
- override def handleCreation(): Unit = {
- znodeChangeHandlerCountDownLatch.countDown()
- }
-
- override val path: String = mockPath
- }
-
- zkClient.registerZNodeChangeHandlerAndCheckExistence(zNodeChangeHandler)
- zkClient.createRecursive(mockPath)
- assertTrue("Failed to receive create notification", znodeChangeHandlerCountDownLatch.await(5, TimeUnit.SECONDS))
}
@Test
@@ -964,6 +458,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
assertTrue(zkClient.getPreferredReplicaElection.isEmpty)
+ val topic1 = "topic1"
val electionPartitions = Set(new TopicPartition(topic1, 0), new TopicPartition(topic1, 1))
zkClient.createPreferredReplicaElection(electionPartitions)
@@ -1003,7 +498,6 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
// test non-existent token
assertTrue(zkClient.getDelegationTokenInfo(tokenId).isEmpty)
- assertFalse(zkClient.deleteDelegationToken(tokenId))
// create a token
zkClient.setOrCreateDelegationToken(token)
@@ -1017,9 +511,5 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
//test updated token
assertEquals(tokenInfo, zkClient.getDelegationTokenInfo(tokenId).get)
-
- //test deleting token
- assertTrue(zkClient.deleteDelegationToken(tokenId))
- assertEquals(None, zkClient.getDelegationTokenInfo(tokenId))
}
-}
\ No newline at end of file
+}
diff --git a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
index f9cb8e3..a122297 100755
--- a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
@@ -19,16 +19,16 @@ package kafka.zk
import javax.security.auth.login.Configuration
-import kafka.utils.{CoreUtils, Logging, TestUtils, ZkUtils}
+import kafka.utils.{CoreUtils, Logging, TestUtils}
import org.junit.{After, AfterClass, Before, BeforeClass}
import org.junit.Assert._
import org.scalatest.junit.JUnitSuite
import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.test.IntegrationTest
import org.junit.experimental.categories.Category
+
import scala.collection.Set
import scala.collection.JavaConverters._
-
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.consumer.internals.AbstractCoordinator
import kafka.controller.ControllerEventManager
@@ -45,7 +45,6 @@ abstract class ZooKeeperTestHarness extends JUnitSuite with Logging {
protected val zkAclsEnabled: Option[Boolean] = None
var zkClient: KafkaZkClient = null
- var otherZkClient: KafkaZkClient = null
var adminZkClient: AdminZkClient = null
var zookeeper: EmbeddedZookeeper = null
@@ -56,22 +55,15 @@ abstract class ZooKeeperTestHarness extends JUnitSuite with Logging {
@Before
def setUp() {
zookeeper = new EmbeddedZookeeper()
- zkClient = createZkClient
- otherZkClient = createZkClient
- adminZkClient = new AdminZkClient(zkClient)
- }
-
- protected def createZkClient = {
- KafkaZkClient(zkConnect, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled), zkSessionTimeout,
+ zkClient = KafkaZkClient(zkConnect, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled), zkSessionTimeout,
zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM)
+ adminZkClient = new AdminZkClient(zkClient)
}
@After
def tearDown() {
if (zkClient != null)
- zkClient.close()
- if (otherZkClient != null)
- otherZkClient.close()
+ zkClient.close()
if (zookeeper != null)
CoreUtils.swallow(zookeeper.shutdown(), this)
Configuration.setConfiguration(null)
--
To stop receiving notification emails like this one, please contact
junrao@apache.org.