You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2018/03/02 02:03:31 UTC

[kafka] branch trunk updated (8100fd9 -> 57c1a68)

This is an automated email from the ASF dual-hosted git repository.

junrao pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git.


    from 8100fd9  removed method updatedLeaderIsrAndControllerEpochs
     new 720e226  Revert "removed method updatedLeaderIsrAndControllerEpochs"
     new aa3453c  Revert "Changes requeted by reviewer"
     new ae52e10  Revert "Minor fixes requested by reviewer"
     new 4110ad5  Revert "Fixes requested by reviewer"
     new 0724179  Revert "Undo unintentional whitespace change in ZooKeeperTestHarness"
     new 6f24af3  Revert "Use move otherZkClient to KafkaZkClientTest"
     new e033799  Revert "Additional tests to improve test coverage of KafkaZkClient."
     new f6b3906  Revert "Fix in deleteLogDirEventNotifications - use correct path for deleted children"
     new 57c1a68  Revert "Fix in updateBrokerInfoInZk, exception is thrown if response was not OK."

The 9 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 core/src/main/scala/kafka/zk/KafkaZkClient.scala   |   5 +-
 .../scala/unit/kafka/zk/KafkaZkClientTest.scala    | 576 +--------------------
 2 files changed, 26 insertions(+), 555 deletions(-)

-- 
To stop receiving notification emails like this one, please contact
junrao@apache.org.

[kafka] 07/09: Revert "Additional tests to improve test coverage of KafkaZkClient."

Posted by ju...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit e03379905096b81362385fefe8c395b3f45105af
Author: Jun Rao <ju...@gmail.com>
AuthorDate: Thu Mar 1 18:02:07 2018 -0800

    Revert "Additional tests to improve test coverage of KafkaZkClient."
    
    This reverts commit d0eb552a891ecd99150ddf4b89985525806c6e31.
---
 .../scala/unit/kafka/zk/KafkaZkClientTest.scala    | 558 +--------------------
 .../scala/unit/kafka/zk/ZooKeeperTestHarness.scala |  18 +-
 2 files changed, 29 insertions(+), 547 deletions(-)

diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
index 9329430..d3726c2 100644
--- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
@@ -16,11 +16,10 @@
 */
 package kafka.zk
 
-import java.util.{Collections, Properties, UUID}
+import java.util.{Properties, UUID}
 import java.nio.charset.StandardCharsets.UTF_8
-import java.util.concurrent.{CountDownLatch, TimeUnit}
 
-import kafka.api.{ApiVersion, LeaderAndIsr}
+import kafka.api.ApiVersion
 import kafka.cluster.{Broker, EndPoint}
 import kafka.log.LogConfig
 import kafka.security.auth._
@@ -31,30 +30,16 @@ import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
 import org.apache.kafka.common.security.token.delegation.TokenInformation
 import org.apache.kafka.common.utils.SecurityUtils
-import org.apache.zookeeper.KeeperException.{Code, NoNodeException, NodeExistsException}
+import org.apache.zookeeper.KeeperException.NodeExistsException
 import org.junit.Assert._
 import org.junit.Test
+
 import scala.collection.JavaConverters._
-import scala.collection.mutable.ArrayBuffer
-import scala.collection.{Seq, mutable}
 import scala.util.Random
 
