You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2018/03/02 01:48:40 UTC
[kafka] 08/09: Changes requeted by reviewer
This is an automated email from the ASF dual-hosted git repository.
junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
commit cacd377933ae0e7da0ed08dd33ab69fabde073c5
Author: Sandor Murakozi <sm...@gmail.com>
AuthorDate: Wed Feb 28 20:57:57 2018 -0800
Changes requeted by reviewer
---
core/src/main/scala/kafka/zk/KafkaZkClient.scala | 3 +-
.../scala/unit/kafka/zk/KafkaZkClientTest.scala | 103 +++++++++++----------
2 files changed, 54 insertions(+), 52 deletions(-)
diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index 851c686..d61b281 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -89,8 +89,7 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean
val brokerIdPath = brokerInfo.path
val setDataRequest = SetDataRequest(brokerIdPath, brokerInfo.toJsonBytes, ZkVersion.NoVersion)
val response = retryRequestUntilConnected(setDataRequest)
- if (response.resultCode != Code.OK)
- throw KeeperException.create(response.resultCode)
+ response.maybeThrow()
info("Updated broker %d at path %s with addresses: %s".format(brokerInfo.broker.id, brokerIdPath, brokerInfo.broker.endPoints))
}
diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
index d6826a6..9590ecd 100644
--- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
@@ -59,14 +59,14 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
var otherZkClient: KafkaZkClient = _
@Before
- override def setUp() {
+ override def setUp(): Unit = {
super.setUp()
otherZkClient = KafkaZkClient(zkConnect, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled), zkSessionTimeout,
zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM)
}
@After
- override def tearDown() {
+ override def tearDown(): Unit = {
if (otherZkClient != null)
otherZkClient.close()
super.tearDown()
@@ -131,9 +131,8 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
assertTrue(zkClient.getPartitionsForTopics(Set(topic1)).isEmpty)
assertTrue(zkClient.getReplicasForPartition(new TopicPartition(topic1, 2)).isEmpty)
- val topicPartition = new TopicPartition(topic1, 0)
val assignment = Map(
- topicPartition -> Seq(0, 1),
+ new TopicPartition(topic1, 0) -> Seq(0, 1),
new TopicPartition(topic1, 1) -> Seq(0, 1),
new TopicPartition(topic1, 2) -> Seq(1, 2, 3)
)
@@ -311,8 +310,10 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
@Test
def testLogDirGetters(): Unit = {
- assertEquals("Failed for non existing parent ZK node", Seq.empty, zkClient.getAllLogDirEventNotifications)
- assertEquals("Failed for non existing parent ZK node", Seq.empty, zkClient.getBrokerIdsFromLogDirEvents(Seq("0000000000")))
+ assertEquals("getAllLogDirEventNotifications failed for non existing parent ZK node",
+ Seq.empty, zkClient.getAllLogDirEventNotifications)
+ assertEquals("getBrokerIdsFromLogDirEvents failed for non existing parent ZK node",
+ Seq.empty, zkClient.getBrokerIdsFromLogDirEvents(Seq("0000000000")))
zkClient.createRecursive("/log_dir_event_notification")
@@ -495,7 +496,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
}
@Test
- def testDeletePath() {
+ def testDeletePath(): Unit = {
val path = "/a/b/c"
zkClient.createRecursive(path)
zkClient.deletePath(path)
@@ -503,7 +504,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
}
@Test
- def testDeleteTopicZNode(): Unit ={
+ def testDeleteTopicZNode(): Unit = {
zkClient.deleteTopicZNode(topic1)
zkClient.createRecursive(TopicZNode.path(topic1))
zkClient.deleteTopicZNode(topic1)
@@ -525,20 +526,20 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
assertTrue(zkClient.getTopicDeletions.isEmpty)
}
- private def assertPathExistenceAndData(expectedPath: String, data: String){
+ private def assertPathExistenceAndData(expectedPath: String, data: String): Unit = {
assertTrue(zkClient.pathExists(expectedPath))
assertEquals(Some(data), dataAsString(expectedPath))
}
@Test
- def testCreateTokenChangeNotification() {
+ def testCreateTokenChangeNotification(): Unit = {
intercept[NoNodeException] {
zkClient.createTokenChangeNotification("delegationToken")
}
zkClient.createDelegationTokenPaths()
zkClient.createTokenChangeNotification("delegationToken")
- assertPathExistenceAndData(s"/delegation_token/token_changes/token_change_0000000000", "delegationToken")
+ assertPathExistenceAndData("/delegation_token/token_changes/token_change_0000000000", "delegationToken")
}
@Test
@@ -560,7 +561,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
}
@Test
- def testCreateConfigChangeNotification() {
+ def testCreateConfigChangeNotification(): Unit = {
intercept[NoNodeException] {
zkClient.createConfigChangeNotification(ConfigEntityZNode.path(ConfigType.Topic, topic1))
}
@@ -569,11 +570,11 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
zkClient.createConfigChangeNotification(ConfigEntityZNode.path(ConfigType.Topic, topic1))
assertPathExistenceAndData(
- s"/config/changes/config_change_0000000000",
+ "/config/changes/config_change_0000000000",
"""{"version":2,"entity_path":"/config/topics/topic1"}""")
}
- private def createLogProps(bytesProp: Int) = {
+ private def createLogProps(bytesProp: Int): Properties = {
val logProps = new Properties()
logProps.put(LogConfig.SegmentBytesProp, bytesProp.toString)
logProps.put(LogConfig.SegmentIndexBytesProp, bytesProp.toString)
@@ -584,7 +585,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
private val logProps = createLogProps(1024)
@Test
- def testGetLogConfigs() {
+ def testGetLogConfigs(): Unit = {
val emptyConfig = LogConfig(Collections.emptyMap())
assertEquals("Non existent config, no defaults",
(Map(topic1 -> emptyConfig), Map.empty),
@@ -612,13 +613,13 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
Map[String, AnyRef](LogConfig.SegmentJitterMsProp -> "100", LogConfig.SegmentBytesProp -> "128").asJava))
}
- private def createBrokerInfo(id: Int, host: String, port: Int,
- securityProtocol: SecurityProtocol, rack: Option[String] = None) =
+ private def createBrokerInfo(id: Int, host: String, port: Int, securityProtocol: SecurityProtocol,
+ rack: Option[String] = None): BrokerInfo =
BrokerInfo(Broker(id, Seq(new EndPoint(host, port, ListenerName.forSecurityProtocol
(securityProtocol), securityProtocol)), rack = rack), ApiVersion.latestVersion, jmxPort = port + 10)
@Test
- def testRegisterBrokerInfo() {
+ def testRegisterBrokerInfo(): Unit = {
zkClient.createTopLevelPaths()
val brokerInfo = createBrokerInfo(1, "test.host", 9999, SecurityProtocol.PLAINTEXT)
@@ -640,7 +641,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
}
@Test
- def testGetBrokerMethods() {
+ def testGetBrokerMethods(): Unit = {
zkClient.createTopLevelPaths()
assertEquals(Seq.empty,zkClient.getAllBrokersInCluster)
@@ -662,7 +663,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
}
@Test
- def testUpdateBrokerInfo() {
+ def testUpdateBrokerInfo(): Unit = {
zkClient.createTopLevelPaths()
// Updating info of a broker not existing in ZK fails
@@ -682,26 +683,28 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
assertEquals(Some(originalBrokerInfo.broker), otherZkClient.getBroker(1))
}
- private def statWithVersion(version: Int) = {
+ private def statWithVersion(version: Int): Stat = {
val stat = new Stat(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)
stat.setVersion(version)
stat
}
- private def leaderIsrAndControllerEpochs(state: Int, zkVersion: Int) = Map(
- topicPartition10 -> LeaderIsrAndControllerEpoch(
- LeaderAndIsr(leader = 1, leaderEpoch = state, isr = List(2 + state, 3 + state), zkVersion = zkVersion),
- controllerEpoch = 4),
- topicPartition11 -> LeaderIsrAndControllerEpoch(
- LeaderAndIsr(leader = 0, leaderEpoch = state + 1, isr = List(1 + state, 2 + state), zkVersion = zkVersion),
- controllerEpoch = 4))
-
- private def initialLeaderIsrAndControllerEpochs = leaderIsrAndControllerEpochs(0, 0)
- private def updatedLeaderIsrAndControllerEpochs(state: Int) = leaderIsrAndControllerEpochs(state, state - 1)
-
- private def initialLeaderIsrs = initialLeaderIsrAndControllerEpochs.mapValues(_.leaderAndIsr)
- private def updatedLeaderIsrs(state: Int) = updatedLeaderIsrAndControllerEpochs(state).mapValues(_.leaderAndIsr)
- private def leaderIsrs(state: Int, zkVersion: Int) =
+ private def leaderIsrAndControllerEpochs(state: Int, zkVersion: Int): Map[TopicPartition, LeaderIsrAndControllerEpoch] =
+ Map(
+ topicPartition10 -> LeaderIsrAndControllerEpoch(
+ LeaderAndIsr(leader = 1, leaderEpoch = state, isr = List(2 + state, 3 + state), zkVersion = zkVersion),
+ controllerEpoch = 4),
+ topicPartition11 -> LeaderIsrAndControllerEpoch(
+ LeaderAndIsr(leader = 0, leaderEpoch = state + 1, isr = List(1 + state, 2 + state), zkVersion = zkVersion),
+ controllerEpoch = 4))
+
+ val initialLeaderIsrAndControllerEpochs: Map[TopicPartition, LeaderIsrAndControllerEpoch] =
+ leaderIsrAndControllerEpochs(0, 0)
+ private def updatedLeaderIsrAndControllerEpochs(state: Int): Map[TopicPartition, LeaderIsrAndControllerEpoch] =
+ leaderIsrAndControllerEpochs(state, state - 1)
+
+ val initialLeaderIsrs: Map[TopicPartition, LeaderAndIsr] = initialLeaderIsrAndControllerEpochs.mapValues(_.leaderAndIsr)
+ private def leaderIsrs(state: Int, zkVersion: Int): Map[TopicPartition, LeaderAndIsr] =
leaderIsrAndControllerEpochs(state, zkVersion).mapValues(_.leaderAndIsr)
private def checkUpdateLeaderAndIsrResult(
@@ -720,7 +723,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
}
@Test
- def testUpdateLeaderAndIsr() {
+ def testUpdateLeaderAndIsr(): Unit = {
zkClient.createRecursive(TopicZNode.path(topic1))
// Non-existing topicPartitions
@@ -738,14 +741,14 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
leaderIsrs(state = 1, zkVersion = 1),
mutable.ArrayBuffer.empty,
Map.empty,
- zkClient.updateLeaderAndIsr(updatedLeaderIsrs(1),controllerEpoch = 4))
+ zkClient.updateLeaderAndIsr(leaderIsrs(state = 1, zkVersion = 0),controllerEpoch = 4))
// Try to update with wrong ZK version
checkUpdateLeaderAndIsrResult(
Map.empty,
ArrayBuffer(topicPartition10, topicPartition11),
Map.empty,
- zkClient.updateLeaderAndIsr(updatedLeaderIsrs(1),controllerEpoch = 4))
+ zkClient.updateLeaderAndIsr(leaderIsrs(state = 1, zkVersion = 0),controllerEpoch = 4))
// Trigger successful, to be retried and failed partitions in same call
val mixedState = Map(
@@ -764,7 +767,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
private def checkGetDataResponse(
leaderIsrAndControllerEpochs: Map[TopicPartition,LeaderIsrAndControllerEpoch],
topicPartition: TopicPartition,
- response: GetDataResponse) = {
+ response: GetDataResponse): Unit = {
val zkVersion = leaderIsrAndControllerEpochs(topicPartition).leaderAndIsr.zkVersion
assertEquals(Code.OK, response.resultCode)
assertEquals(TopicPartitionStateZNode.path(topicPartition), response.path)
@@ -774,10 +777,11 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
TopicPartitionStateZNode.decode(response.data, statWithVersion(zkVersion)))
}
- private def eraseMetadata(response: CreateResponse) = response.copy(metadata = ResponseMetadata(0, 0))
+ private def eraseMetadata(response: CreateResponse): CreateResponse =
+ response.copy(metadata = ResponseMetadata(0, 0))
@Test
- def testGetTopicsAndPartitions() {
+ def testGetTopicsAndPartitions(): Unit = {
assertTrue(zkClient.getAllTopicsInCluster.isEmpty)
assertTrue(zkClient.getAllPartitions.isEmpty)
@@ -792,7 +796,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
}
@Test
- def testCreateAndGetTopicPartitionStatesRaw() {
+ def testCreateAndGetTopicPartitionStatesRaw(): Unit = {
zkClient.createRecursive(TopicZNode.path(topic1))
assertEquals(
@@ -819,7 +823,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
}
@Test
- def testSetTopicPartitionStatesRaw() {
+ def testSetTopicPartitionStatesRaw(): Unit = {
def expectedSetDataResponses(topicPartitions: TopicPartition*)(resultCode: Code, stat: Stat) =
topicPartitions.map { topicPartition =>
@@ -855,21 +859,20 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
}
@Test
- def testReassignPartitionsInProgress() {
+ def testReassignPartitionsInProgress(): Unit = {
assertFalse(zkClient.reassignPartitionsInProgress)
zkClient.createRecursive(ReassignPartitionsZNode.path)
assertTrue(zkClient.reassignPartitionsInProgress)
}
@Test
- def testGetTopicPartitionStates() {
+ def testGetTopicPartitionStates(): Unit = {
assertEquals(None, zkClient.getTopicPartitionState(topicPartition10))
assertEquals(None, zkClient.getLeaderForPartition(topicPartition10))
zkClient.createRecursive(TopicZNode.path(topic1))
-
- zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs).map(eraseMetadata).toList
+ zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs)
assertEquals(
initialLeaderIsrAndControllerEpochs,
zkClient.getTopicPartitionStates(Seq(topicPartition10, topicPartition11))
@@ -894,13 +897,13 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
}
- private def eraseMetadataAndStat(response: SetDataResponse) = {
+ private def eraseMetadataAndStat(response: SetDataResponse): SetDataResponse = {
val stat = if (response.stat != null) statWithVersion(response.stat.getVersion) else null
response.copy(metadata = ResponseMetadata(0, 0), stat = stat)
}
@Test
- def testControllerEpochMethods() {
+ def testControllerEpochMethods(): Unit = {
assertEquals(None, zkClient.getControllerEpoch)
assertEquals("Setting non existing nodes should return NONODE results",
@@ -927,7 +930,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
}
@Test
- def testControllerManagementMethods() {
+ def testControllerManagementMethods(): Unit = {
// No controller
assertEquals(None, zkClient.getControllerId)
// Create controller
--
To stop receiving notification emails like this one, please contact
junrao@apache.org.