You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2018/03/02 02:03:33 UTC
[kafka] 02/09: Revert "Changes requeted by reviewer"
This is an automated email from the ASF dual-hosted git repository.
junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
commit aa3453c157b43429a7e01c209aefc4e7979d1f2e
Author: Jun Rao <ju...@gmail.com>
AuthorDate: Thu Mar 1 18:02:07 2018 -0800
Revert "Changes requeted by reviewer"
This reverts commit cacd377933ae0e7da0ed08dd33ab69fabde073c5.
---
core/src/main/scala/kafka/zk/KafkaZkClient.scala | 3 +-
.../scala/unit/kafka/zk/KafkaZkClientTest.scala | 103 ++++++++++-----------
2 files changed, 52 insertions(+), 54 deletions(-)
diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index d61b281..851c686 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -89,7 +89,8 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean
val brokerIdPath = brokerInfo.path
val setDataRequest = SetDataRequest(brokerIdPath, brokerInfo.toJsonBytes, ZkVersion.NoVersion)
val response = retryRequestUntilConnected(setDataRequest)
- response.maybeThrow()
+ if (response.resultCode != Code.OK)
+ throw KeeperException.create(response.resultCode)
info("Updated broker %d at path %s with addresses: %s".format(brokerInfo.broker.id, brokerIdPath, brokerInfo.broker.endPoints))
}
diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
index 9590ecd..d6826a6 100644
--- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
@@ -59,14 +59,14 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
var otherZkClient: KafkaZkClient = _
@Before
- override def setUp(): Unit = {
+ override def setUp() {
super.setUp()
otherZkClient = KafkaZkClient(zkConnect, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled), zkSessionTimeout,
zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM)
}
@After
- override def tearDown(): Unit = {
+ override def tearDown() {
if (otherZkClient != null)
otherZkClient.close()
super.tearDown()
@@ -131,8 +131,9 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
assertTrue(zkClient.getPartitionsForTopics(Set(topic1)).isEmpty)
assertTrue(zkClient.getReplicasForPartition(new TopicPartition(topic1, 2)).isEmpty)
+ val topicPartition = new TopicPartition(topic1, 0)
val assignment = Map(
- new TopicPartition(topic1, 0) -> Seq(0, 1),
+ topicPartition -> Seq(0, 1),
new TopicPartition(topic1, 1) -> Seq(0, 1),
new TopicPartition(topic1, 2) -> Seq(1, 2, 3)
)
@@ -310,10 +311,8 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
@Test
def testLogDirGetters(): Unit = {
- assertEquals("getAllLogDirEventNotifications failed for non existing parent ZK node",
- Seq.empty, zkClient.getAllLogDirEventNotifications)
- assertEquals("getBrokerIdsFromLogDirEvents failed for non existing parent ZK node",
- Seq.empty, zkClient.getBrokerIdsFromLogDirEvents(Seq("0000000000")))
+ assertEquals("Failed for non existing parent ZK node", Seq.empty, zkClient.getAllLogDirEventNotifications)
+ assertEquals("Failed for non existing parent ZK node", Seq.empty, zkClient.getBrokerIdsFromLogDirEvents(Seq("0000000000")))
zkClient.createRecursive("/log_dir_event_notification")
@@ -496,7 +495,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
}
@Test
- def testDeletePath(): Unit = {
+ def testDeletePath() {
val path = "/a/b/c"
zkClient.createRecursive(path)
zkClient.deletePath(path)
@@ -504,7 +503,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
}
@Test
- def testDeleteTopicZNode(): Unit = {
+ def testDeleteTopicZNode(): Unit ={
zkClient.deleteTopicZNode(topic1)
zkClient.createRecursive(TopicZNode.path(topic1))
zkClient.deleteTopicZNode(topic1)
@@ -526,20 +525,20 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
assertTrue(zkClient.getTopicDeletions.isEmpty)
}
- private def assertPathExistenceAndData(expectedPath: String, data: String): Unit = {
+ private def assertPathExistenceAndData(expectedPath: String, data: String){
assertTrue(zkClient.pathExists(expectedPath))
assertEquals(Some(data), dataAsString(expectedPath))
}
@Test
- def testCreateTokenChangeNotification(): Unit = {
+ def testCreateTokenChangeNotification() {
intercept[NoNodeException] {
zkClient.createTokenChangeNotification("delegationToken")
}
zkClient.createDelegationTokenPaths()
zkClient.createTokenChangeNotification("delegationToken")
- assertPathExistenceAndData("/delegation_token/token_changes/token_change_0000000000", "delegationToken")
+ assertPathExistenceAndData(s"/delegation_token/token_changes/token_change_0000000000", "delegationToken")
}
@Test
@@ -561,7 +560,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
}
@Test
- def testCreateConfigChangeNotification(): Unit = {
+ def testCreateConfigChangeNotification() {
intercept[NoNodeException] {
zkClient.createConfigChangeNotification(ConfigEntityZNode.path(ConfigType.Topic, topic1))
}
@@ -570,11 +569,11 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
zkClient.createConfigChangeNotification(ConfigEntityZNode.path(ConfigType.Topic, topic1))
assertPathExistenceAndData(
- "/config/changes/config_change_0000000000",
+ s"/config/changes/config_change_0000000000",
"""{"version":2,"entity_path":"/config/topics/topic1"}""")
}
- private def createLogProps(bytesProp: Int): Properties = {
+ private def createLogProps(bytesProp: Int) = {
val logProps = new Properties()
logProps.put(LogConfig.SegmentBytesProp, bytesProp.toString)
logProps.put(LogConfig.SegmentIndexBytesProp, bytesProp.toString)
@@ -585,7 +584,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
private val logProps = createLogProps(1024)
@Test
- def testGetLogConfigs(): Unit = {
+ def testGetLogConfigs() {
val emptyConfig = LogConfig(Collections.emptyMap())
assertEquals("Non existent config, no defaults",
(Map(topic1 -> emptyConfig), Map.empty),
@@ -613,13 +612,13 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
Map[String, AnyRef](LogConfig.SegmentJitterMsProp -> "100", LogConfig.SegmentBytesProp -> "128").asJava))
}
- private def createBrokerInfo(id: Int, host: String, port: Int, securityProtocol: SecurityProtocol,
- rack: Option[String] = None): BrokerInfo =
+ private def createBrokerInfo(id: Int, host: String, port: Int,
+ securityProtocol: SecurityProtocol, rack: Option[String] = None) =
BrokerInfo(Broker(id, Seq(new EndPoint(host, port, ListenerName.forSecurityProtocol
(securityProtocol), securityProtocol)), rack = rack), ApiVersion.latestVersion, jmxPort = port + 10)
@Test
- def testRegisterBrokerInfo(): Unit = {
+ def testRegisterBrokerInfo() {
zkClient.createTopLevelPaths()
val brokerInfo = createBrokerInfo(1, "test.host", 9999, SecurityProtocol.PLAINTEXT)
@@ -641,7 +640,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
}
@Test
- def testGetBrokerMethods(): Unit = {
+ def testGetBrokerMethods() {
zkClient.createTopLevelPaths()
assertEquals(Seq.empty,zkClient.getAllBrokersInCluster)
@@ -663,7 +662,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
}
@Test
- def testUpdateBrokerInfo(): Unit = {
+ def testUpdateBrokerInfo() {
zkClient.createTopLevelPaths()
// Updating info of a broker not existing in ZK fails
@@ -683,28 +682,26 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
assertEquals(Some(originalBrokerInfo.broker), otherZkClient.getBroker(1))
}
- private def statWithVersion(version: Int): Stat = {
+ private def statWithVersion(version: Int) = {
val stat = new Stat(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)
stat.setVersion(version)
stat
}
- private def leaderIsrAndControllerEpochs(state: Int, zkVersion: Int): Map[TopicPartition, LeaderIsrAndControllerEpoch] =
- Map(
- topicPartition10 -> LeaderIsrAndControllerEpoch(
- LeaderAndIsr(leader = 1, leaderEpoch = state, isr = List(2 + state, 3 + state), zkVersion = zkVersion),
- controllerEpoch = 4),
- topicPartition11 -> LeaderIsrAndControllerEpoch(
- LeaderAndIsr(leader = 0, leaderEpoch = state + 1, isr = List(1 + state, 2 + state), zkVersion = zkVersion),
- controllerEpoch = 4))
-
- val initialLeaderIsrAndControllerEpochs: Map[TopicPartition, LeaderIsrAndControllerEpoch] =
- leaderIsrAndControllerEpochs(0, 0)
- private def updatedLeaderIsrAndControllerEpochs(state: Int): Map[TopicPartition, LeaderIsrAndControllerEpoch] =
- leaderIsrAndControllerEpochs(state, state - 1)
-
- val initialLeaderIsrs: Map[TopicPartition, LeaderAndIsr] = initialLeaderIsrAndControllerEpochs.mapValues(_.leaderAndIsr)
- private def leaderIsrs(state: Int, zkVersion: Int): Map[TopicPartition, LeaderAndIsr] =
+ private def leaderIsrAndControllerEpochs(state: Int, zkVersion: Int) = Map(
+ topicPartition10 -> LeaderIsrAndControllerEpoch(
+ LeaderAndIsr(leader = 1, leaderEpoch = state, isr = List(2 + state, 3 + state), zkVersion = zkVersion),
+ controllerEpoch = 4),
+ topicPartition11 -> LeaderIsrAndControllerEpoch(
+ LeaderAndIsr(leader = 0, leaderEpoch = state + 1, isr = List(1 + state, 2 + state), zkVersion = zkVersion),
+ controllerEpoch = 4))
+
+ private def initialLeaderIsrAndControllerEpochs = leaderIsrAndControllerEpochs(0, 0)
+ private def updatedLeaderIsrAndControllerEpochs(state: Int) = leaderIsrAndControllerEpochs(state, state - 1)
+
+ private def initialLeaderIsrs = initialLeaderIsrAndControllerEpochs.mapValues(_.leaderAndIsr)
+ private def updatedLeaderIsrs(state: Int) = updatedLeaderIsrAndControllerEpochs(state).mapValues(_.leaderAndIsr)
+ private def leaderIsrs(state: Int, zkVersion: Int) =
leaderIsrAndControllerEpochs(state, zkVersion).mapValues(_.leaderAndIsr)
private def checkUpdateLeaderAndIsrResult(
@@ -723,7 +720,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
}
@Test
- def testUpdateLeaderAndIsr(): Unit = {
+ def testUpdateLeaderAndIsr() {
zkClient.createRecursive(TopicZNode.path(topic1))
// Non-existing topicPartitions
@@ -741,14 +738,14 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
leaderIsrs(state = 1, zkVersion = 1),
mutable.ArrayBuffer.empty,
Map.empty,
- zkClient.updateLeaderAndIsr(leaderIsrs(state = 1, zkVersion = 0),controllerEpoch = 4))
+ zkClient.updateLeaderAndIsr(updatedLeaderIsrs(1),controllerEpoch = 4))
// Try to update with wrong ZK version
checkUpdateLeaderAndIsrResult(
Map.empty,
ArrayBuffer(topicPartition10, topicPartition11),
Map.empty,
- zkClient.updateLeaderAndIsr(leaderIsrs(state = 1, zkVersion = 0),controllerEpoch = 4))
+ zkClient.updateLeaderAndIsr(updatedLeaderIsrs(1),controllerEpoch = 4))
// Trigger successful, to be retried and failed partitions in same call
val mixedState = Map(
@@ -767,7 +764,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
private def checkGetDataResponse(
leaderIsrAndControllerEpochs: Map[TopicPartition,LeaderIsrAndControllerEpoch],
topicPartition: TopicPartition,
- response: GetDataResponse): Unit = {
+ response: GetDataResponse) = {
val zkVersion = leaderIsrAndControllerEpochs(topicPartition).leaderAndIsr.zkVersion
assertEquals(Code.OK, response.resultCode)
assertEquals(TopicPartitionStateZNode.path(topicPartition), response.path)
@@ -777,11 +774,10 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
TopicPartitionStateZNode.decode(response.data, statWithVersion(zkVersion)))
}
- private def eraseMetadata(response: CreateResponse): CreateResponse =
- response.copy(metadata = ResponseMetadata(0, 0))
+ private def eraseMetadata(response: CreateResponse) = response.copy(metadata = ResponseMetadata(0, 0))
@Test
- def testGetTopicsAndPartitions(): Unit = {
+ def testGetTopicsAndPartitions() {
assertTrue(zkClient.getAllTopicsInCluster.isEmpty)
assertTrue(zkClient.getAllPartitions.isEmpty)
@@ -796,7 +792,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
}
@Test
- def testCreateAndGetTopicPartitionStatesRaw(): Unit = {
+ def testCreateAndGetTopicPartitionStatesRaw() {
zkClient.createRecursive(TopicZNode.path(topic1))
assertEquals(
@@ -823,7 +819,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
}
@Test
- def testSetTopicPartitionStatesRaw(): Unit = {
+ def testSetTopicPartitionStatesRaw() {
def expectedSetDataResponses(topicPartitions: TopicPartition*)(resultCode: Code, stat: Stat) =
topicPartitions.map { topicPartition =>
@@ -859,20 +855,21 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
}
@Test
- def testReassignPartitionsInProgress(): Unit = {
+ def testReassignPartitionsInProgress() {
assertFalse(zkClient.reassignPartitionsInProgress)
zkClient.createRecursive(ReassignPartitionsZNode.path)
assertTrue(zkClient.reassignPartitionsInProgress)
}
@Test
- def testGetTopicPartitionStates(): Unit = {
+ def testGetTopicPartitionStates() {
assertEquals(None, zkClient.getTopicPartitionState(topicPartition10))
assertEquals(None, zkClient.getLeaderForPartition(topicPartition10))
zkClient.createRecursive(TopicZNode.path(topic1))
- zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs)
+
+ zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs).map(eraseMetadata).toList
assertEquals(
initialLeaderIsrAndControllerEpochs,
zkClient.getTopicPartitionStates(Seq(topicPartition10, topicPartition11))
@@ -897,13 +894,13 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
}
- private def eraseMetadataAndStat(response: SetDataResponse): SetDataResponse = {
+ private def eraseMetadataAndStat(response: SetDataResponse) = {
val stat = if (response.stat != null) statWithVersion(response.stat.getVersion) else null
response.copy(metadata = ResponseMetadata(0, 0), stat = stat)
}
@Test
- def testControllerEpochMethods(): Unit = {
+ def testControllerEpochMethods() {
assertEquals(None, zkClient.getControllerEpoch)
assertEquals("Setting non existing nodes should return NONODE results",
@@ -930,7 +927,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
}
@Test
- def testControllerManagementMethods(): Unit = {
+ def testControllerManagementMethods() {
// No controller
assertEquals(None, zkClient.getControllerId)
// Create controller
--
To stop receiving notification emails like this one, please contact
junrao@apache.org.