You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2018/03/02 19:29:11 UTC

[kafka] branch trunk updated (5d87b92 -> 9868976)

This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git.


    omit 5d87b92  KAFKA-6111: Improve test coverage of KafkaZkClient, fix bugs found by new tests;
    omit 57c1a68  Revert "Fix in updateBrokerInfoInZk, exception is thrown if response was not OK."
    omit f6b3906  Revert "Fix in deleteLogDirEventNotifications - use correct path for deleted children"
    omit e033799  Revert "Additional tests to improve test coverage of KafkaZkClient."
    omit 6f24af3  Revert "Use move otherZkClient to KafkaZkClientTest"
    omit 0724179  Revert "Undo unintentional whitespace change in ZooKeeperTestHarness"
    omit 4110ad5  Revert "Fixes requested by reviewer"
    omit ae52e10  Revert "Minor fixes requested by reviewer"
    omit aa3453c  Revert "Changes requeted by reviewer"
    omit 720e226  Revert "removed method updatedLeaderIsrAndControllerEpochs"
    omit 8100fd9  removed method updatedLeaderIsrAndControllerEpochs
    omit cacd377  Changes requeted by reviewer
    omit 1ae4c1d  Minor fixes requested by reviewer
    omit ae51a15  Fixes requested by reviewer
    omit 155ac59  Undo unintentional whitespace change in ZooKeeperTestHarness
    omit 54a3220  Use move otherZkClient to KafkaZkClientTest
    omit d0eb552  Additional tests to improve test coverage of KafkaZkClient.
    omit af598af  Fix in deleteLogDirEventNotifications - use correct path for deleted children
    omit 0103103  Fix in updateBrokerInfoInZk, exception is thrown if response was not OK.
     new 9868976  KAFKA-6111: Improve test coverage of KafkaZkClient, fix bugs found by new tests;

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (5d87b92)
            \
             N -- N -- N   refs/heads/trunk (9868976)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:

-- 
To stop receiving notification emails like this one, please contact
ijuma@apache.org.

[kafka] 01/01: KAFKA-6111: Improve test coverage of KafkaZkClient, fix bugs found by new tests;

Posted by ij...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 9868976747fb4a7a4a9b18e1f4050633c226be7d
Author: Sandor Murakozi <sm...@gmail.com>
AuthorDate: Thu Mar 1 18:12:52 2018 -0800

    KAFKA-6111: Improve test coverage of KafkaZkClient, fix bugs found by new tests;
---
 core/src/main/scala/kafka/zk/KafkaZkClient.scala   |   5 +-
 .../scala/unit/kafka/zk/KafkaZkClientTest.scala    | 576 ++++++++++++++++++++-
 2 files changed, 555 insertions(+), 26 deletions(-)

diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index 6545fde..d61b281 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -88,7 +88,8 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean
   def updateBrokerInfoInZk(brokerInfo: BrokerInfo): Unit = {
     val brokerIdPath = brokerInfo.path
     val setDataRequest = SetDataRequest(brokerIdPath, brokerInfo.toJsonBytes, ZkVersion.NoVersion)
-    retryRequestUntilConnected(setDataRequest)
+    val response = retryRequestUntilConnected(setDataRequest)
+    response.maybeThrow()
     info("Updated broker %d at path %s with addresses: %s".format(brokerInfo.broker.id, brokerIdPath, brokerInfo.broker.endPoints))
   }
 
@@ -424,7 +425,7 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean
   def deleteLogDirEventNotifications(): Unit = {
     val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(LogDirEventNotificationZNode.path))
     if (getChildrenResponse.resultCode == Code.OK) {
-      deleteLogDirEventNotifications(getChildrenResponse.children)
+      deleteLogDirEventNotifications(getChildrenResponse.children.map(LogDirEventNotificationSequenceZNode.sequenceNumber))
     } else if (getChildrenResponse.resultCode != Code.NONODE) {
       getChildrenResponse.maybeThrow
     }
diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
index d3726c2..e44c2c9 100644
--- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
@@ -16,10 +16,11 @@
 */
 package kafka.zk
 
-import java.util.{Properties, UUID}
+import java.util.{Collections, Properties, UUID}
 import java.nio.charset.StandardCharsets.UTF_8
+import java.util.concurrent.{CountDownLatch, TimeUnit}
 
-import kafka.api.ApiVersion
+import kafka.api.{ApiVersion, LeaderAndIsr}
 import kafka.cluster.{Broker, EndPoint}
 import kafka.log.LogConfig
 import kafka.security.auth._
@@ -29,17 +30,48 @@ import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
 import org.apache.kafka.common.security.token.delegation.TokenInformation
