You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2018/03/02 02:03:38 UTC

[kafka] 07/09: Revert "Additional tests to improve test coverage of KafkaZkClient."

This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit e03379905096b81362385fefe8c395b3f45105af
Author: Jun Rao <ju...@gmail.com>
AuthorDate: Thu Mar 1 18:02:07 2018 -0800

    Revert "Additional tests to improve test coverage of KafkaZkClient."
    
    This reverts commit d0eb552a891ecd99150ddf4b89985525806c6e31.
---
 .../scala/unit/kafka/zk/KafkaZkClientTest.scala    | 558 +--------------------
 .../scala/unit/kafka/zk/ZooKeeperTestHarness.scala |  18 +-
 2 files changed, 29 insertions(+), 547 deletions(-)

diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
index 9329430..d3726c2 100644
--- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
@@ -16,11 +16,10 @@
 */
 package kafka.zk
 
-import java.util.{Collections, Properties, UUID}
+import java.util.{Properties, UUID}
 import java.nio.charset.StandardCharsets.UTF_8
-import java.util.concurrent.{CountDownLatch, TimeUnit}
 
-import kafka.api.{ApiVersion, LeaderAndIsr}
+import kafka.api.ApiVersion
 import kafka.cluster.{Broker, EndPoint}
 import kafka.log.LogConfig
 import kafka.security.auth._
@@ -31,30 +30,16 @@ import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
 import org.apache.kafka.common.security.token.delegation.TokenInformation
 import org.apache.kafka.common.utils.SecurityUtils
-import org.apache.zookeeper.KeeperException.{Code, NoNodeException, NodeExistsException}
+import org.apache.zookeeper.KeeperException.NodeExistsException
 import org.junit.Assert._
 import org.junit.Test
+
 import scala.collection.JavaConverters._
-import scala.collection.mutable.ArrayBuffer
-import scala.collection.{Seq, mutable}
 import scala.util.Random
 
