You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2018/03/02 19:29:11 UTC
[kafka] branch trunk updated (5d87b92 -> 9868976)
This is an automated email from the ASF dual-hosted git repository.
ijuma pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git.
omit 5d87b92 KAFKA-6111: Improve test coverage of KafkaZkClient, fix bugs found by new tests;
omit 57c1a68 Revert "Fix in updateBrokerInfoInZk, exception is thrown if response was not OK."
omit f6b3906 Revert "Fix in deleteLogDirEventNotifications - use correct path for deleted children"
omit e033799 Revert "Additional tests to improve test coverage of KafkaZkClient."
omit 6f24af3 Revert "Use move otherZkClient to KafkaZkClientTest"
omit 0724179 Revert "Undo unintentional whitespace change in ZooKeeperTestHarness"
omit 4110ad5 Revert "Fixes requested by reviewer"
omit ae52e10 Revert "Minor fixes requested by reviewer"
omit aa3453c Revert "Changes requeted by reviewer"
omit 720e226 Revert "removed method updatedLeaderIsrAndControllerEpochs"
omit 8100fd9 removed method updatedLeaderIsrAndControllerEpochs
omit cacd377 Changes requeted by reviewer
omit 1ae4c1d Minor fixes requested by reviewer
omit ae51a15 Fixes requested by reviewer
omit 155ac59 Undo unintentional whitespace change in ZooKeeperTestHarness
omit 54a3220 Use move otherZkClient to KafkaZkClientTest
omit d0eb552 Additional tests to improve test coverage of KafkaZkClient.
omit af598af Fix in deleteLogDirEventNotifications - use correct path for deleted children
omit 0103103 Fix in updateBrokerInfoInZk, exception is thrown if response was not OK.
new 9868976 KAFKA-6111: Improve test coverage of KafkaZkClient, fix bugs found by new tests;
This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version. This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:
* -- * -- B -- O -- O -- O (5d87b92)
\
N -- N -- N refs/heads/trunk (9868976)
You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.
Any revisions marked "omit" are not gone; other references still
refer to them. Any revisions marked "discard" are gone forever.
The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
Summary of changes:
--
To stop receiving notification emails like this one, please contact
ijuma@apache.org.
[kafka] 01/01: KAFKA-6111: Improve test coverage of KafkaZkClient,
fix bugs found by new tests;
Posted by ij...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 9868976747fb4a7a4a9b18e1f4050633c226be7d
Author: Sandor Murakozi <sm...@gmail.com>
AuthorDate: Thu Mar 1 18:12:52 2018 -0800
KAFKA-6111: Improve test coverage of KafkaZkClient, fix bugs found by new tests;
---
core/src/main/scala/kafka/zk/KafkaZkClient.scala | 5 +-
.../scala/unit/kafka/zk/KafkaZkClientTest.scala | 576 ++++++++++++++++++++-
2 files changed, 555 insertions(+), 26 deletions(-)
diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index 6545fde..d61b281 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -88,7 +88,8 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean
def updateBrokerInfoInZk(brokerInfo: BrokerInfo): Unit = {
val brokerIdPath = brokerInfo.path
val setDataRequest = SetDataRequest(brokerIdPath, brokerInfo.toJsonBytes, ZkVersion.NoVersion)
- retryRequestUntilConnected(setDataRequest)
+ val response = retryRequestUntilConnected(setDataRequest)
+ response.maybeThrow()
info("Updated broker %d at path %s with addresses: %s".format(brokerInfo.broker.id, brokerIdPath, brokerInfo.broker.endPoints))
}
@@ -424,7 +425,7 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean
def deleteLogDirEventNotifications(): Unit = {
val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(LogDirEventNotificationZNode.path))
if (getChildrenResponse.resultCode == Code.OK) {
- deleteLogDirEventNotifications(getChildrenResponse.children)
+ deleteLogDirEventNotifications(getChildrenResponse.children.map(LogDirEventNotificationSequenceZNode.sequenceNumber))
} else if (getChildrenResponse.resultCode != Code.NONODE) {
getChildrenResponse.maybeThrow
}
diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
index d3726c2..e44c2c9 100644
--- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
@@ -16,10 +16,11 @@
*/
package kafka.zk
-import java.util.{Properties, UUID}
+import java.util.{Collections, Properties, UUID}
import java.nio.charset.StandardCharsets.UTF_8
+import java.util.concurrent.{CountDownLatch, TimeUnit}
-import kafka.api.ApiVersion
+import kafka.api.{ApiVersion, LeaderAndIsr}
import kafka.cluster.{Broker, EndPoint}
import kafka.log.LogConfig
import kafka.security.auth._
@@ -29,17 +30,48 @@ import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.common.security.token.delegation.TokenInformation
-import org.apache.kafka.common.utils.SecurityUtils
-import org.apache.zookeeper.KeeperException.NodeExistsException
+import org.apache.kafka.common.utils.{SecurityUtils, Time}
+import org.apache.zookeeper.KeeperException.{Code, NoNodeException, NodeExistsException}
import org.junit.Assert._
-import org.junit.Test
-
+import org.junit.{After, Before, Test}
import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.{Seq, mutable}
import scala.util.Random
+import kafka.controller.LeaderIsrAndControllerEpoch
+import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult
+import kafka.zookeeper._
+import org.apache.kafka.common.security.JaasUtils
+import org.apache.zookeeper.data.Stat
+
class KafkaZkClientTest extends ZooKeeperTestHarness {
private val group = "my-group"
+ private val topic1 = "topic1"
+ private val topic2 = "topic2"
+
+ val topicPartition10 = new TopicPartition(topic1, 0)
+ val topicPartition11 = new TopicPartition(topic1, 1)
+ val topicPartition20 = new TopicPartition(topic2, 0)
+ val topicPartitions10_11 = Seq(topicPartition10, topicPartition11)
+
+ var otherZkClient: KafkaZkClient = _
+
+ @Before
+ override def setUp(): Unit = {
+ super.setUp()
+ otherZkClient = KafkaZkClient(zkConnect, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled), zkSessionTimeout,
+ zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM)
+ }
+
+ @After
+ override def tearDown(): Unit = {
+ if (otherZkClient != null)
+ otherZkClient.close()
+ super.tearDown()
+ }
+
private val topicPartition = new TopicPartition("topic", 0)
@Test
@@ -90,10 +122,10 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
@Test
def testTopicAssignmentMethods() {
- val topic1 = "topic1"
- val topic2 = "topic2"
+ assertTrue(zkClient.getAllTopicsInCluster.isEmpty)
// test with non-existing topic
+ assertFalse(zkClient.topicExists(topic1))
assertTrue(zkClient.getTopicPartitionCount(topic1).isEmpty)
assertTrue(zkClient.getPartitionAssignmentForTopics(Set(topic1)).isEmpty)
assertTrue(zkClient.getPartitionsForTopics(Set(topic1)).isEmpty)
@@ -108,6 +140,8 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
// create a topic assignment
zkClient.createTopicAssignment(topic1, assignment)
+ assertTrue(zkClient.topicExists(topic1))
+
val expectedAssignment = assignment map { topicAssignment =>
val partition = topicAssignment._1.partition
val assignment = topicAssignment._2
@@ -215,6 +249,43 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
}
@Test
+ def testIsrChangeNotificationGetters(): Unit = {
+ assertEquals("Failed for non existing parent ZK node", Seq.empty, zkClient.getAllIsrChangeNotifications)
+ assertEquals("Failed for non existing parent ZK node", Seq.empty, zkClient.getPartitionsFromIsrChangeNotifications(Seq("0000000000")))
+
+ zkClient.createRecursive("/isr_change_notification")
+
+ zkClient.propagateIsrChanges(Set(topicPartition10, topicPartition11))
+ zkClient.propagateIsrChanges(Set(topicPartition10))
+
+ assertEquals(Set("0000000000", "0000000001"), zkClient.getAllIsrChangeNotifications.toSet)
+
+ // A partition can have multiple notifications
+ assertEquals(Seq(topicPartition10, topicPartition11, topicPartition10),
+ zkClient.getPartitionsFromIsrChangeNotifications(Seq("0000000000", "0000000001")))
+ }
+
+ @Test
+ def testIsrChangeNotificationsDeletion(): Unit = {
+ // Should not fail even if parent node does not exist
+ zkClient.deleteIsrChangeNotifications(Seq("0000000000"))
+
+ zkClient.createRecursive("/isr_change_notification")
+
+ zkClient.propagateIsrChanges(Set(topicPartition10, topicPartition11))
+ zkClient.propagateIsrChanges(Set(topicPartition10))
+ zkClient.propagateIsrChanges(Set(topicPartition11))
+
+ zkClient.deleteIsrChangeNotifications(Seq("0000000001"))
+ // Should not fail if called on a non-existent notification
+ zkClient.deleteIsrChangeNotifications(Seq("0000000001"))
+
+ assertEquals(Set("0000000000", "0000000002"), zkClient.getAllIsrChangeNotifications.toSet)
+ zkClient.deleteIsrChangeNotifications()
+ assertEquals(Seq.empty,zkClient.getAllIsrChangeNotifications)
+ }
+
+ @Test
def testPropagateLogDir(): Unit = {
zkClient.createRecursive("/log_dir_event_notification")
@@ -238,6 +309,54 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
}
@Test
+ def testLogDirGetters(): Unit = {
+ assertEquals("getAllLogDirEventNotifications failed for non existing parent ZK node",
+ Seq.empty, zkClient.getAllLogDirEventNotifications)
+ assertEquals("getBrokerIdsFromLogDirEvents failed for non existing parent ZK node",
+ Seq.empty, zkClient.getBrokerIdsFromLogDirEvents(Seq("0000000000")))
+
+ zkClient.createRecursive("/log_dir_event_notification")
+
+ val brokerId = 3
+ zkClient.propagateLogDirEvent(brokerId)
+
+ assertEquals(Seq(3), zkClient.getBrokerIdsFromLogDirEvents(Seq("0000000000")))
+
+ zkClient.propagateLogDirEvent(brokerId)
+
+ val anotherBrokerId = 4
+ zkClient.propagateLogDirEvent(anotherBrokerId)
+
+ val notifications012 = Seq("0000000000", "0000000001", "0000000002")
+ assertEquals(notifications012.toSet, zkClient.getAllLogDirEventNotifications.toSet)
+ assertEquals(Seq(3, 3, 4), zkClient.getBrokerIdsFromLogDirEvents(notifications012))
+ }
+
+ @Test
+ def testLogDirEventNotificationsDeletion(): Unit = {
+ // Should not fail even if parent node does not exist
+ zkClient.deleteLogDirEventNotifications(Seq("0000000000", "0000000002"))
+
+ zkClient.createRecursive("/log_dir_event_notification")
+
+ val brokerId = 3
+ val anotherBrokerId = 4
+
+ zkClient.propagateLogDirEvent(brokerId)
+ zkClient.propagateLogDirEvent(brokerId)
+ zkClient.propagateLogDirEvent(anotherBrokerId)
+
+ zkClient.deleteLogDirEventNotifications(Seq("0000000000", "0000000002"))
+
+ assertEquals(Seq("0000000001"), zkClient.getAllLogDirEventNotifications)
+
+ zkClient.propagateLogDirEvent(anotherBrokerId)
+
+ zkClient.deleteLogDirEventNotifications()
+ assertEquals(Seq.empty, zkClient.getAllLogDirEventNotifications)
+ }
+
+ @Test
def testSetGetAndDeletePartitionReassignment() {
zkClient.createRecursive(AdminZNode.path)
@@ -377,10 +496,23 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
}
@Test
- def testDeleteTopicPathMethods() {
- val topic1 = "topic1"
- val topic2 = "topic2"
+ def testDeletePath(): Unit = {
+ val path = "/a/b/c"
+ zkClient.createRecursive(path)
+ zkClient.deletePath(path)
+ assertFalse(zkClient.pathExists(path))
+ }
+
+ @Test
+ def testDeleteTopicZNode(): Unit = {
+ zkClient.deleteTopicZNode(topic1)
+ zkClient.createRecursive(TopicZNode.path(topic1))
+ zkClient.deleteTopicZNode(topic1)
+ assertFalse(zkClient.pathExists(TopicZNode.path(topic1)))
+ }
+ @Test
+ def testDeleteTopicPathMethods() {
assertFalse(zkClient.isTopicMarkedForDeletion(topic1))
assertTrue(zkClient.getTopicDeletions.isEmpty)
@@ -394,18 +526,26 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
assertTrue(zkClient.getTopicDeletions.isEmpty)
}
+ private def assertPathExistenceAndData(expectedPath: String, data: String): Unit = {
+ assertTrue(zkClient.pathExists(expectedPath))
+ assertEquals(Some(data), dataAsString(expectedPath))
+ }
+
+ @Test
+ def testCreateTokenChangeNotification(): Unit = {
+ intercept[NoNodeException] {
+ zkClient.createTokenChangeNotification("delegationToken")
+ }
+ zkClient.createDelegationTokenPaths()
+
+ zkClient.createTokenChangeNotification("delegationToken")
+ assertPathExistenceAndData("/delegation_token/token_changes/token_change_0000000000", "delegationToken")
+ }
+
@Test
def testEntityConfigManagementMethods() {
- val topic1 = "topic1"
- val topic2 = "topic2"
-
assertTrue(zkClient.getEntityConfigs(ConfigType.Topic, topic1).isEmpty)
- val logProps = new Properties()
- logProps.put(LogConfig.SegmentBytesProp, "1024")
- logProps.put(LogConfig.SegmentIndexBytesProp, "1024")
- logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
-
zkClient.setOrCreateEntityConfigs(ConfigType.Topic, topic1, logProps)
assertEquals(logProps, zkClient.getEntityConfigs(ConfigType.Topic, topic1))
@@ -421,15 +561,399 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
}
@Test
- def testBrokerRegistrationMethods() {
+ def testCreateConfigChangeNotification(): Unit = {
+ intercept[NoNodeException] {
+ zkClient.createConfigChangeNotification(ConfigEntityZNode.path(ConfigType.Topic, topic1))
+ }
+
+ zkClient.createTopLevelPaths()
+ zkClient.createConfigChangeNotification(ConfigEntityZNode.path(ConfigType.Topic, topic1))
+
+ assertPathExistenceAndData(
+ "/config/changes/config_change_0000000000",
+ """{"version":2,"entity_path":"/config/topics/topic1"}""")
+ }
+
+ private def createLogProps(bytesProp: Int): Properties = {
+ val logProps = new Properties()
+ logProps.put(LogConfig.SegmentBytesProp, bytesProp.toString)
+ logProps.put(LogConfig.SegmentIndexBytesProp, bytesProp.toString)
+ logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
+ logProps
+ }
+
+ private val logProps = createLogProps(1024)
+
+ @Test
+ def testGetLogConfigs(): Unit = {
+ val emptyConfig = LogConfig(Collections.emptyMap())
+ assertEquals("Non existent config, no defaults",
+ (Map(topic1 -> emptyConfig), Map.empty),
+ zkClient.getLogConfigs(Seq(topic1), Collections.emptyMap()))
+
+ val logProps2 = createLogProps(2048)
+
+ zkClient.setOrCreateEntityConfigs(ConfigType.Topic, topic1, logProps)
+ assertEquals("One existing and one non-existent topic",
+ (Map(topic1 -> LogConfig(logProps), topic2 -> emptyConfig), Map.empty),
+ zkClient.getLogConfigs(Seq(topic1, topic2), Collections.emptyMap()))
+
+ zkClient.setOrCreateEntityConfigs(ConfigType.Topic, topic2, logProps2)
+ assertEquals("Two existing topics",
+ (Map(topic1 -> LogConfig(logProps), topic2 -> LogConfig(logProps2)), Map.empty),
+ zkClient.getLogConfigs(Seq(topic1, topic2), Collections.emptyMap()))
+
+ val logProps1WithMoreValues = createLogProps(1024)
+ logProps1WithMoreValues.put(LogConfig.SegmentJitterMsProp, "100")
+ logProps1WithMoreValues.put(LogConfig.SegmentBytesProp, "1024")
+
+ assertEquals("Config with defaults",
+ (Map(topic1 -> LogConfig(logProps1WithMoreValues)), Map.empty),
+ zkClient.getLogConfigs(Seq(topic1),
+ Map[String, AnyRef](LogConfig.SegmentJitterMsProp -> "100", LogConfig.SegmentBytesProp -> "128").asJava))
+ }
+
+ private def createBrokerInfo(id: Int, host: String, port: Int, securityProtocol: SecurityProtocol,
+ rack: Option[String] = None): BrokerInfo =
+ BrokerInfo(Broker(id, Seq(new EndPoint(host, port, ListenerName.forSecurityProtocol
+ (securityProtocol), securityProtocol)), rack = rack), ApiVersion.latestVersion, jmxPort = port + 10)
+
+ @Test
+ def testRegisterBrokerInfo(): Unit = {
zkClient.createTopLevelPaths()
- val brokerInfo = BrokerInfo(Broker(1,
- Seq(new EndPoint("test.host", 9999, ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), SecurityProtocol.PLAINTEXT)),
- rack = None), ApiVersion.latestVersion, jmxPort = 9998)
+ val brokerInfo = createBrokerInfo(1, "test.host", 9999, SecurityProtocol.PLAINTEXT)
+ val differentBrokerInfoWithSameId = createBrokerInfo(1, "test.host2", 9995, SecurityProtocol.SSL)
zkClient.registerBrokerInZk(brokerInfo)
assertEquals(Some(brokerInfo.broker), zkClient.getBroker(1))
+ assertEquals("Other ZK clients can read broker info", Some(brokerInfo.broker), otherZkClient.getBroker(1))
+
+ // Node exists, owned by current session - no error, no update
+ zkClient.registerBrokerInZk(differentBrokerInfoWithSameId)
+ assertEquals(Some(brokerInfo.broker), zkClient.getBroker(1))
+
+ // Other client tries to register broker with same id causes failure, info is not changed in ZK
+ intercept[NodeExistsException] {
+ otherZkClient.registerBrokerInZk(differentBrokerInfoWithSameId)
+ }
+ assertEquals(Some(brokerInfo.broker), zkClient.getBroker(1))
+ }
+
+ @Test
+ def testGetBrokerMethods(): Unit = {
+ zkClient.createTopLevelPaths()
+
+ assertEquals(Seq.empty,zkClient.getAllBrokersInCluster)
+ assertEquals(Seq.empty, zkClient.getSortedBrokerList())
+ assertEquals(None, zkClient.getBroker(0))
+
+ val brokerInfo0 = createBrokerInfo(0, "test.host0", 9998, SecurityProtocol.PLAINTEXT)
+ val brokerInfo1 = createBrokerInfo(1, "test.host1", 9999, SecurityProtocol.SSL)
+
+ zkClient.registerBrokerInZk(brokerInfo1)
+ otherZkClient.registerBrokerInZk(brokerInfo0)
+
+ assertEquals(Seq(0, 1), zkClient.getSortedBrokerList())
+ assertEquals(
+ Seq(brokerInfo0.broker, brokerInfo1.broker),
+ zkClient.getAllBrokersInCluster
+ )
+ assertEquals(Some(brokerInfo0.broker), zkClient.getBroker(0))
+ }
+
+ @Test
+ def testUpdateBrokerInfo(): Unit = {
+ zkClient.createTopLevelPaths()
+
+ // Updating info of a broker not existing in ZK fails
+ val originalBrokerInfo = createBrokerInfo(1, "test.host", 9999, SecurityProtocol.PLAINTEXT)
+ intercept[NoNodeException]{
+ zkClient.updateBrokerInfoInZk(originalBrokerInfo)
+ }
+
+ zkClient.registerBrokerInZk(originalBrokerInfo)
+
+ val updatedBrokerInfo = createBrokerInfo(1, "test.host2", 9995, SecurityProtocol.SSL)
+ zkClient.updateBrokerInfoInZk(updatedBrokerInfo)
+ assertEquals(Some(updatedBrokerInfo.broker), zkClient.getBroker(1))
+
+ // Other ZK clients can update info
+ otherZkClient.updateBrokerInfoInZk(originalBrokerInfo)
+ assertEquals(Some(originalBrokerInfo.broker), otherZkClient.getBroker(1))
+ }
+
+ private def statWithVersion(version: Int): Stat = {
+ val stat = new Stat(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)
+ stat.setVersion(version)
+ stat
+ }
+
+ private def leaderIsrAndControllerEpochs(state: Int, zkVersion: Int): Map[TopicPartition, LeaderIsrAndControllerEpoch] =
+ Map(
+ topicPartition10 -> LeaderIsrAndControllerEpoch(
+ LeaderAndIsr(leader = 1, leaderEpoch = state, isr = List(2 + state, 3 + state), zkVersion = zkVersion),
+ controllerEpoch = 4),
+ topicPartition11 -> LeaderIsrAndControllerEpoch(
+ LeaderAndIsr(leader = 0, leaderEpoch = state + 1, isr = List(1 + state, 2 + state), zkVersion = zkVersion),
+ controllerEpoch = 4))
+
+ val initialLeaderIsrAndControllerEpochs: Map[TopicPartition, LeaderIsrAndControllerEpoch] =
+ leaderIsrAndControllerEpochs(0, 0)
+
+ val initialLeaderIsrs: Map[TopicPartition, LeaderAndIsr] = initialLeaderIsrAndControllerEpochs.mapValues(_.leaderAndIsr)
+ private def leaderIsrs(state: Int, zkVersion: Int): Map[TopicPartition, LeaderAndIsr] =
+ leaderIsrAndControllerEpochs(state, zkVersion).mapValues(_.leaderAndIsr)
+
+ private def checkUpdateLeaderAndIsrResult(
+ expectedSuccessfulPartitions: Map[TopicPartition, LeaderAndIsr],
+ expectedPartitionsToRetry: Seq[TopicPartition],
+ expectedFailedPartitions: Map[TopicPartition, (Class[_], String)],
+ actualUpdateLeaderAndIsrResult: UpdateLeaderAndIsrResult): Unit = {
+ val failedPartitionsExcerpt =
+ actualUpdateLeaderAndIsrResult.failedPartitions.mapValues(e => (e.getClass, e.getMessage))
+ assertEquals("Permanently failed updates do not match expected",
+ expectedFailedPartitions, failedPartitionsExcerpt)
+ assertEquals("Retriable updates (due to BADVERSION) do not match expected",
+ expectedPartitionsToRetry, actualUpdateLeaderAndIsrResult.partitionsToRetry)
+ assertEquals("Successful updates do not match expected",
+ expectedSuccessfulPartitions, actualUpdateLeaderAndIsrResult.successfulPartitions)
+ }
+
+ @Test
+ def testUpdateLeaderAndIsr(): Unit = {
+ zkClient.createRecursive(TopicZNode.path(topic1))
+
+ // Non-existing topicPartitions
+ checkUpdateLeaderAndIsrResult(
+ Map.empty,
+ mutable.ArrayBuffer.empty,
+ Map(
+ topicPartition10 -> (classOf[NoNodeException], "KeeperErrorCode = NoNode for /brokers/topics/topic1/partitions/0/state"),
+ topicPartition11 -> (classOf[NoNodeException], "KeeperErrorCode = NoNode for /brokers/topics/topic1/partitions/1/state")),
+ zkClient.updateLeaderAndIsr(initialLeaderIsrs, controllerEpoch = 4))
+
+ zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs)
+
+ checkUpdateLeaderAndIsrResult(
+ leaderIsrs(state = 1, zkVersion = 1),
+ mutable.ArrayBuffer.empty,
+ Map.empty,
+ zkClient.updateLeaderAndIsr(leaderIsrs(state = 1, zkVersion = 0),controllerEpoch = 4))
+
+ // Try to update with wrong ZK version
+ checkUpdateLeaderAndIsrResult(
+ Map.empty,
+ ArrayBuffer(topicPartition10, topicPartition11),
+ Map.empty,
+ zkClient.updateLeaderAndIsr(leaderIsrs(state = 1, zkVersion = 0),controllerEpoch = 4))
+
+ // Trigger successful, to be retried and failed partitions in same call
+ val mixedState = Map(
+ topicPartition10 -> LeaderAndIsr(leader = 1, leaderEpoch = 2, isr = List(4, 5), zkVersion = 1),
+ topicPartition11 -> LeaderAndIsr(leader = 0, leaderEpoch = 2, isr = List(3, 4), zkVersion = 0),
+ topicPartition20 -> LeaderAndIsr(leader = 0, leaderEpoch = 2, isr = List(3, 4), zkVersion = 0))
+
+ checkUpdateLeaderAndIsrResult(
+ leaderIsrs(state = 2, zkVersion = 2).filterKeys{_ == topicPartition10},
+ ArrayBuffer(topicPartition11),
+ Map(
+ topicPartition20 -> (classOf[NoNodeException], "KeeperErrorCode = NoNode for /brokers/topics/topic2/partitions/0/state")),
+ zkClient.updateLeaderAndIsr(mixedState, controllerEpoch = 4))
+ }
+
+ private def checkGetDataResponse(
+ leaderIsrAndControllerEpochs: Map[TopicPartition,LeaderIsrAndControllerEpoch],
+ topicPartition: TopicPartition,
+ response: GetDataResponse): Unit = {
+ val zkVersion = leaderIsrAndControllerEpochs(topicPartition).leaderAndIsr.zkVersion
+ assertEquals(Code.OK, response.resultCode)
+ assertEquals(TopicPartitionStateZNode.path(topicPartition), response.path)
+ assertEquals(Some(topicPartition), response.ctx)
+ assertEquals(
+ Some(leaderIsrAndControllerEpochs(topicPartition)),
+ TopicPartitionStateZNode.decode(response.data, statWithVersion(zkVersion)))
+ }
+
+ private def eraseMetadata(response: CreateResponse): CreateResponse =
+ response.copy(metadata = ResponseMetadata(0, 0))
+
+ @Test
+ def testGetTopicsAndPartitions(): Unit = {
+ assertTrue(zkClient.getAllTopicsInCluster.isEmpty)
+ assertTrue(zkClient.getAllPartitions.isEmpty)
+
+ zkClient.createRecursive(TopicZNode.path(topic1))
+ zkClient.createRecursive(TopicZNode.path(topic2))
+ assertEquals(Set(topic1, topic2), zkClient.getAllTopicsInCluster.toSet)
+
+ assertTrue(zkClient.getAllPartitions.isEmpty)
+
+ zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs)
+ assertEquals(Set(topicPartition10, topicPartition11), zkClient.getAllPartitions)
+ }
+
+ @Test
+ def testCreateAndGetTopicPartitionStatesRaw(): Unit = {
+ zkClient.createRecursive(TopicZNode.path(topic1))
+
+ assertEquals(
+ Seq(
+ CreateResponse(Code.OK, TopicPartitionStateZNode.path(topicPartition10), Some(topicPartition10),
+ TopicPartitionStateZNode.path(topicPartition10), ResponseMetadata(0, 0)),
+ CreateResponse(Code.OK, TopicPartitionStateZNode.path(topicPartition11), Some(topicPartition11),
+ TopicPartitionStateZNode.path(topicPartition11), ResponseMetadata(0, 0))),
+ zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs)
+ .map(eraseMetadata).toList)
+
+ val getResponses = zkClient.getTopicPartitionStatesRaw(topicPartitions10_11)
+ assertEquals(2, getResponses.size)
+ topicPartitions10_11.zip(getResponses) foreach {case (tp, r) => checkGetDataResponse(initialLeaderIsrAndControllerEpochs, tp, r)}
+
+ // Trying to create existing topicPartition states fails
+ assertEquals(
+ Seq(
+ CreateResponse(Code.NODEEXISTS, TopicPartitionStateZNode.path(topicPartition10), Some(topicPartition10),
+ null, ResponseMetadata(0, 0)),
+ CreateResponse(Code.NODEEXISTS, TopicPartitionStateZNode.path(topicPartition11), Some(topicPartition11),
+ null, ResponseMetadata(0, 0))),
+ zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs).map(eraseMetadata).toList)
+ }
+
+ @Test
+ def testSetTopicPartitionStatesRaw(): Unit = {
+
+ def expectedSetDataResponses(topicPartitions: TopicPartition*)(resultCode: Code, stat: Stat) =
+ topicPartitions.map { topicPartition =>
+ SetDataResponse(resultCode, TopicPartitionStateZNode.path(topicPartition),
+ Some(topicPartition), stat, ResponseMetadata(0, 0))
+ }
+
+ zkClient.createRecursive(TopicZNode.path(topic1))
+
+ // Trying to set non-existing topicPartition's data results in NONODE responses
+ assertEquals(
+ expectedSetDataResponses(topicPartition10, topicPartition11)(Code.NONODE, null),
+ zkClient.setTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs).map {
+ _.copy(metadata = ResponseMetadata(0, 0))}.toList)
+
+ zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs)
+
+ assertEquals(
+ expectedSetDataResponses(topicPartition10, topicPartition11)(Code.OK, statWithVersion(1)),
+ zkClient.setTopicPartitionStatesRaw(leaderIsrAndControllerEpochs(state = 1, zkVersion = 0)).map {
+ eraseMetadataAndStat}.toList)
+
+
+ val getResponses = zkClient.getTopicPartitionStatesRaw(topicPartitions10_11)
+ assertEquals(2, getResponses.size)
+ topicPartitions10_11.zip(getResponses) foreach {case (tp, r) => checkGetDataResponse(leaderIsrAndControllerEpochs(state = 1, zkVersion = 0), tp, r)}
+
+ // Other ZK client can also write the state of a partition
+ assertEquals(
+ expectedSetDataResponses(topicPartition10, topicPartition11)(Code.OK, statWithVersion(2)),
+ otherZkClient.setTopicPartitionStatesRaw(leaderIsrAndControllerEpochs(state = 2, zkVersion = 1)).map {
+ eraseMetadataAndStat}.toList)
+ }
+
+ @Test
+ def testReassignPartitionsInProgress(): Unit = {
+ assertFalse(zkClient.reassignPartitionsInProgress)
+ zkClient.createRecursive(ReassignPartitionsZNode.path)
+ assertTrue(zkClient.reassignPartitionsInProgress)
+ }
+
+ @Test
+ def testGetTopicPartitionStates(): Unit = {
+ assertEquals(None, zkClient.getTopicPartitionState(topicPartition10))
+ assertEquals(None, zkClient.getLeaderForPartition(topicPartition10))
+
+ zkClient.createRecursive(TopicZNode.path(topic1))
+
+ zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs)
+ assertEquals(
+ initialLeaderIsrAndControllerEpochs,
+ zkClient.getTopicPartitionStates(Seq(topicPartition10, topicPartition11))
+ )
+
+ assertEquals(
+ Some(initialLeaderIsrAndControllerEpochs(topicPartition10)),
+ zkClient.getTopicPartitionState(topicPartition10)
+ )
+
+ assertEquals(Some(1), zkClient.getLeaderForPartition(topicPartition10))
+
+ val notExistingPartition = new TopicPartition(topic1, 2)
+ assertTrue(zkClient.getTopicPartitionStates(Seq(notExistingPartition)).isEmpty)
+ assertEquals(
+ Map(topicPartition10 -> initialLeaderIsrAndControllerEpochs(topicPartition10)),
+ zkClient.getTopicPartitionStates(Seq(topicPartition10, notExistingPartition))
+ )
+
+ assertEquals(None, zkClient.getTopicPartitionState(notExistingPartition))
+ assertEquals(None, zkClient.getLeaderForPartition(notExistingPartition))
+
+ }
+
+ private def eraseMetadataAndStat(response: SetDataResponse): SetDataResponse = {
+ val stat = if (response.stat != null) statWithVersion(response.stat.getVersion) else null
+ response.copy(metadata = ResponseMetadata(0, 0), stat = stat)
+ }
+
+ @Test
+ def testControllerEpochMethods(): Unit = {
+ assertEquals(None, zkClient.getControllerEpoch)
+
+ assertEquals("Setting non existing nodes should return NONODE results",
+ SetDataResponse(Code.NONODE, ControllerEpochZNode.path, None, null, ResponseMetadata(0, 0)),
+ eraseMetadataAndStat(zkClient.setControllerEpochRaw(1, 0)))
+
+ assertEquals("Creating non existing nodes is OK",
+ CreateResponse(Code.OK, ControllerEpochZNode.path, None, ControllerEpochZNode.path, ResponseMetadata(0, 0)),
+ eraseMetadata(zkClient.createControllerEpochRaw(0)))
+ assertEquals(0, zkClient.getControllerEpoch.get._1)
+
+ assertEquals("Attemt to create existing nodes should return NODEEXISTS",
+ CreateResponse(Code.NODEEXISTS, ControllerEpochZNode.path, None, null, ResponseMetadata(0, 0)),
+ eraseMetadata(zkClient.createControllerEpochRaw(0)))
+
+ assertEquals("Updating existing nodes is OK",
+ SetDataResponse(Code.OK, ControllerEpochZNode.path, None, statWithVersion(1), ResponseMetadata(0, 0)),
+ eraseMetadataAndStat(zkClient.setControllerEpochRaw(1, 0)))
+ assertEquals(1, zkClient.getControllerEpoch.get._1)
+
+ assertEquals("Updating with wrong ZK version returns BADVERSION",
+ SetDataResponse(Code.BADVERSION, ControllerEpochZNode.path, None, null, ResponseMetadata(0, 0)),
+ eraseMetadataAndStat(zkClient.setControllerEpochRaw(1, 0)))
+ }
+
+ @Test
+ def testControllerManagementMethods(): Unit = {
+ // No controller
+ assertEquals(None, zkClient.getControllerId)
+ // Create controller
+ zkClient.checkedEphemeralCreate(ControllerZNode.path, ControllerZNode.encode(brokerId = 1, timestamp = 123456))
+ assertEquals(Some(1), zkClient.getControllerId)
+ zkClient.deleteController()
+ assertEquals(None, zkClient.getControllerId)
+ }
+
+ @Test
+ def testZNodeChangeHandlerForDataChange(): Unit = {
+ val mockPath = "/foo"
+
+ val znodeChangeHandlerCountDownLatch = new CountDownLatch(1)
+ val zNodeChangeHandler = new ZNodeChangeHandler {
+ override def handleCreation(): Unit = {
+ znodeChangeHandlerCountDownLatch.countDown()
+ }
+
+ override val path: String = mockPath
+ }
+
+ zkClient.registerZNodeChangeHandlerAndCheckExistence(zNodeChangeHandler)
+ zkClient.createRecursive(mockPath)
+ assertTrue("Failed to receive create notification", znodeChangeHandlerCountDownLatch.await(5, TimeUnit.SECONDS))
}
@Test
@@ -458,7 +982,6 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
assertTrue(zkClient.getPreferredReplicaElection.isEmpty)
- val topic1 = "topic1"
val electionPartitions = Set(new TopicPartition(topic1, 0), new TopicPartition(topic1, 1))
zkClient.createPreferredReplicaElection(electionPartitions)
@@ -498,6 +1021,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
// test non-existent token
assertTrue(zkClient.getDelegationTokenInfo(tokenId).isEmpty)
+ assertFalse(zkClient.deleteDelegationToken(tokenId))
// create a token
zkClient.setOrCreateDelegationToken(token)
@@ -511,5 +1035,9 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
//test updated token
assertEquals(tokenInfo, zkClient.getDelegationTokenInfo(tokenId).get)
+
+ //test deleting token
+ assertTrue(zkClient.deleteDelegationToken(tokenId))
+ assertEquals(None, zkClient.getDelegationTokenInfo(tokenId))
}
}
--
To stop receiving notification emails like this one, please contact
ijuma@apache.org.