You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2018/03/02 01:48:32 UTC

[kafka] branch trunk updated (6cfcc9d -> 8100fd9)

This is an automated email from the ASF dual-hosted git repository.

junrao pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git.


    from 6cfcc9d  KAFKA-6593; Fix livelock with consumer heartbeat thread in commitSync (#4625)
     new 0103103  Fix in updateBrokerInfoInZk, exception is thrown if response was not OK.
     new af598af  Fix in deleteLogDirEventNotifications - use correct path for deleted children
     new d0eb552  Additional tests to improve test coverage of KafkaZkClient.
     new 54a3220  Use move otherZkClient to KafkaZkClientTest
     new 155ac59  Undo unintentional whitespace change in ZooKeeperTestHarness
     new ae51a15  Fixes requested by reviewer
     new 1ae4c1d  Minor fixes requested by reviewer
     new cacd377  Changes requeted by reviewer
     new 8100fd9  removed method updatedLeaderIsrAndControllerEpochs

The 9 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 core/src/main/scala/kafka/zk/KafkaZkClient.scala   |   5 +-
 .../scala/unit/kafka/zk/KafkaZkClientTest.scala    | 576 ++++++++++++++++++++-
 2 files changed, 555 insertions(+), 26 deletions(-)

-- 
To stop receiving notification emails like this one, please contact
junrao@apache.org.

[kafka] 02/09: Fix in deleteLogDirEventNotifications - use correct path for deleted children

Posted by ju...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit af598af20828149bdd45336377e70d26360452fe
Author: Sandor Murakozi <sm...@gmail.com>
AuthorDate: Mon Feb 19 16:51:38 2018 +0100

    Fix in deleteLogDirEventNotifications - use correct path for deleted children
---
 core/src/main/scala/kafka/zk/KafkaZkClient.scala | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index 145e294..851c686 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -426,7 +426,7 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean
   def deleteLogDirEventNotifications(): Unit = {
     val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(LogDirEventNotificationZNode.path))
     if (getChildrenResponse.resultCode == Code.OK) {
-      deleteLogDirEventNotifications(getChildrenResponse.children)
+      deleteLogDirEventNotifications(getChildrenResponse.children.map(LogDirEventNotificationSequenceZNode.sequenceNumber))
     } else if (getChildrenResponse.resultCode != Code.NONODE) {
       getChildrenResponse.maybeThrow
     }

-- 
To stop receiving notification emails like this one, please contact
junrao@apache.org.

[kafka] 03/09: Additional tests to improve test coverage of KafkaZkClient.

Posted by ju...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit d0eb552a891ecd99150ddf4b89985525806c6e31
Author: Sandor Murakozi <sm...@gmail.com>
AuthorDate: Mon Feb 19 16:52:46 2018 +0100

    Additional tests to improve test coverage of KafkaZkClient.
---
 .../scala/unit/kafka/zk/KafkaZkClientTest.scala    | 558 ++++++++++++++++++++-
 .../scala/unit/kafka/zk/ZooKeeperTestHarness.scala |  18 +-
 2 files changed, 547 insertions(+), 29 deletions(-)

diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
index d3726c2..9329430 100644
--- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
@@ -16,10 +16,11 @@
 */
 package kafka.zk
 
-import java.util.{Properties, UUID}
+import java.util.{Collections, Properties, UUID}
 import java.nio.charset.StandardCharsets.UTF_8
+import java.util.concurrent.{CountDownLatch, TimeUnit}
 
-import kafka.api.ApiVersion
+import kafka.api.{ApiVersion, LeaderAndIsr}
 import kafka.cluster.{Broker, EndPoint}
 import kafka.log.LogConfig
 import kafka.security.auth._
@@ -30,16 +31,30 @@ import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
 import org.apache.kafka.common.security.token.delegation.TokenInformation
 import org.apache.kafka.common.utils.SecurityUtils
-import org.apache.zookeeper.KeeperException.NodeExistsException
+import org.apache.zookeeper.KeeperException.{Code, NoNodeException, NodeExistsException}
 import org.junit.Assert._
 import org.junit.Test
-
 import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.{Seq, mutable}
 import scala.util.Random
 
+import kafka.controller.LeaderIsrAndControllerEpoch
+import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult
+import kafka.zookeeper._
+import org.apache.zookeeper.data.Stat
+
 class KafkaZkClientTest extends ZooKeeperTestHarness {
 
   private val group = "my-group"
+  private val topic1 = "topic1"
+  private val topic2 = "topic2"
+
+  val topicPartition10 = new TopicPartition(topic1, 0)
+  val topicPartition11 = new TopicPartition(topic1, 1)
+  val topicPartition20 = new TopicPartition(topic2, 0)
+  val topicPartitions10_11 = Seq(topicPartition10, topicPartition11)
+
   private val topicPartition = new TopicPartition("topic", 0)
 
   @Test
@@ -90,17 +105,18 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
 
   @Test
   def testTopicAssignmentMethods() {
-    val topic1 = "topic1"
-    val topic2 = "topic2"
+    assertTrue(zkClient.getAllTopicsInCluster.isEmpty)
 
     // test with non-existing topic
+    assertFalse(zkClient.topicExists(topic1))
     assertTrue(zkClient.getTopicPartitionCount(topic1).isEmpty)
     assertTrue(zkClient.getPartitionAssignmentForTopics(Set(topic1)).isEmpty)
     assertTrue(zkClient.getPartitionsForTopics(Set(topic1)).isEmpty)
     assertTrue(zkClient.getReplicasForPartition(new TopicPartition(topic1, 2)).isEmpty)
 
+    val topicPartition = new TopicPartition(topic1, 0)
     val assignment = Map(
-      new TopicPartition(topic1, 0) -> Seq(0, 1),
+      topicPartition -> Seq(0, 1),
       new TopicPartition(topic1, 1) -> Seq(0, 1),
       new TopicPartition(topic1, 2) -> Seq(1, 2, 3)
     )
@@ -108,6 +124,8 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
     // create a topic assignment
     zkClient.createTopicAssignment(topic1, assignment)
 
+    assertTrue(zkClient.topicExists(topic1))
+
     val expectedAssignment = assignment map { topicAssignment =>
       val partition = topicAssignment._1.partition
       val assignment = topicAssignment._2
@@ -215,6 +233,43 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
   }
 
   @Test
+  def testIsrChangeNotificationGetters(): Unit = {
+    assertEquals("Failed for non existing parent ZK node", Seq.empty, zkClient.getAllIsrChangeNotifications)
+    assertEquals("Failed for non existing parent ZK node", Seq.empty, zkClient.getPartitionsFromIsrChangeNotifications(Seq("0000000000")))
+
+    zkClient.createRecursive("/isr_change_notification")
+
+    zkClient.propagateIsrChanges(Set(topicPartition10, topicPartition11))
+    zkClient.propagateIsrChanges(Set(topicPartition10))
+
+    assertEquals(Set("0000000000", "0000000001"), zkClient.getAllIsrChangeNotifications.toSet)
+
+    // A partition can have multiple notifications
+    assertEquals(Seq(topicPartition10, topicPartition11, topicPartition10),
+      zkClient.getPartitionsFromIsrChangeNotifications(Seq("0000000000", "0000000001")))
+  }
+
+  @Test
+  def testIsrChangeNotificationsDeletion(): Unit = {
+    // Should not fail even if parent node does not exist
+    zkClient.deleteIsrChangeNotifications(Seq("0000000000"))
+
+    zkClient.createRecursive("/isr_change_notification")
+
+    zkClient.propagateIsrChanges(Set(topicPartition10, topicPartition11))
+    zkClient.propagateIsrChanges(Set(topicPartition10))
+    zkClient.propagateIsrChanges(Set(topicPartition11))
+
+    zkClient.deleteIsrChangeNotifications(Seq("0000000001"))
+    // Should not fail if called on a non-existent notification
+    zkClient.deleteIsrChangeNotifications(Seq("0000000001"))
+
+    assertEquals(Set("0000000000", "0000000002"), zkClient.getAllIsrChangeNotifications.toSet)
+    zkClient.deleteIsrChangeNotifications()
+    assertEquals(Seq.empty,zkClient.getAllIsrChangeNotifications)
+  }
+
+  @Test
   def testPropagateLogDir(): Unit = {
     zkClient.createRecursive("/log_dir_event_notification")
 
@@ -238,6 +293,52 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
   }
 
   @Test
+  def testLogDirGetters(): Unit = {
+    assertEquals("Failed for non existing parent ZK node", Seq.empty, zkClient.getAllLogDirEventNotifications)
+    assertEquals("Failed for non existing parent ZK node", Seq.empty, zkClient.getBrokerIdsFromLogDirEvents(Seq("0000000000")))
+
+    zkClient.createRecursive("/log_dir_event_notification")
+
+    val brokerId = 3
+    zkClient.propagateLogDirEvent(brokerId)
+
+    assertEquals(Seq(3), zkClient.getBrokerIdsFromLogDirEvents(Seq("0000000000")))
+
+    zkClient.propagateLogDirEvent(brokerId)
+
+    val anotherBrokerId = 4
+    zkClient.propagateLogDirEvent(anotherBrokerId)
+
+    val notifications012 = Seq("0000000000", "0000000001", "0000000002")
+    assertEquals(notifications012.toSet, zkClient.getAllLogDirEventNotifications.toSet)
+    assertEquals(Seq(3, 3, 4), zkClient.getBrokerIdsFromLogDirEvents(notifications012))
+  }
+
+  @Test
+  def testLogDirEventNotificationsDeletion(): Unit = {
+    // Should not fail even if parent node does not exist
+    zkClient.deleteLogDirEventNotifications(Seq("0000000000", "0000000002"))
+
+    zkClient.createRecursive("/log_dir_event_notification")
+
+    val brokerId = 3
+    val anotherBrokerId = 4
+
+    zkClient.propagateLogDirEvent(brokerId)
+    zkClient.propagateLogDirEvent(brokerId)
+    zkClient.propagateLogDirEvent(anotherBrokerId)
+
+    zkClient.deleteLogDirEventNotifications(Seq("0000000000", "0000000002"))
+
+    assertEquals(Seq("0000000001"), zkClient.getAllLogDirEventNotifications)
+
+    zkClient.propagateLogDirEvent(anotherBrokerId)
+
+    zkClient.deleteLogDirEventNotifications()
+    assertEquals(Seq.empty, zkClient.getAllLogDirEventNotifications)
+  }
+
+  @Test
   def testSetGetAndDeletePartitionReassignment() {
     zkClient.createRecursive(AdminZNode.path)
 
@@ -377,10 +478,23 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
   }
 
   @Test
-  def testDeleteTopicPathMethods() {
-    val topic1 = "topic1"
-    val topic2 = "topic2"
+  def testDeletePath() {
+    val path = "/a/b/c"
+    zkClient.createRecursive(path)
+    zkClient.deletePath(path)
+    assertFalse(zkClient.pathExists(path))
+  }
 
+  @Test
+  def testDeleteTopicZNode(): Unit ={
+    zkClient.deleteTopicZNode(topic1)
+    zkClient.createRecursive(TopicZNode.path(topic1))
+    zkClient.deleteTopicZNode(topic1)
+    assertFalse(zkClient.pathExists(TopicZNode.path(topic1)))
+  }
+
+  @Test
+  def testDeleteTopicPathMethods() {
     assertFalse(zkClient.isTopicMarkedForDeletion(topic1))
     assertTrue(zkClient.getTopicDeletions.isEmpty)
 
@@ -394,17 +508,25 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
     assertTrue(zkClient.getTopicDeletions.isEmpty)
   }
 
+  private def assertPathExistenceAndData(expectedPath: String, data: String){
+    assertTrue(zkClient.pathExists(expectedPath))
+    assertEquals(Some(data), dataAsString(expectedPath))
+   }
+
   @Test
-  def testEntityConfigManagementMethods() {
-    val topic1 = "topic1"
-    val topic2 = "topic2"
+  def testCreateTokenChangeNotification() {
+    intercept[NoNodeException] {
+      zkClient.createTokenChangeNotification("delegationToken")
+    }
+    zkClient.createDelegationTokenPaths()
 
-    assertTrue(zkClient.getEntityConfigs(ConfigType.Topic, topic1).isEmpty)
+    zkClient.createTokenChangeNotification("delegationToken")
+    assertPathExistenceAndData(s"/delegation_token/token_changes/token_change_0000000000", "delegationToken")
+  }
 
-    val logProps = new Properties()
-    logProps.put(LogConfig.SegmentBytesProp, "1024")
-    logProps.put(LogConfig.SegmentIndexBytesProp, "1024")
-    logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
+  @Test
+  def testEntityConfigManagementMethods() {
+    assertTrue(zkClient.getEntityConfigs(ConfigType.Topic, topic1).isEmpty)
 
     zkClient.setOrCreateEntityConfigs(ConfigType.Topic, topic1, logProps)
     assertEquals(logProps, zkClient.getEntityConfigs(ConfigType.Topic, topic1))
@@ -421,15 +543,399 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
   }
 
   @Test
-  def testBrokerRegistrationMethods() {
+  def testCreateConfigChangeNotification() {
+    intercept[NoNodeException] {
+      zkClient.createConfigChangeNotification(ConfigEntityZNode.path(ConfigType.Topic, topic1))
+    }
+
+    zkClient.createTopLevelPaths()
+    zkClient.createConfigChangeNotification(ConfigEntityZNode.path(ConfigType.Topic, topic1))
+
+    assertPathExistenceAndData(
+      s"/config/changes/config_change_0000000000",
+      """{"version":2,"entity_path":"/config/topics/topic1"}""")
+  }
+
+  private def createLogProps(bytesProp: Int) = {
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, bytesProp.toString)
+    logProps.put(LogConfig.SegmentIndexBytesProp, bytesProp.toString)
+    logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
+    logProps
+  }
+
+  private val logProps = createLogProps(1024)
+
+  @Test
+  def testGetLogConfigs() {
+    val emptyConfig = LogConfig(Collections.emptyMap())
+    assertEquals("Non existent config, no defaults",
+      (Map(topic1 -> emptyConfig), Map.empty),
+      zkClient.getLogConfigs(Seq(topic1), Collections.emptyMap()))
+
+    val logProps2 = createLogProps(2048)
+
+    zkClient.setOrCreateEntityConfigs(ConfigType.Topic, topic1, logProps)
+    assertEquals("One existing and one non-existent topic",
+      (Map(topic1 -> LogConfig(logProps), topic2 -> emptyConfig), Map.empty),
+      zkClient.getLogConfigs(Seq(topic1, topic2), Collections.emptyMap()))
+
+    zkClient.setOrCreateEntityConfigs(ConfigType.Topic, topic2, logProps2)
+    assertEquals("Two existing topics",
+      (Map(topic1 -> LogConfig(logProps), topic2 -> LogConfig(logProps2)), Map.empty),
+      zkClient.getLogConfigs(Seq(topic1, topic2), Collections.emptyMap()))
+
+    val logProps1WithMoreValues = createLogProps(1024)
+    logProps1WithMoreValues.put(LogConfig.SegmentJitterMsProp, "100")
+    logProps1WithMoreValues.put(LogConfig.SegmentBytesProp, "1024")
+
+    assertEquals("Config with defaults",
+      (Map(topic1 -> LogConfig(logProps1WithMoreValues)), Map.empty),
+      zkClient.getLogConfigs(Seq(topic1),
+        Map[String, AnyRef](LogConfig.SegmentJitterMsProp -> "100", LogConfig.SegmentBytesProp -> "128").asJava))
+  }
+
+  private def createBrokerInfo(id: Int, host: String, port: Int,
+                               securityProtocol: SecurityProtocol, rack: Option[String] = None) =
+    BrokerInfo(Broker(id, Seq(new EndPoint(host, port, ListenerName.forSecurityProtocol
+    (securityProtocol), securityProtocol)), rack = rack), ApiVersion.latestVersion, jmxPort = port + 10)
+
+  @Test
+  def testRegisterBrokerInfo() {
     zkClient.createTopLevelPaths()
 
-    val brokerInfo = BrokerInfo(Broker(1,
-      Seq(new EndPoint("test.host", 9999, ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), SecurityProtocol.PLAINTEXT)),
-      rack = None), ApiVersion.latestVersion, jmxPort = 9998)
+    val brokerInfo = createBrokerInfo(1, "test.host", 9999, SecurityProtocol.PLAINTEXT)
+    val differentBrokerInfoWithSameId = createBrokerInfo(1, "test.host2", 9995, SecurityProtocol.SSL)
 
     zkClient.registerBrokerInZk(brokerInfo)
     assertEquals(Some(brokerInfo.broker), zkClient.getBroker(1))
+    assertEquals("Other ZK clients can read broker info", Some(brokerInfo.broker), otherZkClient.getBroker(1))
+
+    // Node exists, owned by current session - no error, no update
+    zkClient.registerBrokerInZk(differentBrokerInfoWithSameId)
+    assertEquals(Some(brokerInfo.broker), zkClient.getBroker(1))
+
+    // Other client tries to register broker with same id causes failure, info is not changed in ZK
+    intercept[NodeExistsException] {
+      otherZkClient.registerBrokerInZk(differentBrokerInfoWithSameId)
+    }
+    assertEquals(Some(brokerInfo.broker), zkClient.getBroker(1))
+  }
+
+  @Test
+  def testGetBrokerMethods() {
+    zkClient.createTopLevelPaths()
+
+    assertEquals(Seq.empty,zkClient.getAllBrokersInCluster)
+    assertEquals(Seq.empty, zkClient.getSortedBrokerList())
+    assertEquals(None, zkClient.getBroker(0))
+
+    val brokerInfo0 = createBrokerInfo(0, "test.host0", 9998, SecurityProtocol.PLAINTEXT)
+    val brokerInfo1 = createBrokerInfo(1, "test.host1", 9999, SecurityProtocol.SSL)
+
+    zkClient.registerBrokerInZk(brokerInfo1)
+    otherZkClient.registerBrokerInZk(brokerInfo0)
+
+    assertEquals(Seq(0, 1), zkClient.getSortedBrokerList())
+    assertEquals(
+      Seq(brokerInfo0.broker, brokerInfo1.broker),
+      zkClient.getAllBrokersInCluster
+    )
+    assertEquals(Some(brokerInfo0.broker), zkClient.getBroker(0))
+  }
+
+  @Test
+  def testUpdateBrokerInfo() {
+    zkClient.createTopLevelPaths()
+
+    // Updating info of a broker not existing in ZK fails
+    val originalBrokerInfo = createBrokerInfo(1, "test.host", 9999, SecurityProtocol.PLAINTEXT)
+    intercept[NoNodeException]{
+      zkClient.updateBrokerInfoInZk(originalBrokerInfo)
+    }
+
+    zkClient.registerBrokerInZk(originalBrokerInfo)
+
+    val updatedBrokerInfo = createBrokerInfo(1, "test.host2", 9995, SecurityProtocol.SSL)
+    zkClient.updateBrokerInfoInZk(updatedBrokerInfo)
+    assertEquals(Some(updatedBrokerInfo.broker), zkClient.getBroker(1))
+
+    // Other ZK clients can update info
+    otherZkClient.updateBrokerInfoInZk(originalBrokerInfo)
+    assertEquals(Some(originalBrokerInfo.broker), otherZkClient.getBroker(1))
+  }
+
+  private def statWithVersion(version: Int) = {
+    val stat = new Stat(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)
+    stat.setVersion(version)
+    stat
+  }
+
+  private def leaderIsrAndControllerEpochs(state: Int, zkVersion: Int) = Map(
+    topicPartition10 -> LeaderIsrAndControllerEpoch(
+      LeaderAndIsr(leader = 1, leaderEpoch = state, isr = List(2 + state, 3 + state), zkVersion = zkVersion),
+      controllerEpoch = 4),
+    topicPartition11 -> LeaderIsrAndControllerEpoch(
+      LeaderAndIsr(leader = 0, leaderEpoch = state + 1, isr = List(1 + state, 2 + state), zkVersion = zkVersion),
+      controllerEpoch = 4))
+
+  private def initialLeaderIsrAndControllerEpochs = leaderIsrAndControllerEpochs(0, 0)
+  private def updatedLeaderIsrAndControllerEpochs(state: Int) = leaderIsrAndControllerEpochs(state, state - 1)
+
+  private def initialLeaderIsrs = initialLeaderIsrAndControllerEpochs.mapValues(_.leaderAndIsr)
+  private def updatedLeaderIsrs(state: Int) = updatedLeaderIsrAndControllerEpochs(state).mapValues(_.leaderAndIsr)
+  private def leaderIsrs(state: Int, zkVersion: Int) =
+    leaderIsrAndControllerEpochs(state, zkVersion).mapValues(_.leaderAndIsr)
+
+  private def checkUpdateLeaderAndIsrResult(
+                  expectedSuccessfulPartitions: Map[TopicPartition, LeaderAndIsr],
+                  expectedPartitionsToRetry: Seq[TopicPartition],
+                  expectedFailedPartitions: Map[TopicPartition, (Class[_], String)],
+                  actualUpdateLeaderAndIsrResult: UpdateLeaderAndIsrResult): Unit = {
+    val failedPartitionsExcerpt =
+      actualUpdateLeaderAndIsrResult.failedPartitions.mapValues(e => (e.getClass, e.getMessage))
+    assertEquals("Permanently failed updates do not match expected",
+      expectedFailedPartitions, failedPartitionsExcerpt)
+    assertEquals("Retriable updates (due to BADVERSION) do not match expected",
+      expectedPartitionsToRetry, actualUpdateLeaderAndIsrResult.partitionsToRetry)
+    assertEquals("Successful updates do not match expected",
+      expectedSuccessfulPartitions, actualUpdateLeaderAndIsrResult.successfulPartitions)
+  }
+
+  @Test
+  def testUpdateLeaderAndIsr() {
+    zkClient.createRecursive(TopicZNode.path(topic1))
+
+    // Non-existing topicPartitions
+    checkUpdateLeaderAndIsrResult(
+        Map.empty,
+        mutable.ArrayBuffer.empty,
+      Map(
+        topicPartition10 -> (classOf[NoNodeException], "KeeperErrorCode = NoNode for /brokers/topics/topic1/partitions/0/state"),
+        topicPartition11 -> (classOf[NoNodeException], "KeeperErrorCode = NoNode for /brokers/topics/topic1/partitions/1/state")),
+      zkClient.updateLeaderAndIsr(initialLeaderIsrs, controllerEpoch = 4))
+
+    zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs)
+
+    checkUpdateLeaderAndIsrResult(
+      leaderIsrs(state = 1, zkVersion = 1),
+      mutable.ArrayBuffer.empty,
+      Map.empty,
+      zkClient.updateLeaderAndIsr(updatedLeaderIsrs(1),controllerEpoch = 4))
+
+    // Try to update with wrong ZK version
+    checkUpdateLeaderAndIsrResult(
+      Map.empty,
+      ArrayBuffer(topicPartition10, topicPartition11),
+      Map.empty,
+      zkClient.updateLeaderAndIsr(updatedLeaderIsrs(1),controllerEpoch = 4))
+
+    // Trigger successful, to be retried and failed partitions in same call
+    val mixedState = Map(
+      topicPartition10 -> LeaderAndIsr(leader = 1, leaderEpoch = 2, isr = List(4, 5), zkVersion = 1),
+      topicPartition11 -> LeaderAndIsr(leader = 0, leaderEpoch = 2, isr = List(3, 4), zkVersion = 0),
+      topicPartition20 -> LeaderAndIsr(leader = 0, leaderEpoch = 2, isr = List(3, 4), zkVersion = 0))
+
+    checkUpdateLeaderAndIsrResult(
+      leaderIsrs(state = 2, zkVersion = 2).filterKeys{_ == topicPartition10},
+      ArrayBuffer(topicPartition11),
+      Map(
+        topicPartition20 -> (classOf[NoNodeException], "KeeperErrorCode = NoNode for /brokers/topics/topic2/partitions/0/state")),
+      zkClient.updateLeaderAndIsr(mixedState, controllerEpoch = 4))
+  }
+
+  private def checkGetDataResponse(
+      leaderIsrAndControllerEpochs: Map[TopicPartition,LeaderIsrAndControllerEpoch],
+      topicPartition: TopicPartition,
+      response: GetDataResponse) = {
+    val zkVersion = leaderIsrAndControllerEpochs(topicPartition).leaderAndIsr.zkVersion
+    assertEquals(Code.OK, response.resultCode)
+    assertEquals(TopicPartitionStateZNode.path(topicPartition), response.path)
+    assertEquals(Some(topicPartition), response.ctx)
+    assertEquals(
+      Some(leaderIsrAndControllerEpochs(topicPartition)),
+      TopicPartitionStateZNode.decode(response.data, statWithVersion(zkVersion)))
+  }
+
+  private def eraseMetadata(response: CreateResponse) = response.copy(metadata = ResponseMetadata(0, 0))
+
+  @Test
+  def testGetTopicsAndPartitions() {
+    assertTrue(zkClient.getAllTopicsInCluster.isEmpty)
+    assertTrue(zkClient.getAllPartitions.isEmpty)
+
+    zkClient.createRecursive(TopicZNode.path(topic1))
+    zkClient.createRecursive(TopicZNode.path(topic2))
+    assertEquals(Set(topic1, topic2), zkClient.getAllTopicsInCluster.toSet)
+
+    assertTrue(zkClient.getAllPartitions.isEmpty)
+
+    zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs)
+    assertEquals(Set(topicPartition10, topicPartition11), zkClient.getAllPartitions)
+  }
+
+  @Test
+  def testCreateAndGetTopicPartitionStatesRaw() {
+    zkClient.createRecursive(TopicZNode.path(topic1))
+
+    assertEquals(
+      Seq(
+        CreateResponse(Code.OK, TopicPartitionStateZNode.path(topicPartition10), Some(topicPartition10),
+          TopicPartitionStateZNode.path(topicPartition10), ResponseMetadata(0, 0)),
+        CreateResponse(Code.OK, TopicPartitionStateZNode.path(topicPartition11), Some(topicPartition11),
+          TopicPartitionStateZNode.path(topicPartition11), ResponseMetadata(0, 0))),
+      zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs)
+        .map(eraseMetadata).toList)
+
+    val getResponses = zkClient.getTopicPartitionStatesRaw(topicPartitions10_11)
+    assertEquals(2, getResponses.size)
+    topicPartitions10_11.zip(getResponses) foreach {case (tp, r) => checkGetDataResponse(initialLeaderIsrAndControllerEpochs, tp, r)}
+
+    // Trying to create existing topicPartition states fails
+    assertEquals(
+      Seq(
+        CreateResponse(Code.NODEEXISTS, TopicPartitionStateZNode.path(topicPartition10), Some(topicPartition10),
+          null, ResponseMetadata(0, 0)),
+        CreateResponse(Code.NODEEXISTS, TopicPartitionStateZNode.path(topicPartition11), Some(topicPartition11),
+          null, ResponseMetadata(0, 0))),
+      zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs).map(eraseMetadata).toList)
+  }
+
+  @Test
+  def testSetTopicPartitionStatesRaw() {
+
+    def expectedSetDataResponses(topicPartitions: TopicPartition*)(resultCode: Code, stat: Stat) =
+      topicPartitions.map { topicPartition =>
+        SetDataResponse(resultCode, TopicPartitionStateZNode.path(topicPartition),
+          Some(topicPartition), stat, ResponseMetadata(0, 0))
+      }
+
+    zkClient.createRecursive(TopicZNode.path(topic1))
+
+    // Trying to set non-existing topicPartition's data results in NONODE responses
+    assertEquals(
+      expectedSetDataResponses(topicPartition10, topicPartition11)(Code.NONODE, null),
+      zkClient.setTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs).map {
+        _.copy(metadata = ResponseMetadata(0, 0))}.toList)
+
+    zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs)
+
+    assertEquals(
+      expectedSetDataResponses(topicPartition10, topicPartition11)(Code.OK, statWithVersion(1)),
+      zkClient.setTopicPartitionStatesRaw(updatedLeaderIsrAndControllerEpochs(1)).map {
+        eraseMetadataAndStat}.toList)
+
+
+    val getResponses = zkClient.getTopicPartitionStatesRaw(topicPartitions10_11)
+    assertEquals(2, getResponses.size)
+    topicPartitions10_11.zip(getResponses) foreach {case (tp, r) => checkGetDataResponse(updatedLeaderIsrAndControllerEpochs(1), tp, r)}
+
+    // Other ZK client can also write the state of a partition
+    assertEquals(
+      expectedSetDataResponses(topicPartition10, topicPartition11)(Code.OK, statWithVersion(2)),
+      otherZkClient.setTopicPartitionStatesRaw(updatedLeaderIsrAndControllerEpochs(2)).map {
+        eraseMetadataAndStat}.toList)
+  }
+
+  @Test
+  def testReassignPartitionsInProgress() {
+    assertFalse(zkClient.reassignPartitionsInProgress)
+    zkClient.createRecursive(ReassignPartitionsZNode.path)
+    assertTrue(zkClient.reassignPartitionsInProgress)
+  }
+
+  @Test
+  def testGetTopicPartitionStates() {
+    assertEquals(None, zkClient.getTopicPartitionState(topicPartition10))
+    assertEquals(None, zkClient.getLeaderForPartition(topicPartition10))
+
+    zkClient.createRecursive(TopicZNode.path(topic1))
+
+
+    zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs).map(eraseMetadata).toList
+    assertEquals(
+      initialLeaderIsrAndControllerEpochs,
+      zkClient.getTopicPartitionStates(Seq(topicPartition10, topicPartition11))
+    )
+
+    assertEquals(
+      Some(initialLeaderIsrAndControllerEpochs(topicPartition10)),
+      zkClient.getTopicPartitionState(topicPartition10)
+    )
+
+    assertEquals(Some(1), zkClient.getLeaderForPartition(topicPartition10))
+
+    val notExistingPartition = new TopicPartition(topic1, 2)
+    assertTrue(zkClient.getTopicPartitionStates(Seq(notExistingPartition)).isEmpty)
+    assertEquals(
+      Map(topicPartition10 -> initialLeaderIsrAndControllerEpochs(topicPartition10)),
+      zkClient.getTopicPartitionStates(Seq(topicPartition10, notExistingPartition))
+    )
+
+    assertEquals(None, zkClient.getTopicPartitionState(notExistingPartition))
+    assertEquals(None, zkClient.getLeaderForPartition(notExistingPartition))
+
+  }
+
+  private def eraseMetadataAndStat(response: SetDataResponse) = {
+    val stat = if (response.stat != null) statWithVersion(response.stat.getVersion) else null
+    response.copy(metadata = ResponseMetadata(0, 0), stat = stat)
+  }
+
+  @Test
+  def testControllerEpochMethods() {
+    assertEquals(None, zkClient.getControllerEpoch)
+
+    assertEquals("Setting non existing nodes should return NONODE results",
+      SetDataResponse(Code.NONODE, ControllerEpochZNode.path, None, null, ResponseMetadata(0, 0)),
+      eraseMetadataAndStat(zkClient.setControllerEpochRaw(1, 0)))
+
+    assertEquals("Creating non existing nodes is OK",
+      CreateResponse(Code.OK, ControllerEpochZNode.path, None, ControllerEpochZNode.path, ResponseMetadata(0, 0)),
+      eraseMetadata(zkClient.createControllerEpochRaw(0)))
+    assertEquals(0, zkClient.getControllerEpoch.get._1)
+
+    assertEquals("Attemt to create existing nodes should return NODEEXISTS",
+      CreateResponse(Code.NODEEXISTS, ControllerEpochZNode.path, None, null, ResponseMetadata(0, 0)),
+      eraseMetadata(zkClient.createControllerEpochRaw(0)))
+
+    assertEquals("Updating existing nodes is OK",
+      SetDataResponse(Code.OK, ControllerEpochZNode.path, None, statWithVersion(1), ResponseMetadata(0, 0)),
+      eraseMetadataAndStat(zkClient.setControllerEpochRaw(1, 0)))
+    assertEquals(1, zkClient.getControllerEpoch.get._1)
+
+    assertEquals("Updating with wrong ZK version returns BADVERSION",
+      SetDataResponse(Code.BADVERSION, ControllerEpochZNode.path, None, null, ResponseMetadata(0, 0)),
+      eraseMetadataAndStat(zkClient.setControllerEpochRaw(1, 0)))
+  }
+
+  @Test
+  def testControllerManagementMethods() {
+    // No controller
+    assertEquals(None, zkClient.getControllerId)
+    // Create controller
+    zkClient.checkedEphemeralCreate(ControllerZNode.path, ControllerZNode.encode(brokerId = 1, timestamp = 123456))
+    assertEquals(Some(1), zkClient.getControllerId)
+    zkClient.deleteController()
+    assertEquals(None, zkClient.getControllerId)
+  }
+
+  @Test
+  def testZNodeChangeHandlerForDataChange(): Unit = {
+    val mockPath = "/foo"
+
+    val znodeChangeHandlerCountDownLatch = new CountDownLatch(1)
+    val zNodeChangeHandler = new ZNodeChangeHandler {
+      override def handleCreation(): Unit = {
+        znodeChangeHandlerCountDownLatch.countDown()
+      }
+
+      override val path: String = mockPath
+    }
+
+    zkClient.registerZNodeChangeHandlerAndCheckExistence(zNodeChangeHandler)
+    zkClient.createRecursive(mockPath)
+    assertTrue("Failed to receive create notification", znodeChangeHandlerCountDownLatch.await(5, TimeUnit.SECONDS))
   }
 
   @Test