-import kafka.controller.LeaderIsrAndControllerEpoch
-import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult
-import kafka.zookeeper._
-import org.apache.zookeeper.data.Stat
-
 class KafkaZkClientTest extends ZooKeeperTestHarness {
 
   private val group = "my-group"
-  private val topic1 = "topic1"
-  private val topic2 = "topic2"
-
-  val topicPartition10 = new TopicPartition(topic1, 0)
-  val topicPartition11 = new TopicPartition(topic1, 1)
-  val topicPartition20 = new TopicPartition(topic2, 0)
-  val topicPartitions10_11 = Seq(topicPartition10, topicPartition11)
-
   private val topicPartition = new TopicPartition("topic", 0)
 
   @Test
@@ -105,18 +90,17 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
 
   @Test
   def testTopicAssignmentMethods() {
-    assertTrue(zkClient.getAllTopicsInCluster.isEmpty)
+    val topic1 = "topic1"
+    val topic2 = "topic2"
 
     // test with non-existing topic
-    assertFalse(zkClient.topicExists(topic1))
     assertTrue(zkClient.getTopicPartitionCount(topic1).isEmpty)
     assertTrue(zkClient.getPartitionAssignmentForTopics(Set(topic1)).isEmpty)
     assertTrue(zkClient.getPartitionsForTopics(Set(topic1)).isEmpty)
     assertTrue(zkClient.getReplicasForPartition(new TopicPartition(topic1, 2)).isEmpty)
 
-    val topicPartition = new TopicPartition(topic1, 0)
     val assignment = Map(
-      topicPartition -> Seq(0, 1),
+      new TopicPartition(topic1, 0) -> Seq(0, 1),
       new TopicPartition(topic1, 1) -> Seq(0, 1),
       new TopicPartition(topic1, 2) -> Seq(1, 2, 3)
     )
@@ -124,8 +108,6 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
     // create a topic assignment
     zkClient.createTopicAssignment(topic1, assignment)
 
-    assertTrue(zkClient.topicExists(topic1))
-
     val expectedAssignment = assignment map { topicAssignment =>
       val partition = topicAssignment._1.partition
       val assignment = topicAssignment._2
@@ -233,43 +215,6 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
   }
 
   @Test
-  def testIsrChangeNotificationGetters(): Unit = {
-    assertEquals("Failed for non existing parent ZK node", Seq.empty, zkClient.getAllIsrChangeNotifications)
-    assertEquals("Failed for non existing parent ZK node", Seq.empty, zkClient.getPartitionsFromIsrChangeNotifications(Seq("0000000000")))
-
-    zkClient.createRecursive("/isr_change_notification")
-
-    zkClient.propagateIsrChanges(Set(topicPartition10, topicPartition11))
-    zkClient.propagateIsrChanges(Set(topicPartition10))
-
-    assertEquals(Set("0000000000", "0000000001"), zkClient.getAllIsrChangeNotifications.toSet)
-
-    // A partition can have multiple notifications
-    assertEquals(Seq(topicPartition10, topicPartition11, topicPartition10),
-      zkClient.getPartitionsFromIsrChangeNotifications(Seq("0000000000", "0000000001")))
-  }
-
-  @Test
-  def testIsrChangeNotificationsDeletion(): Unit = {
-    // Should not fail even if parent node does not exist
-    zkClient.deleteIsrChangeNotifications(Seq("0000000000"))
-
-    zkClient.createRecursive("/isr_change_notification")
-
-    zkClient.propagateIsrChanges(Set(topicPartition10, topicPartition11))
-    zkClient.propagateIsrChanges(Set(topicPartition10))
-    zkClient.propagateIsrChanges(Set(topicPartition11))
-
-    zkClient.deleteIsrChangeNotifications(Seq("0000000001"))
-    // Should not fail if called on a non-existent notification
-    zkClient.deleteIsrChangeNotifications(Seq("0000000001"))
-
-    assertEquals(Set("0000000000", "0000000002"), zkClient.getAllIsrChangeNotifications.toSet)
-    zkClient.deleteIsrChangeNotifications()
-    assertEquals(Seq.empty,zkClient.getAllIsrChangeNotifications)
-  }
-
-  @Test
   def testPropagateLogDir(): Unit = {
     zkClient.createRecursive("/log_dir_event_notification")
 
@@ -293,52 +238,6 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
   }
 
   @Test
-  def testLogDirGetters(): Unit = {
-    assertEquals("Failed for non existing parent ZK node", Seq.empty, zkClient.getAllLogDirEventNotifications)
-    assertEquals("Failed for non existing parent ZK node", Seq.empty, zkClient.getBrokerIdsFromLogDirEvents(Seq("0000000000")))
-
-    zkClient.createRecursive("/log_dir_event_notification")
-
-    val brokerId = 3
-    zkClient.propagateLogDirEvent(brokerId)
-
-    assertEquals(Seq(3), zkClient.getBrokerIdsFromLogDirEvents(Seq("0000000000")))
-
-    zkClient.propagateLogDirEvent(brokerId)
-
-    val anotherBrokerId = 4
-    zkClient.propagateLogDirEvent(anotherBrokerId)
-
-    val notifications012 = Seq("0000000000", "0000000001", "0000000002")
-    assertEquals(notifications012.toSet, zkClient.getAllLogDirEventNotifications.toSet)
-    assertEquals(Seq(3, 3, 4), zkClient.getBrokerIdsFromLogDirEvents(notifications012))
-  }
-
-  @Test
-  def testLogDirEventNotificationsDeletion(): Unit = {
-    // Should not fail even if parent node does not exist
-    zkClient.deleteLogDirEventNotifications(Seq("0000000000", "0000000002"))
-
-    zkClient.createRecursive("/log_dir_event_notification")
-
-    val brokerId = 3
-    val anotherBrokerId = 4
-
-    zkClient.propagateLogDirEvent(brokerId)
-    zkClient.propagateLogDirEvent(brokerId)
-    zkClient.propagateLogDirEvent(anotherBrokerId)
-
-    zkClient.deleteLogDirEventNotifications(Seq("0000000000", "0000000002"))
-
-    assertEquals(Seq("0000000001"), zkClient.getAllLogDirEventNotifications)
-
-    zkClient.propagateLogDirEvent(anotherBrokerId)
-
-    zkClient.deleteLogDirEventNotifications()
-    assertEquals(Seq.empty, zkClient.getAllLogDirEventNotifications)
-  }
-
-  @Test
   def testSetGetAndDeletePartitionReassignment() {
     zkClient.createRecursive(AdminZNode.path)
 
@@ -478,23 +377,10 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
   }
 
   @Test
-  def testDeletePath() {
-    val path = "/a/b/c"
-    zkClient.createRecursive(path)
-    zkClient.deletePath(path)
-    assertFalse(zkClient.pathExists(path))
-  }
-
-  @Test
-  def testDeleteTopicZNode(): Unit ={
-    zkClient.deleteTopicZNode(topic1)
-    zkClient.createRecursive(TopicZNode.path(topic1))
-    zkClient.deleteTopicZNode(topic1)
-    assertFalse(zkClient.pathExists(TopicZNode.path(topic1)))
-  }
-
-  @Test
   def testDeleteTopicPathMethods() {
+    val topic1 = "topic1"
+    val topic2 = "topic2"
+
     assertFalse(zkClient.isTopicMarkedForDeletion(topic1))
     assertTrue(zkClient.getTopicDeletions.isEmpty)
 
@@ -508,26 +394,18 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
     assertTrue(zkClient.getTopicDeletions.isEmpty)
   }
 
-  private def assertPathExistenceAndData(expectedPath: String, data: String){
-    assertTrue(zkClient.pathExists(expectedPath))
-    assertEquals(Some(data), dataAsString(expectedPath))
-   }
-
-  @Test
-  def testCreateTokenChangeNotification() {
-    intercept[NoNodeException] {
-      zkClient.createTokenChangeNotification("delegationToken")
-    }
-    zkClient.createDelegationTokenPaths()
-
-    zkClient.createTokenChangeNotification("delegationToken")
-    assertPathExistenceAndData(s"/delegation_token/token_changes/token_change_0000000000", "delegationToken")
-  }
-
   @Test
   def testEntityConfigManagementMethods() {
+    val topic1 = "topic1"
+    val topic2 = "topic2"
+
     assertTrue(zkClient.getEntityConfigs(ConfigType.Topic, topic1).isEmpty)
 
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, "1024")
+    logProps.put(LogConfig.SegmentIndexBytesProp, "1024")
+    logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
+
     zkClient.setOrCreateEntityConfigs(ConfigType.Topic, topic1, logProps)
     assertEquals(logProps, zkClient.getEntityConfigs(ConfigType.Topic, topic1))
 
@@ -543,399 +421,15 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
   }
 
   @Test
-  def testCreateConfigChangeNotification() {
-    intercept[NoNodeException] {
-      zkClient.createConfigChangeNotification(ConfigEntityZNode.path(ConfigType.Topic, topic1))
-    }
-
-    zkClient.createTopLevelPaths()
-    zkClient.createConfigChangeNotification(ConfigEntityZNode.path(ConfigType.Topic, topic1))
-
-    assertPathExistenceAndData(
-      s"/config/changes/config_change_0000000000",
-      """{"version":2,"entity_path":"/config/topics/topic1"}""")
-  }
-
-  private def createLogProps(bytesProp: Int) = {
-    val logProps = new Properties()
-    logProps.put(LogConfig.SegmentBytesProp, bytesProp.toString)
-    logProps.put(LogConfig.SegmentIndexBytesProp, bytesProp.toString)
-    logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
-    logProps
-  }
-
-  private val logProps = createLogProps(1024)
-
-  @Test
-  def testGetLogConfigs() {
-    val emptyConfig = LogConfig(Collections.emptyMap())
-    assertEquals("Non existent config, no defaults",
-      (Map(topic1 -> emptyConfig), Map.empty),
-      zkClient.getLogConfigs(Seq(topic1), Collections.emptyMap()))
-
-    val logProps2 = createLogProps(2048)
-
-    zkClient.setOrCreateEntityConfigs(ConfigType.Topic, topic1, logProps)
-    assertEquals("One existing and one non-existent topic",
-      (Map(topic1 -> LogConfig(logProps), topic2 -> emptyConfig), Map.empty),
-      zkClient.getLogConfigs(Seq(topic1, topic2), Collections.emptyMap()))
-
-    zkClient.setOrCreateEntityConfigs(ConfigType.Topic, topic2, logProps2)
-    assertEquals("Two existing topics",
-      (Map(topic1 -> LogConfig(logProps), topic2 -> LogConfig(logProps2)), Map.empty),
-      zkClient.getLogConfigs(Seq(topic1, topic2), Collections.emptyMap()))
-
-    val logProps1WithMoreValues = createLogProps(1024)
-    logProps1WithMoreValues.put(LogConfig.SegmentJitterMsProp, "100")
-    logProps1WithMoreValues.put(LogConfig.SegmentBytesProp, "1024")
-
-    assertEquals("Config with defaults",
-      (Map(topic1 -> LogConfig(logProps1WithMoreValues)), Map.empty),
-      zkClient.getLogConfigs(Seq(topic1),
-        Map[String, AnyRef](LogConfig.SegmentJitterMsProp -> "100", LogConfig.SegmentBytesProp -> "128").asJava))
-  }
-
-  private def createBrokerInfo(id: Int, host: String, port: Int,
-                               securityProtocol: SecurityProtocol, rack: Option[String] = None) =
-    BrokerInfo(Broker(id, Seq(new EndPoint(host, port, ListenerName.forSecurityProtocol
-    (securityProtocol), securityProtocol)), rack = rack), ApiVersion.latestVersion, jmxPort = port + 10)
-
-  @Test
-  def testRegisterBrokerInfo() {
+  def testBrokerRegistrationMethods() {
     zkClient.createTopLevelPaths()
 
-    val brokerInfo = createBrokerInfo(1, "test.host", 9999, SecurityProtocol.PLAINTEXT)
-    val differentBrokerInfoWithSameId = createBrokerInfo(1, "test.host2", 9995, SecurityProtocol.SSL)
+    val brokerInfo = BrokerInfo(Broker(1,
+      Seq(new EndPoint("test.host", 9999, ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), SecurityProtocol.PLAINTEXT)),
+      rack = None), ApiVersion.latestVersion, jmxPort = 9998)
 
     zkClient.registerBrokerInZk(brokerInfo)
     assertEquals(Some(brokerInfo.broker), zkClient.getBroker(1))
-    assertEquals("Other ZK clients can read broker info", Some(brokerInfo.broker), otherZkClient.getBroker(1))
-
-    // Node exists, owned by current session - no error, no update
-    zkClient.registerBrokerInZk(differentBrokerInfoWithSameId)
-    assertEquals(Some(brokerInfo.broker), zkClient.getBroker(1))
-
-    // Other client tries to register broker with same id causes failure, info is not changed in ZK
-    intercept[NodeExistsException] {
-      otherZkClient.registerBrokerInZk(differentBrokerInfoWithSameId)
-    }
-    assertEquals(Some(brokerInfo.broker), zkClient.getBroker(1))
-  }
-
-  @Test
-  def testGetBrokerMethods() {
-    zkClient.createTopLevelPaths()
-
-    assertEquals(Seq.empty,zkClient.getAllBrokersInCluster)
-    assertEquals(Seq.empty, zkClient.getSortedBrokerList())
-    assertEquals(None, zkClient.getBroker(0))
-
-    val brokerInfo0 = createBrokerInfo(0, "test.host0", 9998, SecurityProtocol.PLAINTEXT)
-    val brokerInfo1 = createBrokerInfo(1, "test.host1", 9999, SecurityProtocol.SSL)
-
-    zkClient.registerBrokerInZk(brokerInfo1)
-    otherZkClient.registerBrokerInZk(brokerInfo0)
-
-    assertEquals(Seq(0, 1), zkClient.getSortedBrokerList())
-    assertEquals(
-      Seq(brokerInfo0.broker, brokerInfo1.broker),
-      zkClient.getAllBrokersInCluster
-    )
-    assertEquals(Some(brokerInfo0.broker), zkClient.getBroker(0))
-  }
-
-  @Test
-  def testUpdateBrokerInfo() {
-    zkClient.createTopLevelPaths()
-
-    // Updating info of a broker not existing in ZK fails
-    val originalBrokerInfo = createBrokerInfo(1, "test.host", 9999, SecurityProtocol.PLAINTEXT)
-    intercept[NoNodeException]{
-      zkClient.updateBrokerInfoInZk(originalBrokerInfo)
-    }
-
-    zkClient.registerBrokerInZk(originalBrokerInfo)
-
-    val updatedBrokerInfo = createBrokerInfo(1, "test.host2", 9995, SecurityProtocol.SSL)
-    zkClient.updateBrokerInfoInZk(updatedBrokerInfo)
-    assertEquals(Some(updatedBrokerInfo.broker), zkClient.getBroker(1))
-
-    // Other ZK clients can update info
-    otherZkClient.updateBrokerInfoInZk(originalBrokerInfo)
-    assertEquals(Some(originalBrokerInfo.broker), otherZkClient.getBroker(1))
-  }
-
-  private def statWithVersion(version: Int) = {
-    val stat = new Stat(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)
-    stat.setVersion(version)
-    stat
-  }
-
-  private def leaderIsrAndControllerEpochs(state: Int, zkVersion: Int) = Map(
-    topicPartition10 -> LeaderIsrAndControllerEpoch(
-      LeaderAndIsr(leader = 1, leaderEpoch = state, isr = List(2 + state, 3 + state), zkVersion = zkVersion),
-      controllerEpoch = 4),
-    topicPartition11 -> LeaderIsrAndControllerEpoch(
-      LeaderAndIsr(leader = 0, leaderEpoch = state + 1, isr = List(1 + state, 2 + state), zkVersion = zkVersion),
-      controllerEpoch = 4))
-
-  private def initialLeaderIsrAndControllerEpochs = leaderIsrAndControllerEpochs(0, 0)
-  private def updatedLeaderIsrAndControllerEpochs(state: Int) = leaderIsrAndControllerEpochs(state, state - 1)
-
-  private def initialLeaderIsrs = initialLeaderIsrAndControllerEpochs.mapValues(_.leaderAndIsr)
-  private def updatedLeaderIsrs(state: Int) = updatedLeaderIsrAndControllerEpochs(state).mapValues(_.leaderAndIsr)
-  private def leaderIsrs(state: Int, zkVersion: Int) =
-    leaderIsrAndControllerEpochs(state, zkVersion).mapValues(_.leaderAndIsr)
-
-  private def checkUpdateLeaderAndIsrResult(
-                  expectedSuccessfulPartitions: Map[TopicPartition, LeaderAndIsr],
-                  expectedPartitionsToRetry: Seq[TopicPartition],
-                  expectedFailedPartitions: Map[TopicPartition, (Class[_], String)],
-                  actualUpdateLeaderAndIsrResult: UpdateLeaderAndIsrResult): Unit = {
-    val failedPartitionsExcerpt =
-      actualUpdateLeaderAndIsrResult.failedPartitions.mapValues(e => (e.getClass, e.getMessage))
-    assertEquals("Permanently failed updates do not match expected",
-      expectedFailedPartitions, failedPartitionsExcerpt)
-    assertEquals("Retriable updates (due to BADVERSION) do not match expected",
-      expectedPartitionsToRetry, actualUpdateLeaderAndIsrResult.partitionsToRetry)
-    assertEquals("Successful updates do not match expected",
-      expectedSuccessfulPartitions, actualUpdateLeaderAndIsrResult.successfulPartitions)
-  }
-
-  @Test
-  def testUpdateLeaderAndIsr() {
-    zkClient.createRecursive(TopicZNode.path(topic1))
-
-    // Non-existing topicPartitions
-    checkUpdateLeaderAndIsrResult(
-        Map.empty,
-        mutable.ArrayBuffer.empty,
-      Map(
-        topicPartition10 -> (classOf[NoNodeException], "KeeperErrorCode = NoNode for /brokers/topics/topic1/partitions/0/state"),
-        topicPartition11 -> (classOf[NoNodeException], "KeeperErrorCode = NoNode for /brokers/topics/topic1/partitions/1/state")),
-      zkClient.updateLeaderAndIsr(initialLeaderIsrs, controllerEpoch = 4))
-
-    zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs)
-
-    checkUpdateLeaderAndIsrResult(
-      leaderIsrs(state = 1, zkVersion = 1),
-      mutable.ArrayBuffer.empty,
-      Map.empty,
-      zkClient.updateLeaderAndIsr(updatedLeaderIsrs(1),controllerEpoch = 4))
-
-    // Try to update with wrong ZK version
-    checkUpdateLeaderAndIsrResult(
-      Map.empty,
-      ArrayBuffer(topicPartition10, topicPartition11),
-      Map.empty,
-      zkClient.updateLeaderAndIsr(updatedLeaderIsrs(1),controllerEpoch = 4))
-
-    // Trigger successful, to be retried and failed partitions in same call
-    val mixedState = Map(
-      topicPartition10 -> LeaderAndIsr(leader = 1, leaderEpoch = 2, isr = List(4, 5), zkVersion = 1),
-      topicPartition11 -> LeaderAndIsr(leader = 0, leaderEpoch = 2, isr = List(3, 4), zkVersion = 0),
-      topicPartition20 -> LeaderAndIsr(leader = 0, leaderEpoch = 2, isr = List(3, 4), zkVersion = 0))
-
-    checkUpdateLeaderAndIsrResult(
-      leaderIsrs(state = 2, zkVersion = 2).filterKeys{_ == topicPartition10},
-      ArrayBuffer(topicPartition11),
-      Map(
-        topicPartition20 -> (classOf[NoNodeException], "KeeperErrorCode = NoNode for /brokers/topics/topic2/partitions/0/state")),
-      zkClient.updateLeaderAndIsr(mixedState, controllerEpoch = 4))
-  }
-
-  private def checkGetDataResponse(
-      leaderIsrAndControllerEpochs: Map[TopicPartition,LeaderIsrAndControllerEpoch],
-      topicPartition: TopicPartition,
-      response: GetDataResponse) = {
-    val zkVersion = leaderIsrAndControllerEpochs(topicPartition).leaderAndIsr.zkVersion
-    assertEquals(Code.OK, response.resultCode)
-    assertEquals(TopicPartitionStateZNode.path(topicPartition), response.path)
-    assertEquals(Some(topicPartition), response.ctx)
-    assertEquals(
-      Some(leaderIsrAndControllerEpochs(topicPartition)),
-      TopicPartitionStateZNode.decode(response.data, statWithVersion(zkVersion)))
-  }
-
-  private def eraseMetadata(response: CreateResponse) = response.copy(metadata = ResponseMetadata(0, 0))
-
-  @Test
-  def testGetTopicsAndPartitions() {
-    assertTrue(zkClient.getAllTopicsInCluster.isEmpty)
-    assertTrue(zkClient.getAllPartitions.isEmpty)
-
-    zkClient.createRecursive(TopicZNode.path(topic1))
-    zkClient.createRecursive(TopicZNode.path(topic2))
-    assertEquals(Set(topic1, topic2), zkClient.getAllTopicsInCluster.toSet)
-
-    assertTrue(zkClient.getAllPartitions.isEmpty)
-
-    zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs)
-    assertEquals(Set(topicPartition10, topicPartition11), zkClient.getAllPartitions)
-  }
-
-  @Test
-  def testCreateAndGetTopicPartitionStatesRaw() {
-    zkClient.createRecursive(TopicZNode.path(topic1))
-
-    assertEquals(
-      Seq(
-        CreateResponse(Code.OK, TopicPartitionStateZNode.path(topicPartition10), Some(topicPartition10),
-          TopicPartitionStateZNode.path(topicPartition10), ResponseMetadata(0, 0)),
-        CreateResponse(Code.OK, TopicPartitionStateZNode.path(topicPartition11), Some(topicPartition11),
-          TopicPartitionStateZNode.path(topicPartition11), ResponseMetadata(0, 0))),
-      zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs)
-        .map(eraseMetadata).toList)
-
-    val getResponses = zkClient.getTopicPartitionStatesRaw(topicPartitions10_11)
-    assertEquals(2, getResponses.size)
-    topicPartitions10_11.zip(getResponses) foreach {case (tp, r) => checkGetDataResponse(initialLeaderIsrAndControllerEpochs, tp, r)}
-
-    // Trying to create existing topicPartition states fails
-    assertEquals(
-      Seq(
-        CreateResponse(Code.NODEEXISTS, TopicPartitionStateZNode.path(topicPartition10), Some(topicPartition10),
-          null, ResponseMetadata(0, 0)),
-        CreateResponse(Code.NODEEXISTS, TopicPartitionStateZNode.path(topicPartition11), Some(topicPartition11),
-          null, ResponseMetadata(0, 0))),
-      zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs).map(eraseMetadata).toList)
-  }
-
-  @Test
-  def testSetTopicPartitionStatesRaw() {
-
-    def expectedSetDataResponses(topicPartitions: TopicPartition*)(resultCode: Code, stat: Stat) =
-      topicPartitions.map { topicPartition =>
-        SetDataResponse(resultCode, TopicPartitionStateZNode.path(topicPartition),
-          Some(topicPartition), stat, ResponseMetadata(0, 0))
-      }
-
-    zkClient.createRecursive(TopicZNode.path(topic1))
-
-    // Trying to set non-existing topicPartition's data results in NONODE responses
-    assertEquals(
-      expectedSetDataResponses(topicPartition10, topicPartition11)(Code.NONODE, null),
-      zkClient.setTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs).map {
-        _.copy(metadata = ResponseMetadata(0, 0))}.toList)
-
-    zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs)
-
-    assertEquals(
-      expectedSetDataResponses(topicPartition10, topicPartition11)(Code.OK, statWithVersion(1)),
-      zkClient.setTopicPartitionStatesRaw(updatedLeaderIsrAndControllerEpochs(1)).map {
-        eraseMetadataAndStat}.toList)
-
-
-    val getResponses = zkClient.getTopicPartitionStatesRaw(topicPartitions10_11)
-    assertEquals(2, getResponses.size)
-    topicPartitions10_11.zip(getResponses) foreach {case (tp, r) => checkGetDataResponse(updatedLeaderIsrAndControllerEpochs(1), tp, r)}
-
-    // Other ZK client can also write the state of a partition
-    assertEquals(
-      expectedSetDataResponses(topicPartition10, topicPartition11)(Code.OK, statWithVersion(2)),
-      otherZkClient.setTopicPartitionStatesRaw(updatedLeaderIsrAndControllerEpochs(2)).map {
-        eraseMetadataAndStat}.toList)
-  }
-
-  @Test
-  def testReassignPartitionsInProgress() {
-    assertFalse(zkClient.reassignPartitionsInProgress)
-    zkClient.createRecursive(ReassignPartitionsZNode.path)
-    assertTrue(zkClient.reassignPartitionsInProgress)
-  }
-
-  @Test
-  def testGetTopicPartitionStates() {
-    assertEquals(None, zkClient.getTopicPartitionState(topicPartition10))
-    assertEquals(None, zkClient.getLeaderForPartition(topicPartition10))
-
-    zkClient.createRecursive(TopicZNode.path(topic1))
-
-
-    zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs).map(eraseMetadata).toList
-    assertEquals(
-      initialLeaderIsrAndControllerEpochs,
-      zkClient.getTopicPartitionStates(Seq(topicPartition10, topicPartition11))
-    )
-
-    assertEquals(
-      Some(initialLeaderIsrAndControllerEpochs(topicPartition10)),
-      zkClient.getTopicPartitionState(topicPartition10)
-    )
-
-    assertEquals(Some(1), zkClient.getLeaderForPartition(topicPartition10))
-
-    val notExistingPartition = new TopicPartition(topic1, 2)
-    assertTrue(zkClient.getTopicPartitionStates(Seq(notExistingPartition)).isEmpty)
-    assertEquals(
-      Map(topicPartition10 -> initialLeaderIsrAndControllerEpochs(topicPartition10)),
-      zkClient.getTopicPartitionStates(Seq(topicPartition10, notExistingPartition))
-    )
-
-    assertEquals(None, zkClient.getTopicPartitionState(notExistingPartition))
-    assertEquals(None, zkClient.getLeaderForPartition(notExistingPartition))
-
-  }
-
-  private def eraseMetadataAndStat(response: SetDataResponse) = {
-    val stat = if (response.stat != null) statWithVersion(response.stat.getVersion) else null
-    response.copy(metadata = ResponseMetadata(0, 0), stat = stat)
-  }
-
-  @Test
-  def testControllerEpochMethods() {
-    assertEquals(None, zkClient.getControllerEpoch)
-
-    assertEquals("Setting non existing nodes should return NONODE results",
-      SetDataResponse(Code.NONODE, ControllerEpochZNode.path, None, null, ResponseMetadata(0, 0)),
-      eraseMetadataAndStat(zkClient.setControllerEpochRaw(1, 0)))
-
-    assertEquals("Creating non existing nodes is OK",
-      CreateResponse(Code.OK, ControllerEpochZNode.path, None, ControllerEpochZNode.path, ResponseMetadata(0, 0)),
-      eraseMetadata(zkClient.createControllerEpochRaw(0)))
-    assertEquals(0, zkClient.getControllerEpoch.get._1)
-
-    assertEquals("Attemt to create existing nodes should return NODEEXISTS",
-      CreateResponse(Code.NODEEXISTS, ControllerEpochZNode.path, None, null, ResponseMetadata(0, 0)),
-      eraseMetadata(zkClient.createControllerEpochRaw(0)))
-
-    assertEquals("Updating existing nodes is OK",
-      SetDataResponse(Code.OK, ControllerEpochZNode.path, None, statWithVersion(1), ResponseMetadata(0, 0)),
-      eraseMetadataAndStat(zkClient.setControllerEpochRaw(1, 0)))
-    assertEquals(1, zkClient.getControllerEpoch.get._1)
-
-    assertEquals("Updating with wrong ZK version returns BADVERSION",
-      SetDataResponse(Code.BADVERSION, ControllerEpochZNode.path, None, null, ResponseMetadata(0, 0)),
-      eraseMetadataAndStat(zkClient.setControllerEpochRaw(1, 0)))
-  }
-
-  @Test
-  def testControllerManagementMethods() {
-    // No controller
-    assertEquals(None, zkClient.getControllerId)
-    // Create controller
-    zkClient.checkedEphemeralCreate(ControllerZNode.path, ControllerZNode.encode(brokerId = 1, timestamp = 123456))
-    assertEquals(Some(1), zkClient.getControllerId)
-    zkClient.deleteController()
-    assertEquals(None, zkClient.getControllerId)
-  }
-
-  @Test
-  def testZNodeChangeHandlerForDataChange(): Unit = {
-    val mockPath = "/foo"
-
-    val znodeChangeHandlerCountDownLatch = new CountDownLatch(1)
-    val zNodeChangeHandler = new ZNodeChangeHandler {
-      override def handleCreation(): Unit = {
-        znodeChangeHandlerCountDownLatch.countDown()
-      }
-
-      override val path: String = mockPath
-    }
-
-    zkClient.registerZNodeChangeHandlerAndCheckExistence(zNodeChangeHandler)
-    zkClient.createRecursive(mockPath)
-    assertTrue("Failed to receive create notification", znodeChangeHandlerCountDownLatch.await(5, TimeUnit.SECONDS))
   }
 
   @Test
