You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2018/03/02 02:03:31 UTC
[kafka] branch trunk updated (8100fd9 -> 57c1a68)
This is an automated email from the ASF dual-hosted git repository.
junrao pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git.
from 8100fd9 removed method updatedLeaderIsrAndControllerEpochs
new 720e226 Revert "removed method updatedLeaderIsrAndControllerEpochs"
new aa3453c Revert "Changes requeted by reviewer"
new ae52e10 Revert "Minor fixes requested by reviewer"
new 4110ad5 Revert "Fixes requested by reviewer"
new 0724179 Revert "Undo unintentional whitespace change in ZooKeeperTestHarness"
new 6f24af3 Revert "Use move otherZkClient to KafkaZkClientTest"
new e033799 Revert "Additional tests to improve test coverage of KafkaZkClient."
new f6b3906 Revert "Fix in deleteLogDirEventNotifications - use correct path for deleted children"
new 57c1a68 Revert "Fix in updateBrokerInfoInZk, exception is thrown if response was not OK."
The 9 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
Summary of changes:
core/src/main/scala/kafka/zk/KafkaZkClient.scala | 5 +-
.../scala/unit/kafka/zk/KafkaZkClientTest.scala | 576 +--------------------
2 files changed, 26 insertions(+), 555 deletions(-)
--
To stop receiving notification emails like this one, please contact
junrao@apache.org.
[kafka] 07/09: Revert "Additional tests to improve test coverage of
KafkaZkClient."
Posted by ju...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
commit e03379905096b81362385fefe8c395b3f45105af
Author: Jun Rao <ju...@gmail.com>
AuthorDate: Thu Mar 1 18:02:07 2018 -0800
Revert "Additional tests to improve test coverage of KafkaZkClient."
This reverts commit d0eb552a891ecd99150ddf4b89985525806c6e31.
---
.../scala/unit/kafka/zk/KafkaZkClientTest.scala | 558 +--------------------
.../scala/unit/kafka/zk/ZooKeeperTestHarness.scala | 18 +-
2 files changed, 29 insertions(+), 547 deletions(-)
diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
index 9329430..d3726c2 100644
--- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
@@ -16,11 +16,10 @@
*/
package kafka.zk
-import java.util.{Collections, Properties, UUID}
+import java.util.{Properties, UUID}
import java.nio.charset.StandardCharsets.UTF_8
-import java.util.concurrent.{CountDownLatch, TimeUnit}
-import kafka.api.{ApiVersion, LeaderAndIsr}
+import kafka.api.ApiVersion
import kafka.cluster.{Broker, EndPoint}
import kafka.log.LogConfig
import kafka.security.auth._
@@ -31,30 +30,16 @@ import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.common.security.token.delegation.TokenInformation
import org.apache.kafka.common.utils.SecurityUtils
-import org.apache.zookeeper.KeeperException.{Code, NoNodeException, NodeExistsException}
+import org.apache.zookeeper.KeeperException.NodeExistsException
import org.junit.Assert._
import org.junit.Test
+
import scala.collection.JavaConverters._
-import scala.collection.mutable.ArrayBuffer
-import scala.collection.{Seq, mutable}
import scala.util.Random
-import kafka.controller.LeaderIsrAndControllerEpoch
-import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult
-import kafka.zookeeper._
-import org.apache.zookeeper.data.Stat
-
class KafkaZkClientTest extends ZooKeeperTestHarness {
private val group = "my-group"
- private val topic1 = "topic1"
- private val topic2 = "topic2"
-
- val topicPartition10 = new TopicPartition(topic1, 0)
- val topicPartition11 = new TopicPartition(topic1, 1)
- val topicPartition20 = new TopicPartition(topic2, 0)
- val topicPartitions10_11 = Seq(topicPartition10, topicPartition11)
-
private val topicPartition = new TopicPartition("topic", 0)
@Test
@@ -105,18 +90,17 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
@Test
def testTopicAssignmentMethods() {
- assertTrue(zkClient.getAllTopicsInCluster.isEmpty)
+ val topic1 = "topic1"
+ val topic2 = "topic2"
// test with non-existing topic
- assertFalse(zkClient.topicExists(topic1))
assertTrue(zkClient.getTopicPartitionCount(topic1).isEmpty)
assertTrue(zkClient.getPartitionAssignmentForTopics(Set(topic1)).isEmpty)
assertTrue(zkClient.getPartitionsForTopics(Set(topic1)).isEmpty)
assertTrue(zkClient.getReplicasForPartition(new TopicPartition(topic1, 2)).isEmpty)
- val topicPartition = new TopicPartition(topic1, 0)
val assignment = Map(
- topicPartition -> Seq(0, 1),
+ new TopicPartition(topic1, 0) -> Seq(0, 1),
new TopicPartition(topic1, 1) -> Seq(0, 1),
new TopicPartition(topic1, 2) -> Seq(1, 2, 3)
)
@@ -124,8 +108,6 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
// create a topic assignment
zkClient.createTopicAssignment(topic1, assignment)
- assertTrue(zkClient.topicExists(topic1))
-
val expectedAssignment = assignment map { topicAssignment =>
val partition = topicAssignment._1.partition
val assignment = topicAssignment._2
@@ -233,43 +215,6 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
}
@Test
- def testIsrChangeNotificationGetters(): Unit = {
- assertEquals("Failed for non existing parent ZK node", Seq.empty, zkClient.getAllIsrChangeNotifications)
- assertEquals("Failed for non existing parent ZK node", Seq.empty, zkClient.getPartitionsFromIsrChangeNotifications(Seq("0000000000")))
-
- zkClient.createRecursive("/isr_change_notification")
-
- zkClient.propagateIsrChanges(Set(topicPartition10, topicPartition11))
- zkClient.propagateIsrChanges(Set(topicPartition10))
-
- assertEquals(Set("0000000000", "0000000001"), zkClient.getAllIsrChangeNotifications.toSet)
-
- // A partition can have multiple notifications
- assertEquals(Seq(topicPartition10, topicPartition11, topicPartition10),
- zkClient.getPartitionsFromIsrChangeNotifications(Seq("0000000000", "0000000001")))
- }
-
- @Test
- def testIsrChangeNotificationsDeletion(): Unit = {
- // Should not fail even if parent node does not exist
- zkClient.deleteIsrChangeNotifications(Seq("0000000000"))
-
- zkClient.createRecursive("/isr_change_notification")
-
- zkClient.propagateIsrChanges(Set(topicPartition10, topicPartition11))
- zkClient.propagateIsrChanges(Set(topicPartition10))
- zkClient.propagateIsrChanges(Set(topicPartition11))
-
- zkClient.deleteIsrChangeNotifications(Seq("0000000001"))
- // Should not fail if called on a non-existent notification
- zkClient.deleteIsrChangeNotifications(Seq("0000000001"))
-
- assertEquals(Set("0000000000", "0000000002"), zkClient.getAllIsrChangeNotifications.toSet)
- zkClient.deleteIsrChangeNotifications()
- assertEquals(Seq.empty,zkClient.getAllIsrChangeNotifications)
- }
-
- @Test
def testPropagateLogDir(): Unit = {
zkClient.createRecursive("/log_dir_event_notification")
@@ -293,52 +238,6 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
}
@Test
- def testLogDirGetters(): Unit = {
- assertEquals("Failed for non existing parent ZK node", Seq.empty, zkClient.getAllLogDirEventNotifications)
- assertEquals("Failed for non existing parent ZK node", Seq.empty, zkClient.getBrokerIdsFromLogDirEvents(Seq("0000000000")))
-
- zkClient.createRecursive("/log_dir_event_notification")
-
- val brokerId = 3
- zkClient.propagateLogDirEvent(brokerId)
-
- assertEquals(Seq(3), zkClient.getBrokerIdsFromLogDirEvents(Seq("0000000000")))
-
- zkClient.propagateLogDirEvent(brokerId)
-
- val anotherBrokerId = 4
- zkClient.propagateLogDirEvent(anotherBrokerId)
-
- val notifications012 = Seq("0000000000", "0000000001", "0000000002")
- assertEquals(notifications012.toSet, zkClient.getAllLogDirEventNotifications.toSet)
- assertEquals(Seq(3, 3, 4), zkClient.getBrokerIdsFromLogDirEvents(notifications012))
- }
-
- @Test
- def testLogDirEventNotificationsDeletion(): Unit = {
- // Should not fail even if parent node does not exist
- zkClient.deleteLogDirEventNotifications(Seq("0000000000", "0000000002"))
-
- zkClient.createRecursive("/log_dir_event_notification")
-
- val brokerId = 3
- val anotherBrokerId = 4
-
- zkClient.propagateLogDirEvent(brokerId)
- zkClient.propagateLogDirEvent(brokerId)
- zkClient.propagateLogDirEvent(anotherBrokerId)
-
- zkClient.deleteLogDirEventNotifications(Seq("0000000000", "0000000002"))
-
- assertEquals(Seq("0000000001"), zkClient.getAllLogDirEventNotifications)
-
- zkClient.propagateLogDirEvent(anotherBrokerId)
-
- zkClient.deleteLogDirEventNotifications()
- assertEquals(Seq.empty, zkClient.getAllLogDirEventNotifications)
- }
-
- @Test
def testSetGetAndDeletePartitionReassignment() {
zkClient.createRecursive(AdminZNode.path)
@@ -478,23 +377,10 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
}
@Test
- def testDeletePath() {
- val path = "/a/b/c"
- zkClient.createRecursive(path)
- zkClient.deletePath(path)
- assertFalse(zkClient.pathExists(path))
- }
-
- @Test
- def testDeleteTopicZNode(): Unit ={
- zkClient.deleteTopicZNode(topic1)
- zkClient.createRecursive(TopicZNode.path(topic1))
- zkClient.deleteTopicZNode(topic1)
- assertFalse(zkClient.pathExists(TopicZNode.path(topic1)))
- }
-
- @Test
def testDeleteTopicPathMethods() {
+ val topic1 = "topic1"
+ val topic2 = "topic2"
+
assertFalse(zkClient.isTopicMarkedForDeletion(topic1))
assertTrue(zkClient.getTopicDeletions.isEmpty)
@@ -508,26 +394,18 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
assertTrue(zkClient.getTopicDeletions.isEmpty)
}
- private def assertPathExistenceAndData(expectedPath: String, data: String){
- assertTrue(zkClient.pathExists(expectedPath))
- assertEquals(Some(data), dataAsString(expectedPath))
- }
-
- @Test
- def testCreateTokenChangeNotification() {
- intercept[NoNodeException] {
- zkClient.createTokenChangeNotification("delegationToken")
- }
- zkClient.createDelegationTokenPaths()
-
- zkClient.createTokenChangeNotification("delegationToken")
- assertPathExistenceAndData(s"/delegation_token/token_changes/token_change_0000000000", "delegationToken")
- }
-
@Test
def testEntityConfigManagementMethods() {
+ val topic1 = "topic1"
+ val topic2 = "topic2"
+
assertTrue(zkClient.getEntityConfigs(ConfigType.Topic, topic1).isEmpty)
+ val logProps = new Properties()
+ logProps.put(LogConfig.SegmentBytesProp, "1024")
+ logProps.put(LogConfig.SegmentIndexBytesProp, "1024")
+ logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
+
zkClient.setOrCreateEntityConfigs(ConfigType.Topic, topic1, logProps)
assertEquals(logProps, zkClient.getEntityConfigs(ConfigType.Topic, topic1))
@@ -543,399 +421,15 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
}
@Test
- def testCreateConfigChangeNotification() {
- intercept[NoNodeException] {
- zkClient.createConfigChangeNotification(ConfigEntityZNode.path(ConfigType.Topic, topic1))
- }
-
- zkClient.createTopLevelPaths()
- zkClient.createConfigChangeNotification(ConfigEntityZNode.path(ConfigType.Topic, topic1))
-
- assertPathExistenceAndData(
- s"/config/changes/config_change_0000000000",
- """{"version":2,"entity_path":"/config/topics/topic1"}""")
- }
-
- private def createLogProps(bytesProp: Int) = {
- val logProps = new Properties()
- logProps.put(LogConfig.SegmentBytesProp, bytesProp.toString)
- logProps.put(LogConfig.SegmentIndexBytesProp, bytesProp.toString)
- logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
- logProps
- }
-
- private val logProps = createLogProps(1024)
-
- @Test
- def testGetLogConfigs() {
- val emptyConfig = LogConfig(Collections.emptyMap())
- assertEquals("Non existent config, no defaults",
- (Map(topic1 -> emptyConfig), Map.empty),
- zkClient.getLogConfigs(Seq(topic1), Collections.emptyMap()))
-
- val logProps2 = createLogProps(2048)
-
- zkClient.setOrCreateEntityConfigs(ConfigType.Topic, topic1, logProps)
- assertEquals("One existing and one non-existent topic",
- (Map(topic1 -> LogConfig(logProps), topic2 -> emptyConfig), Map.empty),
- zkClient.getLogConfigs(Seq(topic1, topic2), Collections.emptyMap()))
-
- zkClient.setOrCreateEntityConfigs(ConfigType.Topic, topic2, logProps2)
- assertEquals("Two existing topics",
- (Map(topic1 -> LogConfig(logProps), topic2 -> LogConfig(logProps2)), Map.empty),
- zkClient.getLogConfigs(Seq(topic1, topic2), Collections.emptyMap()))
-
- val logProps1WithMoreValues = createLogProps(1024)
- logProps1WithMoreValues.put(LogConfig.SegmentJitterMsProp, "100")
- logProps1WithMoreValues.put(LogConfig.SegmentBytesProp, "1024")
-
- assertEquals("Config with defaults",
- (Map(topic1 -> LogConfig(logProps1WithMoreValues)), Map.empty),
- zkClient.getLogConfigs(Seq(topic1),
- Map[String, AnyRef](LogConfig.SegmentJitterMsProp -> "100", LogConfig.SegmentBytesProp -> "128").asJava))
- }
-
- private def createBrokerInfo(id: Int, host: String, port: Int,
- securityProtocol: SecurityProtocol, rack: Option[String] = None) =
- BrokerInfo(Broker(id, Seq(new EndPoint(host, port, ListenerName.forSecurityProtocol
- (securityProtocol), securityProtocol)), rack = rack), ApiVersion.latestVersion, jmxPort = port + 10)
-
- @Test
- def testRegisterBrokerInfo() {
+ def testBrokerRegistrationMethods() {
zkClient.createTopLevelPaths()
- val brokerInfo = createBrokerInfo(1, "test.host", 9999, SecurityProtocol.PLAINTEXT)
- val differentBrokerInfoWithSameId = createBrokerInfo(1, "test.host2", 9995, SecurityProtocol.SSL)
+ val brokerInfo = BrokerInfo(Broker(1,
+ Seq(new EndPoint("test.host", 9999, ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), SecurityProtocol.PLAINTEXT)),
+ rack = None), ApiVersion.latestVersion, jmxPort = 9998)
zkClient.registerBrokerInZk(brokerInfo)
assertEquals(Some(brokerInfo.broker), zkClient.getBroker(1))
- assertEquals("Other ZK clients can read broker info", Some(brokerInfo.broker), otherZkClient.getBroker(1))
-
- // Node exists, owned by current session - no error, no update
- zkClient.registerBrokerInZk(differentBrokerInfoWithSameId)
- assertEquals(Some(brokerInfo.broker), zkClient.getBroker(1))
-
- // Other client tries to register broker with same id causes failure, info is not changed in ZK
- intercept[NodeExistsException] {
- otherZkClient.registerBrokerInZk(differentBrokerInfoWithSameId)
- }
- assertEquals(Some(brokerInfo.broker), zkClient.getBroker(1))
- }
-
- @Test
- def testGetBrokerMethods() {
- zkClient.createTopLevelPaths()
-
- assertEquals(Seq.empty,zkClient.getAllBrokersInCluster)
- assertEquals(Seq.empty, zkClient.getSortedBrokerList())
- assertEquals(None, zkClient.getBroker(0))
-
- val brokerInfo0 = createBrokerInfo(0, "test.host0", 9998, SecurityProtocol.PLAINTEXT)
- val brokerInfo1 = createBrokerInfo(1, "test.host1", 9999, SecurityProtocol.SSL)
-
- zkClient.registerBrokerInZk(brokerInfo1)
- otherZkClient.registerBrokerInZk(brokerInfo0)
-
- assertEquals(Seq(0, 1), zkClient.getSortedBrokerList())
- assertEquals(
- Seq(brokerInfo0.broker, brokerInfo1.broker),
- zkClient.getAllBrokersInCluster
- )
- assertEquals(Some(brokerInfo0.broker), zkClient.getBroker(0))
- }
-
- @Test
- def testUpdateBrokerInfo() {
- zkClient.createTopLevelPaths()
-
- // Updating info of a broker not existing in ZK fails
- val originalBrokerInfo = createBrokerInfo(1, "test.host", 9999, SecurityProtocol.PLAINTEXT)
- intercept[NoNodeException]{
- zkClient.updateBrokerInfoInZk(originalBrokerInfo)
- }
-
- zkClient.registerBrokerInZk(originalBrokerInfo)
-
- val updatedBrokerInfo = createBrokerInfo(1, "test.host2", 9995, SecurityProtocol.SSL)
- zkClient.updateBrokerInfoInZk(updatedBrokerInfo)
- assertEquals(Some(updatedBrokerInfo.broker), zkClient.getBroker(1))
-
- // Other ZK clients can update info
- otherZkClient.updateBrokerInfoInZk(originalBrokerInfo)
- assertEquals(Some(originalBrokerInfo.broker), otherZkClient.getBroker(1))
- }
-
- private def statWithVersion(version: Int) = {
- val stat = new Stat(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)
- stat.setVersion(version)
- stat
- }
-
- private def leaderIsrAndControllerEpochs(state: Int, zkVersion: Int) = Map(
- topicPartition10 -> LeaderIsrAndControllerEpoch(
- LeaderAndIsr(leader = 1, leaderEpoch = state, isr = List(2 + state, 3 + state), zkVersion = zkVersion),
- controllerEpoch = 4),
- topicPartition11 -> LeaderIsrAndControllerEpoch(
- LeaderAndIsr(leader = 0, leaderEpoch = state + 1, isr = List(1 + state, 2 + state), zkVersion = zkVersion),
- controllerEpoch = 4))
-
- private def initialLeaderIsrAndControllerEpochs = leaderIsrAndControllerEpochs(0, 0)
- private def updatedLeaderIsrAndControllerEpochs(state: Int) = leaderIsrAndControllerEpochs(state, state - 1)
-
- private def initialLeaderIsrs = initialLeaderIsrAndControllerEpochs.mapValues(_.leaderAndIsr)
- private def updatedLeaderIsrs(state: Int) = updatedLeaderIsrAndControllerEpochs(state).mapValues(_.leaderAndIsr)
- private def leaderIsrs(state: Int, zkVersion: Int) =
- leaderIsrAndControllerEpochs(state, zkVersion).mapValues(_.leaderAndIsr)
-
- private def checkUpdateLeaderAndIsrResult(
- expectedSuccessfulPartitions: Map[TopicPartition, LeaderAndIsr],
- expectedPartitionsToRetry: Seq[TopicPartition],
- expectedFailedPartitions: Map[TopicPartition, (Class[_], String)],
- actualUpdateLeaderAndIsrResult: UpdateLeaderAndIsrResult): Unit = {
- val failedPartitionsExcerpt =
- actualUpdateLeaderAndIsrResult.failedPartitions.mapValues(e => (e.getClass, e.getMessage))
- assertEquals("Permanently failed updates do not match expected",
- expectedFailedPartitions, failedPartitionsExcerpt)
- assertEquals("Retriable updates (due to BADVERSION) do not match expected",
- expectedPartitionsToRetry, actualUpdateLeaderAndIsrResult.partitionsToRetry)
- assertEquals("Successful updates do not match expected",
- expectedSuccessfulPartitions, actualUpdateLeaderAndIsrResult.successfulPartitions)
- }
-
- @Test
- def testUpdateLeaderAndIsr() {
- zkClient.createRecursive(TopicZNode.path(topic1))
-
- // Non-existing topicPartitions
- checkUpdateLeaderAndIsrResult(
- Map.empty,
- mutable.ArrayBuffer.empty,
- Map(
- topicPartition10 -> (classOf[NoNodeException], "KeeperErrorCode = NoNode for /brokers/topics/topic1/partitions/0/state"),
- topicPartition11 -> (classOf[NoNodeException], "KeeperErrorCode = NoNode for /brokers/topics/topic1/partitions/1/state")),
- zkClient.updateLeaderAndIsr(initialLeaderIsrs, controllerEpoch = 4))
-
- zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs)
-
- checkUpdateLeaderAndIsrResult(
- leaderIsrs(state = 1, zkVersion = 1),
- mutable.ArrayBuffer.empty,
- Map.empty,
- zkClient.updateLeaderAndIsr(updatedLeaderIsrs(1),controllerEpoch = 4))
-
- // Try to update with wrong ZK version
- checkUpdateLeaderAndIsrResult(
- Map.empty,
- ArrayBuffer(topicPartition10, topicPartition11),
- Map.empty,
- zkClient.updateLeaderAndIsr(updatedLeaderIsrs(1),controllerEpoch = 4))
-
- // Trigger successful, to be retried and failed partitions in same call
- val mixedState = Map(
- topicPartition10 -> LeaderAndIsr(leader = 1, leaderEpoch = 2, isr = List(4, 5), zkVersion = 1),
- topicPartition11 -> LeaderAndIsr(leader = 0, leaderEpoch = 2, isr = List(3, 4), zkVersion = 0),
- topicPartition20 -> LeaderAndIsr(leader = 0, leaderEpoch = 2, isr = List(3, 4), zkVersion = 0))
-
- checkUpdateLeaderAndIsrResult(
- leaderIsrs(state = 2, zkVersion = 2).filterKeys{_ == topicPartition10},
- ArrayBuffer(topicPartition11),
- Map(
- topicPartition20 -> (classOf[NoNodeException], "KeeperErrorCode = NoNode for /brokers/topics/topic2/partitions/0/state")),
- zkClient.updateLeaderAndIsr(mixedState, controllerEpoch = 4))
- }
-
- private def checkGetDataResponse(
- leaderIsrAndControllerEpochs: Map[TopicPartition,LeaderIsrAndControllerEpoch],
- topicPartition: TopicPartition,
- response: GetDataResponse) = {
- val zkVersion = leaderIsrAndControllerEpochs(topicPartition).leaderAndIsr.zkVersion
- assertEquals(Code.OK, response.resultCode)
- assertEquals(TopicPartitionStateZNode.path(topicPartition), response.path)
- assertEquals(Some(topicPartition), response.ctx)
- assertEquals(
- Some(leaderIsrAndControllerEpochs(topicPartition)),
- TopicPartitionStateZNode.decode(response.data, statWithVersion(zkVersion)))
- }
-
- private def eraseMetadata(response: CreateResponse) = response.copy(metadata = ResponseMetadata(0, 0))
-
- @Test
- def testGetTopicsAndPartitions() {
- assertTrue(zkClient.getAllTopicsInCluster.isEmpty)
- assertTrue(zkClient.getAllPartitions.isEmpty)
-
- zkClient.createRecursive(TopicZNode.path(topic1))
- zkClient.createRecursive(TopicZNode.path(topic2))
- assertEquals(Set(topic1, topic2), zkClient.getAllTopicsInCluster.toSet)
-
- assertTrue(zkClient.getAllPartitions.isEmpty)
-
- zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs)
- assertEquals(Set(topicPartition10, topicPartition11), zkClient.getAllPartitions)
- }
-
- @Test
- def testCreateAndGetTopicPartitionStatesRaw() {
- zkClient.createRecursive(TopicZNode.path(topic1))
-
- assertEquals(
- Seq(
- CreateResponse(Code.OK, TopicPartitionStateZNode.path(topicPartition10), Some(topicPartition10),
- TopicPartitionStateZNode.path(topicPartition10), ResponseMetadata(0, 0)),
- CreateResponse(Code.OK, TopicPartitionStateZNode.path(topicPartition11), Some(topicPartition11),
- TopicPartitionStateZNode.path(topicPartition11), ResponseMetadata(0, 0))),
- zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs)
- .map(eraseMetadata).toList)
-
- val getResponses = zkClient.getTopicPartitionStatesRaw(topicPartitions10_11)
- assertEquals(2, getResponses.size)
- topicPartitions10_11.zip(getResponses) foreach {case (tp, r) => checkGetDataResponse(initialLeaderIsrAndControllerEpochs, tp, r)}
-
- // Trying to create existing topicPartition states fails
- assertEquals(
- Seq(
- CreateResponse(Code.NODEEXISTS, TopicPartitionStateZNode.path(topicPartition10), Some(topicPartition10),
- null, ResponseMetadata(0, 0)),
- CreateResponse(Code.NODEEXISTS, TopicPartitionStateZNode.path(topicPartition11), Some(topicPartition11),
- null, ResponseMetadata(0, 0))),
- zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs).map(eraseMetadata).toList)
- }
-
- @Test
- def testSetTopicPartitionStatesRaw() {
-
- def expectedSetDataResponses(topicPartitions: TopicPartition*)(resultCode: Code, stat: Stat) =
- topicPartitions.map { topicPartition =>
- SetDataResponse(resultCode, TopicPartitionStateZNode.path(topicPartition),
- Some(topicPartition), stat, ResponseMetadata(0, 0))
- }
-
- zkClient.createRecursive(TopicZNode.path(topic1))
-
- // Trying to set non-existing topicPartition's data results in NONODE responses
- assertEquals(
- expectedSetDataResponses(topicPartition10, topicPartition11)(Code.NONODE, null),
- zkClient.setTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs).map {
- _.copy(metadata = ResponseMetadata(0, 0))}.toList)
-
- zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs)
-
- assertEquals(
- expectedSetDataResponses(topicPartition10, topicPartition11)(Code.OK, statWithVersion(1)),
- zkClient.setTopicPartitionStatesRaw(updatedLeaderIsrAndControllerEpochs(1)).map {
- eraseMetadataAndStat}.toList)
-
-
- val getResponses = zkClient.getTopicPartitionStatesRaw(topicPartitions10_11)
- assertEquals(2, getResponses.size)
- topicPartitions10_11.zip(getResponses) foreach {case (tp, r) => checkGetDataResponse(updatedLeaderIsrAndControllerEpochs(1), tp, r)}
-
- // Other ZK client can also write the state of a partition
- assertEquals(
- expectedSetDataResponses(topicPartition10, topicPartition11)(Code.OK, statWithVersion(2)),
- otherZkClient.setTopicPartitionStatesRaw(updatedLeaderIsrAndControllerEpochs(2)).map {
- eraseMetadataAndStat}.toList)
- }
-
- @Test
- def testReassignPartitionsInProgress() {
- assertFalse(zkClient.reassignPartitionsInProgress)
- zkClient.createRecursive(ReassignPartitionsZNode.path)
- assertTrue(zkClient.reassignPartitionsInProgress)
- }
-
- @Test
- def testGetTopicPartitionStates() {
- assertEquals(None, zkClient.getTopicPartitionState(topicPartition10))
- assertEquals(None, zkClient.getLeaderForPartition(topicPartition10))
-
- zkClient.createRecursive(TopicZNode.path(topic1))
-
-
- zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs).map(eraseMetadata).toList
- assertEquals(
- initialLeaderIsrAndControllerEpochs,
- zkClient.getTopicPartitionStates(Seq(topicPartition10, topicPartition11))
- )
-
- assertEquals(
- Some(initialLeaderIsrAndControllerEpochs(topicPartition10)),
- zkClient.getTopicPartitionState(topicPartition10)
- )
-
- assertEquals(Some(1), zkClient.getLeaderForPartition(topicPartition10))
-
- val notExistingPartition = new TopicPartition(topic1, 2)
- assertTrue(zkClient.getTopicPartitionStates(Seq(notExistingPartition)).isEmpty)
- assertEquals(
- Map(topicPartition10 -> initialLeaderIsrAndControllerEpochs(topicPartition10)),
- zkClient.getTopicPartitionStates(Seq(topicPartition10, notExistingPartition))
- )
-
- assertEquals(None, zkClient.getTopicPartitionState(notExistingPartition))
- assertEquals(None, zkClient.getLeaderForPartition(notExistingPartition))
-
- }
-
- private def eraseMetadataAndStat(response: SetDataResponse) = {
- val stat = if (response.stat != null) statWithVersion(response.stat.getVersion) else null
- response.copy(metadata = ResponseMetadata(0, 0), stat = stat)
- }
-
- @Test
- def testControllerEpochMethods() {
- assertEquals(None, zkClient.getControllerEpoch)
-
- assertEquals("Setting non existing nodes should return NONODE results",
- SetDataResponse(Code.NONODE, ControllerEpochZNode.path, None, null, ResponseMetadata(0, 0)),
- eraseMetadataAndStat(zkClient.setControllerEpochRaw(1, 0)))
-
- assertEquals("Creating non existing nodes is OK",
- CreateResponse(Code.OK, ControllerEpochZNode.path, None, ControllerEpochZNode.path, ResponseMetadata(0, 0)),
- eraseMetadata(zkClient.createControllerEpochRaw(0)))
- assertEquals(0, zkClient.getControllerEpoch.get._1)
-
- assertEquals("Attemt to create existing nodes should return NODEEXISTS",
- CreateResponse(Code.NODEEXISTS, ControllerEpochZNode.path, None, null, ResponseMetadata(0, 0)),
- eraseMetadata(zkClient.createControllerEpochRaw(0)))
-
- assertEquals("Updating existing nodes is OK",
- SetDataResponse(Code.OK, ControllerEpochZNode.path, None, statWithVersion(1), ResponseMetadata(0, 0)),
- eraseMetadataAndStat(zkClient.setControllerEpochRaw(1, 0)))
- assertEquals(1, zkClient.getControllerEpoch.get._1)
-
- assertEquals("Updating with wrong ZK version returns BADVERSION",
- SetDataResponse(Code.BADVERSION, ControllerEpochZNode.path, None, null, ResponseMetadata(0, 0)),
- eraseMetadataAndStat(zkClient.setControllerEpochRaw(1, 0)))
- }
-
- @Test
- def testControllerManagementMethods() {
- // No controller
- assertEquals(None, zkClient.getControllerId)
- // Create controller
- zkClient.checkedEphemeralCreate(ControllerZNode.path, ControllerZNode.encode(brokerId = 1, timestamp = 123456))
- assertEquals(Some(1), zkClient.getControllerId)
- zkClient.deleteController()
- assertEquals(None, zkClient.getControllerId)
- }
-
- @Test
- def testZNodeChangeHandlerForDataChange(): Unit = {
- val mockPath = "/foo"
-
- val znodeChangeHandlerCountDownLatch = new CountDownLatch(1)
- val zNodeChangeHandler = new ZNodeChangeHandler {
- override def handleCreation(): Unit = {
- znodeChangeHandlerCountDownLatch.countDown()
- }
-
- override val path: String = mockPath
- }
-
- zkClient.registerZNodeChangeHandlerAndCheckExistence(zNodeChangeHandler)
- zkClient.createRecursive(mockPath)
- assertTrue("Failed to receive create notification", znodeChangeHandlerCountDownLatch.await(5, TimeUnit.SECONDS))
}
@Test
@@ -964,6 +458,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
assertTrue(zkClient.getPreferredReplicaElection.isEmpty)
+ val topic1 = "topic1"
val electionPartitions = Set(new TopicPartition(topic1, 0), new TopicPartition(topic1, 1))
zkClient.createPreferredReplicaElection(electionPartitions)
@@ -1003,7 +498,6 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
// test non-existent token
assertTrue(zkClient.getDelegationTokenInfo(tokenId).isEmpty)
- assertFalse(zkClient.deleteDelegationToken(tokenId))
// create a token
zkClient.setOrCreateDelegationToken(token)
@@ -1017,9 +511,5 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
//test updated token
assertEquals(tokenInfo, zkClient.getDelegationTokenInfo(tokenId).get)
-
- //test deleting token
- assertTrue(zkClient.deleteDelegationToken(tokenId))
- assertEquals(None, zkClient.getDelegationTokenInfo(tokenId))
}
-}
\ No newline at end of file
+}
diff --git a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
index f9cb8e3..a122297 100755
--- a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
@@ -19,16 +19,16 @@ package kafka.zk
import javax.security.auth.login.Configuration
-import kafka.utils.{CoreUtils, Logging, TestUtils, ZkUtils}
+import kafka.utils.{CoreUtils, Logging, TestUtils}
import org.junit.{After, AfterClass, Before, BeforeClass}
import org.junit.Assert._
import org.scalatest.junit.JUnitSuite
import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.test.IntegrationTest
import org.junit.experimental.categories.Category
+
import scala.collection.Set
import scala.collection.JavaConverters._
-
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.consumer.internals.AbstractCoordinator
import kafka.controller.ControllerEventManager
@@ -45,7 +45,6 @@ abstract class ZooKeeperTestHarness extends JUnitSuite with Logging {
protected val zkAclsEnabled: Option[Boolean] = None
var zkClient: KafkaZkClient = null
- var otherZkClient: KafkaZkClient = null
var adminZkClient: AdminZkClient = null
var zookeeper: EmbeddedZookeeper = null
@@ -56,22 +55,15 @@ abstract class ZooKeeperTestHarness extends JUnitSuite with Logging {
@Before
def setUp() {
zookeeper = new EmbeddedZookeeper()
- zkClient = createZkClient
- otherZkClient = createZkClient
- adminZkClient = new AdminZkClient(zkClient)
- }
-
- protected def createZkClient = {
- KafkaZkClient(zkConnect, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled), zkSessionTimeout,
+ zkClient = KafkaZkClient(zkConnect, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled), zkSessionTimeout,
zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM)
+ adminZkClient = new AdminZkClient(zkClient)
}
@After
def tearDown() {
if (zkClient != null)
- zkClient.close()
- if (otherZkClient != null)
- otherZkClient.close()
+ zkClient.close()
if (zookeeper != null)
CoreUtils.swallow(zookeeper.shutdown(), this)
Configuration.setConfiguration(null)
--
To stop receiving notification emails like this one, please contact
junrao@apache.org.
[kafka] 02/09: Revert "Changes requeted by reviewer"
Posted by ju...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
commit aa3453c157b43429a7e01c209aefc4e7979d1f2e
Author: Jun Rao <ju...@gmail.com>
AuthorDate: Thu Mar 1 18:02:07 2018 -0800
Revert "Changes requeted by reviewer"
This reverts commit cacd377933ae0e7da0ed08dd33ab69fabde073c5.
---
core/src/main/scala/kafka/zk/KafkaZkClient.scala | 3 +-
.../scala/unit/kafka/zk/KafkaZkClientTest.scala | 103 ++++++++++-----------
2 files changed, 52 insertions(+), 54 deletions(-)
diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index d61b281..851c686 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -89,7 +89,8 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean
val brokerIdPath = brokerInfo.path
val setDataRequest = SetDataRequest(brokerIdPath, brokerInfo.toJsonBytes, ZkVersion.NoVersion)
val response = retryRequestUntilConnected(setDataRequest)
- response.maybeThrow()
+ if (response.resultCode != Code.OK)
+ throw KeeperException.create(response.resultCode)
info("Updated broker %d at path %s with addresses: %s".format(brokerInfo.broker.id, brokerIdPath, brokerInfo.broker.endPoints))
}
diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
index 9590ecd..d6826a6 100644
--- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
@@ -59,14 +59,14 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
var otherZkClient: KafkaZkClient = _
@Before
- override def setUp(): Unit = {
+ override def setUp() {
super.setUp()
otherZkClient = KafkaZkClient(zkConnect, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled), zkSessionTimeout,
zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM)
}
@After
- override def tearDown(): Unit = {
+ override def tearDown() {
if (otherZkClient != null)
otherZkClient.close()
super.tearDown()
@@ -131,8 +131,9 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
assertTrue(zkClient.getPartitionsForTopics(Set(topic1)).isEmpty)
assertTrue(zkClient.getReplicasForPartition(new TopicPartition(topic1, 2)).isEmpty)
+ val topicPartition = new TopicPartition(topic1, 0)
val assignment = Map(
- new TopicPartition(topic1, 0) -> Seq(0, 1),
+ topicPartition -> Seq(0, 1),
new TopicPartition(topic1, 1) -> Seq(0, 1),
new TopicPartition(topic1, 2) -> Seq(1, 2, 3)
)
@@ -310,10 +311,8 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
@Test
def testLogDirGetters(): Unit = {
- assertEquals("getAllLogDirEventNotifications failed for non existing parent ZK node",
- Seq.empty, zkClient.getAllLogDirEventNotifications)
- assertEquals("getBrokerIdsFromLogDirEvents failed for non existing parent ZK node",
- Seq.empty, zkClient.getBrokerIdsFromLogDirEvents(Seq("0000000000")))
+ assertEquals("Failed for non existing parent ZK node", Seq.empty, zkClient.getAllLogDirEventNotifications)
+ assertEquals("Failed for non existing parent ZK node", Seq.empty, zkClient.getBrokerIdsFromLogDirEvents(Seq("0000000000")))
zkClient.createRecursive("/log_dir_event_notification")
@@ -496,7 +495,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
}
@Test
- def testDeletePath(): Unit = {
+ def testDeletePath() {
val path = "/a/b/c"
zkClient.createRecursive(path)
zkClient.deletePath(path)
@@ -504,7 +503,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
}
@Test
- def testDeleteTopicZNode(): Unit = {
+ def testDeleteTopicZNode(): Unit ={
zkClient.deleteTopicZNode(topic1)
zkClient.createRecursive(TopicZNode.path(topic1))
zkClient.deleteTopicZNode(topic1)
@@ -526,20 +525,20 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
assertTrue(zkClient.getTopicDeletions.isEmpty)
}
- private def assertPathExistenceAndData(expectedPath: String, data: String): Unit = {
+ private def assertPathExistenceAndData(expectedPath: String, data: String){
assertTrue(zkClient.pathExists(expectedPath))
assertEquals(Some(data), dataAsString(expectedPath))
}
@Test
- def testCreateTokenChangeNotification(): Unit = {
+ def testCreateTokenChangeNotification() {
intercept[NoNodeException] {
zkClient.createTokenChangeNotification("delegationToken")
}
zkClient.createDelegationTokenPaths()
zkClient.createTokenChangeNotification("delegationToken")
- assertPathExistenceAndData("/delegation_token/token_changes/token_change_0000000000", "delegationToken")
+ assertPathExistenceAndData(s"/delegation_token/token_changes/token_change_0000000000", "delegationToken")
}
@Test
@@ -561,7 +560,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
}
@Test
- def testCreateConfigChangeNotification(): Unit = {
+ def testCreateConfigChangeNotification() {
intercept[NoNodeException] {
zkClient.createConfigChangeNotification(ConfigEntityZNode.path(ConfigType.Topic, topic1))
}
@@ -570,11 +569,11 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
zkClient.createConfigChangeNotification(ConfigEntityZNode.path(ConfigType.Topic, topic1))
assertPathExistenceAndData(
- "/config/changes/config_change_0000000000",
+ s"/config/changes/config_change_0000000000",
"""{"version":2,"entity_path":"/config/topics/topic1"}""")
}
- private def createLogProps(bytesProp: Int): Properties = {
+ private def createLogProps(bytesProp: Int) = {
val logProps = new Properties()
logProps.put(LogConfig.SegmentBytesProp, bytesProp.toString)
logProps.put(LogConfig.SegmentIndexBytesProp, bytesProp.toString)
@@ -585,7 +584,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
private val logProps = createLogProps(1024)
@Test
- def testGetLogConfigs(): Unit = {
+ def testGetLogConfigs() {
val emptyConfig = LogConfig(Collections.emptyMap())
assertEquals("Non existent config, no defaults",
(Map(topic1 -> emptyConfig), Map.empty),
@@ -613,13 +612,13 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
Map[String, AnyRef](LogConfig.SegmentJitterMsProp -> "100", LogConfig.SegmentBytesProp -> "128").asJava))
}
- private def createBrokerInfo(id: Int, host: String, port: Int, securityProtocol: SecurityProtocol,
- rack: Option[String] = None): BrokerInfo =
+ private def createBrokerInfo(id: Int, host: String, port: Int,
+ securityProtocol: SecurityProtocol, rack: Option[String] = None) =
BrokerInfo(Broker(id, Seq(new EndPoint(host, port, ListenerName.forSecurityProtocol
(securityProtocol), securityProtocol)), rack = rack), ApiVersion.latestVersion, jmxPort = port + 10)
@Test
- def testRegisterBrokerInfo(): Unit = {
+ def testRegisterBrokerInfo() {
zkClient.createTopLevelPaths()
val brokerInfo = createBrokerInfo(1, "test.host", 9999, SecurityProtocol.PLAINTEXT)
@@ -641,7 +640,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
}
@Test
- def testGetBrokerMethods(): Unit = {
+ def testGetBrokerMethods() {
zkClient.createTopLevelPaths()
assertEquals(Seq.empty,zkClient.getAllBrokersInCluster)
@@ -663,7 +662,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
}
@Test
- def testUpdateBrokerInfo(): Unit = {
+ def testUpdateBrokerInfo() {
zkClient.createTopLevelPaths()
// Updating info of a broker not existing in ZK fails
@@ -683,28 +682,26 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
assertEquals(Some(originalBrokerInfo.broker), otherZkClient.getBroker(1))
}
- private def statWithVersion(version: Int): Stat = {
+ private def statWithVersion(version: Int) = {
val stat = new Stat(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)
stat.setVersion(version)
stat
}
- private def leaderIsrAndControllerEpochs(state: Int, zkVersion: Int): Map[TopicPartition, LeaderIsrAndControllerEpoch] =
- Map(
- topicPartition10 -> LeaderIsrAndControllerEpoch(
- LeaderAndIsr(leader = 1, leaderEpoch = state, isr = List(2 + state, 3 + state), zkVersion = zkVersion),
- controllerEpoch = 4),
- topicPartition11 -> LeaderIsrAndControllerEpoch(
- LeaderAndIsr(leader = 0, leaderEpoch = state + 1, isr = List(1 + state, 2 + state), zkVersion = zkVersion),
- controllerEpoch = 4))
-
- val initialLeaderIsrAndControllerEpochs: Map[TopicPartition, LeaderIsrAndControllerEpoch] =
- leaderIsrAndControllerEpochs(0, 0)
- private def updatedLeaderIsrAndControllerEpochs(state: Int): Map[TopicPartition, LeaderIsrAndControllerEpoch] =
- leaderIsrAndControllerEpochs(state, state - 1)
-
- val initialLeaderIsrs: Map[TopicPartition, LeaderAndIsr] = initialLeaderIsrAndControllerEpochs.mapValues(_.leaderAndIsr)
- private def leaderIsrs(state: Int, zkVersion: Int): Map[TopicPartition, LeaderAndIsr] =
+ private def leaderIsrAndControllerEpochs(state: Int, zkVersion: Int) = Map(
+ topicPartition10 -> LeaderIsrAndControllerEpoch(
+ LeaderAndIsr(leader = 1, leaderEpoch = state, isr = List(2 + state, 3 + state), zkVersion = zkVersion),
+ controllerEpoch = 4),
+ topicPartition11 -> LeaderIsrAndControllerEpoch(
+ LeaderAndIsr(leader = 0, leaderEpoch = state + 1, isr = List(1 + state, 2 + state), zkVersion = zkVersion),
+ controllerEpoch = 4))
+
+ private def initialLeaderIsrAndControllerEpochs = leaderIsrAndControllerEpochs(0, 0)
+ private def updatedLeaderIsrAndControllerEpochs(state: Int) = leaderIsrAndControllerEpochs(state, state - 1)
+
+ private def initialLeaderIsrs = initialLeaderIsrAndControllerEpochs.mapValues(_.leaderAndIsr)
+ private def updatedLeaderIsrs(state: Int) = updatedLeaderIsrAndControllerEpochs(state).mapValues(_.leaderAndIsr)
+ private def leaderIsrs(state: Int, zkVersion: Int) =
leaderIsrAndControllerEpochs(state, zkVersion).mapValues(_.leaderAndIsr)
private def checkUpdateLeaderAndIsrResult(
@@ -723,7 +720,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
}
@Test
- def testUpdateLeaderAndIsr(): Unit = {
+ def testUpdateLeaderAndIsr() {
zkClient.createRecursive(TopicZNode.path(topic1))
// Non-existing topicPartitions
@@ -741,14 +738,14 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
leaderIsrs(state = 1, zkVersion = 1),
mutable.ArrayBuffer.empty,
Map.empty,
- zkClient.updateLeaderAndIsr(leaderIsrs(state = 1, zkVersion = 0),controllerEpoch = 4))
+ zkClient.updateLeaderAndIsr(updatedLeaderIsrs(1),controllerEpoch = 4))
// Try to update with wrong ZK version
checkUpdateLeaderAndIsrResult(
Map.empty,
ArrayBuffer(topicPartition10, topicPartition11),
Map.empty,
- zkClient.updateLeaderAndIsr(leaderIsrs(state = 1, zkVersion = 0),controllerEpoch = 4))
+ zkClient.updateLeaderAndIsr(updatedLeaderIsrs(1),controllerEpoch = 4))
// Trigger successful, to be retried and failed partitions in same call
val mixedState = Map(
@@ -767,7 +764,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
private def checkGetDataResponse(
leaderIsrAndControllerEpochs: Map[TopicPartition,LeaderIsrAndControllerEpoch],
topicPartition: TopicPartition,
- response: GetDataResponse): Unit = {
+ response: GetDataResponse) = {
val zkVersion = leaderIsrAndControllerEpochs(topicPartition).leaderAndIsr.zkVersion
assertEquals(Code.OK, response.resultCode)
assertEquals(TopicPartitionStateZNode.path(topicPartition), response.path)
@@ -777,11 +774,10 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
TopicPartitionStateZNode.decode(response.data, statWithVersion(zkVersion)))
}
- private def eraseMetadata(response: CreateResponse): CreateResponse =
- response.copy(metadata = ResponseMetadata(0, 0))
+ private def eraseMetadata(response: CreateResponse) = response.copy(metadata = ResponseMetadata(0, 0))
@Test
- def testGetTopicsAndPartitions(): Unit = {
+ def testGetTopicsAndPartitions() {
assertTrue(zkClient.getAllTopicsInCluster.isEmpty)
assertTrue(zkClient.getAllPartitions.isEmpty)
@@ -796,7 +792,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
}
@Test
- def testCreateAndGetTopicPartitionStatesRaw(): Unit = {
+ def testCreateAndGetTopicPartitionStatesRaw() {
zkClient.createRecursive(TopicZNode.path(topic1))
assertEquals(
@@ -823,7 +819,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
}
@Test
- def testSetTopicPartitionStatesRaw(): Unit = {
+ def testSetTopicPartitionStatesRaw() {
def expectedSetDataResponses(topicPartitions: TopicPartition*)(resultCode: Code, stat: Stat) =
topicPartitions.map { topicPartition =>
@@ -859,20 +855,21 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
}
@Test
- def testReassignPartitionsInProgress(): Unit = {
+ def testReassignPartitionsInProgress() {
assertFalse(zkClient.reassignPartitionsInProgress)
zkClient.createRecursive(ReassignPartitionsZNode.path)
assertTrue(zkClient.reassignPartitionsInProgress)
}
@Test
- def testGetTopicPartitionStates(): Unit = {
+ def testGetTopicPartitionStates() {
assertEquals(None, zkClient.getTopicPartitionState(topicPartition10))
assertEquals(None, zkClient.getLeaderForPartition(topicPartition10))
zkClient.createRecursive(TopicZNode.path(topic1))
- zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs)
+
+ zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs).map(eraseMetadata).toList
assertEquals(
initialLeaderIsrAndControllerEpochs,
zkClient.getTopicPartitionStates(Seq(topicPartition10, topicPartition11))
@@ -897,13 +894,13 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
}
- private def eraseMetadataAndStat(response: SetDataResponse): SetDataResponse = {
+ private def eraseMetadataAndStat(response: SetDataResponse) = {
val stat = if (response.stat != null) statWithVersion(response.stat.getVersion) else null
response.copy(metadata = ResponseMetadata(0, 0), stat = stat)
}
@Test
- def testControllerEpochMethods(): Unit = {
+ def testControllerEpochMethods() {
assertEquals(None, zkClient.getControllerEpoch)
assertEquals("Setting non existing nodes should return NONODE results",
@@ -930,7 +927,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
}
@Test
- def testControllerManagementMethods(): Unit = {
+ def testControllerManagementMethods() {
// No controller
assertEquals(None, zkClient.getControllerId)
// Create controller
--
To stop receiving notification emails like this one, please contact
junrao@apache.org.
[kafka] 03/09: Revert "Minor fixes requested by reviewer"
Posted by ju...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
commit ae52e1028e0ec4a13e10fba71c756e5956a43e69
Author: Jun Rao <ju...@gmail.com>
AuthorDate: Thu Mar 1 18:02:07 2018 -0800
Revert "Minor fixes requested by reviewer"
This reverts commit 1ae4c1d6095a86266ba6d008711570df0eaf5682.
---
core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
index d6826a6..5cbb76e 100644
--- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
@@ -1039,4 +1039,4 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
assertTrue(zkClient.deleteDelegationToken(tokenId))
assertEquals(None, zkClient.getDelegationTokenInfo(tokenId))
}
-}
+}
\ No newline at end of file
--
To stop receiving notification emails like this one, please contact
junrao@apache.org.
[kafka] 08/09: Revert "Fix in deleteLogDirEventNotifications - use
correct path for deleted children"
Posted by ju...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
commit f6b39067cff96a046bedee17e49d0c476138dd87
Author: Jun Rao <ju...@gmail.com>
AuthorDate: Thu Mar 1 18:02:07 2018 -0800
Revert "Fix in deleteLogDirEventNotifications - use correct path for deleted children"
This reverts commit af598af20828149bdd45336377e70d26360452fe.
---
core/src/main/scala/kafka/zk/KafkaZkClient.scala | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index 851c686..145e294 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -426,7 +426,7 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean
def deleteLogDirEventNotifications(): Unit = {
val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(LogDirEventNotificationZNode.path))
if (getChildrenResponse.resultCode == Code.OK) {
- deleteLogDirEventNotifications(getChildrenResponse.children.map(LogDirEventNotificationSequenceZNode.sequenceNumber))
+ deleteLogDirEventNotifications(getChildrenResponse.children)
} else if (getChildrenResponse.resultCode != Code.NONODE) {
getChildrenResponse.maybeThrow
}
--
To stop receiving notification emails like this one, please contact
junrao@apache.org.
[kafka] 01/09: Revert "removed method
updatedLeaderIsrAndControllerEpochs"
Posted by ju...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 720e22677b94e3607f002577f091c23714bcd515
Author: Jun Rao <ju...@gmail.com>
AuthorDate: Thu Mar 1 18:02:07 2018 -0800
Revert "removed method updatedLeaderIsrAndControllerEpochs"
This reverts commit 8100fd92afd6c4615bf7a6998a8a8eeb0d628d4e.
---
core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala | 8 +++++---
1 file changed, 5 insertions(+), 3 deletions(-)
diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
index e44c2c9..9590ecd 100644
--- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
@@ -700,6 +700,8 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
val initialLeaderIsrAndControllerEpochs: Map[TopicPartition, LeaderIsrAndControllerEpoch] =
leaderIsrAndControllerEpochs(0, 0)
+ private def updatedLeaderIsrAndControllerEpochs(state: Int): Map[TopicPartition, LeaderIsrAndControllerEpoch] =
+ leaderIsrAndControllerEpochs(state, state - 1)
val initialLeaderIsrs: Map[TopicPartition, LeaderAndIsr] = initialLeaderIsrAndControllerEpochs.mapValues(_.leaderAndIsr)
private def leaderIsrs(state: Int, zkVersion: Int): Map[TopicPartition, LeaderAndIsr] =
@@ -841,18 +843,18 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
assertEquals(
expectedSetDataResponses(topicPartition10, topicPartition11)(Code.OK, statWithVersion(1)),
- zkClient.setTopicPartitionStatesRaw(leaderIsrAndControllerEpochs(state = 1, zkVersion = 0)).map {
+ zkClient.setTopicPartitionStatesRaw(updatedLeaderIsrAndControllerEpochs(1)).map {
eraseMetadataAndStat}.toList)
val getResponses = zkClient.getTopicPartitionStatesRaw(topicPartitions10_11)
assertEquals(2, getResponses.size)
- topicPartitions10_11.zip(getResponses) foreach {case (tp, r) => checkGetDataResponse(leaderIsrAndControllerEpochs(state = 1, zkVersion = 0), tp, r)}
+ topicPartitions10_11.zip(getResponses) foreach {case (tp, r) => checkGetDataResponse(updatedLeaderIsrAndControllerEpochs(1), tp, r)}
// Other ZK client can also write the state of a partition
assertEquals(
expectedSetDataResponses(topicPartition10, topicPartition11)(Code.OK, statWithVersion(2)),
- otherZkClient.setTopicPartitionStatesRaw(leaderIsrAndControllerEpochs(state = 2, zkVersion = 1)).map {
+ otherZkClient.setTopicPartitionStatesRaw(updatedLeaderIsrAndControllerEpochs(2)).map {
eraseMetadataAndStat}.toList)
}
--
To stop receiving notification emails like this one, please contact
junrao@apache.org.
[kafka] 09/09: Revert "Fix in updateBrokerInfoInZk,
exception is thrown if response was not OK."
Posted by ju...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 57c1a687eb4a9131ddbaddd05ea9e1da0d12450d
Author: Jun Rao <ju...@gmail.com>
AuthorDate: Thu Mar 1 18:02:07 2018 -0800
Revert "Fix in updateBrokerInfoInZk, exception is thrown if response was not OK."
This reverts commit 010310388725d6393a73e12c02dff4bb85cf2518.
---
core/src/main/scala/kafka/zk/KafkaZkClient.scala | 4 +---
1 file changed, 1 insertion(+), 3 deletions(-)
diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index 145e294..6545fde 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -88,9 +88,7 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean
def updateBrokerInfoInZk(brokerInfo: BrokerInfo): Unit = {
val brokerIdPath = brokerInfo.path
val setDataRequest = SetDataRequest(brokerIdPath, brokerInfo.toJsonBytes, ZkVersion.NoVersion)
- val response = retryRequestUntilConnected(setDataRequest)
- if (response.resultCode != Code.OK)
- throw KeeperException.create(response.resultCode)
+ retryRequestUntilConnected(setDataRequest)
info("Updated broker %d at path %s with addresses: %s".format(brokerInfo.broker.id, brokerIdPath, brokerInfo.broker.endPoints))
}
--
To stop receiving notification emails like this one, please contact
junrao@apache.org.
[kafka] 05/09: Revert "Undo unintentional whitespace change in
ZooKeeperTestHarness"
Posted by ju...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 0724179a34114368e81eda9ff11b3ec883c760c5
Author: Jun Rao <ju...@gmail.com>
AuthorDate: Thu Mar 1 18:02:07 2018 -0800
Revert "Undo unintentional whitespace change in ZooKeeperTestHarness"
This reverts commit 155ac5903aec0f682e19a0f1f17c1091f60c6da5.
---
core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
index a122297..af2d53a 100755
--- a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
@@ -51,7 +51,7 @@ abstract class ZooKeeperTestHarness extends JUnitSuite with Logging {
def zkPort: Int = zookeeper.port
def zkConnect: String = s"127.0.0.1:$zkPort"
-
+
@Before
def setUp() {
zookeeper = new EmbeddedZookeeper()
--
To stop receiving notification emails like this one, please contact
junrao@apache.org.
[kafka] 06/09: Revert "Use move otherZkClient to KafkaZkClientTest"
Posted by ju...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 6f24af3cd148af58be9930b2bca7bae477110fbb
Author: Jun Rao <ju...@gmail.com>
AuthorDate: Thu Mar 1 18:02:07 2018 -0800
Revert "Use move otherZkClient to KafkaZkClientTest"
This reverts commit 54a32205b0fa7545d450104bc28aacf153b70cf6.
---
.../scala/unit/kafka/zk/KafkaZkClientTest.scala | 22 ++--------------------
.../scala/unit/kafka/zk/ZooKeeperTestHarness.scala | 20 ++++++++++++++------
2 files changed, 16 insertions(+), 26 deletions(-)
diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
index 28dbb73..9329430 100644
--- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
@@ -19,7 +19,6 @@ package kafka.zk
import java.util.{Collections, Properties, UUID}
import java.nio.charset.StandardCharsets.UTF_8
import java.util.concurrent.{CountDownLatch, TimeUnit}
-import javax.security.auth.login.Configuration
import kafka.api.{ApiVersion, LeaderAndIsr}
import kafka.cluster.{Broker, EndPoint}
@@ -31,10 +30,10 @@ import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.common.security.token.delegation.TokenInformation
-import org.apache.kafka.common.utils.{SecurityUtils, Time}
+import org.apache.kafka.common.utils.SecurityUtils
import org.apache.zookeeper.KeeperException.{Code, NoNodeException, NodeExistsException}
import org.junit.Assert._
-import org.junit.{After, Before, Test}
+import org.junit.Test
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.collection.{Seq, mutable}
@@ -43,7 +42,6 @@ import scala.util.Random
import kafka.controller.LeaderIsrAndControllerEpoch
import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult
import kafka.zookeeper._
-import org.apache.kafka.common.security.JaasUtils
import org.apache.zookeeper.data.Stat
class KafkaZkClientTest extends ZooKeeperTestHarness {
@@ -57,22 +55,6 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
val topicPartition20 = new TopicPartition(topic2, 0)
val topicPartitions10_11 = Seq(topicPartition10, topicPartition11)
- var otherZkClient: KafkaZkClient = null
-
- @Before
- override def setUp() {
- super.setUp()
- otherZkClient = KafkaZkClient(zkConnect, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled), zkSessionTimeout,
- zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM)
- }
-
- @After
- override def tearDown() {
- if (otherZkClient != null)
- otherZkClient.close()
- super.tearDown()
- }
-
private val topicPartition = new TopicPartition("topic", 0)
@Test
diff --git a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
index af2d53a..f9cb8e3 100755
--- a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
@@ -19,16 +19,16 @@ package kafka.zk
import javax.security.auth.login.Configuration
-import kafka.utils.{CoreUtils, Logging, TestUtils}
+import kafka.utils.{CoreUtils, Logging, TestUtils, ZkUtils}
import org.junit.{After, AfterClass, Before, BeforeClass}
import org.junit.Assert._
import org.scalatest.junit.JUnitSuite
import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.test.IntegrationTest
import org.junit.experimental.categories.Category
-
import scala.collection.Set
import scala.collection.JavaConverters._
+
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.consumer.internals.AbstractCoordinator
import kafka.controller.ControllerEventManager
@@ -45,25 +45,33 @@ abstract class ZooKeeperTestHarness extends JUnitSuite with Logging {
protected val zkAclsEnabled: Option[Boolean] = None
var zkClient: KafkaZkClient = null
+ var otherZkClient: KafkaZkClient = null
var adminZkClient: AdminZkClient = null
var zookeeper: EmbeddedZookeeper = null
def zkPort: Int = zookeeper.port
def zkConnect: String = s"127.0.0.1:$zkPort"
-
+
@Before
def setUp() {
zookeeper = new EmbeddedZookeeper()
- zkClient = KafkaZkClient(zkConnect, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled), zkSessionTimeout,
- zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM)
+ zkClient = createZkClient
+ otherZkClient = createZkClient
adminZkClient = new AdminZkClient(zkClient)
}
+ protected def createZkClient = {
+ KafkaZkClient(zkConnect, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled), zkSessionTimeout,
+ zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM)
+ }
+
@After
def tearDown() {
if (zkClient != null)
- zkClient.close()
+ zkClient.close()
+ if (otherZkClient != null)
+ otherZkClient.close()
if (zookeeper != null)
CoreUtils.swallow(zookeeper.shutdown(), this)
Configuration.setConfiguration(null)
--
To stop receiving notification emails like this one, please contact
junrao@apache.org.
[kafka] 04/09: Revert "Fixes requested by reviewer"
Posted by ju...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 4110ad5208621a4bc0d184e95c909662b2819204
Author: Jun Rao <ju...@gmail.com>
AuthorDate: Thu Mar 1 18:02:07 2018 -0800
Revert "Fixes requested by reviewer"
This reverts commit ae51a15026e0dc10ef2df4bcafc85807dc8b4eb6.
---
core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
index 5cbb76e..28dbb73 100644
--- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
@@ -19,6 +19,7 @@ package kafka.zk
import java.util.{Collections, Properties, UUID}
import java.nio.charset.StandardCharsets.UTF_8
import java.util.concurrent.{CountDownLatch, TimeUnit}
+import javax.security.auth.login.Configuration
import kafka.api.{ApiVersion, LeaderAndIsr}
import kafka.cluster.{Broker, EndPoint}
@@ -56,7 +57,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
val topicPartition20 = new TopicPartition(topic2, 0)
val topicPartitions10_11 = Seq(topicPartition10, topicPartition11)
- var otherZkClient: KafkaZkClient = _
+ var otherZkClient: KafkaZkClient = null
@Before
override def setUp() {
--
To stop receiving notification emails like this one, please contact
junrao@apache.org.