You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2018/03/02 01:48:32 UTC
[kafka] branch trunk updated (6cfcc9d -> 8100fd9)
This is an automated email from the ASF dual-hosted git repository.
junrao pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git.
from 6cfcc9d KAFKA-6593; Fix livelock with consumer heartbeat thread in commitSync (#4625)
new 0103103 Fix in updateBrokerInfoInZk, exception is thrown if response was not OK.
new af598af Fix in deleteLogDirEventNotifications - use correct path for deleted children
new d0eb552 Additional tests to improve test coverage of KafkaZkClient.
new 54a3220 Use move otherZkClient to KafkaZkClientTest
new 155ac59 Undo unintentional whitespace change in ZooKeeperTestHarness
new ae51a15 Fixes requested by reviewer
new 1ae4c1d Minor fixes requested by reviewer
new cacd377 Changes requeted by reviewer
new 8100fd9 removed method updatedLeaderIsrAndControllerEpochs
The 9 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
Summary of changes:
core/src/main/scala/kafka/zk/KafkaZkClient.scala | 5 +-
.../scala/unit/kafka/zk/KafkaZkClientTest.scala | 576 ++++++++++++++++++++-
2 files changed, 555 insertions(+), 26 deletions(-)
--
To stop receiving notification emails like this one, please contact
junrao@apache.org.
[kafka] 02/09: Fix in deleteLogDirEventNotifications - use correct
path for deleted children
Posted by ju...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
commit af598af20828149bdd45336377e70d26360452fe
Author: Sandor Murakozi <sm...@gmail.com>
AuthorDate: Mon Feb 19 16:51:38 2018 +0100
Fix in deleteLogDirEventNotifications - use correct path for deleted children
---
core/src/main/scala/kafka/zk/KafkaZkClient.scala | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index 145e294..851c686 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -426,7 +426,7 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean
def deleteLogDirEventNotifications(): Unit = {
val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(LogDirEventNotificationZNode.path))
if (getChildrenResponse.resultCode == Code.OK) {
- deleteLogDirEventNotifications(getChildrenResponse.children)
+ deleteLogDirEventNotifications(getChildrenResponse.children.map(LogDirEventNotificationSequenceZNode.sequenceNumber))
} else if (getChildrenResponse.resultCode != Code.NONODE) {
getChildrenResponse.maybeThrow
}
--
To stop receiving notification emails like this one, please contact
junrao@apache.org.
[kafka] 03/09: Additional tests to improve test coverage of
KafkaZkClient.
Posted by ju...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
commit d0eb552a891ecd99150ddf4b89985525806c6e31
Author: Sandor Murakozi <sm...@gmail.com>
AuthorDate: Mon Feb 19 16:52:46 2018 +0100
Additional tests to improve test coverage of KafkaZkClient.
---
.../scala/unit/kafka/zk/KafkaZkClientTest.scala | 558 ++++++++++++++++++++-
.../scala/unit/kafka/zk/ZooKeeperTestHarness.scala | 18 +-
2 files changed, 547 insertions(+), 29 deletions(-)
diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
index d3726c2..9329430 100644
--- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
@@ -16,10 +16,11 @@
*/
package kafka.zk
-import java.util.{Properties, UUID}
+import java.util.{Collections, Properties, UUID}
import java.nio.charset.StandardCharsets.UTF_8
+import java.util.concurrent.{CountDownLatch, TimeUnit}
-import kafka.api.ApiVersion
+import kafka.api.{ApiVersion, LeaderAndIsr}
import kafka.cluster.{Broker, EndPoint}
import kafka.log.LogConfig
import kafka.security.auth._
@@ -30,16 +31,30 @@ import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.common.security.token.delegation.TokenInformation
import org.apache.kafka.common.utils.SecurityUtils
-import org.apache.zookeeper.KeeperException.NodeExistsException
+import org.apache.zookeeper.KeeperException.{Code, NoNodeException, NodeExistsException}
import org.junit.Assert._
import org.junit.Test
-
import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.{Seq, mutable}
import scala.util.Random
+import kafka.controller.LeaderIsrAndControllerEpoch
+import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult
+import kafka.zookeeper._
+import org.apache.zookeeper.data.Stat
+
class KafkaZkClientTest extends ZooKeeperTestHarness {
private val group = "my-group"
+ private val topic1 = "topic1"
+ private val topic2 = "topic2"
+
+ val topicPartition10 = new TopicPartition(topic1, 0)
+ val topicPartition11 = new TopicPartition(topic1, 1)
+ val topicPartition20 = new TopicPartition(topic2, 0)
+ val topicPartitions10_11 = Seq(topicPartition10, topicPartition11)
+
private val topicPartition = new TopicPartition("topic", 0)
@Test
@@ -90,17 +105,18 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
@Test
def testTopicAssignmentMethods() {
- val topic1 = "topic1"
- val topic2 = "topic2"
+ assertTrue(zkClient.getAllTopicsInCluster.isEmpty)
// test with non-existing topic
+ assertFalse(zkClient.topicExists(topic1))
assertTrue(zkClient.getTopicPartitionCount(topic1).isEmpty)
assertTrue(zkClient.getPartitionAssignmentForTopics(Set(topic1)).isEmpty)
assertTrue(zkClient.getPartitionsForTopics(Set(topic1)).isEmpty)
assertTrue(zkClient.getReplicasForPartition(new TopicPartition(topic1, 2)).isEmpty)
+ val topicPartition = new TopicPartition(topic1, 0)
val assignment = Map(
- new TopicPartition(topic1, 0) -> Seq(0, 1),
+ topicPartition -> Seq(0, 1),
new TopicPartition(topic1, 1) -> Seq(0, 1),
new TopicPartition(topic1, 2) -> Seq(1, 2, 3)
)
@@ -108,6 +124,8 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
// create a topic assignment
zkClient.createTopicAssignment(topic1, assignment)
+ assertTrue(zkClient.topicExists(topic1))
+
val expectedAssignment = assignment map { topicAssignment =>
val partition = topicAssignment._1.partition
val assignment = topicAssignment._2
@@ -215,6 +233,43 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
}
@Test
+ def testIsrChangeNotificationGetters(): Unit = {
+ assertEquals("Failed for non existing parent ZK node", Seq.empty, zkClient.getAllIsrChangeNotifications)
+ assertEquals("Failed for non existing parent ZK node", Seq.empty, zkClient.getPartitionsFromIsrChangeNotifications(Seq("0000000000")))
+
+ zkClient.createRecursive("/isr_change_notification")
+
+ zkClient.propagateIsrChanges(Set(topicPartition10, topicPartition11))
+ zkClient.propagateIsrChanges(Set(topicPartition10))
+
+ assertEquals(Set("0000000000", "0000000001"), zkClient.getAllIsrChangeNotifications.toSet)
+
+ // A partition can have multiple notifications
+ assertEquals(Seq(topicPartition10, topicPartition11, topicPartition10),
+ zkClient.getPartitionsFromIsrChangeNotifications(Seq("0000000000", "0000000001")))
+ }
+
+ @Test
+ def testIsrChangeNotificationsDeletion(): Unit = {
+ // Should not fail even if parent node does not exist
+ zkClient.deleteIsrChangeNotifications(Seq("0000000000"))
+
+ zkClient.createRecursive("/isr_change_notification")
+
+ zkClient.propagateIsrChanges(Set(topicPartition10, topicPartition11))
+ zkClient.propagateIsrChanges(Set(topicPartition10))
+ zkClient.propagateIsrChanges(Set(topicPartition11))
+
+ zkClient.deleteIsrChangeNotifications(Seq("0000000001"))
+ // Should not fail if called on a non-existent notification
+ zkClient.deleteIsrChangeNotifications(Seq("0000000001"))
+
+ assertEquals(Set("0000000000", "0000000002"), zkClient.getAllIsrChangeNotifications.toSet)
+ zkClient.deleteIsrChangeNotifications()
+ assertEquals(Seq.empty,zkClient.getAllIsrChangeNotifications)
+ }
+
+ @Test
def testPropagateLogDir(): Unit = {
zkClient.createRecursive("/log_dir_event_notification")
@@ -238,6 +293,52 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
}
@Test
+ def testLogDirGetters(): Unit = {
+ assertEquals("Failed for non existing parent ZK node", Seq.empty, zkClient.getAllLogDirEventNotifications)
+ assertEquals("Failed for non existing parent ZK node", Seq.empty, zkClient.getBrokerIdsFromLogDirEvents(Seq("0000000000")))
+
+ zkClient.createRecursive("/log_dir_event_notification")
+
+ val brokerId = 3
+ zkClient.propagateLogDirEvent(brokerId)
+
+ assertEquals(Seq(3), zkClient.getBrokerIdsFromLogDirEvents(Seq("0000000000")))
+
+ zkClient.propagateLogDirEvent(brokerId)
+
+ val anotherBrokerId = 4
+ zkClient.propagateLogDirEvent(anotherBrokerId)
+
+ val notifications012 = Seq("0000000000", "0000000001", "0000000002")
+ assertEquals(notifications012.toSet, zkClient.getAllLogDirEventNotifications.toSet)
+ assertEquals(Seq(3, 3, 4), zkClient.getBrokerIdsFromLogDirEvents(notifications012))
+ }
+
+ @Test
+ def testLogDirEventNotificationsDeletion(): Unit = {
+ // Should not fail even if parent node does not exist
+ zkClient.deleteLogDirEventNotifications(Seq("0000000000", "0000000002"))
+
+ zkClient.createRecursive("/log_dir_event_notification")
+
+ val brokerId = 3
+ val anotherBrokerId = 4
+
+ zkClient.propagateLogDirEvent(brokerId)
+ zkClient.propagateLogDirEvent(brokerId)
+ zkClient.propagateLogDirEvent(anotherBrokerId)
+
+ zkClient.deleteLogDirEventNotifications(Seq("0000000000", "0000000002"))
+
+ assertEquals(Seq("0000000001"), zkClient.getAllLogDirEventNotifications)
+
+ zkClient.propagateLogDirEvent(anotherBrokerId)
+
+ zkClient.deleteLogDirEventNotifications()
+ assertEquals(Seq.empty, zkClient.getAllLogDirEventNotifications)
+ }
+
+ @Test
def testSetGetAndDeletePartitionReassignment() {
zkClient.createRecursive(AdminZNode.path)
@@ -377,10 +478,23 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
}
@Test
- def testDeleteTopicPathMethods() {
- val topic1 = "topic1"
- val topic2 = "topic2"
+ def testDeletePath() {
+ val path = "/a/b/c"
+ zkClient.createRecursive(path)
+ zkClient.deletePath(path)
+ assertFalse(zkClient.pathExists(path))
+ }
+ @Test
+ def testDeleteTopicZNode(): Unit ={
+ zkClient.deleteTopicZNode(topic1)
+ zkClient.createRecursive(TopicZNode.path(topic1))
+ zkClient.deleteTopicZNode(topic1)
+ assertFalse(zkClient.pathExists(TopicZNode.path(topic1)))
+ }
+
+ @Test
+ def testDeleteTopicPathMethods() {
assertFalse(zkClient.isTopicMarkedForDeletion(topic1))
assertTrue(zkClient.getTopicDeletions.isEmpty)
@@ -394,17 +508,25 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
assertTrue(zkClient.getTopicDeletions.isEmpty)
}
+ private def assertPathExistenceAndData(expectedPath: String, data: String){
+ assertTrue(zkClient.pathExists(expectedPath))
+ assertEquals(Some(data), dataAsString(expectedPath))
+ }
+
@Test
- def testEntityConfigManagementMethods() {
- val topic1 = "topic1"
- val topic2 = "topic2"
+ def testCreateTokenChangeNotification() {
+ intercept[NoNodeException] {
+ zkClient.createTokenChangeNotification("delegationToken")
+ }
+ zkClient.createDelegationTokenPaths()
- assertTrue(zkClient.getEntityConfigs(ConfigType.Topic, topic1).isEmpty)
+ zkClient.createTokenChangeNotification("delegationToken")
+ assertPathExistenceAndData(s"/delegation_token/token_changes/token_change_0000000000", "delegationToken")
+ }
- val logProps = new Properties()
- logProps.put(LogConfig.SegmentBytesProp, "1024")
- logProps.put(LogConfig.SegmentIndexBytesProp, "1024")
- logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
+ @Test
+ def testEntityConfigManagementMethods() {
+ assertTrue(zkClient.getEntityConfigs(ConfigType.Topic, topic1).isEmpty)
zkClient.setOrCreateEntityConfigs(ConfigType.Topic, topic1, logProps)
assertEquals(logProps, zkClient.getEntityConfigs(ConfigType.Topic, topic1))
@@ -421,15 +543,399 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
}
@Test
- def testBrokerRegistrationMethods() {
+ def testCreateConfigChangeNotification() {
+ intercept[NoNodeException] {
+ zkClient.createConfigChangeNotification(ConfigEntityZNode.path(ConfigType.Topic, topic1))
+ }
+
+ zkClient.createTopLevelPaths()
+ zkClient.createConfigChangeNotification(ConfigEntityZNode.path(ConfigType.Topic, topic1))
+
+ assertPathExistenceAndData(
+ s"/config/changes/config_change_0000000000",
+ """{"version":2,"entity_path":"/config/topics/topic1"}""")
+ }
+
+ private def createLogProps(bytesProp: Int) = {
+ val logProps = new Properties()
+ logProps.put(LogConfig.SegmentBytesProp, bytesProp.toString)
+ logProps.put(LogConfig.SegmentIndexBytesProp, bytesProp.toString)
+ logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
+ logProps
+ }
+
+ private val logProps = createLogProps(1024)
+
+ @Test
+ def testGetLogConfigs() {
+ val emptyConfig = LogConfig(Collections.emptyMap())
+ assertEquals("Non existent config, no defaults",
+ (Map(topic1 -> emptyConfig), Map.empty),
+ zkClient.getLogConfigs(Seq(topic1), Collections.emptyMap()))
+
+ val logProps2 = createLogProps(2048)
+
+ zkClient.setOrCreateEntityConfigs(ConfigType.Topic, topic1, logProps)
+ assertEquals("One existing and one non-existent topic",
+ (Map(topic1 -> LogConfig(logProps), topic2 -> emptyConfig), Map.empty),
+ zkClient.getLogConfigs(Seq(topic1, topic2), Collections.emptyMap()))
+
+ zkClient.setOrCreateEntityConfigs(ConfigType.Topic, topic2, logProps2)
+ assertEquals("Two existing topics",
+ (Map(topic1 -> LogConfig(logProps), topic2 -> LogConfig(logProps2)), Map.empty),
+ zkClient.getLogConfigs(Seq(topic1, topic2), Collections.emptyMap()))
+
+ val logProps1WithMoreValues = createLogProps(1024)
+ logProps1WithMoreValues.put(LogConfig.SegmentJitterMsProp, "100")
+ logProps1WithMoreValues.put(LogConfig.SegmentBytesProp, "1024")
+
+ assertEquals("Config with defaults",
+ (Map(topic1 -> LogConfig(logProps1WithMoreValues)), Map.empty),
+ zkClient.getLogConfigs(Seq(topic1),
+ Map[String, AnyRef](LogConfig.SegmentJitterMsProp -> "100", LogConfig.SegmentBytesProp -> "128").asJava))
+ }
+
+ private def createBrokerInfo(id: Int, host: String, port: Int,
+ securityProtocol: SecurityProtocol, rack: Option[String] = None) =
+ BrokerInfo(Broker(id, Seq(new EndPoint(host, port, ListenerName.forSecurityProtocol
+ (securityProtocol), securityProtocol)), rack = rack), ApiVersion.latestVersion, jmxPort = port + 10)
+
+ @Test
+ def testRegisterBrokerInfo() {
zkClient.createTopLevelPaths()
- val brokerInfo = BrokerInfo(Broker(1,
- Seq(new EndPoint("test.host", 9999, ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), SecurityProtocol.PLAINTEXT)),
- rack = None), ApiVersion.latestVersion, jmxPort = 9998)
+ val brokerInfo = createBrokerInfo(1, "test.host", 9999, SecurityProtocol.PLAINTEXT)
+ val differentBrokerInfoWithSameId = createBrokerInfo(1, "test.host2", 9995, SecurityProtocol.SSL)
zkClient.registerBrokerInZk(brokerInfo)
assertEquals(Some(brokerInfo.broker), zkClient.getBroker(1))
+ assertEquals("Other ZK clients can read broker info", Some(brokerInfo.broker), otherZkClient.getBroker(1))
+
+ // Node exists, owned by current session - no error, no update
+ zkClient.registerBrokerInZk(differentBrokerInfoWithSameId)
+ assertEquals(Some(brokerInfo.broker), zkClient.getBroker(1))
+
+ // Other client tries to register broker with same id causes failure, info is not changed in ZK
+ intercept[NodeExistsException] {
+ otherZkClient.registerBrokerInZk(differentBrokerInfoWithSameId)
+ }
+ assertEquals(Some(brokerInfo.broker), zkClient.getBroker(1))
+ }
+
+ @Test
+ def testGetBrokerMethods() {
+ zkClient.createTopLevelPaths()
+
+ assertEquals(Seq.empty,zkClient.getAllBrokersInCluster)
+ assertEquals(Seq.empty, zkClient.getSortedBrokerList())
+ assertEquals(None, zkClient.getBroker(0))
+
+ val brokerInfo0 = createBrokerInfo(0, "test.host0", 9998, SecurityProtocol.PLAINTEXT)
+ val brokerInfo1 = createBrokerInfo(1, "test.host1", 9999, SecurityProtocol.SSL)
+
+ zkClient.registerBrokerInZk(brokerInfo1)
+ otherZkClient.registerBrokerInZk(brokerInfo0)
+
+ assertEquals(Seq(0, 1), zkClient.getSortedBrokerList())
+ assertEquals(
+ Seq(brokerInfo0.broker, brokerInfo1.broker),
+ zkClient.getAllBrokersInCluster
+ )
+ assertEquals(Some(brokerInfo0.broker), zkClient.getBroker(0))
+ }
+
+ @Test
+ def testUpdateBrokerInfo() {
+ zkClient.createTopLevelPaths()
+
+ // Updating info of a broker not existing in ZK fails
+ val originalBrokerInfo = createBrokerInfo(1, "test.host", 9999, SecurityProtocol.PLAINTEXT)
+ intercept[NoNodeException]{
+ zkClient.updateBrokerInfoInZk(originalBrokerInfo)
+ }
+
+ zkClient.registerBrokerInZk(originalBrokerInfo)
+
+ val updatedBrokerInfo = createBrokerInfo(1, "test.host2", 9995, SecurityProtocol.SSL)
+ zkClient.updateBrokerInfoInZk(updatedBrokerInfo)
+ assertEquals(Some(updatedBrokerInfo.broker), zkClient.getBroker(1))
+
+ // Other ZK clients can update info
+ otherZkClient.updateBrokerInfoInZk(originalBrokerInfo)
+ assertEquals(Some(originalBrokerInfo.broker), otherZkClient.getBroker(1))
+ }
+
+ private def statWithVersion(version: Int) = {
+ val stat = new Stat(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)
+ stat.setVersion(version)
+ stat
+ }
+
+ private def leaderIsrAndControllerEpochs(state: Int, zkVersion: Int) = Map(
+ topicPartition10 -> LeaderIsrAndControllerEpoch(
+ LeaderAndIsr(leader = 1, leaderEpoch = state, isr = List(2 + state, 3 + state), zkVersion = zkVersion),
+ controllerEpoch = 4),
+ topicPartition11 -> LeaderIsrAndControllerEpoch(
+ LeaderAndIsr(leader = 0, leaderEpoch = state + 1, isr = List(1 + state, 2 + state), zkVersion = zkVersion),
+ controllerEpoch = 4))
+
+ private def initialLeaderIsrAndControllerEpochs = leaderIsrAndControllerEpochs(0, 0)
+ private def updatedLeaderIsrAndControllerEpochs(state: Int) = leaderIsrAndControllerEpochs(state, state - 1)
+
+ private def initialLeaderIsrs = initialLeaderIsrAndControllerEpochs.mapValues(_.leaderAndIsr)
+ private def updatedLeaderIsrs(state: Int) = updatedLeaderIsrAndControllerEpochs(state).mapValues(_.leaderAndIsr)
+ private def leaderIsrs(state: Int, zkVersion: Int) =
+ leaderIsrAndControllerEpochs(state, zkVersion).mapValues(_.leaderAndIsr)
+
+ private def checkUpdateLeaderAndIsrResult(
+ expectedSuccessfulPartitions: Map[TopicPartition, LeaderAndIsr],
+ expectedPartitionsToRetry: Seq[TopicPartition],
+ expectedFailedPartitions: Map[TopicPartition, (Class[_], String)],
+ actualUpdateLeaderAndIsrResult: UpdateLeaderAndIsrResult): Unit = {
+ val failedPartitionsExcerpt =
+ actualUpdateLeaderAndIsrResult.failedPartitions.mapValues(e => (e.getClass, e.getMessage))
+ assertEquals("Permanently failed updates do not match expected",
+ expectedFailedPartitions, failedPartitionsExcerpt)
+ assertEquals("Retriable updates (due to BADVERSION) do not match expected",
+ expectedPartitionsToRetry, actualUpdateLeaderAndIsrResult.partitionsToRetry)
+ assertEquals("Successful updates do not match expected",
+ expectedSuccessfulPartitions, actualUpdateLeaderAndIsrResult.successfulPartitions)
+ }
+
+ @Test
+ def testUpdateLeaderAndIsr() {
+ zkClient.createRecursive(TopicZNode.path(topic1))
+
+ // Non-existing topicPartitions
+ checkUpdateLeaderAndIsrResult(
+ Map.empty,
+ mutable.ArrayBuffer.empty,
+ Map(
+ topicPartition10 -> (classOf[NoNodeException], "KeeperErrorCode = NoNode for /brokers/topics/topic1/partitions/0/state"),
+ topicPartition11 -> (classOf[NoNodeException], "KeeperErrorCode = NoNode for /brokers/topics/topic1/partitions/1/state")),
+ zkClient.updateLeaderAndIsr(initialLeaderIsrs, controllerEpoch = 4))
+
+ zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs)
+
+ checkUpdateLeaderAndIsrResult(
+ leaderIsrs(state = 1, zkVersion = 1),
+ mutable.ArrayBuffer.empty,
+ Map.empty,
+ zkClient.updateLeaderAndIsr(updatedLeaderIsrs(1),controllerEpoch = 4))
+
+ // Try to update with wrong ZK version
+ checkUpdateLeaderAndIsrResult(
+ Map.empty,
+ ArrayBuffer(topicPartition10, topicPartition11),
+ Map.empty,
+ zkClient.updateLeaderAndIsr(updatedLeaderIsrs(1),controllerEpoch = 4))
+
+ // Trigger successful, to be retried and failed partitions in same call
+ val mixedState = Map(
+ topicPartition10 -> LeaderAndIsr(leader = 1, leaderEpoch = 2, isr = List(4, 5), zkVersion = 1),
+ topicPartition11 -> LeaderAndIsr(leader = 0, leaderEpoch = 2, isr = List(3, 4), zkVersion = 0),
+ topicPartition20 -> LeaderAndIsr(leader = 0, leaderEpoch = 2, isr = List(3, 4), zkVersion = 0))
+
+ checkUpdateLeaderAndIsrResult(
+ leaderIsrs(state = 2, zkVersion = 2).filterKeys{_ == topicPartition10},
+ ArrayBuffer(topicPartition11),
+ Map(
+ topicPartition20 -> (classOf[NoNodeException], "KeeperErrorCode = NoNode for /brokers/topics/topic2/partitions/0/state")),
+ zkClient.updateLeaderAndIsr(mixedState, controllerEpoch = 4))
+ }
+
+ private def checkGetDataResponse(
+ leaderIsrAndControllerEpochs: Map[TopicPartition,LeaderIsrAndControllerEpoch],
+ topicPartition: TopicPartition,
+ response: GetDataResponse) = {
+ val zkVersion = leaderIsrAndControllerEpochs(topicPartition).leaderAndIsr.zkVersion
+ assertEquals(Code.OK, response.resultCode)
+ assertEquals(TopicPartitionStateZNode.path(topicPartition), response.path)
+ assertEquals(Some(topicPartition), response.ctx)
+ assertEquals(
+ Some(leaderIsrAndControllerEpochs(topicPartition)),
+ TopicPartitionStateZNode.decode(response.data, statWithVersion(zkVersion)))
+ }
+
+ private def eraseMetadata(response: CreateResponse) = response.copy(metadata = ResponseMetadata(0, 0))
+
+ @Test
+ def testGetTopicsAndPartitions() {
+ assertTrue(zkClient.getAllTopicsInCluster.isEmpty)
+ assertTrue(zkClient.getAllPartitions.isEmpty)
+
+ zkClient.createRecursive(TopicZNode.path(topic1))
+ zkClient.createRecursive(TopicZNode.path(topic2))
+ assertEquals(Set(topic1, topic2), zkClient.getAllTopicsInCluster.toSet)
+
+ assertTrue(zkClient.getAllPartitions.isEmpty)
+
+ zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs)
+ assertEquals(Set(topicPartition10, topicPartition11), zkClient.getAllPartitions)
+ }
+
+ @Test
+ def testCreateAndGetTopicPartitionStatesRaw() {
+ zkClient.createRecursive(TopicZNode.path(topic1))
+
+ assertEquals(
+ Seq(
+ CreateResponse(Code.OK, TopicPartitionStateZNode.path(topicPartition10), Some(topicPartition10),
+ TopicPartitionStateZNode.path(topicPartition10), ResponseMetadata(0, 0)),
+ CreateResponse(Code.OK, TopicPartitionStateZNode.path(topicPartition11), Some(topicPartition11),
+ TopicPartitionStateZNode.path(topicPartition11), ResponseMetadata(0, 0))),
+ zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs)
+ .map(eraseMetadata).toList)
+
+ val getResponses = zkClient.getTopicPartitionStatesRaw(topicPartitions10_11)
+ assertEquals(2, getResponses.size)
+ topicPartitions10_11.zip(getResponses) foreach {case (tp, r) => checkGetDataResponse(initialLeaderIsrAndControllerEpochs, tp, r)}
+
+ // Trying to create existing topicPartition states fails
+ assertEquals(
+ Seq(
+ CreateResponse(Code.NODEEXISTS, TopicPartitionStateZNode.path(topicPartition10), Some(topicPartition10),
+ null, ResponseMetadata(0, 0)),
+ CreateResponse(Code.NODEEXISTS, TopicPartitionStateZNode.path(topicPartition11), Some(topicPartition11),
+ null, ResponseMetadata(0, 0))),
+ zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs).map(eraseMetadata).toList)
+ }
+
+ @Test
+ def testSetTopicPartitionStatesRaw() {
+
+ def expectedSetDataResponses(topicPartitions: TopicPartition*)(resultCode: Code, stat: Stat) =
+ topicPartitions.map { topicPartition =>
+ SetDataResponse(resultCode, TopicPartitionStateZNode.path(topicPartition),
+ Some(topicPartition), stat, ResponseMetadata(0, 0))
+ }
+
+ zkClient.createRecursive(TopicZNode.path(topic1))
+
+ // Trying to set non-existing topicPartition's data results in NONODE responses
+ assertEquals(
+ expectedSetDataResponses(topicPartition10, topicPartition11)(Code.NONODE, null),
+ zkClient.setTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs).map {
+ _.copy(metadata = ResponseMetadata(0, 0))}.toList)
+
+ zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs)
+
+ assertEquals(
+ expectedSetDataResponses(topicPartition10, topicPartition11)(Code.OK, statWithVersion(1)),
+ zkClient.setTopicPartitionStatesRaw(updatedLeaderIsrAndControllerEpochs(1)).map {
+ eraseMetadataAndStat}.toList)
+
+
+ val getResponses = zkClient.getTopicPartitionStatesRaw(topicPartitions10_11)
+ assertEquals(2, getResponses.size)
+ topicPartitions10_11.zip(getResponses) foreach {case (tp, r) => checkGetDataResponse(updatedLeaderIsrAndControllerEpochs(1), tp, r)}
+
+ // Other ZK client can also write the state of a partition
+ assertEquals(
+ expectedSetDataResponses(topicPartition10, topicPartition11)(Code.OK, statWithVersion(2)),
+ otherZkClient.setTopicPartitionStatesRaw(updatedLeaderIsrAndControllerEpochs(2)).map {
+ eraseMetadataAndStat}.toList)
+ }
+
+ @Test
+ def testReassignPartitionsInProgress() {
+ assertFalse(zkClient.reassignPartitionsInProgress)
+ zkClient.createRecursive(ReassignPartitionsZNode.path)
+ assertTrue(zkClient.reassignPartitionsInProgress)
+ }
+
+ @Test
+ def testGetTopicPartitionStates() {
+ assertEquals(None, zkClient.getTopicPartitionState(topicPartition10))
+ assertEquals(None, zkClient.getLeaderForPartition(topicPartition10))
+
+ zkClient.createRecursive(TopicZNode.path(topic1))
+
+
+ zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs).map(eraseMetadata).toList
+ assertEquals(
+ initialLeaderIsrAndControllerEpochs,
+ zkClient.getTopicPartitionStates(Seq(topicPartition10, topicPartition11))
+ )
+
+ assertEquals(
+ Some(initialLeaderIsrAndControllerEpochs(topicPartition10)),
+ zkClient.getTopicPartitionState(topicPartition10)
+ )
+
+ assertEquals(Some(1), zkClient.getLeaderForPartition(topicPartition10))
+
+ val notExistingPartition = new TopicPartition(topic1, 2)
+ assertTrue(zkClient.getTopicPartitionStates(Seq(notExistingPartition)).isEmpty)
+ assertEquals(
+ Map(topicPartition10 -> initialLeaderIsrAndControllerEpochs(topicPartition10)),
+ zkClient.getTopicPartitionStates(Seq(topicPartition10, notExistingPartition))
+ )
+
+ assertEquals(None, zkClient.getTopicPartitionState(notExistingPartition))
+ assertEquals(None, zkClient.getLeaderForPartition(notExistingPartition))
+
+ }
+
+ private def eraseMetadataAndStat(response: SetDataResponse) = {
+ val stat = if (response.stat != null) statWithVersion(response.stat.getVersion) else null
+ response.copy(metadata = ResponseMetadata(0, 0), stat = stat)
+ }
+
+ @Test
+ def testControllerEpochMethods() {
+ assertEquals(None, zkClient.getControllerEpoch)
+
+ assertEquals("Setting non existing nodes should return NONODE results",
+ SetDataResponse(Code.NONODE, ControllerEpochZNode.path, None, null, ResponseMetadata(0, 0)),
+ eraseMetadataAndStat(zkClient.setControllerEpochRaw(1, 0)))
+
+ assertEquals("Creating non existing nodes is OK",
+ CreateResponse(Code.OK, ControllerEpochZNode.path, None, ControllerEpochZNode.path, ResponseMetadata(0, 0)),
+ eraseMetadata(zkClient.createControllerEpochRaw(0)))
+ assertEquals(0, zkClient.getControllerEpoch.get._1)
+
+ assertEquals("Attemt to create existing nodes should return NODEEXISTS",
+ CreateResponse(Code.NODEEXISTS, ControllerEpochZNode.path, None, null, ResponseMetadata(0, 0)),
+ eraseMetadata(zkClient.createControllerEpochRaw(0)))
+
+ assertEquals("Updating existing nodes is OK",
+ SetDataResponse(Code.OK, ControllerEpochZNode.path, None, statWithVersion(1), ResponseMetadata(0, 0)),
+ eraseMetadataAndStat(zkClient.setControllerEpochRaw(1, 0)))
+ assertEquals(1, zkClient.getControllerEpoch.get._1)
+
+ assertEquals("Updating with wrong ZK version returns BADVERSION",
+ SetDataResponse(Code.BADVERSION, ControllerEpochZNode.path, None, null, ResponseMetadata(0, 0)),
+ eraseMetadataAndStat(zkClient.setControllerEpochRaw(1, 0)))
+ }
+
+ @Test
+ def testControllerManagementMethods() {
+ // No controller
+ assertEquals(None, zkClient.getControllerId)
+ // Create controller
+ zkClient.checkedEphemeralCreate(ControllerZNode.path, ControllerZNode.encode(brokerId = 1, timestamp = 123456))
+ assertEquals(Some(1), zkClient.getControllerId)
+ zkClient.deleteController()
+ assertEquals(None, zkClient.getControllerId)
+ }
+
+ @Test
+ def testZNodeChangeHandlerForDataChange(): Unit = {
+ val mockPath = "/foo"
+
+ val znodeChangeHandlerCountDownLatch = new CountDownLatch(1)
+ val zNodeChangeHandler = new ZNodeChangeHandler {
+ override def handleCreation(): Unit = {
+ znodeChangeHandlerCountDownLatch.countDown()
+ }
+
+ override val path: String = mockPath
+ }
+
+ zkClient.registerZNodeChangeHandlerAndCheckExistence(zNodeChangeHandler)
+ zkClient.createRecursive(mockPath)
+ assertTrue("Failed to receive create notification", znodeChangeHandlerCountDownLatch.await(5, TimeUnit.SECONDS))
}
@Test
@@ -458,7 +964,6 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
assertTrue(zkClient.getPreferredReplicaElection.isEmpty)
- val topic1 = "topic1"
val electionPartitions = Set(new TopicPartition(topic1, 0), new TopicPartition(topic1, 1))
zkClient.createPreferredReplicaElection(electionPartitions)
@@ -498,6 +1003,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
// test non-existent token
assertTrue(zkClient.getDelegationTokenInfo(tokenId).isEmpty)
+ assertFalse(zkClient.deleteDelegationToken(tokenId))
// create a token
zkClient.setOrCreateDelegationToken(token)
@@ -511,5 +1017,9 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
//test updated token
assertEquals(tokenInfo, zkClient.getDelegationTokenInfo(tokenId).get)
+
+ //test deleting token
+ assertTrue(zkClient.deleteDelegationToken(tokenId))
+ assertEquals(None, zkClient.getDelegationTokenInfo(tokenId))
}
-}
+}
\ No newline at end of file
diff --git a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
index a122297..f9cb8e3 100755
--- a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
@@ -19,16 +19,16 @@ package kafka.zk
import javax.security.auth.login.Configuration
-import kafka.utils.{CoreUtils, Logging, TestUtils}
+import kafka.utils.{CoreUtils, Logging, TestUtils, ZkUtils}
import org.junit.{After, AfterClass, Before, BeforeClass}
import org.junit.Assert._
import org.scalatest.junit.JUnitSuite
import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.test.IntegrationTest
import org.junit.experimental.categories.Category
-
import scala.collection.Set
import scala.collection.JavaConverters._
+
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.consumer.internals.AbstractCoordinator
import kafka.controller.ControllerEventManager
@@ -45,6 +45,7 @@ abstract class ZooKeeperTestHarness extends JUnitSuite with Logging {
protected val zkAclsEnabled: Option[Boolean] = None
var zkClient: KafkaZkClient = null
+ var otherZkClient: KafkaZkClient = null
var adminZkClient: AdminZkClient = null
var zookeeper: EmbeddedZookeeper = null
@@ -55,15 +56,22 @@ abstract class ZooKeeperTestHarness extends JUnitSuite with Logging {
@Before
def setUp() {
zookeeper = new EmbeddedZookeeper()
- zkClient = KafkaZkClient(zkConnect, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled), zkSessionTimeout,
- zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM)
+ zkClient = createZkClient
+ otherZkClient = createZkClient
adminZkClient = new AdminZkClient(zkClient)
}
+ protected def createZkClient = {
+ KafkaZkClient(zkConnect, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled), zkSessionTimeout,
+ zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM)
+ }
+
@After
def tearDown() {
if (zkClient != null)
- zkClient.close()
+ zkClient.close()
+ if (otherZkClient != null)
+ otherZkClient.close()
if (zookeeper != null)
CoreUtils.swallow(zookeeper.shutdown(), this)
Configuration.setConfiguration(null)
--
To stop receiving notification emails like this one, please contact
junrao@apache.org.
[kafka] 05/09: Undo unintentional whitespace change in
ZooKeeperTestHarness
Posted by ju...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 155ac5903aec0f682e19a0f1f17c1091f60c6da5
Author: Sandor Murakozi <sm...@gmail.com>
AuthorDate: Tue Feb 20 10:36:20 2018 +0100
Undo unintentional whitespace change in ZooKeeperTestHarness
---
core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
index af2d53a..a122297 100755
--- a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
@@ -51,7 +51,7 @@ abstract class ZooKeeperTestHarness extends JUnitSuite with Logging {
def zkPort: Int = zookeeper.port
def zkConnect: String = s"127.0.0.1:$zkPort"
-
+
@Before
def setUp() {
zookeeper = new EmbeddedZookeeper()
--
To stop receiving notification emails like this one, please contact
junrao@apache.org.
[kafka] 07/09: Minor fixes requested by reviewer
Posted by ju...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 1ae4c1d6095a86266ba6d008711570df0eaf5682
Author: Sandor Murakozi <sm...@gmail.com>
AuthorDate: Tue Feb 20 21:21:44 2018 +0100
Minor fixes requested by reviewer
---
core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
index 5cbb76e..d6826a6 100644
--- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
@@ -1039,4 +1039,4 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
assertTrue(zkClient.deleteDelegationToken(tokenId))
assertEquals(None, zkClient.getDelegationTokenInfo(tokenId))
}
-}
\ No newline at end of file
+}
--
To stop receiving notification emails like this one, please contact
junrao@apache.org.
[kafka] 06/09: Fixes requested by reviewer
Posted by ju...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
commit ae51a15026e0dc10ef2df4bcafc85807dc8b4eb6
Author: Sandor Murakozi <sm...@gmail.com>
AuthorDate: Tue Feb 20 21:19:38 2018 +0100
Fixes requested by reviewer
---
core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala | 3 +--
1 file changed, 1 insertion(+), 2 deletions(-)
diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
index 28dbb73..5cbb76e 100644
--- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
@@ -19,7 +19,6 @@ package kafka.zk
import java.util.{Collections, Properties, UUID}
import java.nio.charset.StandardCharsets.UTF_8
import java.util.concurrent.{CountDownLatch, TimeUnit}
-import javax.security.auth.login.Configuration
import kafka.api.{ApiVersion, LeaderAndIsr}
import kafka.cluster.{Broker, EndPoint}
@@ -57,7 +56,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
val topicPartition20 = new TopicPartition(topic2, 0)
val topicPartitions10_11 = Seq(topicPartition10, topicPartition11)
- var otherZkClient: KafkaZkClient = null
+ var otherZkClient: KafkaZkClient = _
@Before
override def setUp() {
--
To stop receiving notification emails like this one, please contact
junrao@apache.org.
[kafka] 08/09: Changes requeted by reviewer
Posted by ju...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
commit cacd377933ae0e7da0ed08dd33ab69fabde073c5
Author: Sandor Murakozi <sm...@gmail.com>
AuthorDate: Wed Feb 28 20:57:57 2018 -0800
Changes requeted by reviewer
---
core/src/main/scala/kafka/zk/KafkaZkClient.scala | 3 +-
.../scala/unit/kafka/zk/KafkaZkClientTest.scala | 103 +++++++++++----------
2 files changed, 54 insertions(+), 52 deletions(-)
diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index 851c686..d61b281 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -89,8 +89,7 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean
val brokerIdPath = brokerInfo.path
val setDataRequest = SetDataRequest(brokerIdPath, brokerInfo.toJsonBytes, ZkVersion.NoVersion)
val response = retryRequestUntilConnected(setDataRequest)
- if (response.resultCode != Code.OK)
- throw KeeperException.create(response.resultCode)
+ response.maybeThrow()
info("Updated broker %d at path %s with addresses: %s".format(brokerInfo.broker.id, brokerIdPath, brokerInfo.broker.endPoints))
}
diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
index d6826a6..9590ecd 100644
--- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
@@ -59,14 +59,14 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
var otherZkClient: KafkaZkClient = _
@Before
- override def setUp() {
+ override def setUp(): Unit = {
super.setUp()
otherZkClient = KafkaZkClient(zkConnect, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled), zkSessionTimeout,
zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM)
}
@After
- override def tearDown() {
+ override def tearDown(): Unit = {
if (otherZkClient != null)
otherZkClient.close()
super.tearDown()
@@ -131,9 +131,8 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
assertTrue(zkClient.getPartitionsForTopics(Set(topic1)).isEmpty)
assertTrue(zkClient.getReplicasForPartition(new TopicPartition(topic1, 2)).isEmpty)
- val topicPartition = new TopicPartition(topic1, 0)
val assignment = Map(
- topicPartition -> Seq(0, 1),
+ new TopicPartition(topic1, 0) -> Seq(0, 1),
new TopicPartition(topic1, 1) -> Seq(0, 1),
new TopicPartition(topic1, 2) -> Seq(1, 2, 3)
)
@@ -311,8 +310,10 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
@Test
def testLogDirGetters(): Unit = {
- assertEquals("Failed for non existing parent ZK node", Seq.empty, zkClient.getAllLogDirEventNotifications)
- assertEquals("Failed for non existing parent ZK node", Seq.empty, zkClient.getBrokerIdsFromLogDirEvents(Seq("0000000000")))
+ assertEquals("getAllLogDirEventNotifications failed for non existing parent ZK node",
+ Seq.empty, zkClient.getAllLogDirEventNotifications)
+ assertEquals("getBrokerIdsFromLogDirEvents failed for non existing parent ZK node",
+ Seq.empty, zkClient.getBrokerIdsFromLogDirEvents(Seq("0000000000")))
zkClient.createRecursive("/log_dir_event_notification")
@@ -495,7 +496,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
}
@Test
- def testDeletePath() {
+ def testDeletePath(): Unit = {
val path = "/a/b/c"
zkClient.createRecursive(path)
zkClient.deletePath(path)
@@ -503,7 +504,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
}
@Test
- def testDeleteTopicZNode(): Unit ={
+ def testDeleteTopicZNode(): Unit = {
zkClient.deleteTopicZNode(topic1)
zkClient.createRecursive(TopicZNode.path(topic1))
zkClient.deleteTopicZNode(topic1)
@@ -525,20 +526,20 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
assertTrue(zkClient.getTopicDeletions.isEmpty)
}
- private def assertPathExistenceAndData(expectedPath: String, data: String){
+ private def assertPathExistenceAndData(expectedPath: String, data: String): Unit = {
assertTrue(zkClient.pathExists(expectedPath))
assertEquals(Some(data), dataAsString(expectedPath))
}
@Test
- def testCreateTokenChangeNotification() {
+ def testCreateTokenChangeNotification(): Unit = {
intercept[NoNodeException] {
zkClient.createTokenChangeNotification("delegationToken")
}
zkClient.createDelegationTokenPaths()
zkClient.createTokenChangeNotification("delegationToken")
- assertPathExistenceAndData(s"/delegation_token/token_changes/token_change_0000000000", "delegationToken")
+ assertPathExistenceAndData("/delegation_token/token_changes/token_change_0000000000", "delegationToken")
}
@Test
@@ -560,7 +561,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
}
@Test
- def testCreateConfigChangeNotification() {
+ def testCreateConfigChangeNotification(): Unit = {
intercept[NoNodeException] {
zkClient.createConfigChangeNotification(ConfigEntityZNode.path(ConfigType.Topic, topic1))
}
@@ -569,11 +570,11 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
zkClient.createConfigChangeNotification(ConfigEntityZNode.path(ConfigType.Topic, topic1))
assertPathExistenceAndData(
- s"/config/changes/config_change_0000000000",
+ "/config/changes/config_change_0000000000",
"""{"version":2,"entity_path":"/config/topics/topic1"}""")
}
- private def createLogProps(bytesProp: Int) = {
+ private def createLogProps(bytesProp: Int): Properties = {
val logProps = new Properties()
logProps.put(LogConfig.SegmentBytesProp, bytesProp.toString)
logProps.put(LogConfig.SegmentIndexBytesProp, bytesProp.toString)
@@ -584,7 +585,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
private val logProps = createLogProps(1024)
@Test
- def testGetLogConfigs() {
+ def testGetLogConfigs(): Unit = {
val emptyConfig = LogConfig(Collections.emptyMap())
assertEquals("Non existent config, no defaults",
(Map(topic1 -> emptyConfig), Map.empty),
@@ -612,13 +613,13 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
Map[String, AnyRef](LogConfig.SegmentJitterMsProp -> "100", LogConfig.SegmentBytesProp -> "128").asJava))
}
- private def createBrokerInfo(id: Int, host: String, port: Int,
- securityProtocol: SecurityProtocol, rack: Option[String] = None) =
+ private def createBrokerInfo(id: Int, host: String, port: Int, securityProtocol: SecurityProtocol,
+ rack: Option[String] = None): BrokerInfo =
BrokerInfo(Broker(id, Seq(new EndPoint(host, port, ListenerName.forSecurityProtocol
(securityProtocol), securityProtocol)), rack = rack), ApiVersion.latestVersion, jmxPort = port + 10)
@Test
- def testRegisterBrokerInfo() {
+ def testRegisterBrokerInfo(): Unit = {
zkClient.createTopLevelPaths()
val brokerInfo = createBrokerInfo(1, "test.host", 9999, SecurityProtocol.PLAINTEXT)
@@ -640,7 +641,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
}
@Test
- def testGetBrokerMethods() {
+ def testGetBrokerMethods(): Unit = {
zkClient.createTopLevelPaths()
assertEquals(Seq.empty,zkClient.getAllBrokersInCluster)
@@ -662,7 +663,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
}
@Test
- def testUpdateBrokerInfo() {
+ def testUpdateBrokerInfo(): Unit = {
zkClient.createTopLevelPaths()
// Updating info of a broker not existing in ZK fails
@@ -682,26 +683,28 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
assertEquals(Some(originalBrokerInfo.broker), otherZkClient.getBroker(1))
}
- private def statWithVersion(version: Int) = {
+ private def statWithVersion(version: Int): Stat = {
val stat = new Stat(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)
stat.setVersion(version)
stat
}
- private def leaderIsrAndControllerEpochs(state: Int, zkVersion: Int) = Map(
- topicPartition10 -> LeaderIsrAndControllerEpoch(
- LeaderAndIsr(leader = 1, leaderEpoch = state, isr = List(2 + state, 3 + state), zkVersion = zkVersion),
- controllerEpoch = 4),
- topicPartition11 -> LeaderIsrAndControllerEpoch(
- LeaderAndIsr(leader = 0, leaderEpoch = state + 1, isr = List(1 + state, 2 + state), zkVersion = zkVersion),
- controllerEpoch = 4))
-
- private def initialLeaderIsrAndControllerEpochs = leaderIsrAndControllerEpochs(0, 0)
- private def updatedLeaderIsrAndControllerEpochs(state: Int) = leaderIsrAndControllerEpochs(state, state - 1)
-
- private def initialLeaderIsrs = initialLeaderIsrAndControllerEpochs.mapValues(_.leaderAndIsr)
- private def updatedLeaderIsrs(state: Int) = updatedLeaderIsrAndControllerEpochs(state).mapValues(_.leaderAndIsr)
- private def leaderIsrs(state: Int, zkVersion: Int) =
+ private def leaderIsrAndControllerEpochs(state: Int, zkVersion: Int): Map[TopicPartition, LeaderIsrAndControllerEpoch] =
+ Map(
+ topicPartition10 -> LeaderIsrAndControllerEpoch(
+ LeaderAndIsr(leader = 1, leaderEpoch = state, isr = List(2 + state, 3 + state), zkVersion = zkVersion),
+ controllerEpoch = 4),
+ topicPartition11 -> LeaderIsrAndControllerEpoch(
+ LeaderAndIsr(leader = 0, leaderEpoch = state + 1, isr = List(1 + state, 2 + state), zkVersion = zkVersion),
+ controllerEpoch = 4))
+
+ val initialLeaderIsrAndControllerEpochs: Map[TopicPartition, LeaderIsrAndControllerEpoch] =
+ leaderIsrAndControllerEpochs(0, 0)
+ private def updatedLeaderIsrAndControllerEpochs(state: Int): Map[TopicPartition, LeaderIsrAndControllerEpoch] =
+ leaderIsrAndControllerEpochs(state, state - 1)
+
+ val initialLeaderIsrs: Map[TopicPartition, LeaderAndIsr] = initialLeaderIsrAndControllerEpochs.mapValues(_.leaderAndIsr)
+ private def leaderIsrs(state: Int, zkVersion: Int): Map[TopicPartition, LeaderAndIsr] =
leaderIsrAndControllerEpochs(state, zkVersion).mapValues(_.leaderAndIsr)
private def checkUpdateLeaderAndIsrResult(
@@ -720,7 +723,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
}
@Test
- def testUpdateLeaderAndIsr() {
+ def testUpdateLeaderAndIsr(): Unit = {
zkClient.createRecursive(TopicZNode.path(topic1))
// Non-existing topicPartitions
@@ -738,14 +741,14 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
leaderIsrs(state = 1, zkVersion = 1),
mutable.ArrayBuffer.empty,
Map.empty,
- zkClient.updateLeaderAndIsr(updatedLeaderIsrs(1),controllerEpoch = 4))
+ zkClient.updateLeaderAndIsr(leaderIsrs(state = 1, zkVersion = 0),controllerEpoch = 4))
// Try to update with wrong ZK version
checkUpdateLeaderAndIsrResult(
Map.empty,
ArrayBuffer(topicPartition10, topicPartition11),
Map.empty,
- zkClient.updateLeaderAndIsr(updatedLeaderIsrs(1),controllerEpoch = 4))
+ zkClient.updateLeaderAndIsr(leaderIsrs(state = 1, zkVersion = 0),controllerEpoch = 4))
// Trigger successful, to be retried and failed partitions in same call
val mixedState = Map(
@@ -764,7 +767,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
private def checkGetDataResponse(
leaderIsrAndControllerEpochs: Map[TopicPartition,LeaderIsrAndControllerEpoch],
topicPartition: TopicPartition,
- response: GetDataResponse) = {
+ response: GetDataResponse): Unit = {
val zkVersion = leaderIsrAndControllerEpochs(topicPartition).leaderAndIsr.zkVersion
assertEquals(Code.OK, response.resultCode)
assertEquals(TopicPartitionStateZNode.path(topicPartition), response.path)
@@ -774,10 +777,11 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
TopicPartitionStateZNode.decode(response.data, statWithVersion(zkVersion)))
}
- private def eraseMetadata(response: CreateResponse) = response.copy(metadata = ResponseMetadata(0, 0))
+ private def eraseMetadata(response: CreateResponse): CreateResponse =
+ response.copy(metadata = ResponseMetadata(0, 0))
@Test
- def testGetTopicsAndPartitions() {
+ def testGetTopicsAndPartitions(): Unit = {
assertTrue(zkClient.getAllTopicsInCluster.isEmpty)
assertTrue(zkClient.getAllPartitions.isEmpty)
@@ -792,7 +796,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
}
@Test
- def testCreateAndGetTopicPartitionStatesRaw() {
+ def testCreateAndGetTopicPartitionStatesRaw(): Unit = {
zkClient.createRecursive(TopicZNode.path(topic1))
assertEquals(
@@ -819,7 +823,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
}
@Test
- def testSetTopicPartitionStatesRaw() {
+ def testSetTopicPartitionStatesRaw(): Unit = {
def expectedSetDataResponses(topicPartitions: TopicPartition*)(resultCode: Code, stat: Stat) =
topicPartitions.map { topicPartition =>
@@ -855,21 +859,20 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
}
@Test
- def testReassignPartitionsInProgress() {
+ def testReassignPartitionsInProgress(): Unit = {
assertFalse(zkClient.reassignPartitionsInProgress)
zkClient.createRecursive(ReassignPartitionsZNode.path)
assertTrue(zkClient.reassignPartitionsInProgress)
}
@Test
- def testGetTopicPartitionStates() {
+ def testGetTopicPartitionStates(): Unit = {
assertEquals(None, zkClient.getTopicPartitionState(topicPartition10))
assertEquals(None, zkClient.getLeaderForPartition(topicPartition10))
zkClient.createRecursive(TopicZNode.path(topic1))
-
- zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs).map(eraseMetadata).toList
+ zkClient.createTopicPartitionStatesRaw(initialLeaderIsrAndControllerEpochs)
assertEquals(
initialLeaderIsrAndControllerEpochs,
zkClient.getTopicPartitionStates(Seq(topicPartition10, topicPartition11))
@@ -894,13 +897,13 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
}
- private def eraseMetadataAndStat(response: SetDataResponse) = {
+ private def eraseMetadataAndStat(response: SetDataResponse): SetDataResponse = {
val stat = if (response.stat != null) statWithVersion(response.stat.getVersion) else null
response.copy(metadata = ResponseMetadata(0, 0), stat = stat)
}
@Test
- def testControllerEpochMethods() {
+ def testControllerEpochMethods(): Unit = {
assertEquals(None, zkClient.getControllerEpoch)
assertEquals("Setting non existing nodes should return NONODE results",
@@ -927,7 +930,7 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
}
@Test
- def testControllerManagementMethods() {
+ def testControllerManagementMethods(): Unit = {
// No controller
assertEquals(None, zkClient.getControllerId)
// Create controller
--
To stop receiving notification emails like this one, please contact
junrao@apache.org.
[kafka] 09/09: removed method updatedLeaderIsrAndControllerEpochs
Posted by ju...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 8100fd92afd6c4615bf7a6998a8a8eeb0d628d4e
Author: Sandor Murakozi <sm...@gmail.com>
AuthorDate: Thu Mar 1 14:47:47 2018 -0800
removed method updatedLeaderIsrAndControllerEpochs
---
core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala | 8 +++-----
1 file changed, 3 insertions(+), 5 deletions(-)
diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
index 9590ecd..e44c2c9 100644
--- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
@@ -700,8 +700,6 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
val initialLeaderIsrAndControllerEpochs: Map[TopicPartition, LeaderIsrAndControllerEpoch] =
leaderIsrAndControllerEpochs(0, 0)
- private def updatedLeaderIsrAndControllerEpochs(state: Int): Map[TopicPartition, LeaderIsrAndControllerEpoch] =
- leaderIsrAndControllerEpochs(state, state - 1)
val initialLeaderIsrs: Map[TopicPartition, LeaderAndIsr] = initialLeaderIsrAndControllerEpochs.mapValues(_.leaderAndIsr)
private def leaderIsrs(state: Int, zkVersion: Int): Map[TopicPartition, LeaderAndIsr] =
@@ -843,18 +841,18 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
assertEquals(
expectedSetDataResponses(topicPartition10, topicPartition11)(Code.OK, statWithVersion(1)),
- zkClient.setTopicPartitionStatesRaw(updatedLeaderIsrAndControllerEpochs(1)).map {
+ zkClient.setTopicPartitionStatesRaw(leaderIsrAndControllerEpochs(state = 1, zkVersion = 0)).map {
eraseMetadataAndStat}.toList)
val getResponses = zkClient.getTopicPartitionStatesRaw(topicPartitions10_11)
assertEquals(2, getResponses.size)
- topicPartitions10_11.zip(getResponses) foreach {case (tp, r) => checkGetDataResponse(updatedLeaderIsrAndControllerEpochs(1), tp, r)}
+ topicPartitions10_11.zip(getResponses) foreach {case (tp, r) => checkGetDataResponse(leaderIsrAndControllerEpochs(state = 1, zkVersion = 0), tp, r)}
// Other ZK client can also write the state of a partition
assertEquals(
expectedSetDataResponses(topicPartition10, topicPartition11)(Code.OK, statWithVersion(2)),
- otherZkClient.setTopicPartitionStatesRaw(updatedLeaderIsrAndControllerEpochs(2)).map {
+ otherZkClient.setTopicPartitionStatesRaw(leaderIsrAndControllerEpochs(state = 2, zkVersion = 1)).map {
eraseMetadataAndStat}.toList)
}
--
To stop receiving notification emails like this one, please contact
junrao@apache.org.
[kafka] 01/09: Fix in updateBrokerInfoInZk,
exception is thrown if response was not OK.
Posted by ju...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 010310388725d6393a73e12c02dff4bb85cf2518
Author: Sandor Murakozi <sm...@gmail.com>
AuthorDate: Mon Feb 19 16:49:15 2018 +0100
Fix in updateBrokerInfoInZk, exception is thrown if response was not OK.
---
core/src/main/scala/kafka/zk/KafkaZkClient.scala | 4 +++-
1 file changed, 3 insertions(+), 1 deletion(-)
diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index 6545fde..145e294 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -88,7 +88,9 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean
def updateBrokerInfoInZk(brokerInfo: BrokerInfo): Unit = {
val brokerIdPath = brokerInfo.path
val setDataRequest = SetDataRequest(brokerIdPath, brokerInfo.toJsonBytes, ZkVersion.NoVersion)
- retryRequestUntilConnected(setDataRequest)
+ val response = retryRequestUntilConnected(setDataRequest)
+ if (response.resultCode != Code.OK)
+ throw KeeperException.create(response.resultCode)
info("Updated broker %d at path %s with addresses: %s".format(brokerInfo.broker.id, brokerIdPath, brokerInfo.broker.endPoints))
}
--
To stop receiving notification emails like this one, please contact
junrao@apache.org.
[kafka] 04/09: Use move otherZkClient to KafkaZkClientTest
Posted by ju...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 54a32205b0fa7545d450104bc28aacf153b70cf6
Author: Sandor Murakozi <sm...@gmail.com>
AuthorDate: Tue Feb 20 09:03:56 2018 +0100
Use move otherZkClient to KafkaZkClientTest
---
.../scala/unit/kafka/zk/KafkaZkClientTest.scala | 22 ++++++++++++++++++++--
.../scala/unit/kafka/zk/ZooKeeperTestHarness.scala | 20 ++++++--------------
2 files changed, 26 insertions(+), 16 deletions(-)
diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
index 9329430..28dbb73 100644
--- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
@@ -19,6 +19,7 @@ package kafka.zk
import java.util.{Collections, Properties, UUID}
import java.nio.charset.StandardCharsets.UTF_8
import java.util.concurrent.{CountDownLatch, TimeUnit}
+import javax.security.auth.login.Configuration
import kafka.api.{ApiVersion, LeaderAndIsr}
import kafka.cluster.{Broker, EndPoint}
@@ -30,10 +31,10 @@ import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.common.security.token.delegation.TokenInformation
-import org.apache.kafka.common.utils.SecurityUtils
+import org.apache.kafka.common.utils.{SecurityUtils, Time}
import org.apache.zookeeper.KeeperException.{Code, NoNodeException, NodeExistsException}
import org.junit.Assert._
-import org.junit.Test
+import org.junit.{After, Before, Test}
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.collection.{Seq, mutable}
@@ -42,6 +43,7 @@ import scala.util.Random
import kafka.controller.LeaderIsrAndControllerEpoch
import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult
import kafka.zookeeper._
+import org.apache.kafka.common.security.JaasUtils
import org.apache.zookeeper.data.Stat
class KafkaZkClientTest extends ZooKeeperTestHarness {
@@ -55,6 +57,22 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
val topicPartition20 = new TopicPartition(topic2, 0)
val topicPartitions10_11 = Seq(topicPartition10, topicPartition11)
+ var otherZkClient: KafkaZkClient = null
+
+ @Before
+ override def setUp() {
+ super.setUp()
+ otherZkClient = KafkaZkClient(zkConnect, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled), zkSessionTimeout,
+ zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM)
+ }
+
+ @After
+ override def tearDown() {
+ if (otherZkClient != null)
+ otherZkClient.close()
+ super.tearDown()
+ }
+
private val topicPartition = new TopicPartition("topic", 0)
@Test
diff --git a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
index f9cb8e3..af2d53a 100755
--- a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
@@ -19,16 +19,16 @@ package kafka.zk
import javax.security.auth.login.Configuration
-import kafka.utils.{CoreUtils, Logging, TestUtils, ZkUtils}
+import kafka.utils.{CoreUtils, Logging, TestUtils}
import org.junit.{After, AfterClass, Before, BeforeClass}
import org.junit.Assert._
import org.scalatest.junit.JUnitSuite
import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.test.IntegrationTest
import org.junit.experimental.categories.Category
+
import scala.collection.Set
import scala.collection.JavaConverters._
-
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.consumer.internals.AbstractCoordinator
import kafka.controller.ControllerEventManager
@@ -45,33 +45,25 @@ abstract class ZooKeeperTestHarness extends JUnitSuite with Logging {
protected val zkAclsEnabled: Option[Boolean] = None
var zkClient: KafkaZkClient = null
- var otherZkClient: KafkaZkClient = null
var adminZkClient: AdminZkClient = null
var zookeeper: EmbeddedZookeeper = null
def zkPort: Int = zookeeper.port
def zkConnect: String = s"127.0.0.1:$zkPort"
-
+
@Before
def setUp() {
zookeeper = new EmbeddedZookeeper()
- zkClient = createZkClient
- otherZkClient = createZkClient
- adminZkClient = new AdminZkClient(zkClient)
- }
-
- protected def createZkClient = {
- KafkaZkClient(zkConnect, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled), zkSessionTimeout,
+ zkClient = KafkaZkClient(zkConnect, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled), zkSessionTimeout,
zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM)
+ adminZkClient = new AdminZkClient(zkClient)
}
@After
def tearDown() {
if (zkClient != null)
- zkClient.close()
- if (otherZkClient != null)
- otherZkClient.close()
+ zkClient.close()
if (zookeeper != null)
CoreUtils.swallow(zookeeper.shutdown(), this)
Configuration.setConfiguration(null)
--
To stop receiving notification emails like this one, please contact
junrao@apache.org.