@@ -964,6 +458,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
 
     assertTrue(zkClient.getPreferredReplicaElection.isEmpty)
 
+    val topic1 = "topic1"
     val electionPartitions = Set(new TopicPartition(topic1, 0), new TopicPartition(topic1, 1))
 
     zkClient.createPreferredReplicaElection(electionPartitions)
@@ -1003,7 +498,6 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
 
     // test non-existent token
     assertTrue(zkClient.getDelegationTokenInfo(tokenId).isEmpty)
-    assertFalse(zkClient.deleteDelegationToken(tokenId))
 
     // create a token
     zkClient.setOrCreateDelegationToken(token)
@@ -1017,9 +511,5 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
 
     //test updated token
     assertEquals(tokenInfo, zkClient.getDelegationTokenInfo(tokenId).get)
-
-    //test deleting token
-    assertTrue(zkClient.deleteDelegationToken(tokenId))
-    assertEquals(None, zkClient.getDelegationTokenInfo(tokenId))
   }
-}
\ No newline at end of file
+}
diff --git a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
index f9cb8e3..a122297 100755
--- a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
@@ -19,16 +19,16 @@ package kafka.zk
 
 import javax.security.auth.login.Configuration
 
-import kafka.utils.{CoreUtils, Logging, TestUtils, ZkUtils}
+import kafka.utils.{CoreUtils, Logging, TestUtils}
 import org.junit.{After, AfterClass, Before, BeforeClass}
 import org.junit.Assert._
 import org.scalatest.junit.JUnitSuite
 import org.apache.kafka.common.security.JaasUtils
 import org.apache.kafka.test.IntegrationTest
 import org.junit.experimental.categories.Category