-import kafka.controller.LeaderIsrAndControllerEpoch
-import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult
-import kafka.zookeeper._
-import org.apache.zookeeper.data.Stat
-
 class KafkaZkClientTest extends ZooKeeperTestHarness {
 
   private val group = "my-group"
-  private val topic1 = "topic1"
-  private val topic2 = "topic2"
-
-  val topicPartition10 = new TopicPartition(topic1, 0)
-  val topicPartition11 = new TopicPartition(topic1, 1)
-  val topicPartition20 = new TopicPartition(topic2, 0)
-  val topicPartitions10_11 = Seq(topicPartition10, topicPartition11)
-
   private val topicPartition = new TopicPartition("topic", 0)
 
   @Test
@@ -105,18 +90,17 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
 
   @Test
   def testTopicAssignmentMethods() {
-    assertTrue(zkClient.getAllTopicsInCluster.isEmpty)
+    val topic1 = "topic1"
+    val topic2 = "topic2"
 
     // test with non-existing topic
-    assertFalse(zkClient.topicExists(topic1))
     assertTrue(zkClient.getTopicPartitionCount(topic1).isEmpty)
     assertTrue(zkClient.getPartitionAssignmentForTopics(Set(topic1)).isEmpty)
     assertTrue(zkClient.getPartitionsForTopics(Set(topic1)).isEmpty)
     assertTrue(zkClient.getReplicasForPartition(new TopicPartition(topic1, 2)).isEmpty)
 
-    val topicPartition = new TopicPartition(topic1, 0)
     val assignment = Map(
-      topicPartition -> Seq(0, 1),
+      new TopicPartition(topic1, 0) -> Seq(0, 1),
       new TopicPartition(topic1, 1) -> Seq(0, 1),
       new TopicPartition(topic1, 2) -> Seq(1, 2, 3)
     )
@@ -124,8 +108,6 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
     // create a topic assignment
     zkClient.createTopicAssignment(topic1, assignment)
 
-    assertTrue(zkClient.topicExists(topic1))
-
     val expectedAssignment = assignment map { topicAssignment =>
       val partition = topicAssignment._1.partition
       val assignment = topicAssignment._2
@@ -233,43 +215,6 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
   }
 
   @Test
-  def testIsrChangeNotificationGetters(): Unit = {
-    assertEquals("Failed for non existing parent ZK node", Seq.empty, zkClient.getAllIsrChangeNotifications)
-    assertEquals("Failed for non existing parent ZK node", Seq.empty, zkClient.getPartitionsFromIsrChangeNotifications(Seq("0000000000")))
-
-    zkClient.createRecursive("/isr_change_notification")
-
-    zkClient.propagateIsrChanges(Set(topicPartition10, topicPartition11))
-    zkClient.propagateIsrChanges(Set(topicPartition10))
-
-    assertEquals(Set("0000000000", "0000000001"), zkClient.getAllIsrChangeNotifications.toSet)
-
-    // A partition can have multiple notifications
-    assertEquals(Seq(topicPartition10, topicPartition11, topicPartition10),
-      zkClient.getPartitionsFromIsrChangeNotifications(Seq("0000000000", "0000000001")))
-  }
-
-  @Test
-  def testIsrChangeNotificationsDeletion(): Unit = {
-    // Should not fail even if parent node does not exist
-    zkClient.deleteIsrChangeNotifications(Seq("0000000000"))
-
-    zkClient.createRecursive("/isr_change_notification")
-
-    zkClient.propagateIsrChanges(Set(topicPartition10, topicPartition11))
-    zkClient.propagateIsrChanges(Set(topicPartition10))
-    zkClient.propagateIsrChanges(Set(topicPartition11))
-
-    zkClient.deleteIsrChangeNotifications(Seq("0000000001"))
-    // Should not fail if called on a non-existent notification
-    zkClient.deleteIsrChangeNotifications(Seq("0000000001"))
-
-    assertEquals(Set("0000000000", "0000000002"), zkClient.getAllIsrChangeNotifications.toSet)
-    zkClient.deleteIsrChangeNotifications()
-    assertEquals(Seq.empty,zkClient.getAllIsrChangeNotifications)
-  }
-
-  @Test
   def testPropagateLogDir(): Unit = {
     zkClient.createRecursive("/log_dir_event_notification")
 
@@ -293,52 +238,6 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
   }
 
   @Test
-  def testLogDirGetters(): Unit = {
-    assertEquals("Failed for non existing parent ZK node", Seq.empty, zkClient.getAllLogDirEventNotifications)
-    assertEquals("Failed for non existing parent ZK node", Seq.empty, zkClient.getBrokerIdsFromLogDirEvents(Seq("0000000000")))
-
-    zkClient.createRecursive("/log_dir_event_notification")
-
-    val brokerId = 3
-    zkClient.propagateLogDirEvent(brokerId)
-
-    assertEquals(Seq(3), zkClient.getBrokerIdsFromLogDirEvents(Seq("0000000000")))
-
-    zkClient.propagateLogDirEvent(brokerId)
-
-    val anotherBrokerId = 4
-    zkClient.propagateLogDirEvent(anotherBrokerId)
-
-    val notifications012 = Seq("0000000000", "0000000001", "0000000002")
-    assertEquals(notifications012.toSet, zkClient.getAllLogDirEventNotifications.toSet)
-    assertEquals(Seq(3, 3, 4), zkClient.getBrokerIdsFromLogDirEvents(notifications012))
-  }
-
-  @Test
-  def testLogDirEventNotificationsDeletion(): Unit = {
-    // Should not fail even if parent node does not exist
-    zkClient.deleteLogDirEventNotifications(Seq("0000000000", "0000000002"))
-
-    zkClient.createRecursive("/log_dir_event_notification")
-
-    val brokerId = 3
-    val anotherBrokerId = 4
-
-    zkClient.propagateLogDirEvent(brokerId)
-    zkClient.propagateLogDirEvent(brokerId)
-    zkClient.propagateLogDirEvent(anotherBrokerId)
-
-    zkClient.deleteLogDirEventNotifications(Seq("0000000000", "0000000002"))
-
-    assertEquals(Seq("0000000001"), zkClient.getAllLogDirEventNotifications)
-
-    zkClient.propagateLogDirEvent(anotherBrokerId)
-
-    zkClient.deleteLogDirEventNotifications()
-    assertEquals(Seq.empty, zkClient.getAllLogDirEventNotifications)
-  }
-
-  @Test
   def testSetGetAndDeletePartitionReassignment() {
     zkClient.createRecursive(AdminZNode.path)
 
@@ -478,23 +377,10 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
   }
 
   @Test
-  def testDeletePath() {
-    val path = "/a/b/c"
-    zkClient.createRecursive(path)
-    zkClient.deletePath(path)
-    assertFalse(zkClient.pathExists(path))
-  }
-
-  @Test
-  def testDeleteTopicZNode(): Unit ={
-    zkClient.deleteTopicZNode(topic1)
-    zkClient.createRecursive(TopicZNode.path(topic1))
-    zkClient.deleteTopicZNode(topic1)
-    assertFalse(zkClient.pathExists(TopicZNode.path(topic1)))
-  }
-
-  @Test
   def testDeleteTopicPathMethods() {
+    val topic1 = "topic1"
+    val topic2 = "topic2"
+
     assertFalse(zkClient.isTopicMarkedForDeletion(topic1))
     assertTrue(zkClient.getTopicDeletions.isEmpty)
 
@@ -508,26 +394,18 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
     assertTrue(zkClient.getTopicDeletions.isEmpty)
   }
 
-  private def assertPathExistenceAndData(expectedPath: String, data: String){
-    assertTrue(zkClient.pathExists(expectedPath))
-    assertEquals(Some(data), dataAsString(expectedPath))
-   }
-
-  @Test
-  def testCreateTokenChangeNotification() {
-    intercept[NoNodeException] {
-      zkClient.createTokenChangeNotification("delegationToken")
-    }
-    zkClient.createDelegationTokenPaths()
-
-    zkClient.createTokenChangeNotification("delegationToken")
-    assertPathExistenceAndData(s"/delegation_token/token_changes/token_change_0000000000", "delegationToken")
-  }
-
   @Test
   def testEntityConfigManagementMethods() {
+    val topic1 = "topic1"
+    val topic2 = "topic2"
+
     assertTrue(zkClient.getEntityConfigs(ConfigType.Topic, topic1).isEmpty)
 
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, "1024")
+    logProps.put(LogConfig.SegmentIndexBytesProp, "1024")
+    logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
+
     zkClient.setOrCreateEntityConfigs(ConfigType.Topic, topic1, logProps)
     assertEquals(logProps, zkClient.getEntityConfigs(ConfigType.Topic, topic1))
 
@@ -543,399 +421,15 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
   }
 
   @Test
-  def testCreateConfigChangeNotification() {
-    intercept[NoNodeException] {
-      zkClient.createConfigChangeNotification(ConfigEntityZNode.path(ConfigType.Topic, topic1))
-    }
-
-    zkClient.createTopLevelPaths()
-    zkClient.createConfigChangeNotification(ConfigEntityZNode.path(ConfigType.Topic, topic1))
-
-    assertPathExistenceAndData(
-      s"/config/changes/config_change_0000000000",
-      """{"version":2,"entity_path":"/config/topics/topic1"}""")
-  }
-
-  private def createLogProps(bytesProp: Int) = {
-    val logProps = new Properties()
-    logProps.put(LogConfig.SegmentBytesProp, bytesProp.toString)
-    logProps.put(LogConfig.SegmentIndexBytesProp, bytesProp.toString)
-    logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
-    logProps
-  }
-
-  private val logProps = createLogProps(1024)
-
-  @Test
-  def testGetLogConfigs() {
-    val emptyConfig = LogConfig(Collections.emptyMap())
-    assertEquals("Non existent config, no defaults",
-      (Map(topic1 -> emptyConfig), Map.empty),
-      zkClient.getLogConfigs(Seq(topic1), Collections.emptyMap()))
-
-    val logProps2 = createLogProps(2048)
-
-    zkClient.setOrCreateEntityConfigs(ConfigType.Topic, topic1, logProps)
-    assertEquals("One existing and one non-existent topic",
-      (Map(topic1 -> LogConfig(logProps), topic2 -> emptyConfig), Map.empty),
-      zkClient.getLogConfigs(Seq(topic1, topic2), Collections.emptyMap()))
-
-    zkClient.setOrCreateEntityConfigs(ConfigType.Topic, topic2, logProps2)
-    assertEquals("Two existing topics",
-      (Map(topic1 -> LogConfig(logProps), topic2 -> LogConfig(logProps2)), Map.empty),
-      zkClient.getLogConfigs(Seq(topic1, topic2), Collections.emptyMap()))
-
-    val logProps1WithMoreValues = createLogProps(1024)
-    logProps1WithMoreValues.put(LogConfig.SegmentJitterMsProp, "100")
-    logProps1WithMoreValues.put(LogConfig.SegmentBytesProp, "1024")
-
-    assertEquals("Config with defaults",
-      (Map(topic1 -> LogConfig(logProps1WithMoreValues)), Map.empty),
-      zkClient.getLogConfigs(Seq(topic1),
-        Map[String, AnyRef](LogConfig.SegmentJitterMsProp -> "100", LogConfig.SegmentBytesProp -> "128").asJava))
-  }
-
-  private def createBrokerInfo(id: Int, host: String, port: Int,
-                               securityProtocol: SecurityProtocol, rack: Option[String] = None) =
-    BrokerInfo(Broker(id, Seq(new EndPoint(host, port, ListenerName.forSecurityProtocol
-    (securityProtocol), securityProtocol)), rack = rack), ApiVersion.latestVersion, jmxPort = port + 10)
-
-  @Test
-  def testRegisterBrokerInfo() {
+  def testBrokerRegistrationMethods() {
     zkClient.createTopLevelPaths()
 
-    val brokerInfo = createBrokerInfo(1, "test.host", 9999, SecurityProtocol.PLAINTEXT)
-    val differentBrokerInfoWithSameId = createBrokerInfo(1, "test.host2", 9995, SecurityProtocol.SSL)
+    val brokerInfo = BrokerInfo(Broker(1,
+      Seq(new EndPoint("test.host", 9999, ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), SecurityProtocol.PLAINTEXT)),
+      rack = None), ApiVersion.latestVersion, jmxPort = 9998)
 
     zkClient.registerBrokerInZk(brokerInfo)
     assertEquals(Some(brokerInfo.broker), zkClient.getBroker(1))
-    assertEquals("Other ZK clients can read broker info", Some(brokerInfo.broker), otherZkClient.getBroker(1))
-
-    // Node exists, owned by current session - no error, no update
-    zkClient.registerBrokerInZk(differentBrokerInfoWithSameId)
-    assertEquals(Some(brokerInfo.broker), zkClient.getBroker(1))
-
-    // Other client tries to register broker with same id causes failure, info is not changed in ZK
-    intercept[NodeExistsException] {
-      otherZkClient.registerBrokerInZk(differentBrokerInfoWithSameId)
-    }
-    assertEquals(Some(brokerInfo.broker), zkClient.getBroker(1))
-  }
-
-  @Test
-  def testGetBrokerMethods() {
-    zkClient.createTopLevelPaths()
-
-    assertEquals(Seq.empty,zkClient.getAllBrokersInCluster)
-    assertEquals(Seq.empty, zkClient.getSortedBrokerList())
-    assertEquals(None, zkClient.getBroker(0))
-
-    val brokerInfo0 = createBrokerInfo(0, "test.host0", 9998, SecurityProtocol.PLAINTEXT)
-    val brokerInfo1 = createBrokerInfo(1, "test.host1", 9999, SecurityProtocol.SSL)
-
-    zkClient.registerBrokerInZk(brokerInfo1)
-    otherZkClient.registerBrokerInZk(brokerInfo0)
-
-    assertEquals(Seq(0, 1), zkClient.getSortedBrokerList())
-    assertEquals(
-      Seq(brokerInfo0.broker, brokerInfo1.broker),
-      zkClient.getAllBrokersInCluster
-    )
-    assertEquals(Some(brokerInfo0.broker), zkClient.getBroker(0))
-  }
-
-  @Test
-  def testUpdateBrokerInfo() {
-    zkClient.createTopLevelPaths()
-
-    // Updating info of a broker not existing in ZK fails
-    val originalBrokerInfo = createBrokerInfo(1, "test.host", 9999, SecurityProtocol.PLAINTEXT)
-    intercept[NoNodeException]{
-      zkClient.updateBrokerInfoInZk(originalBrokerInfo)
-    }
-
-    zkClient.registerBrokerInZk(originalBrokerInfo)
-
-    val updatedBrokerInfo = createBrokerInfo(1, "test.host2", 9995, SecurityProtocol.SSL)
-    zkClient.updateBrokerInfoInZk(updatedBrokerInfo)
-    assertEquals(Some(updatedBrokerInfo.broker), zkClient.getBroker(1))
-
-    // Other ZK clients can update info
-    otherZkClient.updateBrokerInfoInZk(originalBrokerInfo)
-    assertEquals(Some(originalBrokerInfo.broker), otherZkClient.getBroker(1))
-  }
-
-  private def statWithVersion(version: Int) = {
-    val stat = new Stat(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)
-    stat.setVersion(version)
-    stat
-  }
-
-  private def leaderIsrAndControllerEpochs(state: Int, zkVersion: Int) = Map(
-    topicPartition10 -> LeaderIsrAndControllerEpoch(
-      LeaderAndIsr(leader = 1, leaderEpoch = state, isr = List(2 + state, 3 + state), zkVersion = zkVersion),
-      controllerEpoch = 4),
-    topicPartition11 -> LeaderIsrAndControllerEpoch(
-      LeaderAndIsr(leader = 0, leaderEpoch = state + 1, isr = List(1 + state, 2 + state), zkVersion = zkVersion),
-      controllerEpoch = 4))
-
-  private def initialLeaderIsrAndControllerEpochs = leaderIsrAndControllerEpochs(0, 0)
-  private def updatedLeaderIsrAndControllerEpochs(state: Int) = leaderIsrAndControllerEpochs(state, state - 1)
-
-  private def initialLeaderIsrs = initialLeaderIsrAndControllerEpochs.mapValues(_.leaderAndIsr)
-  private def updatedLeaderIsrs(state: Int) = updatedLeaderIsrAndControllerEpochs(state).mapValues(_.leaderAndIsr)
-  private def leaderIsrs(state: Int, zkVersion: Int) =
-    leaderIsrAndControllerEpochs(state, zkVersion).mapValues(_.leaderAndIsr)
-
-  private def checkUpdateLeaderAndIsrResult(
-                  expectedSuccessfulPartitions: Map[TopicPartition, LeaderAndIsr],
-                  expectedPartitionsToRetry: Seq[TopicPartition],
-                  expectedFailedPartitions: Map[TopicPartition, (Class[_], String)],
-                  actualUpdateLeaderAndIsrResult: UpdateLeaderAndIsrResult): Unit = {
-    val failedPartitionsExcerpt =
-      actualUpdateLeaderAndIsrResult.failedPartitions.mapValues(e => (e.getClass, e.getMessage))
-    assertEquals("Permanently failed updates do not match expected",
-      expectedFailedPartitions, failedPartitionsExcerpt)
-    assertEquals("Retriable updates (due to BADVERSION) do not match expected",
-      expectedPartitionsToRetry, actualUpdateLeaderAndIsrResult.partitionsToRetry)
-    assertEquals("Successful updates do not match expected",
-      expectedSuccessfulPartitions, actualUpdateLeaderAndIsrResult.successfulPartitions)
-  }
-
-  @Test
-  def testUpdateLeaderAndIsr() {
-    zkClient.createRecursive(TopicZNode.path(topic1))
-
-    // Non-existing topicPartitions
-    checkUpdateLeaderAndIsrResult(
-        Map.empty,
-        mutable.ArrayBuffer.empty,
-      Map(
-        topicPartition10 -> (classOf[NoNodeException], "KeeperErrorCode = NoNode for /brokers/topics/topic1/partitions/0/state"),
-        topicPartition11 -> (classOf[NoNodeException], "KeeperErrorCode = NoNode for /brokers/topics/topic1/partitions/1/state")),
-      zkClient.updateLeaderAndIsr(initialLeaderIsrs, controllerEpoch = 4))
-
-    zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs)
-
-    checkUpdateLeaderAndIsrResult(
-      leaderIsrs(state = 1, zkVersion = 1),
-      mutable.ArrayBuffer.empty,
-      Map.empty,
-      zkClient.updateLeaderAndIsr(updatedLeaderIsrs(1),controllerEpoch = 4))
-
-    // Try to update with wrong ZK version
-    checkUpdateLeaderAndIsrResult(
-      Map.empty,
-      ArrayBuffer(topicPartition10, topicPartition11),
-      Map.empty,
-      zkClient.updateLeaderAndIsr(updatedLeaderIsrs(1),controllerEpoch = 4))
-
-    // Trigger successful, to be retried and failed partitions in same call
-    val mixedState = Map(
-      topicPartition10 -> LeaderAndIsr(leader = 1, leaderEpoch = 2, isr = List(4, 5), zkVersion = 1),
-      topicPartition11 -> LeaderAndIsr(leader = 0, leaderEpoch = 2, isr = List(3, 4), zkVersion = 0),
-      topicPartition20 -> LeaderAndIsr(leader = 0, leaderEpoch = 2, isr = List(3, 4), zkVersion = 0))
-
-    checkUpdateLeaderAndIsrResult(
-      leaderIsrs(state = 2, zkVersion = 2).filterKeys{_ == topicPartition10},
-      ArrayBuffer(topicPartition11),
-      Map(
-        topicPartition20 -> (classOf[NoNodeException], "KeeperErrorCode = NoNode for /brokers/topics/topic2/partitions/0/state")),
-      zkClient.updateLeaderAndIsr(mixedState, controllerEpoch = 4))
-  }
-
-  private def checkGetDataResponse(
-      leaderIsrAndControllerEpochs: Map[TopicPartition,LeaderIsrAndControllerEpoch],
-      topicPartition: TopicPartition,
-      response: GetDataResponse) = {
-    val zkVersion = leaderIsrAndControllerEpochs(topicPartition).leaderAndIsr.zkVersion
-    assertEquals(Code.OK, response.resultCode)
-    assertEquals(TopicPartitionStateZNode.path(topicPartition), response.path)
-    assertEquals(Some(topicPartition), response.ctx)
-    assertEquals(
-      Some(leaderIsrAndControllerEpochs(topicPartition)),
-      TopicPartitionStateZNode.decode(response.data, statWithVersion(zkVersion)))
-  }
-
-  private def eraseMetadata(response: CreateResponse) = response.copy(metadata = ResponseMetadata(0, 0))
-
-  @Test
-  def testGetTopicsAndPartitions() {
-    assertTrue(zkClient.getAllTopicsInCluster.isEmpty)
-    assertTrue(zkClient.getAllPartitions.isEmpty)
-
-    zkClient.createRecursive(TopicZNode.path(topic1))
-    zkClient.createRecursive(TopicZNode.path(topic2))
-    assertEquals(Set(topic1, topic2), zkClient.getAllTopicsInCluster.toSet)
-
-    assertTrue(zkClient.getAllPartitions.isEmpty)
-
-    zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs)
-    assertEquals(Set(topicPartition10, topicPartition11), zkClient.getAllPartitions)
-  }
-
-  @Test
-  def testCreateAndGetTopicPartitionStatesRaw() {
-    zkClient.createRecursive(TopicZNode.path(topic1))
-
-    assertEquals(
-      Seq(
-        CreateResponse(Code.OK, TopicPartitionStateZNode.path(topicPartition10), Some(topicPartition10),
-          TopicPartitionStateZNode.path(topicPartition10), ResponseMetadata(0, 0)),
-        CreateResponse(Code.OK, TopicPartitionStateZNode.path(topicPartition11), Some(topicPartition11),
-          TopicPartitionStateZNode.path(topicPartition11), ResponseMetadata(0, 0))),
-      zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs)
-        .map(eraseMetadata).toList)
-
-    val getResponses = zkClient.getTopicPartitionStatesRaw(topicPartitions10_11)
-    assertEquals(2, getResponses.size)
-    topicPartitions10_11.zip(getResponses) foreach {case (tp, r) => checkGetDataResponse(initialLeaderIsrAndControllerEpochs, tp, r)}
-
-    // Trying to create existing topicPartition states fails
-    assertEquals(
-      Seq(
-        CreateResponse(Code.NODEEXISTS, TopicPartitionStateZNode.path(topicPartition10), Some(topicPartition10),
-          null, ResponseMetadata(0, 0)),
-        CreateResponse(Code.NODEEXISTS, TopicPartitionStateZNode.path(topicPartition11), Some(topicPartition11),
-          null, ResponseMetadata(0, 0))),
-      zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs).map(eraseMetadata).toList)
-  }
-
-  @Test
-  def testSetTopicPartitionStatesRaw() {
-
-    def expectedSetDataResponses(topicPartitions: TopicPartition*)(resultCode: Code, stat: Stat) =
-      topicPartitions.map { topicPartition =>
-        SetDataResponse(resultCode, TopicPartitionStateZNode.path(topicPartition),
-          Some(topicPartition), stat, ResponseMetadata(0, 0))
-      }
-
-    zkClient.createRecursive(TopicZNode.path(topic1))
-
-    // Trying to set non-existing topicPartition's data results in NONODE responses
-    assertEquals(
-      expectedSetDataResponses(topicPartition10, topicPartition11)(Code.NONODE, null),
-      zkClient.setTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs).map {
-        _.copy(metadata = ResponseMetadata(0, 0))}.toList)
-
-    zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs)
-
-    assertEquals(
-      expectedSetDataResponses(topicPartition10, topicPartition11)(Code.OK, statWithVersion(1)),
-      zkClient.setTopicPartitionStatesRaw(updatedLeaderIsrAndControllerEpochs(1)).map {
-        eraseMetadataAndStat}.toList)
-
-
-    val getResponses = zkClient.getTopicPartitionStatesRaw(topicPartitions10_11)
-    assertEquals(2, getResponses.size)
-    topicPartitions10_11.zip(getResponses) foreach {case (tp, r) => checkGetDataResponse(updatedLeaderIsrAndControllerEpochs(1), tp, r)}
-
-    // Other ZK client can also write the state of a partition
-    assertEquals(
-      expectedSetDataResponses(topicPartition10, topicPartition11)(Code.OK, statWithVersion(2)),
-      otherZkClient.setTopicPartitionStatesRaw(updatedLeaderIsrAndControllerEpochs(2)).map {
-        eraseMetadataAndStat}.toList)
-  }
-
-  @Test
-  def testReassignPartitionsInProgress() {
-    assertFalse(zkClient.reassignPartitionsInProgress)
-    zkClient.createRecursive(ReassignPartitionsZNode.path)
-    assertTrue(zkClient.reassignPartitionsInProgress)
-  }
-
-  @Test
-  def testGetTopicPartitionStates() {
-    assertEquals(None, zkClient.getTopicPartitionState(topicPartition10))
-    assertEquals(None, zkClient.getLeaderForPartition(topicPartition10))
-
-    zkClient.createRecursive(TopicZNode.path(topic1))
-
-
-    zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs).map(eraseMetadata).toList
-    assertEquals(
-      initialLeaderIsrAndControllerEpochs,
-      zkClient.getTopicPartitionStates(Seq(topicPartition10, topicPartition11))
-    )
-
-    assertEquals(
-      Some(initialLeaderIsrAndControllerEpochs(topicPartition10)),
-      zkClient.getTopicPartitionState(topicPartition10)
-    )
-
-    assertEquals(Some(1), zkClient.getLeaderForPartition(topicPartition10))
-
-    val notExistingPartition = new TopicPartition(topic1, 2)
-    assertTrue(zkClient.getTopicPartitionStates(Seq(notExistingPartition)).isEmpty)
-    assertEquals(
-      Map(topicPartition10 -> initialLeaderIsrAndControllerEpochs(topicPartition10)),
-      zkClient.getTopicPartitionStates(Seq(topicPartition10, notExistingPartition))
-    )
-
-    assertEquals(None, zkClient.getTopicPartitionState(notExistingPartition))
-    assertEquals(None, zkClient.getLeaderForPartition(notExistingPartition))
-
-  }
-
-  private def eraseMetadataAndStat(response: SetDataResponse) = {
-    val stat = if (response.stat != null) statWithVersion(response.stat.getVersion) else null
-    response.copy(metadata = ResponseMetadata(0, 0), stat = stat)
-  }
-
-  @Test
-  def testControllerEpochMethods() {
-    assertEquals(None, zkClient.getControllerEpoch)
-
-    assertEquals("Setting non existing nodes should return NONODE results",
-      SetDataResponse(Code.NONODE, ControllerEpochZNode.path, None, null, ResponseMetadata(0, 0)),
-      eraseMetadataAndStat(zkClient.setControllerEpochRaw(1, 0)))
-
-    assertEquals("Creating non existing nodes is OK",
-      CreateResponse(Code.OK, ControllerEpochZNode.path, None, ControllerEpochZNode.path, ResponseMetadata(0, 0)),
-      eraseMetadata(zkClient.createControllerEpochRaw(0)))
-    assertEquals(0, zkClient.getControllerEpoch.get._1)
-
-    assertEquals("Attemt to create existing nodes should return NODEEXISTS",
-      CreateResponse(Code.NODEEXISTS, ControllerEpochZNode.path, None, null, ResponseMetadata(0, 0)),
-      eraseMetadata(zkClient.createControllerEpochRaw(0)))
-
-    assertEquals("Updating existing nodes is OK",
-      SetDataResponse(Code.OK, ControllerEpochZNode.path, None, statWithVersion(1), ResponseMetadata(0, 0)),
-      eraseMetadataAndStat(zkClient.setControllerEpochRaw(1, 0)))
-    assertEquals(1, zkClient.getControllerEpoch.get._1)
-
-    assertEquals("Updating with wrong ZK version returns BADVERSION",
-      SetDataResponse(Code.BADVERSION, ControllerEpochZNode.path, None, null, ResponseMetadata(0, 0)),
-      eraseMetadataAndStat(zkClient.setControllerEpochRaw(1, 0)))
-  }
-
-  @Test
-  def testControllerManagementMethods() {
-    // No controller
-    assertEquals(None, zkClient.getControllerId)
-    // Create controller
-    zkClient.checkedEphemeralCreate(ControllerZNode.path, ControllerZNode.encode(brokerId = 1, timestamp = 123456))
-    assertEquals(Some(1), zkClient.getControllerId)
-    zkClient.deleteController()
-    assertEquals(None, zkClient.getControllerId)
-  }
-
-  @Test
-  def testZNodeChangeHandlerForDataChange(): Unit = {
-    val mockPath = "/foo"
-
-    val znodeChangeHandlerCountDownLatch = new CountDownLatch(1)
-    val zNodeChangeHandler = new ZNodeChangeHandler {
-      override def handleCreation(): Unit = {
-        znodeChangeHandlerCountDownLatch.countDown()
-      }
-
-      override val path: String = mockPath
-    }
-
-    zkClient.registerZNodeChangeHandlerAndCheckExistence(zNodeChangeHandler)
-    zkClient.createRecursive(mockPath)
-    assertTrue("Failed to receive create notification", znodeChangeHandlerCountDownLatch.await(5, TimeUnit.SECONDS))
   }
 
   @Test