@@ -458,7 +964,6 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
 
     assertTrue(zkClient.getPreferredReplicaElection.isEmpty)
 
-    val topic1 = "topic1"
     val electionPartitions = Set(new TopicPartition(topic1, 0), new TopicPartition(topic1, 1))
 
     zkClient.createPreferredReplicaElection(electionPartitions)
@@ -498,6 +1003,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
 
     // test non-existent token
     assertTrue(zkClient.getDelegationTokenInfo(tokenId).isEmpty)
+    assertFalse(zkClient.deleteDelegationToken(tokenId))
 
     // create a token
     zkClient.setOrCreateDelegationToken(token)
@@ -511,5 +1017,9 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
 
     //test updated token
     assertEquals(tokenInfo, zkClient.getDelegationTokenInfo(tokenId).get)
+
+    //test deleting token
+    assertTrue(zkClient.deleteDelegationToken(tokenId))
+    assertEquals(None, zkClient.getDelegationTokenInfo(tokenId))
   }
-}
+}
\ No newline at end of file
diff --git a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
index a122297..f9cb8e3 100755
--- a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
@@ -19,16 +19,16 @@ package kafka.zk
 
 import javax.security.auth.login.Configuration
 
-import kafka.utils.{CoreUtils, Logging, TestUtils}
+import kafka.utils.{CoreUtils, Logging, TestUtils, ZkUtils}
 import org.junit.{After, AfterClass, Before, BeforeClass}
 import org.junit.Assert._
 import org.scalatest.junit.JUnitSuite
 import org.apache.kafka.common.security.JaasUtils
 import org.apache.kafka.test.IntegrationTest
 import org.junit.experimental.categories.Category