-import org.apache.kafka.common.utils.SecurityUtils
-import org.apache.zookeeper.KeeperException.NodeExistsException
+import org.apache.kafka.common.utils.{SecurityUtils, Time}
+import org.apache.zookeeper.KeeperException.{Code, NoNodeException, NodeExistsException}
 import org.junit.Assert._
-import org.junit.Test
-
+import org.junit.{After, Before, Test}
 import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.{Seq, mutable}
 import scala.util.Random
 
+import kafka.controller.LeaderIsrAndControllerEpoch
+import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult
+import kafka.zookeeper._
+import org.apache.kafka.common.security.JaasUtils
+import org.apache.zookeeper.data.Stat
+
 class KafkaZkClientTest extends ZooKeeperTestHarness {
 
   private val group = "my-group"
+  private val topic1 = "topic1"
+  private val topic2 = "topic2"
+
+  val topicPartition10 = new TopicPartition(topic1, 0)
+  val topicPartition11 = new TopicPartition(topic1, 1)
+  val topicPartition20 = new TopicPartition(topic2, 0)
+  val topicPartitions10_11 = Seq(topicPartition10, topicPartition11)
+
+  var otherZkClient: KafkaZkClient = _
+
+  @Before
+  override def setUp(): Unit = {
+    super.setUp()
+    otherZkClient = KafkaZkClient(zkConnect, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled), zkSessionTimeout,
+      zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM)
+  }
+
+  @After
+  override def tearDown(): Unit = {
+    if (otherZkClient != null)
+      otherZkClient.close()
+    super.tearDown()
+  }
+
   private val topicPartition = new TopicPartition("topic", 0)
 
   @Test