@@ -964,6 +458,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
 
     assertTrue(zkClient.getPreferredReplicaElection.isEmpty)
 
+    val topic1 = "topic1"
     val electionPartitions = Set(new TopicPartition(topic1, 0), new TopicPartition(topic1, 1))
 
     zkClient.createPreferredReplicaElection(electionPartitions)
@@ -1003,7 +498,6 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
 
     // test non-existent token
     assertTrue(zkClient.getDelegationTokenInfo(tokenId).isEmpty)
-    assertFalse(zkClient.deleteDelegationToken(tokenId))
 
     // create a token
     zkClient.setOrCreateDelegationToken(token)
@@ -1017,9 +511,5 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
 
     //test updated token
     assertEquals(tokenInfo, zkClient.getDelegationTokenInfo(tokenId).get)
-
-    //test deleting token
-    assertTrue(zkClient.deleteDelegationToken(tokenId))
-    assertEquals(None, zkClient.getDelegationTokenInfo(tokenId))
   }
-}
\ No newline at end of file
+}
diff --git a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
index f9cb8e3..a122297 100755
--- a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
@@ -19,16 +19,16 @@ package kafka.zk
 
 import javax.security.auth.login.Configuration
 
-import kafka.utils.{CoreUtils, Logging, TestUtils, ZkUtils}
+import kafka.utils.{CoreUtils, Logging, TestUtils}
 import org.junit.{After, AfterClass, Before, BeforeClass}
 import org.junit.Assert._
 import org.scalatest.junit.JUnitSuite
 import org.apache.kafka.common.security.JaasUtils
 import org.apache.kafka.test.IntegrationTest
 import org.junit.experimental.categories.Category
