You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2018/03/02 02:03:33 UTC

[kafka] 02/09: Revert "Changes requeted by reviewer"

This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit aa3453c157b43429a7e01c209aefc4e7979d1f2e
Author: Jun Rao <ju...@gmail.com>
AuthorDate: Thu Mar 1 18:02:07 2018 -0800

    Revert "Changes requeted by reviewer"
    
    This reverts commit cacd377933ae0e7da0ed08dd33ab69fabde073c5.
---
 core/src/main/scala/kafka/zk/KafkaZkClient.scala   |   3 +-
 .../scala/unit/kafka/zk/KafkaZkClientTest.scala    | 103 ++++++++++-----------
 2 files changed, 52 insertions(+), 54 deletions(-)

diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index d61b281..851c686 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -89,7 +89,8 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean
     val brokerIdPath = brokerInfo.path
     val setDataRequest = SetDataRequest(brokerIdPath, brokerInfo.toJsonBytes, ZkVersion.NoVersion)
     val response = retryRequestUntilConnected(setDataRequest)
-    response.maybeThrow()
+     if (response.resultCode != Code.OK)
+       throw KeeperException.create(response.resultCode)
     info("Updated broker %d at path %s with addresses: %s".format(brokerInfo.broker.id, brokerIdPath, brokerInfo.broker.endPoints))
   }
 
diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
index 9590ecd..d6826a6 100644
--- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
@@ -59,14 +59,14 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
   var otherZkClient: KafkaZkClient = _
 
   @Before
-  override def setUp(): Unit = {
+  override def setUp() {
     super.setUp()
     otherZkClient = KafkaZkClient(zkConnect, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled), zkSessionTimeout,
       zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM)
   }
 
   @After