@@ -90,10 +122,10 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
 
   @Test
   def testTopicAssignmentMethods() {
-    val topic1 = "topic1"
-    val topic2 = "topic2"
+    assertTrue(zkClient.getAllTopicsInCluster.isEmpty)
 
     // test with non-existing topic
+    assertFalse(zkClient.topicExists(topic1))
     assertTrue(zkClient.getTopicPartitionCount(topic1).isEmpty)
     assertTrue(zkClient.getPartitionAssignmentForTopics(Set(topic1)).isEmpty)
     assertTrue(zkClient.getPartitionsForTopics(Set(topic1)).isEmpty)
@@ -108,6 +140,8 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
     // create a topic assignment
     zkClient.createTopicAssignment(topic1, assignment)
 
+    assertTrue(zkClient.topicExists(topic1))
+
     val expectedAssignment = assignment map { topicAssignment =>
       val partition = topicAssignment._1.partition
       val assignment = topicAssignment._2
@@ -215,6 +249,43 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
   }
 
   @Test
+  def testIsrChangeNotificationGetters(): Unit = {
+    assertEquals("Failed for non existing parent ZK node", Seq.empty, zkClient.getAllIsrChangeNotifications)
+    assertEquals("Failed for non existing parent ZK node", Seq.empty, zkClient.getPartitionsFromIsrChangeNotifications(Seq("0000000000")))
+
+    zkClient.createRecursive("/isr_change_notification")
+
+    zkClient.propagateIsrChanges(Set(topicPartition10, topicPartition11))
+    zkClient.propagateIsrChanges(Set(topicPartition10))
+
+    assertEquals(Set("0000000000", "0000000001"), zkClient.getAllIsrChangeNotifications.toSet)
+
+    // A partition can have multiple notifications
+    assertEquals(Seq(topicPartition10, topicPartition11, topicPartition10),
+      zkClient.getPartitionsFromIsrChangeNotifications(Seq("0000000000", "0000000001")))
+  }
+
+  @Test
+  def testIsrChangeNotificationsDeletion(): Unit = {
+    // Should not fail even if parent node does not exist
+    zkClient.deleteIsrChangeNotifications(Seq("0000000000"))
+
+    zkClient.createRecursive("/isr_change_notification")
+
+    zkClient.propagateIsrChanges(Set(topicPartition10, topicPartition11))
+    zkClient.propagateIsrChanges(Set(topicPartition10))
+    zkClient.propagateIsrChanges(Set(topicPartition11))
+
+    zkClient.deleteIsrChangeNotifications(Seq("0000000001"))
+    // Should not fail if called on a non-existent notification
+    zkClient.deleteIsrChangeNotifications(Seq("0000000001"))
+
+    assertEquals(Set("0000000000", "0000000002"), zkClient.getAllIsrChangeNotifications.toSet)
+    zkClient.deleteIsrChangeNotifications()
+    assertEquals(Seq.empty,zkClient.getAllIsrChangeNotifications)
+  }
+
+  @Test
   def testPropagateLogDir(): Unit = {
     zkClient.createRecursive("/log_dir_event_notification")
 
@@ -238,6 +309,54 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
   }
 
   @Test
+  def testLogDirGetters(): Unit = {
+    assertEquals("getAllLogDirEventNotifications failed for non existing parent ZK node",
+      Seq.empty, zkClient.getAllLogDirEventNotifications)
+    assertEquals("getBrokerIdsFromLogDirEvents failed for non existing parent ZK node",
+      Seq.empty, zkClient.getBrokerIdsFromLogDirEvents(Seq("0000000000")))
+
+    zkClient.createRecursive("/log_dir_event_notification")
+
+    val brokerId = 3
+    zkClient.propagateLogDirEvent(brokerId)
+
+    assertEquals(Seq(3), zkClient.getBrokerIdsFromLogDirEvents(Seq("0000000000")))
+
+    zkClient.propagateLogDirEvent(brokerId)
+
+    val anotherBrokerId = 4
+    zkClient.propagateLogDirEvent(anotherBrokerId)
+
+    val notifications012 = Seq("0000000000", "0000000001", "0000000002")
+    assertEquals(notifications012.toSet, zkClient.getAllLogDirEventNotifications.toSet)
+    assertEquals(Seq(3, 3, 4), zkClient.getBrokerIdsFromLogDirEvents(notifications012))
+  }
+
+  @Test
+  def testLogDirEventNotificationsDeletion(): Unit = {
+    // Should not fail even if parent node does not exist
+    zkClient.deleteLogDirEventNotifications(Seq("0000000000", "0000000002"))
+
+    zkClient.createRecursive("/log_dir_event_notification")
+
+    val brokerId = 3
+    val anotherBrokerId = 4
+
+    zkClient.propagateLogDirEvent(brokerId)
+    zkClient.propagateLogDirEvent(brokerId)
+    zkClient.propagateLogDirEvent(anotherBrokerId)
+
+    zkClient.deleteLogDirEventNotifications(Seq("0000000000", "0000000002"))
+
+    assertEquals(Seq("0000000001"), zkClient.getAllLogDirEventNotifications)
+
+    zkClient.propagateLogDirEvent(anotherBrokerId)
+
+    zkClient.deleteLogDirEventNotifications()
+    assertEquals(Seq.empty, zkClient.getAllLogDirEventNotifications)
+  }
+
+  @Test
   def testSetGetAndDeletePartitionReassignment() {
     zkClient.createRecursive(AdminZNode.path)
 
@@ -377,10 +496,23 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
   }
 
   @Test
-  def testDeleteTopicPathMethods() {
-    val topic1 = "topic1"
-    val topic2 = "topic2"
+  def testDeletePath(): Unit = {
+    val path = "/a/b/c"
+    zkClient.createRecursive(path)
+    zkClient.deletePath(path)
+    assertFalse(zkClient.pathExists(path))
+  }
+
+  @Test
+  def testDeleteTopicZNode(): Unit = {
+    zkClient.deleteTopicZNode(topic1)
+    zkClient.createRecursive(TopicZNode.path(topic1))
+    zkClient.deleteTopicZNode(topic1)
+    assertFalse(zkClient.pathExists(TopicZNode.path(topic1)))
+  }
 
+  @Test
+  def testDeleteTopicPathMethods() {
     assertFalse(zkClient.isTopicMarkedForDeletion(topic1))
     assertTrue(zkClient.getTopicDeletions.isEmpty)
 
@@ -394,18 +526,26 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
     assertTrue(zkClient.getTopicDeletions.isEmpty)
   }
 
+  private def assertPathExistenceAndData(expectedPath: String, data: String): Unit = {
+    assertTrue(zkClient.pathExists(expectedPath))
+    assertEquals(Some(data), dataAsString(expectedPath))
+   }
+
+  @Test
+  def testCreateTokenChangeNotification(): Unit = {
+    intercept[NoNodeException] {
+      zkClient.createTokenChangeNotification("delegationToken")
+    }
+    zkClient.createDelegationTokenPaths()
+
+    zkClient.createTokenChangeNotification("delegationToken")
+    assertPathExistenceAndData("/delegation_token/token_changes/token_change_0000000000", "delegationToken")
+  }
+
   @Test
   def testEntityConfigManagementMethods() {
-    val topic1 = "topic1"
-    val topic2 = "topic2"
-
     assertTrue(zkClient.getEntityConfigs(ConfigType.Topic, topic1).isEmpty)
 
-    val logProps = new Properties()
-    logProps.put(LogConfig.SegmentBytesProp, "1024")
-    logProps.put(LogConfig.SegmentIndexBytesProp, "1024")
-    logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
-
     zkClient.setOrCreateEntityConfigs(ConfigType.Topic, topic1, logProps)
     assertEquals(logProps, zkClient.getEntityConfigs(ConfigType.Topic, topic1))
 
@@ -421,15 +561,399 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
   }
 
   @Test
-  def testBrokerRegistrationMethods() {
+  def testCreateConfigChangeNotification(): Unit = {
+    intercept[NoNodeException] {
+      zkClient.createConfigChangeNotification(ConfigEntityZNode.path(ConfigType.Topic, topic1))
+    }
+
+    zkClient.createTopLevelPaths()
+    zkClient.createConfigChangeNotification(ConfigEntityZNode.path(ConfigType.Topic, topic1))
+
+    assertPathExistenceAndData(
+      "/config/changes/config_change_0000000000",
+      """{"version":2,"entity_path":"/config/topics/topic1"}""")
+  }
+
+  private def createLogProps(bytesProp: Int): Properties = {
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, bytesProp.toString)
+    logProps.put(LogConfig.SegmentIndexBytesProp, bytesProp.toString)
+    logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
+    logProps
+  }
+
+  private val logProps = createLogProps(1024)
+
+  @Test
+  def testGetLogConfigs(): Unit = {
+    val emptyConfig = LogConfig(Collections.emptyMap())
+    assertEquals("Non existent config, no defaults",
+      (Map(topic1 -> emptyConfig), Map.empty),
+      zkClient.getLogConfigs(Seq(topic1), Collections.emptyMap()))
+
+    val logProps2 = createLogProps(2048)
+
+    zkClient.setOrCreateEntityConfigs(ConfigType.Topic, topic1, logProps)
+    assertEquals("One existing and one non-existent topic",
+      (Map(topic1 -> LogConfig(logProps), topic2 -> emptyConfig), Map.empty),
+      zkClient.getLogConfigs(Seq(topic1, topic2), Collections.emptyMap()))
+
+    zkClient.setOrCreateEntityConfigs(ConfigType.Topic, topic2, logProps2)
+    assertEquals("Two existing topics",
+      (Map(topic1 -> LogConfig(logProps), topic2 -> LogConfig(logProps2)), Map.empty),
+      zkClient.getLogConfigs(Seq(topic1, topic2), Collections.emptyMap()))
+
+    val logProps1WithMoreValues = createLogProps(1024)
+    logProps1WithMoreValues.put(LogConfig.SegmentJitterMsProp, "100")
+    logProps1WithMoreValues.put(LogConfig.SegmentBytesProp, "1024")
+
+    assertEquals("Config with defaults",
+      (Map(topic1 -> LogConfig(logProps1WithMoreValues)), Map.empty),
+      zkClient.getLogConfigs(Seq(topic1),
+        Map[String, AnyRef](LogConfig.SegmentJitterMsProp -> "100", LogConfig.SegmentBytesProp -> "128").asJava))
+  }
+
+  private def createBrokerInfo(id: Int, host: String, port: Int, securityProtocol: SecurityProtocol,
+                               rack: Option[String] = None): BrokerInfo =
+    BrokerInfo(Broker(id, Seq(new EndPoint(host, port, ListenerName.forSecurityProtocol
+    (securityProtocol), securityProtocol)), rack = rack), ApiVersion.latestVersion, jmxPort = port + 10)
+
+  @Test
+  def testRegisterBrokerInfo(): Unit = {
     zkClient.createTopLevelPaths()
 
-    val brokerInfo = BrokerInfo(Broker(1,
-      Seq(new EndPoint("test.host", 9999, ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), SecurityProtocol.PLAINTEXT)),
-      rack = None), ApiVersion.latestVersion, jmxPort = 9998)
+    val brokerInfo = createBrokerInfo(1, "test.host", 9999, SecurityProtocol.PLAINTEXT)
+    val differentBrokerInfoWithSameId = createBrokerInfo(1, "test.host2", 9995, SecurityProtocol.SSL)
 
     zkClient.registerBrokerInZk(brokerInfo)
     assertEquals(Some(brokerInfo.broker), zkClient.getBroker(1))
+    assertEquals("Other ZK clients can read broker info", Some(brokerInfo.broker), otherZkClient.getBroker(1))
+
+    // Node exists, owned by current session - no error, no update
+    zkClient.registerBrokerInZk(differentBrokerInfoWithSameId)
+    assertEquals(Some(brokerInfo.broker), zkClient.getBroker(1))
+
+    // Other client tries to register broker with same id causes failure, info is not changed in ZK
+    intercept[NodeExistsException] {
+      otherZkClient.registerBrokerInZk(differentBrokerInfoWithSameId)
+    }
+    assertEquals(Some(brokerInfo.broker), zkClient.getBroker(1))
+  }
+
+  @Test
+  def testGetBrokerMethods(): Unit = {
+    zkClient.createTopLevelPaths()
+
+    assertEquals(Seq.empty,zkClient.getAllBrokersInCluster)
+    assertEquals(Seq.empty, zkClient.getSortedBrokerList())
+    assertEquals(None, zkClient.getBroker(0))
+
+    val brokerInfo0 = createBrokerInfo(0, "test.host0", 9998, SecurityProtocol.PLAINTEXT)
+    val brokerInfo1 = createBrokerInfo(1, "test.host1", 9999, SecurityProtocol.SSL)
+
+    zkClient.registerBrokerInZk(brokerInfo1)
+    otherZkClient.registerBrokerInZk(brokerInfo0)
+
+    assertEquals(Seq(0, 1), zkClient.getSortedBrokerList())
+    assertEquals(
+      Seq(brokerInfo0.broker, brokerInfo1.broker),
+      zkClient.getAllBrokersInCluster
+    )
+    assertEquals(Some(brokerInfo0.broker), zkClient.getBroker(0))
+  }
+
+  @Test
+  def testUpdateBrokerInfo(): Unit = {
+    zkClient.createTopLevelPaths()
+
+    // Updating info of a broker not existing in ZK fails
+    val originalBrokerInfo = createBrokerInfo(1, "test.host", 9999, SecurityProtocol.PLAINTEXT)
+    intercept[NoNodeException]{
+      zkClient.updateBrokerInfoInZk(originalBrokerInfo)
+    }
+
+    zkClient.registerBrokerInZk(originalBrokerInfo)
+
+    val updatedBrokerInfo = createBrokerInfo(1, "test.host2", 9995, SecurityProtocol.SSL)
+    zkClient.updateBrokerInfoInZk(updatedBrokerInfo)
+    assertEquals(Some(updatedBrokerInfo.broker), zkClient.getBroker(1))
+
+    // Other ZK clients can update info
+    otherZkClient.updateBrokerInfoInZk(originalBrokerInfo)
+    assertEquals(Some(originalBrokerInfo.broker), otherZkClient.getBroker(1))
+  }
+
+  private def statWithVersion(version: Int): Stat = {
+    val stat = new Stat(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)
+    stat.setVersion(version)
+    stat
+  }
+
+  private def leaderIsrAndControllerEpochs(state: Int, zkVersion: Int): Map[TopicPartition, LeaderIsrAndControllerEpoch] =
+    Map(
+      topicPartition10 -> LeaderIsrAndControllerEpoch(
+        LeaderAndIsr(leader = 1, leaderEpoch = state, isr = List(2 + state, 3 + state), zkVersion = zkVersion),
+        controllerEpoch = 4),
+      topicPartition11 -> LeaderIsrAndControllerEpoch(
+        LeaderAndIsr(leader = 0, leaderEpoch = state + 1, isr = List(1 + state, 2 + state), zkVersion = zkVersion),
+        controllerEpoch = 4))
+
+  val initialLeaderIsrAndControllerEpochs: Map[TopicPartition, LeaderIsrAndControllerEpoch] =
+    leaderIsrAndControllerEpochs(0, 0)
+
+  val initialLeaderIsrs: Map[TopicPartition, LeaderAndIsr] = initialLeaderIsrAndControllerEpochs.mapValues(_.leaderAndIsr)
+  private def leaderIsrs(state: Int, zkVersion: Int): Map[TopicPartition, LeaderAndIsr] =
+    leaderIsrAndControllerEpochs(state, zkVersion).mapValues(_.leaderAndIsr)
+
+  private def checkUpdateLeaderAndIsrResult(
+                  expectedSuccessfulPartitions: Map[TopicPartition, LeaderAndIsr],
+                  expectedPartitionsToRetry: Seq[TopicPartition],
+                  expectedFailedPartitions: Map[TopicPartition, (Class[_], String)],
+                  actualUpdateLeaderAndIsrResult: UpdateLeaderAndIsrResult): Unit = {
+    val failedPartitionsExcerpt =
+      actualUpdateLeaderAndIsrResult.failedPartitions.mapValues(e => (e.getClass, e.getMessage))
+    assertEquals("Permanently failed updates do not match expected",
+      expectedFailedPartitions, failedPartitionsExcerpt)
+    assertEquals("Retriable updates (due to BADVERSION) do not match expected",
+      expectedPartitionsToRetry, actualUpdateLeaderAndIsrResult.partitionsToRetry)
+    assertEquals("Successful updates do not match expected",
+      expectedSuccessfulPartitions, actualUpdateLeaderAndIsrResult.successfulPartitions)
+  }
+
+  @Test
+  def testUpdateLeaderAndIsr(): Unit = {
+    zkClient.createRecursive(TopicZNode.path(topic1))
+
+    // Non-existing topicPartitions
+    checkUpdateLeaderAndIsrResult(
+        Map.empty,
+        mutable.ArrayBuffer.empty,
+      Map(
+        topicPartition10 -> (classOf[NoNodeException], "KeeperErrorCode = NoNode for /brokers/topics/topic1/partitions/0/state"),
+        topicPartition11 -> (classOf[NoNodeException], "KeeperErrorCode = NoNode for /brokers/topics/topic1/partitions/1/state")),
+      zkClient.updateLeaderAndIsr(initialLeaderIsrs, controllerEpoch = 4))
+
+    zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs)
+
+    checkUpdateLeaderAndIsrResult(
+      leaderIsrs(state = 1, zkVersion = 1),
+      mutable.ArrayBuffer.empty,
+      Map.empty,
+      zkClient.updateLeaderAndIsr(leaderIsrs(state = 1, zkVersion = 0),controllerEpoch = 4))
+
+    // Try to update with wrong ZK version
+    checkUpdateLeaderAndIsrResult(
+      Map.empty,
+      ArrayBuffer(topicPartition10, topicPartition11),
+      Map.empty,
+      zkClient.updateLeaderAndIsr(leaderIsrs(state = 1, zkVersion = 0),controllerEpoch = 4))
+
+    // Trigger successful, to be retried and failed partitions in same call
+    val mixedState = Map(
+      topicPartition10 -> LeaderAndIsr(leader = 1, leaderEpoch = 2, isr = List(4, 5), zkVersion = 1),
+      topicPartition11 -> LeaderAndIsr(leader = 0, leaderEpoch = 2, isr = List(3, 4), zkVersion = 0),
+      topicPartition20 -> LeaderAndIsr(leader = 0, leaderEpoch = 2, isr = List(3, 4), zkVersion = 0))
+
+    checkUpdateLeaderAndIsrResult(
+      leaderIsrs(state = 2, zkVersion = 2).filterKeys{_ == topicPartition10},
+      ArrayBuffer(topicPartition11),
+      Map(
+        topicPartition20 -> (classOf[NoNodeException], "KeeperErrorCode = NoNode for /brokers/topics/topic2/partitions/0/state")),
+      zkClient.updateLeaderAndIsr(mixedState, controllerEpoch = 4))
+  }
+
+  private def checkGetDataResponse(
+      leaderIsrAndControllerEpochs: Map[TopicPartition,LeaderIsrAndControllerEpoch],
+      topicPartition: TopicPartition,
+      response: GetDataResponse): Unit = {
+    val zkVersion = leaderIsrAndControllerEpochs(topicPartition).leaderAndIsr.zkVersion
+    assertEquals(Code.OK, response.resultCode)
+    assertEquals(TopicPartitionStateZNode.path(topicPartition), response.path)
+    assertEquals(Some(topicPartition), response.ctx)
+    assertEquals(
+      Some(leaderIsrAndControllerEpochs(topicPartition)),
+      TopicPartitionStateZNode.decode(response.data, statWithVersion(zkVersion)))
+  }
+
+  private def eraseMetadata(response: CreateResponse): CreateResponse =
+    response.copy(metadata = ResponseMetadata(0, 0))
+
+  @Test
+  def testGetTopicsAndPartitions(): Unit = {
+    assertTrue(zkClient.getAllTopicsInCluster.isEmpty)
+    assertTrue(zkClient.getAllPartitions.isEmpty)
+
+    zkClient.createRecursive(TopicZNode.path(topic1))
+    zkClient.createRecursive(TopicZNode.path(topic2))
+    assertEquals(Set(topic1, topic2), zkClient.getAllTopicsInCluster.toSet)
+
+    assertTrue(zkClient.getAllPartitions.isEmpty)
+
+    zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs)
+    assertEquals(Set(topicPartition10, topicPartition11), zkClient.getAllPartitions)
+  }
+
+  @Test
+  def testCreateAndGetTopicPartitionStatesRaw(): Unit = {
+    zkClient.createRecursive(TopicZNode.path(topic1))
+
+    assertEquals(
+      Seq(
+        CreateResponse(Code.OK, TopicPartitionStateZNode.path(topicPartition10), Some(topicPartition10),
+          TopicPartitionStateZNode.path(topicPartition10), ResponseMetadata(0, 0)),
+        CreateResponse(Code.OK, TopicPartitionStateZNode.path(topicPartition11), Some(topicPartition11),
+          TopicPartitionStateZNode.path(topicPartition11), ResponseMetadata(0, 0))),
+      zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs)
+        .map(eraseMetadata).toList)
+
+    val getResponses = zkClient.getTopicPartitionStatesRaw(topicPartitions10_11)
+    assertEquals(2, getResponses.size)
+    topicPartitions10_11.zip(getResponses) foreach {case (tp, r) => checkGetDataResponse(initialLeaderIsrAndControllerEpochs, tp, r)}
+
+    // Trying to create existing topicPartition states fails
+    assertEquals(
+      Seq(
+        CreateResponse(Code.NODEEXISTS, TopicPartitionStateZNode.path(topicPartition10), Some(topicPartition10),
+          null, ResponseMetadata(0, 0)),
+        CreateResponse(Code.NODEEXISTS, TopicPartitionStateZNode.path(topicPartition11), Some(topicPartition11),
+          null, ResponseMetadata(0, 0))),
+      zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs).map(eraseMetadata).toList)
+  }
+
+  @Test
+  def testSetTopicPartitionStatesRaw(): Unit = {
+
+    def expectedSetDataResponses(topicPartitions: TopicPartition*)(resultCode: Code, stat: Stat) =
+      topicPartitions.map { topicPartition =>
+        SetDataResponse(resultCode, TopicPartitionStateZNode.path(topicPartition),
+          Some(topicPartition), stat, ResponseMetadata(0, 0))
+      }
+
+    zkClient.createRecursive(TopicZNode.path(topic1))
+
+    // Trying to set non-existing topicPartition's data results in NONODE responses
+    assertEquals(
+      expectedSetDataResponses(topicPartition10, topicPartition11)(Code.NONODE, null),
+      zkClient.setTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs).map {
+        _.copy(metadata = ResponseMetadata(0, 0))}.toList)
+
+    zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs)
+
+    assertEquals(
+      expectedSetDataResponses(topicPartition10, topicPartition11)(Code.OK, statWithVersion(1)),
+      zkClient.setTopicPartitionStatesRaw(leaderIsrAndControllerEpochs(state = 1, zkVersion = 0)).map {
+        eraseMetadataAndStat}.toList)
+
+
+    val getResponses = zkClient.getTopicPartitionStatesRaw(topicPartitions10_11)
+    assertEquals(2, getResponses.size)
+    topicPartitions10_11.zip(getResponses) foreach {case (tp, r) => checkGetDataResponse(leaderIsrAndControllerEpochs(state = 1, zkVersion = 0), tp, r)}
+
+    // Other ZK client can also write the state of a partition
+    assertEquals(
+      expectedSetDataResponses(topicPartition10, topicPartition11)(Code.OK, statWithVersion(2)),
+      otherZkClient.setTopicPartitionStatesRaw(leaderIsrAndControllerEpochs(state = 2, zkVersion = 1)).map {
+        eraseMetadataAndStat}.toList)
+  }
+
+  @Test
+  def testReassignPartitionsInProgress(): Unit = {
+    assertFalse(zkClient.reassignPartitionsInProgress)
+    zkClient.createRecursive(ReassignPartitionsZNode.path)
+    assertTrue(zkClient.reassignPartitionsInProgress)
+  }
+
+  @Test
+  def testGetTopicPartitionStates(): Unit = {
+    assertEquals(None, zkClient.getTopicPartitionState(topicPartition10))
+    assertEquals(None, zkClient.getLeaderForPartition(topicPartition10))
+
+    zkClient.createRecursive(TopicZNode.path(topic1))
+
+    zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs)
+    assertEquals(
+      initialLeaderIsrAndControllerEpochs,
+      zkClient.getTopicPartitionStates(Seq(topicPartition10, topicPartition11))
+    )
+
+    assertEquals(
+      Some(initialLeaderIsrAndControllerEpochs(topicPartition10)),
+      zkClient.getTopicPartitionState(topicPartition10)
+    )
+
+    assertEquals(Some(1), zkClient.getLeaderForPartition(topicPartition10))
+
+    val notExistingPartition = new TopicPartition(topic1, 2)
+    assertTrue(zkClient.getTopicPartitionStates(Seq(notExistingPartition)).isEmpty)
+    assertEquals(
+      Map(topicPartition10 -> initialLeaderIsrAndControllerEpochs(topicPartition10)),
+      zkClient.getTopicPartitionStates(Seq(topicPartition10, notExistingPartition))
+    )
+
+    assertEquals(None, zkClient.getTopicPartitionState(notExistingPartition))
+    assertEquals(None, zkClient.getLeaderForPartition(notExistingPartition))
+
+  }
+
+  private def eraseMetadataAndStat(response: SetDataResponse): SetDataResponse = {
+    val stat = if (response.stat != null) statWithVersion(response.stat.getVersion) else null
+    response.copy(metadata = ResponseMetadata(0, 0), stat = stat)
+  }
+
+  @Test
+  def testControllerEpochMethods(): Unit = {
+    assertEquals(None, zkClient.getControllerEpoch)
+
+    assertEquals("Setting non existing nodes should return NONODE results",
+      SetDataResponse(Code.NONODE, ControllerEpochZNode.path, None, null, ResponseMetadata(0, 0)),
+      eraseMetadataAndStat(zkClient.setControllerEpochRaw(1, 0)))
+
+    assertEquals("Creating non existing nodes is OK",
+      CreateResponse(Code.OK, ControllerEpochZNode.path, None, ControllerEpochZNode.path, ResponseMetadata(0, 0)),
+      eraseMetadata(zkClient.createControllerEpochRaw(0)))
+    assertEquals(0, zkClient.getControllerEpoch.get._1)
+
+    assertEquals("Attemt to create existing nodes should return NODEEXISTS",
+      CreateResponse(Code.NODEEXISTS, ControllerEpochZNode.path, None, null, ResponseMetadata(0, 0)),
+      eraseMetadata(zkClient.createControllerEpochRaw(0)))
+
+    assertEquals("Updating existing nodes is OK",
+      SetDataResponse(Code.OK, ControllerEpochZNode.path, None, statWithVersion(1), ResponseMetadata(0, 0)),
+      eraseMetadataAndStat(zkClient.setControllerEpochRaw(1, 0)))
+    assertEquals(1, zkClient.getControllerEpoch.get._1)
+
+    assertEquals("Updating with wrong ZK version returns BADVERSION",
+      SetDataResponse(Code.BADVERSION, ControllerEpochZNode.path, None, null, ResponseMetadata(0, 0)),
+      eraseMetadataAndStat(zkClient.setControllerEpochRaw(1, 0)))
+  }
+
+  @Test
+  def testControllerManagementMethods(): Unit = {
+    // No controller
+    assertEquals(None, zkClient.getControllerId)
+    // Create controller
+    zkClient.checkedEphemeralCreate(ControllerZNode.path, ControllerZNode.encode(brokerId = 1, timestamp = 123456))
+    assertEquals(Some(1), zkClient.getControllerId)
+    zkClient.deleteController()
+    assertEquals(None, zkClient.getControllerId)
+  }
+
+  @Test
+  def testZNodeChangeHandlerForDataChange(): Unit = {
+    val mockPath = "/foo"
+
+    val znodeChangeHandlerCountDownLatch = new CountDownLatch(1)
+    val zNodeChangeHandler = new ZNodeChangeHandler {
+      override def handleCreation(): Unit = {
+        znodeChangeHandlerCountDownLatch.countDown()
+      }
+
+      override val path: String = mockPath
+    }
+
+    zkClient.registerZNodeChangeHandlerAndCheckExistence(zNodeChangeHandler)
+    zkClient.createRecursive(mockPath)
+    assertTrue("Failed to receive create notification", znodeChangeHandlerCountDownLatch.await(5, TimeUnit.SECONDS))
   }
 
   @Test
@@ -458,7 +982,6 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
 
     assertTrue(zkClient.getPreferredReplicaElection.isEmpty)
 
-    val topic1 = "topic1"
     val electionPartitions = Set(new TopicPartition(topic1, 0), new TopicPartition(topic1, 1))
 
     zkClient.createPreferredReplicaElection(electionPartitions)
@@ -498,6 +1021,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
 
     // test non-existent token
     assertTrue(zkClient.getDelegationTokenInfo(tokenId).isEmpty)
+    assertFalse(zkClient.deleteDelegationToken(tokenId))
 
     // create a token
     zkClient.setOrCreateDelegationToken(token)
@@ -511,5 +1035,9 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
 
     //test updated token
     assertEquals(tokenInfo, zkClient.getDelegationTokenInfo(tokenId).get)
+
+    //test deleting token
+    assertTrue(zkClient.deleteDelegationToken(tokenId))
+    assertEquals(None, zkClient.getDelegationTokenInfo(tokenId))
   }
 }

-- 
To stop receiving notification emails like this one, please contact
ijuma@apache.org.