+
 import scala.collection.Set
 import scala.collection.JavaConverters._
-
 import org.apache.kafka.clients.producer.KafkaProducer
 import org.apache.kafka.clients.consumer.internals.AbstractCoordinator
 import kafka.controller.ControllerEventManager
@@ -45,7 +45,6 @@ abstract class ZooKeeperTestHarness extends JUnitSuite with Logging {
   protected val zkAclsEnabled: Option[Boolean] = None
 
   var zkClient: KafkaZkClient = null
-  var otherZkClient: KafkaZkClient = null
   var adminZkClient: AdminZkClient = null
 
   var zookeeper: EmbeddedZookeeper = null
@@ -56,22 +55,15 @@ abstract class ZooKeeperTestHarness extends JUnitSuite with Logging {
   @Before
   def setUp() {
     zookeeper = new EmbeddedZookeeper()
-    zkClient = createZkClient
-    otherZkClient = createZkClient
-    adminZkClient = new AdminZkClient(zkClient)
-  }
-
-  protected def createZkClient = {
-    KafkaZkClient(zkConnect, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled), zkSessionTimeout,
+    zkClient = KafkaZkClient(zkConnect, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled), zkSessionTimeout,
       zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM)
+    adminZkClient = new AdminZkClient(zkClient)
   }
 
   @After
   def tearDown() {
     if (zkClient != null)
-      zkClient.close()
-    if (otherZkClient != null)
-      otherZkClient.close()
+     zkClient.close()
     if (zookeeper != null)
       CoreUtils.swallow(zookeeper.shutdown(), this)
     Configuration.setConfiguration(null)

-- 
To stop receiving notification emails like this one, please contact
junrao@apache.org.