-
 import scala.collection.Set
 import scala.collection.JavaConverters._
+
 import org.apache.kafka.clients.producer.KafkaProducer
 import org.apache.kafka.clients.consumer.internals.AbstractCoordinator
 import kafka.controller.ControllerEventManager
@@ -45,6 +45,7 @@ abstract class ZooKeeperTestHarness extends JUnitSuite with Logging {
   protected val zkAclsEnabled: Option[Boolean] = None
 
   var zkClient: KafkaZkClient = null
+  var otherZkClient: KafkaZkClient = null
   var adminZkClient: AdminZkClient = null
 
   var zookeeper: EmbeddedZookeeper = null
@@ -55,15 +56,22 @@ abstract class ZooKeeperTestHarness extends JUnitSuite with Logging {
   @Before
   def setUp() {
     zookeeper = new EmbeddedZookeeper()
-    zkClient = KafkaZkClient(zkConnect, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled), zkSessionTimeout,
-      zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM)
+    zkClient = createZkClient
+    otherZkClient = createZkClient
     adminZkClient = new AdminZkClient(zkClient)
   }
 
+  protected def createZkClient = {
+    KafkaZkClient(zkConnect, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled), zkSessionTimeout,
+      zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM)
+  }
+
   @After
   def tearDown() {
     if (zkClient != null)
-     zkClient.close()
+      zkClient.close()
+    if (otherZkClient != null)
+      otherZkClient.close()
     if (zookeeper != null)
       CoreUtils.swallow(zookeeper.shutdown(), this)
     Configuration.setConfiguration(null)

-- 
To stop receiving notification emails like this one, please contact
junrao@apache.org.

[kafka] 05/09: Undo unintentional whitespace change in ZooKeeperTestHarness

Posted by ju...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 155ac5903aec0f682e19a0f1f17c1091f60c6da5
Author: Sandor Murakozi <sm...@gmail.com>
AuthorDate: Tue Feb 20 10:36:20 2018 +0100

    Undo unintentional whitespace change in ZooKeeperTestHarness
---
 core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
index af2d53a..a122297 100755
--- a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
@@ -51,7 +51,7 @@ abstract class ZooKeeperTestHarness extends JUnitSuite with Logging {
 
   def zkPort: Int = zookeeper.port
   def zkConnect: String = s"127.0.0.1:$zkPort"
-
+  
   @Before
   def setUp() {
     zookeeper = new EmbeddedZookeeper()

-- 
To stop receiving notification emails like this one, please contact
junrao@apache.org.

[kafka] 07/09: Minor fixes requested by reviewer

Posted by ju...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 1ae4c1d6095a86266ba6d008711570df0eaf5682
Author: Sandor Murakozi <sm...@gmail.com>
AuthorDate: Tue Feb 20 21:21:44 2018 +0100

    Minor fixes requested by reviewer
---
 core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
index 5cbb76e..d6826a6 100644
--- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
@@ -1039,4 +1039,4 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
     assertTrue(zkClient.deleteDelegationToken(tokenId))
     assertEquals(None, zkClient.getDelegationTokenInfo(tokenId))
   }
-}
\ No newline at end of file
+}