+
 import scala.collection.Set
 import scala.collection.JavaConverters._
-
 import org.apache.kafka.clients.producer.KafkaProducer
 import org.apache.kafka.clients.consumer.internals.AbstractCoordinator
 import kafka.controller.ControllerEventManager
@@ -45,7 +45,6 @@ abstract class ZooKeeperTestHarness extends JUnitSuite with Logging {
   protected val zkAclsEnabled: Option[Boolean] = None
 
   var zkClient: KafkaZkClient = null
-  var otherZkClient: KafkaZkClient = null
   var adminZkClient: AdminZkClient = null
 
   var zookeeper: EmbeddedZookeeper = null
@@ -56,22 +55,15 @@ abstract class ZooKeeperTestHarness extends JUnitSuite with Logging {
   @Before
   def setUp() {
     zookeeper = new EmbeddedZookeeper()
-    zkClient = createZkClient
-    otherZkClient = createZkClient
-    adminZkClient = new AdminZkClient(zkClient)
-  }
-
-  protected def createZkClient = {
-    KafkaZkClient(zkConnect, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled), zkSessionTimeout,
+    zkClient = KafkaZkClient(zkConnect, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled), zkSessionTimeout,
       zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM)
+    adminZkClient = new AdminZkClient(zkClient)
   }
 
   @After
   def tearDown() {
     if (zkClient != null)
-      zkClient.close()
-    if (otherZkClient != null)
-      otherZkClient.close()
+     zkClient.close()
     if (zookeeper != null)
       CoreUtils.swallow(zookeeper.shutdown(), this)
     Configuration.setConfiguration(null)

-- 
To stop receiving notification emails like this one, please contact
junrao@apache.org.

[kafka] 02/09: Revert "Changes requeted by reviewer"

Posted by ju...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit aa3453c157b43429a7e01c209aefc4e7979d1f2e
Author: Jun Rao <ju...@gmail.com>
AuthorDate: Thu Mar 1 18:02:07 2018 -0800

    Revert "Changes requeted by reviewer"
    
    This reverts commit cacd377933ae0e7da0ed08dd33ab69fabde073c5.
---
 core/src/main/scala/kafka/zk/KafkaZkClient.scala   |   3 +-
 .../scala/unit/kafka/zk/KafkaZkClientTest.scala    | 103 ++++++++++-----------
 2 files changed, 52 insertions(+), 54 deletions(-)

diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index d61b281..851c686 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -89,7 +89,8 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean
     val brokerIdPath = brokerInfo.path
     val setDataRequest = SetDataRequest(brokerIdPath, brokerInfo.toJsonBytes, ZkVersion.NoVersion)
     val response = retryRequestUntilConnected(setDataRequest)
-    response.maybeThrow()
+     if (response.resultCode != Code.OK)
+       throw KeeperException.create(response.resultCode)
     info("Updated broker %d at path %s with addresses: %s".format(brokerInfo.broker.id, brokerIdPath, brokerInfo.broker.endPoints))
   }
 
diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
index 9590ecd..d6826a6 100644
--- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
@@ -59,14 +59,14 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
   var otherZkClient: KafkaZkClient = _
 
   @Before
-  override def setUp(): Unit = {
+  override def setUp() {
     super.setUp()
     otherZkClient = KafkaZkClient(zkConnect, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled), zkSessionTimeout,
       zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM)
   }
 
   @After
-  override def tearDown(): Unit = {
+  override def tearDown() {
     if (otherZkClient != null)
       otherZkClient.close()
     super.tearDown()
@@ -131,8 +131,9 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
     assertTrue(zkClient.getPartitionsForTopics(Set(topic1)).isEmpty)
     assertTrue(zkClient.getReplicasForPartition(new TopicPartition(topic1, 2)).isEmpty)
 
+    val topicPartition = new TopicPartition(topic1, 0)
     val assignment = Map(
-      new TopicPartition(topic1, 0) -> Seq(0, 1),
+      topicPartition -> Seq(0, 1),
       new TopicPartition(topic1, 1) -> Seq(0, 1),
       new TopicPartition(topic1, 2) -> Seq(1, 2, 3)
     )
@@ -310,10 +311,8 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
 
   @Test
   def testLogDirGetters(): Unit = {
-    assertEquals("getAllLogDirEventNotifications failed for non existing parent ZK node",
-      Seq.empty, zkClient.getAllLogDirEventNotifications)
-    assertEquals("getBrokerIdsFromLogDirEvents failed for non existing parent ZK node",
-      Seq.empty, zkClient.getBrokerIdsFromLogDirEvents(Seq("0000000000")))
+    assertEquals("Failed for non existing parent ZK node", Seq.empty, zkClient.getAllLogDirEventNotifications)
+    assertEquals("Failed for non existing parent ZK node", Seq.empty, zkClient.getBrokerIdsFromLogDirEvents(Seq("0000000000")))
 
     zkClient.createRecursive("/log_dir_event_notification")
 
@@ -496,7 +495,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
   }
 
   @Test
-  def testDeletePath(): Unit = {
+  def testDeletePath() {
     val path = "/a/b/c"
     zkClient.createRecursive(path)
     zkClient.deletePath(path)
@@ -504,7 +503,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
   }
 
   @Test
-  def testDeleteTopicZNode(): Unit = {
+  def testDeleteTopicZNode(): Unit ={
     zkClient.deleteTopicZNode(topic1)
     zkClient.createRecursive(TopicZNode.path(topic1))
     zkClient.deleteTopicZNode(topic1)
@@ -526,20 +525,20 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
     assertTrue(zkClient.getTopicDeletions.isEmpty)
   }
 
-  private def assertPathExistenceAndData(expectedPath: String, data: String): Unit = {
+  private def assertPathExistenceAndData(expectedPath: String, data: String){
     assertTrue(zkClient.pathExists(expectedPath))
     assertEquals(Some(data), dataAsString(expectedPath))
    }
 
   @Test
-  def testCreateTokenChangeNotification(): Unit = {
+  def testCreateTokenChangeNotification() {
     intercept[NoNodeException] {
       zkClient.createTokenChangeNotification("delegationToken")
     }
     zkClient.createDelegationTokenPaths()
 
     zkClient.createTokenChangeNotification("delegationToken")
-    assertPathExistenceAndData("/delegation_token/token_changes/token_change_0000000000", "delegationToken")
+    assertPathExistenceAndData(s"/delegation_token/token_changes/token_change_0000000000", "delegationToken")
   }
 
   @Test
@@ -561,7 +560,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
   }
 
   @Test
-  def testCreateConfigChangeNotification(): Unit = {
+  def testCreateConfigChangeNotification() {
     intercept[NoNodeException] {
       zkClient.createConfigChangeNotification(ConfigEntityZNode.path(ConfigType.Topic, topic1))
     }
@@ -570,11 +569,11 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
     zkClient.createConfigChangeNotification(ConfigEntityZNode.path(ConfigType.Topic, topic1))
 
     assertPathExistenceAndData(
-      "/config/changes/config_change_0000000000",
+      s"/config/changes/config_change_0000000000",
       """{"version":2,"entity_path":"/config/topics/topic1"}""")
   }
 
-  private def createLogProps(bytesProp: Int): Properties = {
+  private def createLogProps(bytesProp: Int) = {
     val logProps = new Properties()
     logProps.put(LogConfig.SegmentBytesProp, bytesProp.toString)
     logProps.put(LogConfig.SegmentIndexBytesProp, bytesProp.toString)
@@ -585,7 +584,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
   private val logProps = createLogProps(1024)
 
   @Test
-  def testGetLogConfigs(): Unit = {
+  def testGetLogConfigs() {
     val emptyConfig = LogConfig(Collections.emptyMap())
     assertEquals("Non existent config, no defaults",
       (Map(topic1 -> emptyConfig), Map.empty),
@@ -613,13 +612,13 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
         Map[String, AnyRef](LogConfig.SegmentJitterMsProp -> "100", LogConfig.SegmentBytesProp -> "128").asJava))
   }
 
-  private def createBrokerInfo(id: Int, host: String, port: Int, securityProtocol: SecurityProtocol,
-                               rack: Option[String] = None): BrokerInfo =
+  private def createBrokerInfo(id: Int, host: String, port: Int,
+                               securityProtocol: SecurityProtocol, rack: Option[String] = None) =
     BrokerInfo(Broker(id, Seq(new EndPoint(host, port, ListenerName.forSecurityProtocol
     (securityProtocol), securityProtocol)), rack = rack), ApiVersion.latestVersion, jmxPort = port + 10)
 
   @Test
-  def testRegisterBrokerInfo(): Unit = {
+  def testRegisterBrokerInfo() {
     zkClient.createTopLevelPaths()
 
     val brokerInfo = createBrokerInfo(1, "test.host", 9999, SecurityProtocol.PLAINTEXT)
@@ -641,7 +640,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
   }
 
   @Test
-  def testGetBrokerMethods(): Unit = {
+  def testGetBrokerMethods() {
     zkClient.createTopLevelPaths()
 
     assertEquals(Seq.empty,zkClient.getAllBrokersInCluster)
@@ -663,7 +662,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
   }
 
   @Test
-  def testUpdateBrokerInfo(): Unit = {
+  def testUpdateBrokerInfo() {
     zkClient.createTopLevelPaths()
 
     // Updating info of a broker not existing in ZK fails
@@ -683,28 +682,26 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
     assertEquals(Some(originalBrokerInfo.broker), otherZkClient.getBroker(1))
   }
 
-  private def statWithVersion(version: Int): Stat = {
+  private def statWithVersion(version: Int) = {
     val stat = new Stat(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)
     stat.setVersion(version)
     stat
   }
 
-  private def leaderIsrAndControllerEpochs(state: Int, zkVersion: Int): Map[TopicPartition, LeaderIsrAndControllerEpoch] =
-    Map(
-      topicPartition10 -> LeaderIsrAndControllerEpoch(
-        LeaderAndIsr(leader = 1, leaderEpoch = state, isr = List(2 + state, 3 + state), zkVersion = zkVersion),
-        controllerEpoch = 4),
-      topicPartition11 -> LeaderIsrAndControllerEpoch(
-        LeaderAndIsr(leader = 0, leaderEpoch = state + 1, isr = List(1 + state, 2 + state), zkVersion = zkVersion),
-        controllerEpoch = 4))
-
-  val initialLeaderIsrAndControllerEpochs: Map[TopicPartition, LeaderIsrAndControllerEpoch] =
-    leaderIsrAndControllerEpochs(0, 0)
-  private def updatedLeaderIsrAndControllerEpochs(state: Int): Map[TopicPartition, LeaderIsrAndControllerEpoch] =
-    leaderIsrAndControllerEpochs(state, state - 1)
-
-  val initialLeaderIsrs: Map[TopicPartition, LeaderAndIsr] = initialLeaderIsrAndControllerEpochs.mapValues(_.leaderAndIsr)
-  private def leaderIsrs(state: Int, zkVersion: Int): Map[TopicPartition, LeaderAndIsr] =
+  private def leaderIsrAndControllerEpochs(state: Int, zkVersion: Int) = Map(
+    topicPartition10 -> LeaderIsrAndControllerEpoch(
+      LeaderAndIsr(leader = 1, leaderEpoch = state, isr = List(2 + state, 3 + state), zkVersion = zkVersion),
+      controllerEpoch = 4),
+    topicPartition11 -> LeaderIsrAndControllerEpoch(
+      LeaderAndIsr(leader = 0, leaderEpoch = state + 1, isr = List(1 + state, 2 + state), zkVersion = zkVersion),
+      controllerEpoch = 4))
+
+  private def initialLeaderIsrAndControllerEpochs = leaderIsrAndControllerEpochs(0, 0)
+  private def updatedLeaderIsrAndControllerEpochs(state: Int) = leaderIsrAndControllerEpochs(state, state - 1)
+
+  private def initialLeaderIsrs = initialLeaderIsrAndControllerEpochs.mapValues(_.leaderAndIsr)
+  private def updatedLeaderIsrs(state: Int) = updatedLeaderIsrAndControllerEpochs(state).mapValues(_.leaderAndIsr)
+  private def leaderIsrs(state: Int, zkVersion: Int) =
     leaderIsrAndControllerEpochs(state, zkVersion).mapValues(_.leaderAndIsr)
 
   private def checkUpdateLeaderAndIsrResult(
@@ -723,7 +720,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
   }
 
   @Test
-  def testUpdateLeaderAndIsr(): Unit = {
+  def testUpdateLeaderAndIsr() {
     zkClient.createRecursive(TopicZNode.path(topic1))
 
     // Non-existing topicPartitions
@@ -741,14 +738,14 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
       leaderIsrs(state = 1, zkVersion = 1),
       mutable.ArrayBuffer.empty,
       Map.empty,
-      zkClient.updateLeaderAndIsr(leaderIsrs(state = 1, zkVersion = 0),controllerEpoch = 4))
+      zkClient.updateLeaderAndIsr(updatedLeaderIsrs(1),controllerEpoch = 4))
 
     // Try to update with wrong ZK version
     checkUpdateLeaderAndIsrResult(
       Map.empty,
       ArrayBuffer(topicPartition10, topicPartition11),
       Map.empty,
-      zkClient.updateLeaderAndIsr(leaderIsrs(state = 1, zkVersion = 0),controllerEpoch = 4))
+      zkClient.updateLeaderAndIsr(updatedLeaderIsrs(1),controllerEpoch = 4))
 
     // Trigger successful, to be retried and failed partitions in same call
     val mixedState = Map(
@@ -767,7 +764,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
   private def checkGetDataResponse(
       leaderIsrAndControllerEpochs: Map[TopicPartition,LeaderIsrAndControllerEpoch],
       topicPartition: TopicPartition,
-      response: GetDataResponse): Unit = {
+      response: GetDataResponse) = {
     val zkVersion = leaderIsrAndControllerEpochs(topicPartition).leaderAndIsr.zkVersion
     assertEquals(Code.OK, response.resultCode)
     assertEquals(TopicPartitionStateZNode.path(topicPartition), response.path)
@@ -777,11 +774,10 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
       TopicPartitionStateZNode.decode(response.data, statWithVersion(zkVersion)))
   }
 
-  private def eraseMetadata(response: CreateResponse): CreateResponse =
-    response.copy(metadata = ResponseMetadata(0, 0))
+  private def eraseMetadata(response: CreateResponse) = response.copy(metadata = ResponseMetadata(0, 0))
 
   @Test
-  def testGetTopicsAndPartitions(): Unit = {
+  def testGetTopicsAndPartitions() {
     assertTrue(zkClient.getAllTopicsInCluster.isEmpty)
     assertTrue(zkClient.getAllPartitions.isEmpty)
 
@@ -796,7 +792,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
   }
 
   @Test
-  def testCreateAndGetTopicPartitionStatesRaw(): Unit = {
+  def testCreateAndGetTopicPartitionStatesRaw() {
     zkClient.createRecursive(TopicZNode.path(topic1))
 
     assertEquals(
@@ -823,7 +819,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
   }
 
   @Test
-  def testSetTopicPartitionStatesRaw(): Unit = {
+  def testSetTopicPartitionStatesRaw() {
 
     def expectedSetDataResponses(topicPartitions: TopicPartition*)(resultCode: Code, stat: Stat) =
       topicPartitions.map { topicPartition =>
@@ -859,20 +855,21 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
   }
 
   @Test
-  def testReassignPartitionsInProgress(): Unit = {
+  def testReassignPartitionsInProgress() {
     assertFalse(zkClient.reassignPartitionsInProgress)
     zkClient.createRecursive(ReassignPartitionsZNode.path)
     assertTrue(zkClient.reassignPartitionsInProgress)
   }
 
   @Test
-  def testGetTopicPartitionStates(): Unit = {
+  def testGetTopicPartitionStates() {
     assertEquals(None, zkClient.getTopicPartitionState(topicPartition10))
     assertEquals(None, zkClient.getLeaderForPartition(topicPartition10))
 
     zkClient.createRecursive(TopicZNode.path(topic1))
 
-    zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs)
+
+    zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs).map(eraseMetadata).toList
     assertEquals(
       initialLeaderIsrAndControllerEpochs,
       zkClient.getTopicPartitionStates(Seq(topicPartition10, topicPartition11))
@@ -897,13 +894,13 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
 
   }
 
-  private def eraseMetadataAndStat(response: SetDataResponse): SetDataResponse = {
+  private def eraseMetadataAndStat(response: SetDataResponse) = {
     val stat = if (response.stat != null) statWithVersion(response.stat.getVersion) else null
     response.copy(metadata = ResponseMetadata(0, 0), stat = stat)
   }
 
   @Test
-  def testControllerEpochMethods(): Unit = {
+  def testControllerEpochMethods() {
     assertEquals(None, zkClient.getControllerEpoch)
 
     assertEquals("Setting non existing nodes should return NONODE results",
@@ -930,7 +927,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
   }
 
   @Test
-  def testControllerManagementMethods(): Unit = {
+  def testControllerManagementMethods() {
     // No controller
     assertEquals(None, zkClient.getControllerId)
     // Create controller

-- 
To stop receiving notification emails like this one, please contact
junrao@apache.org.

[kafka] 03/09: Revert "Minor fixes requested by reviewer"

Posted by ju...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit ae52e1028e0ec4a13e10fba71c756e5956a43e69
Author: Jun Rao <ju...@gmail.com>
AuthorDate: Thu Mar 1 18:02:07 2018 -0800

    Revert "Minor fixes requested by reviewer"
    
    This reverts commit 1ae4c1d6095a86266ba6d008711570df0eaf5682.
---
 core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
index d6826a6..5cbb76e 100644
--- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
@@ -1039,4 +1039,4 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
     assertTrue(zkClient.deleteDelegationToken(tokenId))
     assertEquals(None, zkClient.getDelegationTokenInfo(tokenId))
   }
-}
+}
\ No newline at end of file

-- 
To stop receiving notification emails like this one, please contact
junrao@apache.org.

[kafka] 08/09: Revert "Fix in deleteLogDirEventNotifications - use correct path for deleted children"

Posted by ju...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit f6b39067cff96a046bedee17e49d0c476138dd87
Author: Jun Rao <ju...@gmail.com>
AuthorDate: Thu Mar 1 18:02:07 2018 -0800

    Revert "Fix in deleteLogDirEventNotifications - use correct path for deleted children"
    
    This reverts commit af598af20828149bdd45336377e70d26360452fe.
---
 core/src/main/scala/kafka/zk/KafkaZkClient.scala | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index 851c686..145e294 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -426,7 +426,7 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean
   def deleteLogDirEventNotifications(): Unit = {
     val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(LogDirEventNotificationZNode.path))
     if (getChildrenResponse.resultCode == Code.OK) {
-      deleteLogDirEventNotifications(getChildrenResponse.children.map(LogDirEventNotificationSequenceZNode.sequenceNumber))
+      deleteLogDirEventNotifications(getChildrenResponse.children)
     } else if (getChildrenResponse.resultCode != Code.NONODE) {
       getChildrenResponse.maybeThrow
     }

-- 
To stop receiving notification emails like this one, please contact
junrao@apache.org.

[kafka] 01/09: Revert "removed method updatedLeaderIsrAndControllerEpochs"

Posted by ju...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 720e22677b94e3607f002577f091c23714bcd515
Author: Jun Rao <ju...@gmail.com>
AuthorDate: Thu Mar 1 18:02:07 2018 -0800

    Revert "removed method updatedLeaderIsrAndControllerEpochs"
    
    This reverts commit 8100fd92afd6c4615bf7a6998a8a8eeb0d628d4e.
---
 core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala | 8 +++++---
 1 file changed, 5 insertions(+), 3 deletions(-)

diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
index e44c2c9..9590ecd 100644
--- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
@@ -700,6 +700,8 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
 
   val initialLeaderIsrAndControllerEpochs: Map[TopicPartition, LeaderIsrAndControllerEpoch] =
     leaderIsrAndControllerEpochs(0, 0)
+  private def updatedLeaderIsrAndControllerEpochs(state: Int): Map[TopicPartition, LeaderIsrAndControllerEpoch] =
+    leaderIsrAndControllerEpochs(state, state - 1)
 
   val initialLeaderIsrs: Map[TopicPartition, LeaderAndIsr] = initialLeaderIsrAndControllerEpochs.mapValues(_.leaderAndIsr)
   private def leaderIsrs(state: Int, zkVersion: Int): Map[TopicPartition, LeaderAndIsr] =
@@ -841,18 +843,18 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
 
     assertEquals(
       expectedSetDataResponses(topicPartition10, topicPartition11)(Code.OK, statWithVersion(1)),
-      zkClient.setTopicPartitionStatesRaw(leaderIsrAndControllerEpochs(state = 1, zkVersion = 0)).map {
+      zkClient.setTopicPartitionStatesRaw(updatedLeaderIsrAndControllerEpochs(1)).map {
         eraseMetadataAndStat}.toList)
 
 
     val getResponses = zkClient.getTopicPartitionStatesRaw(topicPartitions10_11)
     assertEquals(2, getResponses.size)
-    topicPartitions10_11.zip(getResponses) foreach {case (tp, r) => checkGetDataResponse(leaderIsrAndControllerEpochs(state = 1, zkVersion = 0), tp, r)}
+    topicPartitions10_11.zip(getResponses) foreach {case (tp, r) => checkGetDataResponse(updatedLeaderIsrAndControllerEpochs(1), tp, r)}
 
     // Other ZK client can also write the state of a partition
     assertEquals(
       expectedSetDataResponses(topicPartition10, topicPartition11)(Code.OK, statWithVersion(2)),
-      otherZkClient.setTopicPartitionStatesRaw(leaderIsrAndControllerEpochs(state = 2, zkVersion = 1)).map {
+      otherZkClient.setTopicPartitionStatesRaw(updatedLeaderIsrAndControllerEpochs(2)).map {
         eraseMetadataAndStat}.toList)
   }
 

-- 
To stop receiving notification emails like this one, please contact
junrao@apache.org.

[kafka] 09/09: Revert "Fix in updateBrokerInfoInZk, exception is thrown if response was not OK."

Posted by ju...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 57c1a687eb4a9131ddbaddd05ea9e1da0d12450d
Author: Jun Rao <ju...@gmail.com>
AuthorDate: Thu Mar 1 18:02:07 2018 -0800

    Revert "Fix in updateBrokerInfoInZk, exception is thrown if response was not OK."
    
    This reverts commit 010310388725d6393a73e12c02dff4bb85cf2518.
---
 core/src/main/scala/kafka/zk/KafkaZkClient.scala | 4 +---
 1 file changed, 1 insertion(+), 3 deletions(-)

diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index 145e294..6545fde 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -88,9 +88,7 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean
   def updateBrokerInfoInZk(brokerInfo: BrokerInfo): Unit = {
     val brokerIdPath = brokerInfo.path
     val setDataRequest = SetDataRequest(brokerIdPath, brokerInfo.toJsonBytes, ZkVersion.NoVersion)
-    val response = retryRequestUntilConnected(setDataRequest)
-     if (response.resultCode != Code.OK)
-       throw KeeperException.create(response.resultCode)
+    retryRequestUntilConnected(setDataRequest)
     info("Updated broker %d at path %s with addresses: %s".format(brokerInfo.broker.id, brokerIdPath, brokerInfo.broker.endPoints))
   }
 

-- 
To stop receiving notification emails like this one, please contact
junrao@apache.org.

[kafka] 05/09: Revert "Undo unintentional whitespace change in ZooKeeperTestHarness"

Posted by ju...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 0724179a34114368e81eda9ff11b3ec883c760c5
Author: Jun Rao <ju...@gmail.com>
AuthorDate: Thu Mar 1 18:02:07 2018 -0800

    Revert "Undo unintentional whitespace change in ZooKeeperTestHarness"
    
    This reverts commit 155ac5903aec0f682e19a0f1f17c1091f60c6da5.
---
 core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
index a122297..af2d53a 100755
--- a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
@@ -51,7 +51,7 @@ abstract class ZooKeeperTestHarness extends JUnitSuite with Logging {
 
   def zkPort: Int = zookeeper.port
   def zkConnect: String = s"127.0.0.1:$zkPort"
-  
+
   @Before
   def setUp() {
     zookeeper = new EmbeddedZookeeper()

-- 
To stop receiving notification emails like this one, please contact
junrao@apache.org.

[kafka] 06/09: Revert "Use move otherZkClient to KafkaZkClientTest"

Posted by ju...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 6f24af3cd148af58be9930b2bca7bae477110fbb
Author: Jun Rao <ju...@gmail.com>
AuthorDate: Thu Mar 1 18:02:07 2018 -0800

    Revert "Use move otherZkClient to KafkaZkClientTest"
    
    This reverts commit 54a32205b0fa7545d450104bc28aacf153b70cf6.
---
 .../scala/unit/kafka/zk/KafkaZkClientTest.scala    | 22 ++--------------------
 .../scala/unit/kafka/zk/ZooKeeperTestHarness.scala | 20 ++++++++++++++------
 2 files changed, 16 insertions(+), 26 deletions(-)

diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
index 28dbb73..9329430 100644
--- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
@@ -19,7 +19,6 @@ package kafka.zk
 import java.util.{Collections, Properties, UUID}
 import java.nio.charset.StandardCharsets.UTF_8
 import java.util.concurrent.{CountDownLatch, TimeUnit}
-import javax.security.auth.login.Configuration
 
 import kafka.api.{ApiVersion, LeaderAndIsr}
 import kafka.cluster.{Broker, EndPoint}
@@ -31,10 +30,10 @@ import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
 import org.apache.kafka.common.security.token.delegation.TokenInformation
-import org.apache.kafka.common.utils.{SecurityUtils, Time}
+import org.apache.kafka.common.utils.SecurityUtils
 import org.apache.zookeeper.KeeperException.{Code, NoNodeException, NodeExistsException}
 import org.junit.Assert._
-import org.junit.{After, Before, Test}
+import org.junit.Test
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 import scala.collection.{Seq, mutable}
@@ -43,7 +42,6 @@ import scala.util.Random
 import kafka.controller.LeaderIsrAndControllerEpoch
 import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult
 import kafka.zookeeper._
-import org.apache.kafka.common.security.JaasUtils
 import org.apache.zookeeper.data.Stat
 
 class KafkaZkClientTest extends ZooKeeperTestHarness {
@@ -57,22 +55,6 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
   val topicPartition20 = new TopicPartition(topic2, 0)
   val topicPartitions10_11 = Seq(topicPartition10, topicPartition11)
 
-  var otherZkClient: KafkaZkClient = null
-
-  @Before
-  override def setUp() {
-    super.setUp()
-    otherZkClient = KafkaZkClient(zkConnect, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled), zkSessionTimeout,
-      zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM)
-  }
-
-  @After
-  override def tearDown() {
-    if (otherZkClient != null)
-      otherZkClient.close()
-    super.tearDown()
-  }
-
   private val topicPartition = new TopicPartition("topic", 0)
 
   @Test
diff --git a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
index af2d53a..f9cb8e3 100755
--- a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
@@ -19,16 +19,16 @@ package kafka.zk
 
 import javax.security.auth.login.Configuration
 
-import kafka.utils.{CoreUtils, Logging, TestUtils}
+import kafka.utils.{CoreUtils, Logging, TestUtils, ZkUtils}
 import org.junit.{After, AfterClass, Before, BeforeClass}
 import org.junit.Assert._
 import org.scalatest.junit.JUnitSuite
 import org.apache.kafka.common.security.JaasUtils
 import org.apache.kafka.test.IntegrationTest
 import org.junit.experimental.categories.Category
-
 import scala.collection.Set
 import scala.collection.JavaConverters._
+
 import org.apache.kafka.clients.producer.KafkaProducer
 import org.apache.kafka.clients.consumer.internals.AbstractCoordinator
 import kafka.controller.ControllerEventManager
@@ -45,25 +45,33 @@ abstract class ZooKeeperTestHarness extends JUnitSuite with Logging {
   protected val zkAclsEnabled: Option[Boolean] = None
 
   var zkClient: KafkaZkClient = null
+  var otherZkClient: KafkaZkClient = null
   var adminZkClient: AdminZkClient = null
 
   var zookeeper: EmbeddedZookeeper = null
 
   def zkPort: Int = zookeeper.port
   def zkConnect: String = s"127.0.0.1:$zkPort"
-
+  
   @Before
   def setUp() {
     zookeeper = new EmbeddedZookeeper()
-    zkClient = KafkaZkClient(zkConnect, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled), zkSessionTimeout,
-      zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM)
+    zkClient = createZkClient
+    otherZkClient = createZkClient
     adminZkClient = new AdminZkClient(zkClient)
   }
 
+  protected def createZkClient = {
+    KafkaZkClient(zkConnect, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled), zkSessionTimeout,
+      zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM)
+  }
+
   @After
   def tearDown() {
     if (zkClient != null)
-     zkClient.close()
+      zkClient.close()
+    if (otherZkClient != null)
+      otherZkClient.close()
     if (zookeeper != null)
       CoreUtils.swallow(zookeeper.shutdown(), this)
     Configuration.setConfiguration(null)

-- 
To stop receiving notification emails like this one, please contact
junrao@apache.org.

[kafka] 04/09: Revert "Fixes requested by reviewer"

Posted by ju...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 4110ad5208621a4bc0d184e95c909662b2819204
Author: Jun Rao <ju...@gmail.com>
AuthorDate: Thu Mar 1 18:02:07 2018 -0800

    Revert "Fixes requested by reviewer"
    
    This reverts commit ae51a15026e0dc10ef2df4bcafc85807dc8b4eb6.
---
 core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
index 5cbb76e..28dbb73 100644
--- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
@@ -19,6 +19,7 @@ package kafka.zk
 import java.util.{Collections, Properties, UUID}
 import java.nio.charset.StandardCharsets.UTF_8
 import java.util.concurrent.{CountDownLatch, TimeUnit}
+import javax.security.auth.login.Configuration
 
 import kafka.api.{ApiVersion, LeaderAndIsr}
 import kafka.cluster.{Broker, EndPoint}
@@ -56,7 +57,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
   val topicPartition20 = new TopicPartition(topic2, 0)
   val topicPartitions10_11 = Seq(topicPartition10, topicPartition11)
 
-  var otherZkClient: KafkaZkClient = _
+  var otherZkClient: KafkaZkClient = null
 
   @Before
   override def setUp() {

-- 
To stop receiving notification emails like this one, please contact
junrao@apache.org.