-  override def tearDown(): Unit = {
+  override def tearDown() {
     if (otherZkClient != null)
       otherZkClient.close()
     super.tearDown()
@@ -131,8 +131,9 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
     assertTrue(zkClient.getPartitionsForTopics(Set(topic1)).isEmpty)
     assertTrue(zkClient.getReplicasForPartition(new TopicPartition(topic1, 2)).isEmpty)
 
+    val topicPartition = new TopicPartition(topic1, 0)
     val assignment = Map(
-      new TopicPartition(topic1, 0) -> Seq(0, 1),
+      topicPartition -> Seq(0, 1),
       new TopicPartition(topic1, 1) -> Seq(0, 1),
       new TopicPartition(topic1, 2) -> Seq(1, 2, 3)
     )
@@ -310,10 +311,8 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
 
   @Test
   def testLogDirGetters(): Unit = {
-    assertEquals("getAllLogDirEventNotifications failed for non existing parent ZK node",
-      Seq.empty, zkClient.getAllLogDirEventNotifications)
-    assertEquals("getBrokerIdsFromLogDirEvents failed for non existing parent ZK node",
-      Seq.empty, zkClient.getBrokerIdsFromLogDirEvents(Seq("0000000000")))
+    assertEquals("Failed for non existing parent ZK node", Seq.empty, zkClient.getAllLogDirEventNotifications)
+    assertEquals("Failed for non existing parent ZK node", Seq.empty, zkClient.getBrokerIdsFromLogDirEvents(Seq("0000000000")))
 
     zkClient.createRecursive("/log_dir_event_notification")
 
@@ -496,7 +495,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
   }
 
   @Test
-  def testDeletePath(): Unit = {
+  def testDeletePath() {
     val path = "/a/b/c"
     zkClient.createRecursive(path)
     zkClient.deletePath(path)
@@ -504,7 +503,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
   }
 
   @Test
-  def testDeleteTopicZNode(): Unit = {
+  def testDeleteTopicZNode(): Unit ={
     zkClient.deleteTopicZNode(topic1)
     zkClient.createRecursive(TopicZNode.path(topic1))
     zkClient.deleteTopicZNode(topic1)
@@ -526,20 +525,20 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
     assertTrue(zkClient.getTopicDeletions.isEmpty)
   }
 
-  private def assertPathExistenceAndData(expectedPath: String, data: String): Unit = {
+  private def assertPathExistenceAndData(expectedPath: String, data: String){
     assertTrue(zkClient.pathExists(expectedPath))
     assertEquals(Some(data), dataAsString(expectedPath))
    }
 
   @Test
-  def testCreateTokenChangeNotification(): Unit = {
+  def testCreateTokenChangeNotification() {
     intercept[NoNodeException] {
       zkClient.createTokenChangeNotification("delegationToken")
     }
     zkClient.createDelegationTokenPaths()
 
     zkClient.createTokenChangeNotification("delegationToken")
-    assertPathExistenceAndData("/delegation_token/token_changes/token_change_0000000000", "delegationToken")
+    assertPathExistenceAndData(s"/delegation_token/token_changes/token_change_0000000000", "delegationToken")
   }
 
   @Test
@@ -561,7 +560,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
   }
 
   @Test
-  def testCreateConfigChangeNotification(): Unit = {
+  def testCreateConfigChangeNotification() {
     intercept[NoNodeException] {
       zkClient.createConfigChangeNotification(ConfigEntityZNode.path(ConfigType.Topic, topic1))
     }
@@ -570,11 +569,11 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
     zkClient.createConfigChangeNotification(ConfigEntityZNode.path(ConfigType.Topic, topic1))
 
     assertPathExistenceAndData(
-      "/config/changes/config_change_0000000000",
+      s"/config/changes/config_change_0000000000",
       """{"version":2,"entity_path":"/config/topics/topic1"}""")
   }
 
-  private def createLogProps(bytesProp: Int): Properties = {
+  private def createLogProps(bytesProp: Int) = {
     val logProps = new Properties()
     logProps.put(LogConfig.SegmentBytesProp, bytesProp.toString)
     logProps.put(LogConfig.SegmentIndexBytesProp, bytesProp.toString)
@@ -585,7 +584,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
   private val logProps = createLogProps(1024)
 
   @Test
-  def testGetLogConfigs(): Unit = {
+  def testGetLogConfigs() {
     val emptyConfig = LogConfig(Collections.emptyMap())
     assertEquals("Non existent config, no defaults",
       (Map(topic1 -> emptyConfig), Map.empty),
@@ -613,13 +612,13 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
         Map[String, AnyRef](LogConfig.SegmentJitterMsProp -> "100", LogConfig.SegmentBytesProp -> "128").asJava))
   }
 
-  private def createBrokerInfo(id: Int, host: String, port: Int, securityProtocol: SecurityProtocol,
-                               rack: Option[String] = None): BrokerInfo =
+  private def createBrokerInfo(id: Int, host: String, port: Int,
+                               securityProtocol: SecurityProtocol, rack: Option[String] = None) =
     BrokerInfo(Broker(id, Seq(new EndPoint(host, port, ListenerName.forSecurityProtocol
     (securityProtocol), securityProtocol)), rack = rack), ApiVersion.latestVersion, jmxPort = port + 10)
 
   @Test
-  def testRegisterBrokerInfo(): Unit = {
+  def testRegisterBrokerInfo() {
     zkClient.createTopLevelPaths()
 
     val brokerInfo = createBrokerInfo(1, "test.host", 9999, SecurityProtocol.PLAINTEXT)
@@ -641,7 +640,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
   }
 
   @Test
-  def testGetBrokerMethods(): Unit = {
+  def testGetBrokerMethods() {
     zkClient.createTopLevelPaths()
 
     assertEquals(Seq.empty,zkClient.getAllBrokersInCluster)
@@ -663,7 +662,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
   }
 
   @Test
-  def testUpdateBrokerInfo(): Unit = {
+  def testUpdateBrokerInfo() {
     zkClient.createTopLevelPaths()
 
     // Updating info of a broker not existing in ZK fails
@@ -683,28 +682,26 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
     assertEquals(Some(originalBrokerInfo.broker), otherZkClient.getBroker(1))
   }
 
-  private def statWithVersion(version: Int): Stat = {
+  private def statWithVersion(version: Int) = {
     val stat = new Stat(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)
     stat.setVersion(version)
     stat
   }
 
-  private def leaderIsrAndControllerEpochs(state: Int, zkVersion: Int): Map[TopicPartition, LeaderIsrAndControllerEpoch] =
-    Map(
-      topicPartition10 -> LeaderIsrAndControllerEpoch(
-        LeaderAndIsr(leader = 1, leaderEpoch = state, isr = List(2 + state, 3 + state), zkVersion = zkVersion),
-        controllerEpoch = 4),
-      topicPartition11 -> LeaderIsrAndControllerEpoch(
-        LeaderAndIsr(leader = 0, leaderEpoch = state + 1, isr = List(1 + state, 2 + state), zkVersion = zkVersion),
-        controllerEpoch = 4))
-
-  val initialLeaderIsrAndControllerEpochs: Map[TopicPartition, LeaderIsrAndControllerEpoch] =
-    leaderIsrAndControllerEpochs(0, 0)
-  private def updatedLeaderIsrAndControllerEpochs(state: Int): Map[TopicPartition, LeaderIsrAndControllerEpoch] =
-    leaderIsrAndControllerEpochs(state, state - 1)
-
-  val initialLeaderIsrs: Map[TopicPartition, LeaderAndIsr] = initialLeaderIsrAndControllerEpochs.mapValues(_.leaderAndIsr)
-  private def leaderIsrs(state: Int, zkVersion: Int): Map[TopicPartition, LeaderAndIsr] =
+  private def leaderIsrAndControllerEpochs(state: Int, zkVersion: Int) = Map(
+    topicPartition10 -> LeaderIsrAndControllerEpoch(
+      LeaderAndIsr(leader = 1, leaderEpoch = state, isr = List(2 + state, 3 + state), zkVersion = zkVersion),
+      controllerEpoch = 4),
+    topicPartition11 -> LeaderIsrAndControllerEpoch(
+      LeaderAndIsr(leader = 0, leaderEpoch = state + 1, isr = List(1 + state, 2 + state), zkVersion = zkVersion),
+      controllerEpoch = 4))
+
+  private def initialLeaderIsrAndControllerEpochs = leaderIsrAndControllerEpochs(0, 0)
+  private def updatedLeaderIsrAndControllerEpochs(state: Int) = leaderIsrAndControllerEpochs(state, state - 1)
+
+  private def initialLeaderIsrs = initialLeaderIsrAndControllerEpochs.mapValues(_.leaderAndIsr)
+  private def updatedLeaderIsrs(state: Int) = updatedLeaderIsrAndControllerEpochs(state).mapValues(_.leaderAndIsr)
+  private def leaderIsrs(state: Int, zkVersion: Int) =
     leaderIsrAndControllerEpochs(state, zkVersion).mapValues(_.leaderAndIsr)
 
   private def checkUpdateLeaderAndIsrResult(
@@ -723,7 +720,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
   }
 
   @Test
-  def testUpdateLeaderAndIsr(): Unit = {
+  def testUpdateLeaderAndIsr() {
     zkClient.createRecursive(TopicZNode.path(topic1))
 
     // Non-existing topicPartitions
@@ -741,14 +738,14 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
       leaderIsrs(state = 1, zkVersion = 1),
       mutable.ArrayBuffer.empty,
       Map.empty,
-      zkClient.updateLeaderAndIsr(leaderIsrs(state = 1, zkVersion = 0),controllerEpoch = 4))
+      zkClient.updateLeaderAndIsr(updatedLeaderIsrs(1),controllerEpoch = 4))
 
     // Try to update with wrong ZK version
     checkUpdateLeaderAndIsrResult(
       Map.empty,
       ArrayBuffer(topicPartition10, topicPartition11),
       Map.empty,
-      zkClient.updateLeaderAndIsr(leaderIsrs(state = 1, zkVersion = 0),controllerEpoch = 4))
+      zkClient.updateLeaderAndIsr(updatedLeaderIsrs(1),controllerEpoch = 4))
 
     // Trigger successful, to be retried and failed partitions in same call
     val mixedState = Map(
@@ -767,7 +764,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
   private def checkGetDataResponse(
       leaderIsrAndControllerEpochs: Map[TopicPartition,LeaderIsrAndControllerEpoch],
       topicPartition: TopicPartition,
-      response: GetDataResponse): Unit = {
+      response: GetDataResponse) = {
     val zkVersion = leaderIsrAndControllerEpochs(topicPartition).leaderAndIsr.zkVersion
     assertEquals(Code.OK, response.resultCode)
     assertEquals(TopicPartitionStateZNode.path(topicPartition), response.path)
@@ -777,11 +774,10 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
       TopicPartitionStateZNode.decode(response.data, statWithVersion(zkVersion)))
   }
 
-  private def eraseMetadata(response: CreateResponse): CreateResponse =
-    response.copy(metadata = ResponseMetadata(0, 0))
+  private def eraseMetadata(response: CreateResponse) = response.copy(metadata = ResponseMetadata(0, 0))
 
   @Test
-  def testGetTopicsAndPartitions(): Unit = {
+  def testGetTopicsAndPartitions() {
     assertTrue(zkClient.getAllTopicsInCluster.isEmpty)
     assertTrue(zkClient.getAllPartitions.isEmpty)
 
@@ -796,7 +792,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
   }
 
   @Test
-  def testCreateAndGetTopicPartitionStatesRaw(): Unit = {
+  def testCreateAndGetTopicPartitionStatesRaw() {
     zkClient.createRecursive(TopicZNode.path(topic1))
 
     assertEquals(
@@ -823,7 +819,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
   }
 
   @Test
-  def testSetTopicPartitionStatesRaw(): Unit = {
+  def testSetTopicPartitionStatesRaw() {
 
     def expectedSetDataResponses(topicPartitions: TopicPartition*)(resultCode: Code, stat: Stat) =
       topicPartitions.map { topicPartition =>
@@ -859,20 +855,21 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
   }
 
   @Test
-  def testReassignPartitionsInProgress(): Unit = {
+  def testReassignPartitionsInProgress() {
     assertFalse(zkClient.reassignPartitionsInProgress)
     zkClient.createRecursive(ReassignPartitionsZNode.path)
     assertTrue(zkClient.reassignPartitionsInProgress)
   }
 
   @Test
-  def testGetTopicPartitionStates(): Unit = {
+  def testGetTopicPartitionStates() {
     assertEquals(None, zkClient.getTopicPartitionState(topicPartition10))
     assertEquals(None, zkClient.getLeaderForPartition(topicPartition10))
 
     zkClient.createRecursive(TopicZNode.path(topic1))
 
-    zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs)
+
+    zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs).map(eraseMetadata).toList
     assertEquals(
       initialLeaderIsrAndControllerEpochs,
       zkClient.getTopicPartitionStates(Seq(topicPartition10, topicPartition11))
@@ -897,13 +894,13 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
 
   }
 
-  private def eraseMetadataAndStat(response: SetDataResponse): SetDataResponse = {
+  private def eraseMetadataAndStat(response: SetDataResponse) = {
     val stat = if (response.stat != null) statWithVersion(response.stat.getVersion) else null
     response.copy(metadata = ResponseMetadata(0, 0), stat = stat)
   }
 
   @Test
-  def testControllerEpochMethods(): Unit = {
+  def testControllerEpochMethods() {
     assertEquals(None, zkClient.getControllerEpoch)
 
     assertEquals("Setting non existing nodes should return NONODE results",
@@ -930,7 +927,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
   }
 
   @Test
-  def testControllerManagementMethods(): Unit = {
+  def testControllerManagementMethods() {
     // No controller
     assertEquals(None, zkClient.getControllerId)
     // Create controller

-- 
To stop receiving notification emails like this one, please contact
junrao@apache.org.