-- 
To stop receiving notification emails like this one, please contact
junrao@apache.org.

[kafka] 06/09: Fixes requested by reviewer

Posted by ju...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit ae51a15026e0dc10ef2df4bcafc85807dc8b4eb6
Author: Sandor Murakozi <sm...@gmail.com>
AuthorDate: Tue Feb 20 21:19:38 2018 +0100

    Fixes requested by reviewer
---
 core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)

diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
index 28dbb73..5cbb76e 100644
--- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
@@ -19,7 +19,6 @@ package kafka.zk
 import java.util.{Collections, Properties, UUID}
 import java.nio.charset.StandardCharsets.UTF_8
 import java.util.concurrent.{CountDownLatch, TimeUnit}
-import javax.security.auth.login.Configuration
 
 import kafka.api.{ApiVersion, LeaderAndIsr}
 import kafka.cluster.{Broker, EndPoint}
@@ -57,7 +56,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
   val topicPartition20 = new TopicPartition(topic2, 0)
   val topicPartitions10_11 = Seq(topicPartition10, topicPartition11)
 
-  var otherZkClient: KafkaZkClient = null
+  var otherZkClient: KafkaZkClient = _
 
   @Before
   override def setUp() {

-- 
To stop receiving notification emails like this one, please contact
junrao@apache.org.

[kafka] 08/09: Changes requeted by reviewer

Posted by ju...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit cacd377933ae0e7da0ed08dd33ab69fabde073c5
Author: Sandor Murakozi <sm...@gmail.com>
AuthorDate: Wed Feb 28 20:57:57 2018 -0800

    Changes requeted by reviewer
---
 core/src/main/scala/kafka/zk/KafkaZkClient.scala   |   3 +-
 .../scala/unit/kafka/zk/KafkaZkClientTest.scala    | 103 +++++++++++----------
 2 files changed, 54 insertions(+), 52 deletions(-)

diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index 851c686..d61b281 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -89,8 +89,7 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean
     val brokerIdPath = brokerInfo.path
     val setDataRequest = SetDataRequest(brokerIdPath, brokerInfo.toJsonBytes, ZkVersion.NoVersion)
     val response = retryRequestUntilConnected(setDataRequest)
-     if (response.resultCode != Code.OK)
-       throw KeeperException.create(response.resultCode)
+    response.maybeThrow()
     info("Updated broker %d at path %s with addresses: %s".format(brokerInfo.broker.id, brokerIdPath, brokerInfo.broker.endPoints))
   }
 
diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
index d6826a6..9590ecd 100644
--- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
@@ -59,14 +59,14 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
   var otherZkClient: KafkaZkClient = _
 
   @Before
-  override def setUp() {
+  override def setUp(): Unit = {
     super.setUp()
     otherZkClient = KafkaZkClient(zkConnect, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled), zkSessionTimeout,
       zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM)
   }
 
   @After
-  override def tearDown() {
+  override def tearDown(): Unit = {
     if (otherZkClient != null)
       otherZkClient.close()
     super.tearDown()
@@ -131,9 +131,8 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
     assertTrue(zkClient.getPartitionsForTopics(Set(topic1)).isEmpty)
     assertTrue(zkClient.getReplicasForPartition(new TopicPartition(topic1, 2)).isEmpty)
 
-    val topicPartition = new TopicPartition(topic1, 0)
     val assignment = Map(
-      topicPartition -> Seq(0, 1),
+      new TopicPartition(topic1, 0) -> Seq(0, 1),
       new TopicPartition(topic1, 1) -> Seq(0, 1),
       new TopicPartition(topic1, 2) -> Seq(1, 2, 3)
     )
@@ -311,8 +310,10 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
 
   @Test
   def testLogDirGetters(): Unit = {
-    assertEquals("Failed for non existing parent ZK node", Seq.empty, zkClient.getAllLogDirEventNotifications)
-    assertEquals("Failed for non existing parent ZK node", Seq.empty, zkClient.getBrokerIdsFromLogDirEvents(Seq("0000000000")))
+    assertEquals("getAllLogDirEventNotifications failed for non existing parent ZK node",
+      Seq.empty, zkClient.getAllLogDirEventNotifications)
+    assertEquals("getBrokerIdsFromLogDirEvents failed for non existing parent ZK node",
+      Seq.empty, zkClient.getBrokerIdsFromLogDirEvents(Seq("0000000000")))
 
     zkClient.createRecursive("/log_dir_event_notification")
 
@@ -495,7 +496,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
   }
 
   @Test
-  def testDeletePath() {
+  def testDeletePath(): Unit = {
     val path = "/a/b/c"
     zkClient.createRecursive(path)
     zkClient.deletePath(path)
@@ -503,7 +504,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
   }
 
   @Test
-  def testDeleteTopicZNode(): Unit ={
+  def testDeleteTopicZNode(): Unit = {
     zkClient.deleteTopicZNode(topic1)
     zkClient.createRecursive(TopicZNode.path(topic1))
     zkClient.deleteTopicZNode(topic1)
@@ -525,20 +526,20 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
     assertTrue(zkClient.getTopicDeletions.isEmpty)
   }
 
-  private def assertPathExistenceAndData(expectedPath: String, data: String){
+  private def assertPathExistenceAndData(expectedPath: String, data: String): Unit = {
     assertTrue(zkClient.pathExists(expectedPath))
     assertEquals(Some(data), dataAsString(expectedPath))
    }
 
   @Test
-  def testCreateTokenChangeNotification() {
+  def testCreateTokenChangeNotification(): Unit = {
     intercept[NoNodeException] {
       zkClient.createTokenChangeNotification("delegationToken")
     }
     zkClient.createDelegationTokenPaths()
 
     zkClient.createTokenChangeNotification("delegationToken")
-    assertPathExistenceAndData(s"/delegation_token/token_changes/token_change_0000000000", "delegationToken")
+    assertPathExistenceAndData("/delegation_token/token_changes/token_change_0000000000", "delegationToken")
   }
 
   @Test
@@ -560,7 +561,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
   }
 
   @Test
-  def testCreateConfigChangeNotification() {
+  def testCreateConfigChangeNotification(): Unit = {
     intercept[NoNodeException] {
       zkClient.createConfigChangeNotification(ConfigEntityZNode.path(ConfigType.Topic, topic1))
     }
@@ -569,11 +570,11 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
     zkClient.createConfigChangeNotification(ConfigEntityZNode.path(ConfigType.Topic, topic1))
 
     assertPathExistenceAndData(
-      s"/config/changes/config_change_0000000000",
+      "/config/changes/config_change_0000000000",
       """{"version":2,"entity_path":"/config/topics/topic1"}""")
   }
 
-  private def createLogProps(bytesProp: Int) = {
+  private def createLogProps(bytesProp: Int): Properties = {
     val logProps = new Properties()
     logProps.put(LogConfig.SegmentBytesProp, bytesProp.toString)
     logProps.put(LogConfig.SegmentIndexBytesProp, bytesProp.toString)
@@ -584,7 +585,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
   private val logProps = createLogProps(1024)
 
   @Test
-  def testGetLogConfigs() {
+  def testGetLogConfigs(): Unit = {
     val emptyConfig = LogConfig(Collections.emptyMap())
     assertEquals("Non existent config, no defaults",
       (Map(topic1 -> emptyConfig), Map.empty),
@@ -612,13 +613,13 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
         Map[String, AnyRef](LogConfig.SegmentJitterMsProp -> "100", LogConfig.SegmentBytesProp -> "128").asJava))
   }
 
-  private def createBrokerInfo(id: Int, host: String, port: Int,
-                               securityProtocol: SecurityProtocol, rack: Option[String] = None) =
+  private def createBrokerInfo(id: Int, host: String, port: Int, securityProtocol: SecurityProtocol,
+                               rack: Option[String] = None): BrokerInfo =
     BrokerInfo(Broker(id, Seq(new EndPoint(host, port, ListenerName.forSecurityProtocol
     (securityProtocol), securityProtocol)), rack = rack), ApiVersion.latestVersion, jmxPort = port + 10)
 
   @Test
-  def testRegisterBrokerInfo() {
+  def testRegisterBrokerInfo(): Unit = {
     zkClient.createTopLevelPaths()
 
     val brokerInfo = createBrokerInfo(1, "test.host", 9999, SecurityProtocol.PLAINTEXT)
@@ -640,7 +641,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
   }
 
   @Test
-  def testGetBrokerMethods() {
+  def testGetBrokerMethods(): Unit = {
     zkClient.createTopLevelPaths()
 
     assertEquals(Seq.empty,zkClient.getAllBrokersInCluster)
@@ -662,7 +663,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
   }
 
   @Test
-  def testUpdateBrokerInfo() {
+  def testUpdateBrokerInfo(): Unit = {
     zkClient.createTopLevelPaths()
 
     // Updating info of a broker not existing in ZK fails
@@ -682,26 +683,28 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
     assertEquals(Some(originalBrokerInfo.broker), otherZkClient.getBroker(1))
   }
 
-  private def statWithVersion(version: Int) = {
+  private def statWithVersion(version: Int): Stat = {
     val stat = new Stat(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)
     stat.setVersion(version)
     stat
   }
 
-  private def leaderIsrAndControllerEpochs(state: Int, zkVersion: Int) = Map(
-    topicPartition10 -> LeaderIsrAndControllerEpoch(
-      LeaderAndIsr(leader = 1, leaderEpoch = state, isr = List(2 + state, 3 + state), zkVersion = zkVersion),
-      controllerEpoch = 4),
-    topicPartition11 -> LeaderIsrAndControllerEpoch(
-      LeaderAndIsr(leader = 0, leaderEpoch = state + 1, isr = List(1 + state, 2 + state), zkVersion = zkVersion),
-      controllerEpoch = 4))
-
-  private def initialLeaderIsrAndControllerEpochs = leaderIsrAndControllerEpochs(0, 0)
-  private def updatedLeaderIsrAndControllerEpochs(state: Int) = leaderIsrAndControllerEpochs(state, state - 1)
-
-  private def initialLeaderIsrs = initialLeaderIsrAndControllerEpochs.mapValues(_.leaderAndIsr)
-  private def updatedLeaderIsrs(state: Int) = updatedLeaderIsrAndControllerEpochs(state).mapValues(_.leaderAndIsr)
-  private def leaderIsrs(state: Int, zkVersion: Int) =
+  private def leaderIsrAndControllerEpochs(state: Int, zkVersion: Int): Map[TopicPartition, LeaderIsrAndControllerEpoch] =
+    Map(
+      topicPartition10 -> LeaderIsrAndControllerEpoch(
+        LeaderAndIsr(leader = 1, leaderEpoch = state, isr = List(2 + state, 3 + state), zkVersion = zkVersion),
+        controllerEpoch = 4),
+      topicPartition11 -> LeaderIsrAndControllerEpoch(
+        LeaderAndIsr(leader = 0, leaderEpoch = state + 1, isr = List(1 + state, 2 + state), zkVersion = zkVersion),
+        controllerEpoch = 4))
+
+  val initialLeaderIsrAndControllerEpochs: Map[TopicPartition, LeaderIsrAndControllerEpoch] =
+    leaderIsrAndControllerEpochs(0, 0)
+  private def updatedLeaderIsrAndControllerEpochs(state: Int): Map[TopicPartition, LeaderIsrAndControllerEpoch] =
+    leaderIsrAndControllerEpochs(state, state - 1)
+
+  val initialLeaderIsrs: Map[TopicPartition, LeaderAndIsr] = initialLeaderIsrAndControllerEpochs.mapValues(_.leaderAndIsr)
+  private def leaderIsrs(state: Int, zkVersion: Int): Map[TopicPartition, LeaderAndIsr] =
     leaderIsrAndControllerEpochs(state, zkVersion).mapValues(_.leaderAndIsr)
 
   private def checkUpdateLeaderAndIsrResult(
@@ -720,7 +723,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
   }
 
   @Test
-  def testUpdateLeaderAndIsr() {
+  def testUpdateLeaderAndIsr(): Unit = {
     zkClient.createRecursive(TopicZNode.path(topic1))
 
     // Non-existing topicPartitions
@@ -738,14 +741,14 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
       leaderIsrs(state = 1, zkVersion = 1),
       mutable.ArrayBuffer.empty,
       Map.empty,
-      zkClient.updateLeaderAndIsr(updatedLeaderIsrs(1),controllerEpoch = 4))
+      zkClient.updateLeaderAndIsr(leaderIsrs(state = 1, zkVersion = 0),controllerEpoch = 4))
 
     // Try to update with wrong ZK version
     checkUpdateLeaderAndIsrResult(
       Map.empty,
       ArrayBuffer(topicPartition10, topicPartition11),
       Map.empty,
-      zkClient.updateLeaderAndIsr(updatedLeaderIsrs(1),controllerEpoch = 4))
+      zkClient.updateLeaderAndIsr(leaderIsrs(state = 1, zkVersion = 0),controllerEpoch = 4))
 
     // Trigger successful, to be retried and failed partitions in same call
     val mixedState = Map(
@@ -764,7 +767,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
   private def checkGetDataResponse(
       leaderIsrAndControllerEpochs: Map[TopicPartition,LeaderIsrAndControllerEpoch],
       topicPartition: TopicPartition,
-      response: GetDataResponse) = {
+      response: GetDataResponse): Unit = {
     val zkVersion = leaderIsrAndControllerEpochs(topicPartition).leaderAndIsr.zkVersion
     assertEquals(Code.OK, response.resultCode)
     assertEquals(TopicPartitionStateZNode.path(topicPartition), response.path)
@@ -774,10 +777,11 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
       TopicPartitionStateZNode.decode(response.data, statWithVersion(zkVersion)))
   }
 
-  private def eraseMetadata(response: CreateResponse) = response.copy(metadata = ResponseMetadata(0, 0))
+  private def eraseMetadata(response: CreateResponse): CreateResponse =
+    response.copy(metadata = ResponseMetadata(0, 0))
 
   @Test
-  def testGetTopicsAndPartitions() {
+  def testGetTopicsAndPartitions(): Unit = {
     assertTrue(zkClient.getAllTopicsInCluster.isEmpty)
     assertTrue(zkClient.getAllPartitions.isEmpty)
 
@@ -792,7 +796,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
   }
 
   @Test
-  def testCreateAndGetTopicPartitionStatesRaw() {
+  def testCreateAndGetTopicPartitionStatesRaw(): Unit = {
     zkClient.createRecursive(TopicZNode.path(topic1))
 
     assertEquals(
@@ -819,7 +823,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
   }
 
   @Test
-  def testSetTopicPartitionStatesRaw() {
+  def testSetTopicPartitionStatesRaw(): Unit = {
 
     def expectedSetDataResponses(topicPartitions: TopicPartition*)(resultCode: Code, stat: Stat) =
       topicPartitions.map { topicPartition =>
@@ -855,21 +859,20 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
   }
 
   @Test
-  def testReassignPartitionsInProgress() {
+  def testReassignPartitionsInProgress(): Unit = {
     assertFalse(zkClient.reassignPartitionsInProgress)
     zkClient.createRecursive(ReassignPartitionsZNode.path)
     assertTrue(zkClient.reassignPartitionsInProgress)
   }
 
   @Test
-  def testGetTopicPartitionStates() {
+  def testGetTopicPartitionStates(): Unit = {
     assertEquals(None, zkClient.getTopicPartitionState(topicPartition10))
     assertEquals(None, zkClient.getLeaderForPartition(topicPartition10))
 
     zkClient.createRecursive(TopicZNode.path(topic1))
 
-
-    zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs).map(eraseMetadata).toList
+    zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs)
     assertEquals(
       initialLeaderIsrAndControllerEpochs,
       zkClient.getTopicPartitionStates(Seq(topicPartition10, topicPartition11))
@@ -894,13 +897,13 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
 
   }
 
-  private def eraseMetadataAndStat(response: SetDataResponse) = {
+  private def eraseMetadataAndStat(response: SetDataResponse): SetDataResponse = {
     val stat = if (response.stat != null) statWithVersion(response.stat.getVersion) else null
     response.copy(metadata = ResponseMetadata(0, 0), stat = stat)
   }
 
   @Test
-  def testControllerEpochMethods() {
+  def testControllerEpochMethods(): Unit = {
     assertEquals(None, zkClient.getControllerEpoch)
 
     assertEquals("Setting non existing nodes should return NONODE results",
@@ -927,7 +930,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
   }
 
   @Test
-  def testControllerManagementMethods() {
+  def testControllerManagementMethods(): Unit = {
     // No controller
     assertEquals(None, zkClient.getControllerId)
     // Create controller

-- 
To stop receiving notification emails like this one, please contact
junrao@apache.org.

[kafka] 09/09: removed method updatedLeaderIsrAndControllerEpochs

Posted by ju...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 8100fd92afd6c4615bf7a6998a8a8eeb0d628d4e
Author: Sandor Murakozi <sm...@gmail.com>
AuthorDate: Thu Mar 1 14:47:47 2018 -0800

    removed method updatedLeaderIsrAndControllerEpochs
---
 core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala | 8 +++-----
 1 file changed, 3 insertions(+), 5 deletions(-)

diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
index 9590ecd..e44c2c9 100644
--- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
@@ -700,8 +700,6 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
 
   val initialLeaderIsrAndControllerEpochs: Map[TopicPartition, LeaderIsrAndControllerEpoch] =
     leaderIsrAndControllerEpochs(0, 0)
-  private def updatedLeaderIsrAndControllerEpochs(state: Int): Map[TopicPartition, LeaderIsrAndControllerEpoch] =
-    leaderIsrAndControllerEpochs(state, state - 1)
 
   val initialLeaderIsrs: Map[TopicPartition, LeaderAndIsr] = initialLeaderIsrAndControllerEpochs.mapValues(_.leaderAndIsr)
   private def leaderIsrs(state: Int, zkVersion: Int): Map[TopicPartition, LeaderAndIsr] =
@@ -843,18 +841,18 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
 
     assertEquals(
       expectedSetDataResponses(topicPartition10, topicPartition11)(Code.OK, statWithVersion(1)),
-      zkClient.setTopicPartitionStatesRaw(updatedLeaderIsrAndControllerEpochs(1)).map {
+      zkClient.setTopicPartitionStatesRaw(leaderIsrAndControllerEpochs(state = 1, zkVersion = 0)).map {
         eraseMetadataAndStat}.toList)
 
 
     val getResponses = zkClient.getTopicPartitionStatesRaw(topicPartitions10_11)
     assertEquals(2, getResponses.size)
-    topicPartitions10_11.zip(getResponses) foreach {case (tp, r) => checkGetDataResponse(updatedLeaderIsrAndControllerEpochs(1), tp, r)}
+    topicPartitions10_11.zip(getResponses) foreach {case (tp, r) => checkGetDataResponse(leaderIsrAndControllerEpochs(state = 1, zkVersion = 0), tp, r)}
 
     // Other ZK client can also write the state of a partition
     assertEquals(
       expectedSetDataResponses(topicPartition10, topicPartition11)(Code.OK, statWithVersion(2)),
-      otherZkClient.setTopicPartitionStatesRaw(updatedLeaderIsrAndControllerEpochs(2)).map {
+      otherZkClient.setTopicPartitionStatesRaw(leaderIsrAndControllerEpochs(state = 2, zkVersion = 1)).map {
         eraseMetadataAndStat}.toList)
   }
 

-- 
To stop receiving notification emails like this one, please contact
junrao@apache.org.

[kafka] 01/09: Fix in updateBrokerInfoInZk, exception is thrown if response was not OK.

Posted by ju...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 010310388725d6393a73e12c02dff4bb85cf2518
Author: Sandor Murakozi <sm...@gmail.com>
AuthorDate: Mon Feb 19 16:49:15 2018 +0100

    Fix in updateBrokerInfoInZk, exception is thrown if response was not OK.
---
 core/src/main/scala/kafka/zk/KafkaZkClient.scala | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index 6545fde..145e294 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -88,7 +88,9 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean
   def updateBrokerInfoInZk(brokerInfo: BrokerInfo): Unit = {
     val brokerIdPath = brokerInfo.path
     val setDataRequest = SetDataRequest(brokerIdPath, brokerInfo.toJsonBytes, ZkVersion.NoVersion)
-    retryRequestUntilConnected(setDataRequest)
+    val response = retryRequestUntilConnected(setDataRequest)
+     if (response.resultCode != Code.OK)
+       throw KeeperException.create(response.resultCode)
     info("Updated broker %d at path %s with addresses: %s".format(brokerInfo.broker.id, brokerIdPath, brokerInfo.broker.endPoints))
   }
 

-- 
To stop receiving notification emails like this one, please contact
junrao@apache.org.

[kafka] 04/09: Use move otherZkClient to KafkaZkClientTest

Posted by ju...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 54a32205b0fa7545d450104bc28aacf153b70cf6
Author: Sandor Murakozi <sm...@gmail.com>
AuthorDate: Tue Feb 20 09:03:56 2018 +0100

    Use move otherZkClient to KafkaZkClientTest
---
 .../scala/unit/kafka/zk/KafkaZkClientTest.scala    | 22 ++++++++++++++++++++--
 .../scala/unit/kafka/zk/ZooKeeperTestHarness.scala | 20 ++++++--------------
 2 files changed, 26 insertions(+), 16 deletions(-)

diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
index 9329430..28dbb73 100644
--- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
@@ -19,6 +19,7 @@ package kafka.zk
 import java.util.{Collections, Properties, UUID}
 import java.nio.charset.StandardCharsets.UTF_8
 import java.util.concurrent.{CountDownLatch, TimeUnit}
+import javax.security.auth.login.Configuration
 
 import kafka.api.{ApiVersion, LeaderAndIsr}
 import kafka.cluster.{Broker, EndPoint}
@@ -30,10 +31,10 @@ import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
 import org.apache.kafka.common.security.token.delegation.TokenInformation
-import org.apache.kafka.common.utils.SecurityUtils
+import org.apache.kafka.common.utils.{SecurityUtils, Time}
 import org.apache.zookeeper.KeeperException.{Code, NoNodeException, NodeExistsException}
 import org.junit.Assert._
-import org.junit.Test
+import org.junit.{After, Before, Test}
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 import scala.collection.{Seq, mutable}
@@ -42,6 +43,7 @@ import scala.util.Random
 import kafka.controller.LeaderIsrAndControllerEpoch
 import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult
 import kafka.zookeeper._
+import org.apache.kafka.common.security.JaasUtils
 import org.apache.zookeeper.data.Stat
 
 class KafkaZkClientTest extends ZooKeeperTestHarness {
@@ -55,6 +57,22 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
   val topicPartition20 = new TopicPartition(topic2, 0)
   val topicPartitions10_11 = Seq(topicPartition10, topicPartition11)
 
+  var otherZkClient: KafkaZkClient = null
+
+  @Before
+  override def setUp() {
+    super.setUp()
+    otherZkClient = KafkaZkClient(zkConnect, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled), zkSessionTimeout,
+      zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM)
+  }
+
+  @After
+  override def tearDown() {
+    if (otherZkClient != null)
+      otherZkClient.close()
+    super.tearDown()
+  }
+
   private val topicPartition = new TopicPartition("topic", 0)
 
   @Test
diff --git a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
index f9cb8e3..af2d53a 100755
--- a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
@@ -19,16 +19,16 @@ package kafka.zk
 
 import javax.security.auth.login.Configuration
 
-import kafka.utils.{CoreUtils, Logging, TestUtils, ZkUtils}
+import kafka.utils.{CoreUtils, Logging, TestUtils}
 import org.junit.{After, AfterClass, Before, BeforeClass}
 import org.junit.Assert._
 import org.scalatest.junit.JUnitSuite
 import org.apache.kafka.common.security.JaasUtils
 import org.apache.kafka.test.IntegrationTest
 import org.junit.experimental.categories.Category
+
 import scala.collection.Set
 import scala.collection.JavaConverters._
-
 import org.apache.kafka.clients.producer.KafkaProducer
 import org.apache.kafka.clients.consumer.internals.AbstractCoordinator
 import kafka.controller.ControllerEventManager
@@ -45,33 +45,25 @@ abstract class ZooKeeperTestHarness extends JUnitSuite with Logging {
   protected val zkAclsEnabled: Option[Boolean] = None
 
   var zkClient: KafkaZkClient = null
-  var otherZkClient: KafkaZkClient = null
   var adminZkClient: AdminZkClient = null
 
   var zookeeper: EmbeddedZookeeper = null
 
   def zkPort: Int = zookeeper.port
   def zkConnect: String = s"127.0.0.1:$zkPort"
-  
+
   @Before
   def setUp() {
     zookeeper = new EmbeddedZookeeper()
-    zkClient = createZkClient
-    otherZkClient = createZkClient
-    adminZkClient = new AdminZkClient(zkClient)
-  }
-
-  protected def createZkClient = {
-    KafkaZkClient(zkConnect, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled), zkSessionTimeout,
+    zkClient = KafkaZkClient(zkConnect, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled), zkSessionTimeout,
       zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM)
+    adminZkClient = new AdminZkClient(zkClient)
   }
 
   @After
   def tearDown() {
     if (zkClient != null)
-      zkClient.close()
-    if (otherZkClient != null)
-      otherZkClient.close()
+     zkClient.close()
     if (zookeeper != null)
       CoreUtils.swallow(zookeeper.shutdown(), this)
     Configuration.setConfiguration(null)

-- 
To stop receiving notification emails like this one, please contact
junrao@apache.org.