You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2017/11/22 21:25:57 UTC
[1/3] kafka git commit: KAFKA-5646;
Use KafkaZkClient in DynamicConfigManager and AdminManager
Repository: kafka
Updated Branches:
refs/heads/trunk c5f31fe38 -> bc852baff
http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala b/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
index 2d672b6..a44f900 100644
--- a/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
+++ b/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala
@@ -20,7 +20,6 @@ package kafka.server.epoch
import java.io.{File, RandomAccessFile}
import java.util.Properties
-import kafka.admin.AdminUtils
import kafka.api.KAFKA_0_11_0_IV1
import kafka.log.Log
import kafka.server.KafkaConfig._
@@ -78,7 +77,7 @@ class EpochDrivenReplicationProtocolAcceptanceTest extends ZooKeeperTestHarness
brokers = (100 to 101).map(createBroker(_))
//A single partition topic with 2 replicas
- AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, Map(0 -> Seq(100, 101)))
+ adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, Map(0 -> Seq(100, 101)))
producer = createProducer()
val tp = new TopicPartition(topic, 0)
@@ -139,7 +138,7 @@ class EpochDrivenReplicationProtocolAcceptanceTest extends ZooKeeperTestHarness
brokers = (100 to 101).map { id => createServer(fromProps(createBrokerConfig(id, zkConnect))) }
//A single partition topic with 2 replicas
- AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, Map(
+ adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, Map(
0 -> Seq(100, 101)
))
producer = createProducer()
@@ -189,7 +188,7 @@ class EpochDrivenReplicationProtocolAcceptanceTest extends ZooKeeperTestHarness
brokers = (100 to 101).map(createBroker(_))
//A single partition topic with 2 replicas
- AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, Map(
+ adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, Map(
0 -> Seq(100, 101)
))
producer = bufferingProducer()
@@ -266,7 +265,7 @@ class EpochDrivenReplicationProtocolAcceptanceTest extends ZooKeeperTestHarness
brokers = (100 to 101).map(createBroker(_))
//A single partition topic with 2 replicas
- AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, Map(0 -> Seq(100, 101)))
+ adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, Map(0 -> Seq(100, 101)))
producer = createProducer()
//Kick off with a single record
http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
index 89c12fe..eaafb2a 100644
--- a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
@@ -18,7 +18,6 @@ package kafka.server.epoch
import java.util.{Map => JMap}
-import kafka.admin.AdminUtils
import kafka.server.KafkaConfig._
import kafka.server.{BlockingSend, KafkaServer, ReplicaFetcherBlockingSend}
import kafka.utils.TestUtils._
@@ -96,11 +95,11 @@ class LeaderEpochIntegrationTest extends ZooKeeperTestHarness with Logging {
//3 brokers, put partition on 100/101 and then pretend to be 102
brokers = (100 to 102).map { id => createServer(fromProps(createBrokerConfig(id, zkConnect))) }
- AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic1, Map(
+ adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic1, Map(
0 -> Seq(100),
1 -> Seq(101)
))
- AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic2, Map(
+ adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic2, Map(
0 -> Seq(100)
))
@@ -144,7 +143,7 @@ class LeaderEpochIntegrationTest extends ZooKeeperTestHarness with Logging {
//Setup: we are only interested in the single partition on broker 101
brokers = Seq(100, 101).map { id => createServer(fromProps(createBrokerConfig(id, zkConnect))) }
def leo() = brokers(1).replicaManager.getReplica(tp).get.logEndOffset.messageOffset
- AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, tp.topic, Map(tp.partition -> Seq(101)))
+ adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(tp.topic, Map(tp.partition -> Seq(101)))
producer = createNewProducer(getBrokerListStrFromServers(brokers), retries = 10, acks = -1)
//1. Given a single message
http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala
new file mode 100644
index 0000000..993dcf2
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala
@@ -0,0 +1,323 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.admin
+
+import java.util
+import java.util.Properties
+
+import kafka.log._
+import kafka.server.DynamicConfig.Broker._
+import kafka.server.KafkaConfig._
+import kafka.server.{ConfigType, KafkaConfig, KafkaServer}
+import kafka.utils.CoreUtils._
+import kafka.utils.TestUtils._
+import kafka.utils.{Logging, TestUtils}
+import kafka.zk.{AdminZkClient, KafkaZkClient, ZooKeeperTestHarness}
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.{InvalidReplicaAssignmentException, InvalidTopicException, TopicExistsException}
+import org.apache.kafka.common.metrics.Quota
+import org.easymock.EasyMock
+import org.junit.Assert._
+import org.junit.{After, Test}
+
+import scala.collection.JavaConverters._
+import scala.collection.{Map, immutable}
+
+class AdminZkClientTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
+
+ var servers: Seq[KafkaServer] = Seq()
+
+ @After
+ override def tearDown() {
+ TestUtils.shutdownServers(servers)
+ super.tearDown()
+ }
+
+ @Test
+ def testManualReplicaAssignment() {
+ val brokers = List(0, 1, 2, 3, 4)
+ TestUtils.createBrokersInZk(zkUtils, brokers)
+
+ // duplicate brokers
+ intercept[InvalidReplicaAssignmentException] {
+ adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK("test", Map(0->Seq(0,0)))
+ }
+
+ // inconsistent replication factor
+ intercept[InvalidReplicaAssignmentException] {
+ adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK("test", Map(0->Seq(0,1), 1->Seq(0)))
+ }
+
+ // good assignment
+ val assignment = Map(0 -> List(0, 1, 2),
+ 1 -> List(1, 2, 3))
+ adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK("test", assignment)
+ val found = zkClient.getPartitionAssignmentForTopics(Set("test"))
+ assertEquals(assignment, found("test"))
+ }
+
+ @Test
+ def testTopicCreationInZK() {
+ val expectedReplicaAssignment = Map(
+ 0 -> List(0, 1, 2),
+ 1 -> List(1, 2, 3),
+ 2 -> List(2, 3, 4),
+ 3 -> List(3, 4, 0),
+ 4 -> List(4, 0, 1),
+ 5 -> List(0, 2, 3),
+ 6 -> List(1, 3, 4),
+ 7 -> List(2, 4, 0),
+ 8 -> List(3, 0, 1),
+ 9 -> List(4, 1, 2),
+ 10 -> List(1, 2, 3),
+ 11 -> List(1, 3, 4)
+ )
+ val leaderForPartitionMap = immutable.Map(
+ 0 -> 0,
+ 1 -> 1,
+ 2 -> 2,
+ 3 -> 3,
+ 4 -> 4,
+ 5 -> 0,
+ 6 -> 1,
+ 7 -> 2,
+ 8 -> 3,
+ 9 -> 4,
+ 10 -> 1,
+ 11 -> 1
+ )
+ val topic = "test"
+ TestUtils.createBrokersInZk(zkUtils, List(0, 1, 2, 3, 4))
+ // create the topic
+ adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment)
+ // create leaders for all partitions
+ TestUtils.makeLeaderForPartition(zkUtils, topic, leaderForPartitionMap, 1)
+ val actualReplicaMap = leaderForPartitionMap.keys.map(p => p -> zkClient.getReplicasForPartition(new TopicPartition(topic, p))).toMap
+ assertEquals(expectedReplicaAssignment.size, actualReplicaMap.size)
+ for(i <- 0 until actualReplicaMap.size)
+ assertEquals(expectedReplicaAssignment.get(i).get, actualReplicaMap(i))
+
+ intercept[TopicExistsException] {
+ // shouldn't be able to create a topic that already exists
+ adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment)
+ }
+ }
+
+ @Test
+ def testTopicCreationWithCollision() {
+ val topic = "test.topic"
+ val collidingTopic = "test_topic"
+ TestUtils.createBrokersInZk(zkUtils, List(0, 1, 2, 3, 4))
+ // create the topic
+ adminZkClient.createTopic(topic, 3, 1)
+
+ intercept[InvalidTopicException] {
+ // shouldn't be able to create a topic that collides
+ adminZkClient.createTopic(collidingTopic, 3, 1)
+ }
+ }
+
+ @Test
+ def testConcurrentTopicCreation() {
+ val topic = "test.topic"
+
+ // simulate the ZK interactions that can happen when a topic is concurrently created by multiple processes
+ val zkMock = EasyMock.createNiceMock(classOf[KafkaZkClient])
+ EasyMock.expect(zkMock.topicExists(topic)).andReturn(false)
+ EasyMock.expect(zkMock.getAllTopicsInCluster).andReturn(Seq("some.topic", topic, "some.other.topic"))
+ EasyMock.replay(zkMock)
+ val adminZkClient = new AdminZkClient(zkMock)
+
+ intercept[TopicExistsException] {
+ adminZkClient.validateCreateOrUpdateTopic(topic, Map.empty, new Properties, update = false)
+ }
+ }
+
+ /**
+ * This test creates a topic with a few config overrides and checks that the configs are applied to the new topic
+ * then changes the config and checks that the new values take effect.
+ */
+ @Test
+ def testTopicConfigChange() {
+ val partitions = 3
+ val topic = "my-topic"
+ val server = TestUtils.createServer(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect)))
+ servers = Seq(server)
+
+ def makeConfig(messageSize: Int, retentionMs: Long, throttledLeaders: String, throttledFollowers: String) = {
+ val props = new Properties()
+ props.setProperty(LogConfig.MaxMessageBytesProp, messageSize.toString)
+ props.setProperty(LogConfig.RetentionMsProp, retentionMs.toString)
+ props.setProperty(LogConfig.LeaderReplicationThrottledReplicasProp, throttledLeaders)
+ props.setProperty(LogConfig.FollowerReplicationThrottledReplicasProp, throttledFollowers)
+ props
+ }
+
+ def checkConfig(messageSize: Int, retentionMs: Long, throttledLeaders: String, throttledFollowers: String, quotaManagerIsThrottled: Boolean) {
+ def checkList(actual: util.List[String], expected: String): Unit = {
+ assertNotNull(actual)
+ if (expected == "")
+ assertTrue(actual.isEmpty)
+ else
+ assertEquals(expected.split(",").toSeq, actual.asScala)
+ }
+ TestUtils.retry(10000) {
+ for (part <- 0 until partitions) {
+ val tp = new TopicPartition(topic, part)
+ val log = server.logManager.getLog(tp)
+ assertTrue(log.isDefined)
+ assertEquals(retentionMs, log.get.config.retentionMs)
+ assertEquals(messageSize, log.get.config.maxMessageSize)
+ checkList(log.get.config.LeaderReplicationThrottledReplicas, throttledLeaders)
+ checkList(log.get.config.FollowerReplicationThrottledReplicas, throttledFollowers)
+ assertEquals(quotaManagerIsThrottled, server.quotaManagers.leader.isThrottled(tp))
+ }
+ }
+ }
+
+ // create a topic with a few config overrides and check that they are applied
+ val maxMessageSize = 1024
+ val retentionMs = 1000 * 1000
+ adminZkClient.createTopic(topic, partitions, 1, makeConfig(maxMessageSize, retentionMs, "0:0,1:0,2:0", "0:1,1:1,2:1"))
+
+ //Standard topic configs will be propagated at topic creation time, but the quota manager will not have been updated.
+ checkConfig(maxMessageSize, retentionMs, "0:0,1:0,2:0", "0:1,1:1,2:1", false)
+
+ //Update dynamically and all properties should be applied
+ adminZkClient.changeTopicConfig(topic, makeConfig(maxMessageSize, retentionMs, "0:0,1:0,2:0", "0:1,1:1,2:1"))
+
+ checkConfig(maxMessageSize, retentionMs, "0:0,1:0,2:0", "0:1,1:1,2:1", true)
+
+ // now double the config values for the topic and check that it is applied
+ val newConfig = makeConfig(2 * maxMessageSize, 2 * retentionMs, "*", "*")
+ adminZkClient.changeTopicConfig(topic, makeConfig(2 * maxMessageSize, 2 * retentionMs, "*", "*"))
+ checkConfig(2 * maxMessageSize, 2 * retentionMs, "*", "*", quotaManagerIsThrottled = true)
+
+ // Verify that the same config can be read from ZK
+ val configInZk = adminZkClient.fetchEntityConfig(ConfigType.Topic, topic)
+ assertEquals(newConfig, configInZk)
+
+ //Now delete the config
+ adminZkClient.changeTopicConfig(topic, new Properties)
+ checkConfig(Defaults.MaxMessageSize, Defaults.RetentionMs, "", "", quotaManagerIsThrottled = false)
+
+ //Add config back
+ adminZkClient.changeTopicConfig(topic, makeConfig(maxMessageSize, retentionMs, "0:0,1:0,2:0", "0:1,1:1,2:1"))
+ checkConfig(maxMessageSize, retentionMs, "0:0,1:0,2:0", "0:1,1:1,2:1", quotaManagerIsThrottled = true)
+
+ //Now ensure updating to "" removes the throttled replica list also
+ adminZkClient.changeTopicConfig(topic, propsWith((LogConfig.FollowerReplicationThrottledReplicasProp, ""), (LogConfig.LeaderReplicationThrottledReplicasProp, "")))
+ checkConfig(Defaults.MaxMessageSize, Defaults.RetentionMs, "", "", quotaManagerIsThrottled = false)
+ }
+
+ @Test
+ def shouldPropagateDynamicBrokerConfigs() {
+ val brokerIds = Seq(0, 1, 2)
+ servers = createBrokerConfigs(3, zkConnect).map(fromProps).map(createServer(_))
+
+ def checkConfig(limit: Long) {
+ retry(10000) {
+ for (server <- servers) {
+ assertEquals("Leader Quota Manager was not updated", limit, server.quotaManagers.leader.upperBound)
+ assertEquals("Follower Quota Manager was not updated", limit, server.quotaManagers.follower.upperBound)
+ }
+ }
+ }
+
+ val limit: Long = 1000000
+
+ // Set the limit & check it is applied to the log
+ adminZkClient.changeBrokerConfig(brokerIds, propsWith(
+ (LeaderReplicationThrottledRateProp, limit.toString),
+ (FollowerReplicationThrottledRateProp, limit.toString)))
+ checkConfig(limit)
+
+ // Now double the config values for the topic and check that it is applied
+ val newLimit = 2 * limit
+ adminZkClient.changeBrokerConfig(brokerIds, propsWith(
+ (LeaderReplicationThrottledRateProp, newLimit.toString),
+ (FollowerReplicationThrottledRateProp, newLimit.toString)))
+ checkConfig(newLimit)
+
+ // Verify that the same config can be read from ZK
+ for (brokerId <- brokerIds) {
+ val configInZk = adminZkClient.fetchEntityConfig(ConfigType.Broker, brokerId.toString)
+ assertEquals(newLimit, configInZk.getProperty(LeaderReplicationThrottledRateProp).toInt)
+ assertEquals(newLimit, configInZk.getProperty(FollowerReplicationThrottledRateProp).toInt)
+ }
+
+ //Now delete the config
+ adminZkClient.changeBrokerConfig(brokerIds, new Properties)
+ checkConfig(DefaultReplicationThrottledRate)
+ }
+
+ /**
+ * This test simulates a client config change in ZK whose notification has been purged.
+ * Basically, it asserts that notifications are bootstrapped from ZK
+ */
+ @Test
+ def testBootstrapClientIdConfig() {
+ val clientId = "my-client"
+ val props = new Properties()
+ props.setProperty("producer_byte_rate", "1000")
+ props.setProperty("consumer_byte_rate", "2000")
+
+ // Write config without notification to ZK.
+ zkClient.setOrCreateEntityConfigs(ConfigType.Client, clientId, props)
+
+ val configInZk: Map[String, Properties] = adminZkClient.fetchAllEntityConfigs(ConfigType.Client)
+ assertEquals("Must have 1 overriden client config", 1, configInZk.size)
+ assertEquals(props, configInZk(clientId))
+
+ // Test that the existing clientId overrides are read
+ val server = TestUtils.createServer(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect)))
+ servers = Seq(server)
+ assertEquals(new Quota(1000, true), server.apis.quotas.produce.quota("ANONYMOUS", clientId))
+ assertEquals(new Quota(2000, true), server.apis.quotas.fetch.quota("ANONYMOUS", clientId))
+ }
+
+ @Test
+ def testGetBrokerMetadatas() {
+ // broker 4 has no rack information
+ val brokerList = 0 to 5
+ val rackInfo = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 -> "rack1", 5 -> "rack3")
+ val brokerMetadatas = toBrokerMetadata(rackInfo, brokersWithoutRack = brokerList.filterNot(rackInfo.keySet))
+ TestUtils.createBrokersInZk(brokerMetadatas, zkUtils)
+
+ val processedMetadatas1 = adminZkClient.getBrokerMetadatas(RackAwareMode.Disabled)
+ assertEquals(brokerList, processedMetadatas1.map(_.id))
+ assertEquals(List.fill(brokerList.size)(None), processedMetadatas1.map(_.rack))
+
+ val processedMetadatas2 = adminZkClient.getBrokerMetadatas(RackAwareMode.Safe)
+ assertEquals(brokerList, processedMetadatas2.map(_.id))
+ assertEquals(List.fill(brokerList.size)(None), processedMetadatas2.map(_.rack))
+
+ intercept[AdminOperationException] {
+ adminZkClient.getBrokerMetadatas(RackAwareMode.Enforced)
+ }
+
+ val partialList = List(0, 1, 2, 3, 5)
+ val processedMetadatas3 = adminZkClient.getBrokerMetadatas(RackAwareMode.Enforced, Some(partialList))
+ assertEquals(partialList, processedMetadatas3.map(_.id))
+ assertEquals(partialList.map(rackInfo), processedMetadatas3.flatMap(_.rack))
+
+ val numPartitions = 3
+ adminZkClient.createTopic("foo", numPartitions, 2, rackAwareMode = RackAwareMode.Safe)
+ val assignment = zkClient.getReplicaAssignmentForTopics(Set("foo"))
+ assertEquals(numPartitions, assignment.size)
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
index 347569d..28d8430 100644
--- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
@@ -16,9 +16,11 @@
*/
package kafka.zk
-import java.util.UUID
+import java.util.{Properties, UUID}
+import kafka.log.LogConfig
import kafka.security.auth._
+import kafka.server.ConfigType
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.junit.Assert.{assertEquals, assertFalse, assertTrue}
@@ -70,30 +72,57 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
zkClient.createRecursive("/create/some/random/long/path")
assertTrue(zkClient.pathExists("/create/some/random/long/path"))
- zkClient.createRecursive("/create/some/random/long/path") // no errors if path already exists
+ zkClient.createRecursive("/create/some/random/long/path", throwIfPathExists = false) // no errors if path already exists
intercept[IllegalArgumentException](zkClient.createRecursive("create-invalid-path"))
}
@Test
- def testGetTopicPartitionCount() {
- val topic = "mytest"
+ def testTopicAssignmentMethods() {
+ val topic1 = "topic1"
+ val topic2 = "topic2"
// test with non-existing topic
- assertTrue(zkClient.getTopicPartitionCount(topic).isEmpty)
-
- // create a topic path
- zkClient.createRecursive(TopicZNode.path(topic))
+ assertTrue(zkClient.getTopicPartitionCount(topic1).isEmpty)
+ assertTrue(zkClient.getPartitionAssignmentForTopics(Set(topic1)).isEmpty)
+ assertTrue(zkClient.getPartitionsForTopics(Set(topic1)).isEmpty)
+ assertTrue(zkClient.getReplicasForPartition(new TopicPartition(topic1, 2)).isEmpty)
val assignment = Map(
- new TopicPartition(topic, 0) -> Seq(0, 1),
- new TopicPartition(topic, 1) -> Seq(0, 1)
+ new TopicPartition(topic1, 0) -> Seq(0, 1),
+ new TopicPartition(topic1, 1) -> Seq(0, 1),
+ new TopicPartition(topic1, 2) -> Seq(1, 2, 3)
)
- zkClient.setTopicAssignmentRaw(topic, assignment)
- assertEquals(2, zkClient.getTopicPartitionCount(topic).get)
- }
+ // create a topic assignment
+ zkClient.createTopicAssignment(topic1, assignment)
+
+ val expectedAssignment = assignment map { topicAssignment =>
+ val partition = topicAssignment._1.partition
+ val assignment = topicAssignment._2
+ partition -> assignment
+ }
+
+ assertEquals(assignment.size, zkClient.getTopicPartitionCount(topic1).get)
+ assertEquals(expectedAssignment, zkClient.getPartitionAssignmentForTopics(Set(topic1)).get(topic1).get)
+ assertEquals(Set(0, 1, 2), zkClient.getPartitionsForTopics(Set(topic1)).get(topic1).get.toSet)
+ assertEquals(Set(1, 2, 3), zkClient.getReplicasForPartition(new TopicPartition(topic1, 2)).toSet)
+
+ val updatedAssignment = assignment - new TopicPartition(topic1, 2)
+
+ zkClient.setTopicAssignment(topic1, updatedAssignment)
+ assertEquals(updatedAssignment.size, zkClient.getTopicPartitionCount(topic1).get)
+
+ // add second topic
+ val secondAssignment = Map(
+ new TopicPartition(topic2, 0) -> Seq(0, 1),
+ new TopicPartition(topic2, 1) -> Seq(0, 1)
+ )
+
+ zkClient.createTopicAssignment(topic2, secondAssignment)
+ assertEquals(Set(topic1, topic2), zkClient.getAllTopicsInCluster.toSet)
+ }
@Test
def testGetDataAndVersion() {
@@ -163,6 +192,9 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
zkClient.deletePartitionReassignment()
assertEquals(Map.empty, zkClient.getPartitionReassignment)
+
+ zkClient.createPartitionReassignment(reassignment)
+ assertEquals(reassignment, zkClient.getPartitionReassignment)
}
@Test
@@ -276,6 +308,49 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
zkClient.deleteAclChangeNotifications()
assertTrue(zkClient.getChildren(AclChangeNotificationZNode.path).isEmpty)
+ }
+
+ @Test
+ def testDeleteTopicPathMethods() {
+ val topic1 = "topic1"
+ val topic2 = "topic2"
+
+ assertFalse(zkClient.isTopicMarkedForDeletion(topic1))
+ assertTrue(zkClient.getTopicDeletions.isEmpty)
+
+ zkClient.createDeleteTopicPath(topic1)
+ zkClient.createDeleteTopicPath(topic2)
+
+ assertTrue(zkClient.isTopicMarkedForDeletion(topic1))
+ assertEquals(Set(topic1, topic2), zkClient.getTopicDeletions.toSet)
+
+ zkClient.deleteTopicDeletions(Seq(topic1, topic2))
+ assertTrue(zkClient.getTopicDeletions.isEmpty)
+ }
+
+ @Test
+ def testEntityConfigManagementMethods() {
+ val topic1 = "topic1"
+ val topic2 = "topic2"
+
+ assertTrue(zkClient.getEntityConfigs(ConfigType.Topic, topic1).isEmpty)
+
+ val logProps = new Properties()
+ logProps.put(LogConfig.SegmentBytesProp, "1024")
+ logProps.put(LogConfig.SegmentIndexBytesProp, "1024")
+ logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
+
+ zkClient.setOrCreateEntityConfigs(ConfigType.Topic, topic1, logProps)
+ assertEquals(logProps, zkClient.getEntityConfigs(ConfigType.Topic, topic1))
+
+ logProps.remove(LogConfig.CleanupPolicyProp)
+ zkClient.setOrCreateEntityConfigs(ConfigType.Topic, topic1, logProps)
+ assertEquals(logProps, zkClient.getEntityConfigs(ConfigType.Topic, topic1))
+
+ zkClient.setOrCreateEntityConfigs(ConfigType.Topic, topic2, logProps)
+ assertEquals(Set(topic1, topic2), zkClient.getAllEntitiesWithConfig(ConfigType.Topic).toSet)
+ zkClient.deleteTopicConfigs(Seq(topic1, topic2))
+ assertTrue(zkClient.getEntityConfigs(ConfigType.Topic, topic1).isEmpty)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
index cc1b9c1..4966b10 100755
--- a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
@@ -46,6 +46,8 @@ abstract class ZooKeeperTestHarness extends JUnitSuite with Logging {
var zkUtils: ZkUtils = null
var zooKeeperClient: ZooKeeperClient = null
var zkClient: KafkaZkClient = null
+ var adminZkClient: AdminZkClient = null
+
var zookeeper: EmbeddedZookeeper = null
def zkPort: Int = zookeeper.port
@@ -58,6 +60,7 @@ abstract class ZooKeeperTestHarness extends JUnitSuite with Logging {
zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, zkMaxInFlightRequests)
zkClient = new KafkaZkClient(zooKeeperClient, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled()))
+ adminZkClient = new AdminZkClient(zkClient)
}
@After
[3/3] kafka git commit: KAFKA-5646;
Use KafkaZkClient in DynamicConfigManager and AdminManager
Posted by ju...@apache.org.
KAFKA-5646; Use KafkaZkClient in DynamicConfigManager and AdminManager
* Add AdminZkClient class
* Use KafkaZkClient, AdminZkClient in ConfigCommand, TopicCommand
* All the existing tests should work
Author: Manikumar Reddy <ma...@gmail.com>
Reviewers: Ismael Juma <is...@juma.me.uk>, Jun Rao <ju...@gmail.com>
Closes #4194 from omkreddy/KAFKA-5646-ZK-ADMIN-UTILS-DYNAMIC-MANAGER
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/bc852baf
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/bc852baf
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/bc852baf
Branch: refs/heads/trunk
Commit: bc852baffbf602ead9cb719a01747de414940d53
Parents: c5f31fe
Author: Manikumar Reddy <ma...@gmail.com>
Authored: Wed Nov 22 13:25:52 2017 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Wed Nov 22 13:25:52 2017 -0800
----------------------------------------------------------------------
.../src/main/scala/kafka/admin/AdminUtils.scala | 15 +
.../main/scala/kafka/admin/ConfigCommand.scala | 37 +-
.../main/scala/kafka/admin/TopicCommand.scala | 86 ++--
.../main/scala/kafka/server/AdminManager.scala | 32 +-
.../kafka/server/DynamicConfigManager.scala | 16 +-
.../src/main/scala/kafka/server/KafkaApis.scala | 10 +-
.../main/scala/kafka/server/KafkaServer.scala | 6 +-
.../src/main/scala/kafka/zk/AdminZkClient.scala | 417 +++++++++++++++++++
.../src/main/scala/kafka/zk/KafkaZkClient.scala | 339 ++++++++++++---
core/src/main/scala/kafka/zk/ZkData.scala | 27 +-
.../ReassignPartitionsIntegrationTest.scala | 2 +-
.../kafka/api/AuthorizerIntegrationTest.scala | 5 +-
.../kafka/api/BaseProducerSendTest.scala | 3 +-
.../kafka/api/ClientIdQuotaTest.scala | 3 +-
.../api/RackAwareAutoTopicCreationTest.scala | 4 +-
.../kafka/api/UserClientIdQuotaTest.scala | 9 +-
.../integration/kafka/api/UserQuotaTest.scala | 5 +-
.../ReplicaFetcherThreadFatalErrorTest.scala | 3 +-
.../unit/kafka/admin/AddPartitionsTest.scala | 12 +-
.../unit/kafka/admin/ConfigCommandTest.scala | 78 ++--
.../unit/kafka/admin/DeleteTopicTest.scala | 38 +-
.../kafka/admin/DescribeConsumerGroupTest.scala | 4 +-
.../kafka/admin/ListConsumerGroupTest.scala | 3 +-
.../admin/ResetConsumerGroupOffsetTest.scala | 80 ++--
.../unit/kafka/admin/TopicCommandTest.scala | 44 +-
.../controller/ControllerFailoverTest.scala | 3 +-
...MetricsDuringTopicCreationDeletionTest.scala | 5 +-
.../kafka/integration/TopicMetadataTest.scala | 3 +-
.../integration/UncleanLeaderElectionTest.scala | 11 +-
.../scala/unit/kafka/metrics/MetricsTest.scala | 9 +-
.../unit/kafka/producer/SyncProducerTest.scala | 8 +-
.../kafka/server/DynamicConfigChangeTest.scala | 24 +-
.../unit/kafka/server/DynamicConfigTest.scala | 29 +-
.../scala/unit/kafka/server/KafkaApisTest.scala | 1 -
.../scala/unit/kafka/server/LogOffsetTest.scala | 9 +-
.../unit/kafka/server/OffsetCommitTest.scala | 4 +-
.../kafka/server/ReplicationQuotasTest.scala | 16 +-
.../unit/kafka/server/RequestQuotaTest.scala | 5 +-
...rivenReplicationProtocolAcceptanceTest.scala | 9 +-
.../epoch/LeaderEpochIntegrationTest.scala | 7 +-
.../scala/unit/kafka/zk/AdminZkClientTest.scala | 323 ++++++++++++++
.../scala/unit/kafka/zk/KafkaZkClientTest.scala | 101 ++++-
.../unit/kafka/zk/ZooKeeperTestHarness.scala | 3 +
43 files changed, 1459 insertions(+), 389 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/main/scala/kafka/admin/AdminUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala
index 32cab2a..09a65af 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -32,6 +32,7 @@ import scala.collection.JavaConverters._
import org.I0Itec.zkclient.exception.ZkNodeExistsException
import org.apache.kafka.common.internals.Topic
+@deprecated("This class is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
trait AdminUtilities {
def changeTopicConfig(zkUtils: ZkUtils, topic: String, configs: Properties)
def changeClientIdConfig(zkUtils: ZkUtils, clientId: String, configs: Properties)
@@ -267,6 +268,7 @@ object AdminUtils extends Logging with AdminUtilities {
* @param validateOnly If true, validate the parameters without actually adding the partitions
* @return the updated replica assignment
*/
+ @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
def addPartitions(zkUtils: ZkUtils,
topic: String,
existingAssignment: Map[Int, Seq[Int]],
@@ -359,6 +361,7 @@ object AdminUtils extends Logging with AdminUtilities {
}
}
+ @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
def deleteTopic(zkUtils: ZkUtils, topic: String) {
if (topicExists(zkUtils, topic)) {
try {
@@ -434,6 +437,7 @@ object AdminUtils extends Logging with AdminUtilities {
def topicExists(zkUtils: ZkUtils, topic: String): Boolean =
zkUtils.pathExists(getTopicPath(topic))
+ @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
def getBrokerMetadatas(zkUtils: ZkUtils, rackAwareMode: RackAwareMode = RackAwareMode.Enforced,
brokerList: Option[Seq[Int]] = None): Seq[BrokerMetadata] = {
val allBrokers = zkUtils.getAllBrokersInCluster()
@@ -452,6 +456,7 @@ object AdminUtils extends Logging with AdminUtilities {
brokerMetadatas.sortBy(_.id)
}
+ @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
def createTopic(zkUtils: ZkUtils,
topic: String,
partitions: Int,
@@ -463,6 +468,7 @@ object AdminUtils extends Logging with AdminUtilities {
AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, replicaAssignment, topicConfig)
}
+ @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
def validateCreateOrUpdateTopic(zkUtils: ZkUtils,
topic: String,
partitionReplicaAssignment: Map[Int, Seq[Int]],
@@ -501,6 +507,7 @@ object AdminUtils extends Logging with AdminUtilities {
LogConfig.validate(config)
}
+ @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
def createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils: ZkUtils,
topic: String,
partitionReplicaAssignment: Map[Int, Seq[Int]],
@@ -548,6 +555,7 @@ object AdminUtils extends Logging with AdminUtilities {
* existing configs need to be deleted, it should be done prior to invoking this API
*
*/
+ @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
def changeClientIdConfig(zkUtils: ZkUtils, sanitizedClientId: String, configs: Properties) {
DynamicConfig.Client.validate(configs)
changeEntityConfig(zkUtils, ConfigType.Client, sanitizedClientId, configs)
@@ -564,6 +572,7 @@ object AdminUtils extends Logging with AdminUtilities {
* existing configs need to be deleted, it should be done prior to invoking this API
*
*/
+ @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
def changeUserOrUserClientIdConfig(zkUtils: ZkUtils, sanitizedEntityName: String, configs: Properties) {
if (sanitizedEntityName == ConfigEntityName.Default || sanitizedEntityName.contains("/clients"))
DynamicConfig.Client.validate(configs)
@@ -589,6 +598,7 @@ object AdminUtils extends Logging with AdminUtilities {
* existing configs need to be deleted, it should be done prior to invoking this API
*
*/
+ @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
def changeTopicConfig(zkUtils: ZkUtils, topic: String, configs: Properties) {
validateTopicConfig(zkUtils, topic, configs)
changeEntityConfig(zkUtils, ConfigType.Topic, topic, configs)
@@ -602,6 +612,7 @@ object AdminUtils extends Logging with AdminUtilities {
* @param brokers: The list of brokers to apply config changes to
* @param configs: The config to change, as properties
*/
+ @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
def changeBrokerConfig(zkUtils: ZkUtils, brokers: Seq[Int], configs: Properties): Unit = {
DynamicConfig.Broker.validate(configs)
brokers.foreach { broker =>
@@ -637,6 +648,7 @@ object AdminUtils extends Logging with AdminUtilities {
* Read the entity (topic, broker, client, user or <user, client>) config (if any) from zk
* sanitizedEntityName is <topic>, <broker>, <client-id>, <user> or <user>/clients/<client-id>.
*/
+ @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
def fetchEntityConfig(zkUtils: ZkUtils, rootEntityType: String, sanitizedEntityName: String): Properties = {
val entityConfigPath = getEntityConfigPath(rootEntityType, sanitizedEntityName)
// readDataMaybeNull returns Some(null) if the path exists, but there is no data
@@ -657,12 +669,15 @@ object AdminUtils extends Logging with AdminUtilities {
props
}
+ @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
def fetchAllTopicConfigs(zkUtils: ZkUtils): Map[String, Properties] =
zkUtils.getAllTopics().map(topic => (topic, fetchEntityConfig(zkUtils, ConfigType.Topic, topic))).toMap
+ @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
def fetchAllEntityConfigs(zkUtils: ZkUtils, entityType: String): Map[String, Properties] =
zkUtils.getAllEntitiesWithConfig(entityType).map(entity => (entity, fetchEntityConfig(zkUtils, entityType, entity))).toMap
+ @deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
def fetchAllChildEntityConfigs(zkUtils: ZkUtils, rootEntityType: String, childEntityType: String): Map[String, Properties] = {
def entityPaths(zkUtils: ZkUtils, rootPath: Option[String]): Seq[String] = {
val root = rootPath match {
http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/main/scala/kafka/admin/ConfigCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala
index febf40f..077ecce 100644
--- a/core/src/main/scala/kafka/admin/ConfigCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala
@@ -24,8 +24,10 @@ import kafka.common.Config
import kafka.common.InvalidConfigException
import kafka.log.LogConfig
import kafka.server.{ConfigEntityName, ConfigType, DynamicConfig}
-import kafka.utils.{CommandLineUtils, ZkUtils}
+import kafka.utils.CommandLineUtils
import kafka.utils.Implicits._
+import kafka.zk.{AdminZkClient, KafkaZkClient}
+import kafka.zookeeper.ZooKeeperClient
import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.common.security.scram._
import org.apache.kafka.common.utils.{Sanitizer, Utils}
@@ -61,26 +63,25 @@ object ConfigCommand extends Config {
opts.checkArgs()
- val zkUtils = ZkUtils(opts.options.valueOf(opts.zkConnectOpt),
- 30000,
- 30000,
- JaasUtils.isZkSecurityEnabled())
+ val zooKeeperClient = new ZooKeeperClient(opts.options.valueOf(opts.zkConnectOpt), 30000, 30000, Int.MaxValue)
+ val zkClient = new KafkaZkClient(zooKeeperClient, JaasUtils.isZkSecurityEnabled())
+ val adminZkClient = new AdminZkClient(zkClient)
try {
if (opts.options.has(opts.alterOpt))
- alterConfig(zkUtils, opts)
+ alterConfig(zkClient, opts, adminZkClient)
else if (opts.options.has(opts.describeOpt))
- describeConfig(zkUtils, opts)
+ describeConfig(zkClient, opts, adminZkClient)
} catch {
case e: Throwable =>
println("Error while executing config command " + e.getMessage)
println(Utils.stackTrace(e))
} finally {
- zkUtils.close()
+ zkClient.close()
}
}
- private[admin] def alterConfig(zkUtils: ZkUtils, opts: ConfigCommandOptions, utils: AdminUtilities = AdminUtils) {
+ private[admin] def alterConfig(zkClient: KafkaZkClient, opts: ConfigCommandOptions, adminZkClient: AdminZkClient) {
val configsToBeAdded = parseConfigsToBeAdded(opts)
val configsToBeDeleted = parseConfigsToBeDeleted(opts)
val entity = parseEntity(opts)
@@ -91,7 +92,7 @@ object ConfigCommand extends Config {
preProcessScramCredentials(configsToBeAdded)
// compile the final set of configs
- val configs = utils.fetchEntityConfig(zkUtils, entityType, entityName)
+ val configs = adminZkClient.fetchEntityConfig(entityType, entityName)
// fail the command if any of the configs to be deleted does not exist
val invalidConfigs = configsToBeDeleted.filterNot(configs.containsKey(_))
@@ -101,7 +102,7 @@ object ConfigCommand extends Config {
configs ++= configsToBeAdded
configsToBeDeleted.foreach(configs.remove(_))
- utils.changeConfigs(zkUtils, entityType, entityName, configs)
+ adminZkClient.changeConfigs(entityType, entityName, configs)
println(s"Completed Updating config for entity: $entity.")
}
@@ -127,12 +128,12 @@ object ConfigCommand extends Config {
}
}
- private def describeConfig(zkUtils: ZkUtils, opts: ConfigCommandOptions) {
+ private def describeConfig(zkClient: KafkaZkClient, opts: ConfigCommandOptions, adminZkClient: AdminZkClient) {
val configEntity = parseEntity(opts)
val describeAllUsers = configEntity.root.entityType == ConfigType.User && !configEntity.root.sanitizedName.isDefined && !configEntity.child.isDefined
- val entities = configEntity.getAllEntities(zkUtils)
+ val entities = configEntity.getAllEntities(zkClient)
for (entity <- entities) {
- val configs = AdminUtils.fetchEntityConfig(zkUtils, entity.root.entityType, entity.fullSanitizedName)
+ val configs = adminZkClient.fetchEntityConfig(entity.root.entityType, entity.fullSanitizedName)
// When describing all users, don't include empty user nodes with only <user, client> quota overrides.
if (!configs.isEmpty || !describeAllUsers) {
println("Configs for %s are %s"
@@ -196,7 +197,7 @@ object ConfigCommand extends Config {
case class ConfigEntity(root: Entity, child: Option[Entity]) {
val fullSanitizedName = root.sanitizedName.getOrElse("") + child.map(s => "/" + s.entityPath).getOrElse("")
- def getAllEntities(zkUtils: ZkUtils) : Seq[ConfigEntity] = {
+ def getAllEntities(zkClient: KafkaZkClient) : Seq[ConfigEntity] = {
// Describe option examples:
// Describe entity with specified name:
// --entity-type topics --entity-name topic1 (topic1)
@@ -211,19 +212,19 @@ object ConfigCommand extends Config {
// --entity-type users --entity-default --entity-type clients --entity-default (Default <user, client>)
(root.sanitizedName, child) match {
case (None, _) =>
- val rootEntities = zkUtils.getAllEntitiesWithConfig(root.entityType)
+ val rootEntities = zkClient.getAllEntitiesWithConfig(root.entityType)
.map(name => ConfigEntity(Entity(root.entityType, Some(name)), child))
child match {
case Some(s) =>
rootEntities.flatMap(rootEntity =>
- ConfigEntity(rootEntity.root, Some(Entity(s.entityType, None))).getAllEntities(zkUtils))
+ ConfigEntity(rootEntity.root, Some(Entity(s.entityType, None))).getAllEntities(zkClient))
case None => rootEntities
}
case (_, Some(childEntity)) =>
childEntity.sanitizedName match {
case Some(_) => Seq(this)
case None =>
- zkUtils.getAllEntitiesWithConfig(root.entityPath + "/" + childEntity.entityType)
+ zkClient.getAllEntitiesWithConfig(root.entityPath + "/" + childEntity.entityType)
.map(name => ConfigEntity(root, Some(Entity(childEntity.entityType, Some(name)))))
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/main/scala/kafka/admin/TopicCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala
index f2a74a0..bdd8aaf 100755
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -25,18 +25,19 @@ import kafka.utils.Implicits._
import kafka.consumer.Whitelist
import kafka.log.LogConfig
import kafka.server.ConfigType
-import kafka.utils.ZkUtils._
import kafka.utils._
-import org.I0Itec.zkclient.exception.ZkNodeExistsException
+import kafka.zk.{AdminZkClient, KafkaZkClient}
+import kafka.zookeeper.ZooKeeperClient
import org.apache.kafka.common.errors.{InvalidTopicException, TopicExistsException}
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.common.utils.Utils
+import org.apache.zookeeper.KeeperException.NodeExistsException
+import org.apache.kafka.common.TopicPartition
import scala.collection.JavaConverters._
import scala.collection._
-
object TopicCommand extends Logging {
def main(args: Array[String]): Unit = {
@@ -53,36 +54,35 @@ object TopicCommand extends Logging {
opts.checkArgs()
- val zkUtils = ZkUtils(opts.options.valueOf(opts.zkConnectOpt),
- 30000,
- 30000,
- JaasUtils.isZkSecurityEnabled())
+ val zooKeeperClient = new ZooKeeperClient(opts.options.valueOf(opts.zkConnectOpt), 30000, 30000, Int.MaxValue)
+ val zkClient = new KafkaZkClient(zooKeeperClient, JaasUtils.isZkSecurityEnabled())
+
var exitCode = 0
try {
if(opts.options.has(opts.createOpt))
- createTopic(zkUtils, opts)
+ createTopic(zkClient, opts)
else if(opts.options.has(opts.alterOpt))
- alterTopic(zkUtils, opts)
+ alterTopic(zkClient, opts)
else if(opts.options.has(opts.listOpt))
- listTopics(zkUtils, opts)
+ listTopics(zkClient, opts)
else if(opts.options.has(opts.describeOpt))
- describeTopic(zkUtils, opts)
+ describeTopic(zkClient, opts)
else if(opts.options.has(opts.deleteOpt))
- deleteTopic(zkUtils, opts)
+ deleteTopic(zkClient, opts)
} catch {
case e: Throwable =>
println("Error while executing topic command : " + e.getMessage)
error(Utils.stackTrace(e))
exitCode = 1
} finally {
- zkUtils.close()
+ zkClient.close()
Exit.exit(exitCode)
}
}
- private def getTopics(zkUtils: ZkUtils, opts: TopicCommandOptions): Seq[String] = {
- val allTopics = zkUtils.getAllTopics().sorted
+ private def getTopics(zkClient: KafkaZkClient, opts: TopicCommandOptions): Seq[String] = {
+ val allTopics = zkClient.getAllTopicsInCluster.sorted
if (opts.options.has(opts.topicOpt)) {
val topicsSpec = opts.options.valueOf(opts.topicOpt)
val topicsFilter = new Whitelist(topicsSpec)
@@ -91,23 +91,24 @@ object TopicCommand extends Logging {
allTopics
}
- def createTopic(zkUtils: ZkUtils, opts: TopicCommandOptions) {
+ def createTopic(zkClient: KafkaZkClient, opts: TopicCommandOptions) {
val topic = opts.options.valueOf(opts.topicOpt)
val configs = parseTopicConfigsToBeAdded(opts)
val ifNotExists = opts.options.has(opts.ifNotExistsOpt)
if (Topic.hasCollisionChars(topic))
println("WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.")
+ val adminZkClient = new AdminZkClient(zkClient)
try {
if (opts.options.has(opts.replicaAssignmentOpt)) {
val assignment = parseReplicaAssignment(opts.options.valueOf(opts.replicaAssignmentOpt))
- AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, assignment, configs, update = false)
+ adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, assignment, configs, update = false)
} else {
CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.partitionsOpt, opts.replicationFactorOpt)
val partitions = opts.options.valueOf(opts.partitionsOpt).intValue
val replicas = opts.options.valueOf(opts.replicationFactorOpt).intValue
val rackAwareMode = if (opts.options.has(opts.disableRackAware)) RackAwareMode.Disabled
else RackAwareMode.Enforced
- AdminUtils.createTopic(zkUtils, topic, partitions, replicas, configs, rackAwareMode)
+ adminZkClient.createTopic(topic, partitions, replicas, configs, rackAwareMode)
}
println("Created topic \"%s\".".format(topic))
} catch {
@@ -115,15 +116,16 @@ object TopicCommand extends Logging {
}
}
- def alterTopic(zkUtils: ZkUtils, opts: TopicCommandOptions) {
- val topics = getTopics(zkUtils, opts)
+ def alterTopic(zkClient: KafkaZkClient, opts: TopicCommandOptions) {
+ val topics = getTopics(zkClient, opts)
val ifExists = opts.options.has(opts.ifExistsOpt)
if (topics.isEmpty && !ifExists) {
throw new IllegalArgumentException("Topic %s does not exist on ZK path %s".format(opts.options.valueOf(opts.topicOpt),
opts.options.valueOf(opts.zkConnectOpt)))
}
+ val adminZkClient = new AdminZkClient(zkClient)
topics.foreach { topic =>
- val configs = AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic, topic)
+ val configs = adminZkClient.fetchEntityConfig(ConfigType.Topic, topic)
if(opts.options.has(opts.configOpt) || opts.options.has(opts.deleteConfigOpt)) {
println("WARNING: Altering topic configuration from this script has been deprecated and may be removed in future releases.")
println(" Going forward, please use kafka-configs.sh for this functionality")
@@ -133,7 +135,7 @@ object TopicCommand extends Logging {
// compile the final set of configs
configs ++= configsToBeAdded
configsToBeDeleted.foreach(config => configs.remove(config))
- AdminUtils.changeTopicConfig(zkUtils, topic, configs)
+ adminZkClient.changeTopicConfig(topic, configs)
println("Updated config for topic \"%s\".".format(topic))
}
@@ -144,7 +146,7 @@ object TopicCommand extends Logging {
println("WARNING: If partitions are increased for a topic that has a key, the partition " +
"logic or ordering of the messages will be affected")
val nPartitions = opts.options.valueOf(opts.partitionsOpt).intValue
- val existingAssignment = zkUtils.getReplicaAssignmentForTopics(List(topic)).map {
+ val existingAssignment = zkClient.getReplicaAssignmentForTopics(immutable.Set(topic)).map {
case (topicPartition, replicas) => topicPartition.partition -> replicas
}
if (existingAssignment.isEmpty)
@@ -155,17 +157,17 @@ object TopicCommand extends Logging {
val partitionList = replicaAssignmentString.split(",").drop(startPartitionId)
AdminUtils.parseReplicaAssignment(partitionList.mkString(","), startPartitionId)
}
- val allBrokers = AdminUtils.getBrokerMetadatas(zkUtils)
- AdminUtils.addPartitions(zkUtils, topic, existingAssignment, allBrokers, nPartitions, newAssignment)
+ val allBrokers = adminZkClient.getBrokerMetadatas()
+ adminZkClient.addPartitions(topic, existingAssignment, allBrokers, nPartitions, newAssignment)
println("Adding partitions succeeded!")
}
}
}
- def listTopics(zkUtils: ZkUtils, opts: TopicCommandOptions) {
- val topics = getTopics(zkUtils, opts)
+ def listTopics(zkClient: KafkaZkClient, opts: TopicCommandOptions) {
+ val topics = getTopics(zkClient, opts)
for(topic <- topics) {
- if (zkUtils.isTopicMarkedForDeletion(topic)) {
+ if (zkClient.isTopicMarkedForDeletion(topic)) {
println("%s - marked for deletion".format(topic))
} else {
println(topic)
@@ -173,8 +175,8 @@ object TopicCommand extends Logging {
}
}
- def deleteTopic(zkUtils: ZkUtils, opts: TopicCommandOptions) {
- val topics = getTopics(zkUtils, opts)
+ def deleteTopic(zkClient: KafkaZkClient, opts: TopicCommandOptions) {
+ val topics = getTopics(zkClient, opts)
val ifExists = opts.options.has(opts.ifExistsOpt)
if (topics.isEmpty && !ifExists) {
throw new IllegalArgumentException("Topic %s does not exist on ZK path %s".format(opts.options.valueOf(opts.topicOpt),
@@ -185,12 +187,12 @@ object TopicCommand extends Logging {
if (Topic.isInternal(topic)) {
throw new AdminOperationException("Topic %s is a kafka internal topic and is not allowed to be marked for deletion.".format(topic))
} else {
- zkUtils.createPersistentPath(getDeleteTopicPath(topic))
+ zkClient.createDeleteTopicPath(topic)
println("Topic %s is marked for deletion.".format(topic))
println("Note: This will have no impact if delete.topic.enable is not set to true.")
}
} catch {
- case _: ZkNodeExistsException =>
+ case _: NodeExistsException =>
println("Topic %s is already marked for deletion.".format(topic))
case e: AdminOperationException =>
throw e
@@ -200,21 +202,23 @@ object TopicCommand extends Logging {
}
}
- def describeTopic(zkUtils: ZkUtils, opts: TopicCommandOptions) {
- val topics = getTopics(zkUtils, opts)
+ def describeTopic(zkClient: KafkaZkClient, opts: TopicCommandOptions) {
+ val topics = getTopics(zkClient, opts)
val reportUnderReplicatedPartitions = opts.options.has(opts.reportUnderReplicatedPartitionsOpt)
val reportUnavailablePartitions = opts.options.has(opts.reportUnavailablePartitionsOpt)
val reportOverriddenConfigs = opts.options.has(opts.topicsWithOverridesOpt)
- val liveBrokers = zkUtils.getAllBrokersInCluster().map(_.id).toSet
+ val liveBrokers = zkClient.getAllBrokersInCluster.map(_.id).toSet
+ val adminZkClient = new AdminZkClient(zkClient)
+
for (topic <- topics) {
- zkUtils.getPartitionAssignmentForTopics(List(topic)).get(topic) match {
+ zkClient.getPartitionAssignmentForTopics(immutable.Set(topic)).get(topic) match {
case Some(topicPartitionAssignment) =>
val describeConfigs: Boolean = !reportUnavailablePartitions && !reportUnderReplicatedPartitions
val describePartitions: Boolean = !reportOverriddenConfigs
val sortedPartitions = topicPartitionAssignment.toSeq.sortBy(_._1)
- val markedForDeletion = zkUtils.isTopicMarkedForDeletion(topic)
+ val markedForDeletion = zkClient.isTopicMarkedForDeletion(topic)
if (describeConfigs) {
- val configs = AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic, topic).asScala
+ val configs = adminZkClient.fetchEntityConfig(ConfigType.Topic, topic).asScala
if (!reportOverriddenConfigs || configs.nonEmpty) {
val numPartitions = topicPartitionAssignment.size
val replicationFactor = topicPartitionAssignment.head._2.size
@@ -226,8 +230,10 @@ object TopicCommand extends Logging {
}
if (describePartitions) {
for ((partitionId, assignedReplicas) <- sortedPartitions) {
- val inSyncReplicas = zkUtils.getInSyncReplicasForPartition(topic, partitionId)
- val leader = zkUtils.getLeaderForPartition(topic, partitionId)
+ val leaderIsrEpoch = zkClient.getTopicPartitionState(new TopicPartition(topic, partitionId))
+ val inSyncReplicas = if (leaderIsrEpoch.isEmpty) Seq.empty[Int] else leaderIsrEpoch.get.leaderAndIsr.isr
+ val leader = if (leaderIsrEpoch.isEmpty) None else Option(leaderIsrEpoch.get.leaderAndIsr.leader)
+
if ((!reportUnderReplicatedPartitions && !reportUnavailablePartitions) ||
(reportUnderReplicatedPartitions && inSyncReplicas.size < assignedReplicas.size) ||
(reportUnavailablePartitions && (leader.isEmpty || !liveBrokers.contains(leader.get)))) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/main/scala/kafka/server/AdminManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AdminManager.scala b/core/src/main/scala/kafka/server/AdminManager.scala
index 935fade..8f69000 100644
--- a/core/src/main/scala/kafka/server/AdminManager.scala
+++ b/core/src/main/scala/kafka/server/AdminManager.scala
@@ -23,6 +23,7 @@ import kafka.common.TopicAlreadyMarkedForDeletionException
import kafka.log.LogConfig
import kafka.metrics.KafkaMetricsGroup
import kafka.utils._
+import kafka.zk.{AdminZkClient, KafkaZkClient}
import org.apache.kafka.clients.admin.NewPartitions
import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigException, ConfigResource}
import org.apache.kafka.common.errors.{ApiException, InvalidPartitionsException, InvalidReplicaAssignmentException, InvalidRequestException, PolicyViolationException, ReassignmentInProgressException, UnknownTopicOrPartitionException}
@@ -41,11 +42,12 @@ import scala.collection.JavaConverters._
class AdminManager(val config: KafkaConfig,
val metrics: Metrics,
val metadataCache: MetadataCache,
- val zkUtils: ZkUtils) extends Logging with KafkaMetricsGroup {
+ val zkClient: KafkaZkClient) extends Logging with KafkaMetricsGroup {
this.logIdent = "[Admin Manager on Broker " + config.brokerId + "]: "
private val topicPurgatory = DelayedOperationPurgatory[DelayedOperation]("topic", config.brokerId)
+ private val adminZkClient = new AdminZkClient(zkClient)
private val createTopicPolicy =
Option(config.getConfiguredInstance(KafkaConfig.CreateTopicPolicyClassNameProp, classOf[CreateTopicPolicy]))
@@ -101,7 +103,7 @@ class AdminManager(val config: KafkaConfig,
createTopicPolicy match {
case Some(policy) =>
- AdminUtils.validateCreateOrUpdateTopic(zkUtils, topic, assignments, configs, update = false)
+ adminZkClient.validateCreateOrUpdateTopic(topic, assignments, configs, update = false)
// Use `null` for unset fields in the public API
val numPartitions: java.lang.Integer =
@@ -114,13 +116,13 @@ class AdminManager(val config: KafkaConfig,
arguments.configs))
if (!validateOnly)
- AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, assignments, configs, update = false)
+ adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, assignments, configs, update = false)
case None =>
if (validateOnly)
- AdminUtils.validateCreateOrUpdateTopic(zkUtils, topic, assignments, configs, update = false)
+ adminZkClient.validateCreateOrUpdateTopic(topic, assignments, configs, update = false)
else
- AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, assignments, configs, update = false)
+ adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, assignments, configs, update = false)
}
CreatePartitionsMetadata(topic, assignments, ApiError.NONE)
} catch {
@@ -165,7 +167,7 @@ class AdminManager(val config: KafkaConfig,
// 1. map over topics calling the asynchronous delete
val metadata = topics.map { topic =>
try {
- AdminUtils.deleteTopic(zkUtils, topic)
+ adminZkClient.deleteTopic(topic)
DeleteTopicMetadata(topic, Errors.NONE)
} catch {
case _: TopicAlreadyMarkedForDeletionException =>
@@ -203,8 +205,8 @@ class AdminManager(val config: KafkaConfig,
listenerName: ListenerName,
callback: Map[String, ApiError] => Unit): Unit = {
- val reassignPartitionsInProgress = zkUtils.pathExists(ZkUtils.ReassignPartitionsPath)
- val allBrokers = AdminUtils.getBrokerMetadatas(zkUtils)
+ val reassignPartitionsInProgress = zkClient.reassignPartitionsInProgress
+ val allBrokers = adminZkClient.getBrokerMetadatas()
val allBrokerIds = allBrokers.map(_.id)
// 1. map over topics creating assignment and calling AdminUtils
@@ -215,7 +217,7 @@ class AdminManager(val config: KafkaConfig,
if (reassignPartitionsInProgress)
throw new ReassignmentInProgressException("A partition reassignment is in progress.")
- val existingAssignment = zkUtils.getReplicaAssignmentForTopics(List(topic)).map {
+ val existingAssignment = zkClient.getReplicaAssignmentForTopics(immutable.Set(topic)).map {
case (topicPartition, replicas) => topicPartition.partition -> replicas
}
if (existingAssignment.isEmpty)
@@ -247,7 +249,7 @@ class AdminManager(val config: KafkaConfig,
}.toMap
}
- val updatedReplicaAssignment = AdminUtils.addPartitions(zkUtils, topic, existingAssignment, allBrokers,
+ val updatedReplicaAssignment = adminZkClient.addPartitions(topic, existingAssignment, allBrokers,
newPartition.totalCount, reassignment, validateOnly = validateOnly)
CreatePartitionsMetadata(topic, updatedReplicaAssignment, ApiError.NONE)
} catch {
@@ -306,7 +308,7 @@ class AdminManager(val config: KafkaConfig,
val topic = resource.name
Topic.validate(topic)
// Consider optimizing this by caching the configs or retrieving them from the `Log` when possible
- val topicProps = AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic, topic)
+ val topicProps = adminZkClient.fetchEntityConfig(ConfigType.Topic, topic)
val logConfig = LogConfig.fromProps(KafkaServer.copyKafkaConfigToLog(config), topicProps)
createResponseConfig(logConfig, isReadOnly = false, name => !topicProps.containsKey(name))
@@ -350,19 +352,19 @@ class AdminManager(val config: KafkaConfig,
alterConfigPolicy match {
case Some(policy) =>
- AdminUtils.validateTopicConfig(zkUtils, topic, properties)
+ adminZkClient.validateTopicConfig(topic, properties)
val configEntriesMap = config.entries.asScala.map(entry => (entry.name, entry.value)).toMap
policy.validate(new AlterConfigPolicy.RequestMetadata(
new ConfigResource(ConfigResource.Type.TOPIC, resource.name), configEntriesMap.asJava))
if (!validateOnly)
- AdminUtils.changeTopicConfig(zkUtils, topic, properties)
+ adminZkClient.changeTopicConfig(topic, properties)
case None =>
if (validateOnly)
- AdminUtils.validateTopicConfig(zkUtils, topic, properties)
+ adminZkClient.validateTopicConfig(topic, properties)
else
- AdminUtils.changeTopicConfig(zkUtils, topic, properties)
+ adminZkClient.changeTopicConfig(topic, properties)
}
resource -> ApiError.NONE
case resourceType =>
http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/main/scala/kafka/server/DynamicConfigManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DynamicConfigManager.scala b/core/src/main/scala/kafka/server/DynamicConfigManager.scala
index 6392723..457742d 100644
--- a/core/src/main/scala/kafka/server/DynamicConfigManager.scala
+++ b/core/src/main/scala/kafka/server/DynamicConfigManager.scala
@@ -26,7 +26,7 @@ import scala.collection._
import scala.collection.JavaConverters._
import kafka.admin.AdminUtils
import kafka.utils.json.JsonObject
-import kafka.zk.KafkaZkClient
+import kafka.zk.{AdminZkClient, KafkaZkClient}
import org.apache.kafka.common.config.types.Password
import org.apache.kafka.common.security.scram.ScramMechanism
import org.apache.kafka.common.utils.Time
@@ -84,11 +84,11 @@ object ConfigEntityName {
* on startup where a change might be missed between the initial config load and registering for change notifications.
*
*/
-class DynamicConfigManager(private val oldZkUtils: ZkUtils,
- private val zkClient: KafkaZkClient,
+class DynamicConfigManager(private val zkClient: KafkaZkClient,
private val configHandlers: Map[String, ConfigHandler],
private val changeExpirationMs: Long = 15*60*1000,
private val time: Time = Time.SYSTEM) extends Logging {
+ val adminZkClient = new AdminZkClient(zkClient)
object ConfigChangedNotificationHandler extends NotificationHandler {
override def processNotification(json: String) = {
@@ -120,7 +120,7 @@ class DynamicConfigManager(private val oldZkUtils: ZkUtils,
throw new IllegalArgumentException("Version 1 config change notification does not specify 'entity_name'. Received: " + json)
}
- val entityConfig = AdminUtils.fetchEntityConfig(oldZkUtils, entityType, entity)
+ val entityConfig = adminZkClient.fetchEntityConfig(entityType, entity)
info(s"Processing override for entityType: $entityType, entity: $entity with config: $entityConfig")
configHandlers(entityType).processConfigChanges(entity, entityConfig)
@@ -141,7 +141,7 @@ class DynamicConfigManager(private val oldZkUtils: ZkUtils,
}
val fullSanitizedEntityName = entityPath.substring(index + 1)
- val entityConfig = AdminUtils.fetchEntityConfig(oldZkUtils, rootEntityType, fullSanitizedEntityName)
+ val entityConfig = adminZkClient.fetchEntityConfig(rootEntityType, fullSanitizedEntityName)
val loggableConfig = entityConfig.asScala.map {
case (k, v) => (k, if (ScramMechanism.isScram(k)) Password.HIDDEN else v)
}
@@ -163,14 +163,14 @@ class DynamicConfigManager(private val oldZkUtils: ZkUtils,
// Apply all existing client/user configs to the ClientIdConfigHandler/UserConfigHandler to bootstrap the overrides
configHandlers.foreach {
case (ConfigType.User, handler) =>
- AdminUtils.fetchAllEntityConfigs(oldZkUtils, ConfigType.User).foreach {
+ adminZkClient.fetchAllEntityConfigs(ConfigType.User).foreach {
case (sanitizedUser, properties) => handler.processConfigChanges(sanitizedUser, properties)
}
- AdminUtils.fetchAllChildEntityConfigs(oldZkUtils, ConfigType.User, ConfigType.Client).foreach {
+ adminZkClient.fetchAllChildEntityConfigs(ConfigType.User, ConfigType.Client).foreach {
case (sanitizedUserClientId, properties) => handler.processConfigChanges(sanitizedUserClientId, properties)
}
case (configType, handler) =>
- AdminUtils.fetchAllEntityConfigs(oldZkUtils, configType).foreach {
+ adminZkClient.fetchAllEntityConfigs(configType).foreach {
case (entityName, properties) => handler.processConfigChanges(entityName, properties)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index de56986..a31b6c3 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -29,7 +29,7 @@ import kafka.api.{ApiVersion, KAFKA_0_11_0_IV0}
import kafka.cluster.Partition
import kafka.common.{OffsetAndMetadata, OffsetMetadata}
import kafka.server.QuotaFactory.{QuotaManagers, UnboundedQuota}
-import kafka.controller.{KafkaController}
+import kafka.controller.KafkaController
import kafka.coordinator.group.{GroupCoordinator, JoinGroupResult}
import kafka.coordinator.transaction.{InitProducerIdResult, TransactionCoordinator}
import kafka.log.{Log, LogManager, TimestampOffset}
@@ -37,8 +37,8 @@ import kafka.network.RequestChannel
import kafka.network.RequestChannel.{CloseConnectionAction, NoOpAction, SendAction}
import kafka.security.SecurityUtils
import kafka.security.auth.{Resource, _}
-import kafka.utils.{CoreUtils, Logging, ZkUtils}
-import kafka.zk.KafkaZkClient
+import kafka.utils.{CoreUtils, Logging}
+import kafka.zk.{AdminZkClient, KafkaZkClient}
import org.apache.kafka.common.errors._
import org.apache.kafka.common.internals.FatalExitError
import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME, isInternal}
@@ -71,7 +71,6 @@ class KafkaApis(val requestChannel: RequestChannel,
val groupCoordinator: GroupCoordinator,
val txnCoordinator: TransactionCoordinator,
val controller: KafkaController,
- val zkUtils: ZkUtils,
val zkClient: KafkaZkClient,
val brokerId: Int,
val config: KafkaConfig,
@@ -84,6 +83,7 @@ class KafkaApis(val requestChannel: RequestChannel,
time: Time) extends Logging {
this.logIdent = "[KafkaApi-%d] ".format(brokerId)
+ val adminZkClient = new AdminZkClient(zkClient)
def close() {
info("Shutdown complete.")
@@ -829,7 +829,7 @@ class KafkaApis(val requestChannel: RequestChannel,
replicationFactor: Int,
properties: Properties = new Properties()): MetadataResponse.TopicMetadata = {
try {
- AdminUtils.createTopic(zkUtils, topic, numPartitions, replicationFactor, properties, RackAwareMode.Safe)
+ adminZkClient.createTopic(topic, numPartitions, replicationFactor, properties, RackAwareMode.Safe)
info("Auto creation of topic %s with %d partitions and replication factor %d is successful"
.format(topic, numPartitions, replicationFactor))
new MetadataResponse.TopicMetadata(Errors.LEADER_NOT_AVAILABLE, topic, isInternal(topic),
http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 1812eb0..7f61479 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -242,7 +242,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
kafkaController = new KafkaController(config, zkClient, time, metrics, threadNamePrefix)
kafkaController.startup()
- adminManager = new AdminManager(config, metrics, metadataCache, zkUtils)
+ adminManager = new AdminManager(config, metrics, metadataCache, zkClient)
/* start group coordinator */
// Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good to fix the underlying issue
@@ -263,7 +263,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
/* start processing requests */
apis = new KafkaApis(socketServer.requestChannel, replicaManager, adminManager, groupCoordinator, transactionCoordinator,
- kafkaController, zkUtils, zkClient, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers,
+ kafkaController, zkClient, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers,
brokerTopicStats, clusterId, time)
requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, time,
@@ -278,7 +278,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
ConfigType.Broker -> new BrokerConfigHandler(config, quotaManagers))
// Create the config manager. start listening to notifications
- dynamicConfigManager = new DynamicConfigManager(zkUtils, zkClient, dynamicConfigHandlers)
+ dynamicConfigManager = new DynamicConfigManager(zkClient, dynamicConfigHandlers)
dynamicConfigManager.startup()
/* tell everyone we are alive */
http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/main/scala/kafka/zk/AdminZkClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/zk/AdminZkClient.scala b/core/src/main/scala/kafka/zk/AdminZkClient.scala
new file mode 100644
index 0000000..e00b8e6
--- /dev/null
+++ b/core/src/main/scala/kafka/zk/AdminZkClient.scala
@@ -0,0 +1,417 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package kafka.zk
+
+import java.util.Properties
+
+import kafka.admin.{AdminOperationException, AdminUtils, BrokerMetadata, RackAwareMode}
+import kafka.common.TopicAlreadyMarkedForDeletionException
+import kafka.log.LogConfig
+import kafka.server.{ConfigEntityName, ConfigType, DynamicConfig}
+import kafka.utils._
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors._
+import org.apache.kafka.common.internals.Topic
+import org.apache.zookeeper.KeeperException.NodeExistsException
+
+import scala.collection.{Map, Seq}
+
+class AdminZkClient(zkClient: KafkaZkClient) extends Logging {
+
+ /**
+ * Creates the topic with given configuration
+ * @param topic topic name to create
+ * @param partitions Number of partitions to be set
+ * @param replicationFactor Replication factor
+ * @param topicConfig topic configs
+ * @param rackAwareMode
+ */
+ def createTopic(topic: String,
+ partitions: Int,
+ replicationFactor: Int,
+ topicConfig: Properties = new Properties,
+ rackAwareMode: RackAwareMode = RackAwareMode.Enforced) {
+ val brokerMetadatas = getBrokerMetadatas(rackAwareMode)
+ val replicaAssignment = AdminUtils.assignReplicasToBrokers(brokerMetadatas, partitions, replicationFactor)
+ createOrUpdateTopicPartitionAssignmentPathInZK(topic, replicaAssignment, topicConfig)
+ }
+
+ /**
+ * Gets broker metadata list
+ * @param rackAwareMode
+ * @param brokerList
+ * @return
+ */
+ def getBrokerMetadatas(rackAwareMode: RackAwareMode = RackAwareMode.Enforced,
+ brokerList: Option[Seq[Int]] = None): Seq[BrokerMetadata] = {
+ val allBrokers = zkClient.getAllBrokersInCluster
+ val brokers = brokerList.map(brokerIds => allBrokers.filter(b => brokerIds.contains(b.id))).getOrElse(allBrokers)
+ val brokersWithRack = brokers.filter(_.rack.nonEmpty)
+ if (rackAwareMode == RackAwareMode.Enforced && brokersWithRack.nonEmpty && brokersWithRack.size < brokers.size) {
+ throw new AdminOperationException("Not all brokers have rack information. Add --disable-rack-aware in command line" +
+ " to make replica assignment without rack information.")
+ }
+ val brokerMetadatas = rackAwareMode match {
+ case RackAwareMode.Disabled => brokers.map(broker => BrokerMetadata(broker.id, None))
+ case RackAwareMode.Safe if brokersWithRack.size < brokers.size =>
+ brokers.map(broker => BrokerMetadata(broker.id, None))
+ case _ => brokers.map(broker => BrokerMetadata(broker.id, broker.rack))
+ }
+ brokerMetadatas.sortBy(_.id)
+ }
+
+ /**
+ * Creates or Updates the partition assignment for a given topic
+ * @param topic
+ * @param partitionReplicaAssignment
+ * @param config
+ * @param update
+ */
+ def createOrUpdateTopicPartitionAssignmentPathInZK(topic: String,
+ partitionReplicaAssignment: Map[Int, Seq[Int]],
+ config: Properties = new Properties,
+ update: Boolean = false) {
+ validateCreateOrUpdateTopic(topic, partitionReplicaAssignment, config, update)
+
+ // Configs only matter if a topic is being created. Changing configs via AlterTopic is not supported
+ if (!update) {
+ // write out the config if there is any, this isn't transactional with the partition assignments
+ zkClient.setOrCreateEntityConfigs(ConfigType.Topic, topic, config)
+ }
+
+ // create the partition assignment
+ writeTopicPartitionAssignment(topic, partitionReplicaAssignment, update)
+ }
+
+ /**
+ * Validate method to use before the topic creation or update
+ * @param topic
+ * @param partitionReplicaAssignment
+ * @param config
+ * @param update
+ */
+ def validateCreateOrUpdateTopic(topic: String,
+ partitionReplicaAssignment: Map[Int, Seq[Int]],
+ config: Properties,
+ update: Boolean): Unit = {
+ // validate arguments
+ Topic.validate(topic)
+
+ if (!update) {
+ if (zkClient.topicExists(topic))
+ throw new TopicExistsException(s"Topic '$topic' already exists.")
+ else if (Topic.hasCollisionChars(topic)) {
+ val allTopics = zkClient.getAllTopicsInCluster
+ // check again in case the topic was created in the meantime, otherwise the
+ // topic could potentially collide with itself
+ if (allTopics.contains(topic))
+ throw new TopicExistsException(s"Topic '$topic' already exists.")
+ val collidingTopics = allTopics.filter(Topic.hasCollision(topic, _))
+ if (collidingTopics.nonEmpty) {
+ throw new InvalidTopicException(s"Topic '$topic' collides with existing topics: ${collidingTopics.mkString(", ")}")
+ }
+ }
+ }
+
+ if (partitionReplicaAssignment.values.map(_.size).toSet.size != 1)
+ throw new InvalidReplicaAssignmentException("All partitions should have the same number of replicas")
+
+ partitionReplicaAssignment.values.foreach(reps =>
+ if (reps.size != reps.toSet.size)
+ throw new InvalidReplicaAssignmentException("Duplicate replica assignment found: " + partitionReplicaAssignment)
+ )
+
+ // Configs only matter if a topic is being created. Changing configs via AlterTopic is not supported
+ if (!update)
+ LogConfig.validate(config)
+ }
+
+ private def writeTopicPartitionAssignment(topic: String, replicaAssignment: Map[Int, Seq[Int]], update: Boolean) {
+ try {
+ val assignment = replicaAssignment.map { case (partitionId, replicas) => (new TopicPartition(topic,partitionId), replicas) }.toMap
+
+ if (!update) {
+ info("Topic creation " + assignment)
+ zkClient.createTopicAssignment(topic, assignment)
+ } else {
+ info("Topic update " + assignment)
+ zkClient.setTopicAssignment(topic, assignment)
+ }
+ debug("Updated path %s with %s for replica assignment".format(TopicZNode.path(topic), assignment))
+ } catch {
+ case _: NodeExistsException => throw new TopicExistsException(s"Topic '$topic' already exists.")
+ case e2: Throwable => throw new AdminOperationException(e2.toString)
+ }
+ }
+
+
+ /**
+ * Creates a delete path for a given topic
+ * @param topic
+ */
+ def deleteTopic(topic: String) {
+ if (zkClient.topicExists(topic)) {
+ try {
+ zkClient.createDeleteTopicPath(topic)
+ } catch {
+ case _: NodeExistsException => throw new TopicAlreadyMarkedForDeletionException(
+ "topic %s is already marked for deletion".format(topic))
+ case e: Throwable => throw new AdminOperationException(e.getMessage)
+ }
+ } else {
+ throw new UnknownTopicOrPartitionException(s"Topic `$topic` to delete does not exist")
+ }
+ }
+
+ /**
+ * Add partitions to existing topic with optional replica assignment
+ *
+ * @param topic Topic for adding partitions to
+ * @param existingAssignment A map from partition id to its assigned replicas
+ * @param allBrokers All brokers in the cluster
+ * @param numPartitions Number of partitions to be set
+ * @param replicaAssignment Manual replica assignment, or none
+ * @param validateOnly If true, validate the parameters without actually adding the partitions
+ * @return the updated replica assignment
+ */
+ def addPartitions(topic: String,
+ existingAssignment: Map[Int, Seq[Int]],
+ allBrokers: Seq[BrokerMetadata],
+ numPartitions: Int = 1,
+ replicaAssignment: Option[Map[Int, Seq[Int]]] = None,
+ validateOnly: Boolean = false): Map[Int, Seq[Int]] = {
+ val existingAssignmentPartition0 = existingAssignment.getOrElse(0,
+ throw new AdminOperationException(
+ s"Unexpected existing replica assignment for topic '$topic', partition id 0 is missing. " +
+ s"Assignment: $existingAssignment"))
+
+ val partitionsToAdd = numPartitions - existingAssignment.size
+ if (partitionsToAdd <= 0)
+ throw new InvalidPartitionsException(
+ s"The number of partitions for a topic can only be increased. " +
+ s"Topic $topic currently has ${existingAssignment.size} partitions, " +
+ s"$numPartitions would not be an increase.")
+
+ replicaAssignment.foreach { proposedReplicaAssignment =>
+ validateReplicaAssignment(proposedReplicaAssignment, existingAssignmentPartition0.size,
+ allBrokers.map(_.id).toSet)
+ }
+
+ val proposedAssignmentForNewPartitions = replicaAssignment.getOrElse {
+ val startIndex = math.max(0, allBrokers.indexWhere(_.id >= existingAssignmentPartition0.head))
+ AdminUtils.assignReplicasToBrokers(allBrokers, partitionsToAdd, existingAssignmentPartition0.size,
+ startIndex, existingAssignment.size)
+ }
+ val proposedAssignment = existingAssignment ++ proposedAssignmentForNewPartitions
+ if (!validateOnly) {
+ info(s"Creating $partitionsToAdd partitions for '$topic' with the following replica assignment: " +
+ s"$proposedAssignmentForNewPartitions.")
+ // add the combined new list
+ createOrUpdateTopicPartitionAssignmentPathInZK(topic, proposedAssignment, update = true)
+ }
+ proposedAssignment
+
+ }
+
+ private def validateReplicaAssignment(replicaAssignment: Map[Int, Seq[Int]],
+ expectedReplicationFactor: Int,
+ availableBrokerIds: Set[Int]): Unit = {
+
+ replicaAssignment.foreach { case (partitionId, replicas) =>
+ if (replicas.isEmpty)
+ throw new InvalidReplicaAssignmentException(
+ s"Cannot have replication factor of 0 for partition id $partitionId.")
+ if (replicas.size != replicas.toSet.size)
+ throw new InvalidReplicaAssignmentException(
+ s"Duplicate brokers not allowed in replica assignment: " +
+ s"${replicas.mkString(", ")} for partition id $partitionId.")
+ if (!replicas.toSet.subsetOf(availableBrokerIds))
+ throw new BrokerNotAvailableException(
+ s"Some brokers specified for partition id $partitionId are not available. " +
+ s"Specified brokers: ${replicas.mkString(", ")}, " +
+ s"available brokers: ${availableBrokerIds.mkString(", ")}.")
+ partitionId -> replicas.size
+ }
+ val badRepFactors = replicaAssignment.collect {
+ case (partition, replicas) if replicas.size != expectedReplicationFactor => partition -> replicas.size
+ }
+ if (badRepFactors.nonEmpty) {
+ val sortedBadRepFactors = badRepFactors.toSeq.sortBy { case (partitionId, _) => partitionId }
+ val partitions = sortedBadRepFactors.map { case (partitionId, _) => partitionId }
+ val repFactors = sortedBadRepFactors.map { case (_, rf) => rf }
+ throw new InvalidReplicaAssignmentException(s"Inconsistent replication factor between partitions, " +
+ s"partition 0 has ${expectedReplicationFactor} while partitions [${partitions.mkString(", ")}] have " +
+ s"replication factors [${repFactors.mkString(", ")}], respectively.")
+ }
+ }
+
+ /**
+ * Change the configs for a given entityType and entityName
+ * @param entityType
+ * @param entityName
+ * @param configs
+ */
+ def changeConfigs(entityType: String, entityName: String, configs: Properties): Unit = {
+
+ def parseBroker(broker: String): Int = {
+ try broker.toInt
+ catch {
+ case _: NumberFormatException =>
+ throw new IllegalArgumentException(s"Error parsing broker $broker. The broker's Entity Name must be a single integer value")
+ }
+ }
+
+ entityType match {
+ case ConfigType.Topic => changeTopicConfig(entityName, configs)
+ case ConfigType.Client => changeClientIdConfig(entityName, configs)
+ case ConfigType.User => changeUserOrUserClientIdConfig(entityName, configs)
+ case ConfigType.Broker => changeBrokerConfig(Seq(parseBroker(entityName)), configs)
+ case _ => throw new IllegalArgumentException(s"$entityType is not a known entityType. Should be one of ${ConfigType.Topic}, ${ConfigType.Client}, ${ConfigType.Broker}")
+ }
+ }
+
+ /**
+ * Update the config for a client and create a change notification so the change will propagate to other brokers.
+ * If clientId is <default>, default clientId config is updated. ClientId configs are used only if <user, clientId>
+ * and <user> configs are not specified.
+ *
+ * @param sanitizedClientId: The sanitized clientId for which configs are being changed
+ * @param configs: The final set of configs that will be applied to the topic. If any new configs need to be added or
+ * existing configs need to be deleted, it should be done prior to invoking this API
+ *
+ */
+ def changeClientIdConfig(sanitizedClientId: String, configs: Properties) {
+ DynamicConfig.Client.validate(configs)
+ changeEntityConfig(ConfigType.Client, sanitizedClientId, configs)
+ }
+
+ /**
+ * Update the config for a <user> or <user, clientId> and create a change notification so the change will propagate to other brokers.
+ * User and/or clientId components of the path may be <default>, indicating that the configuration is the default
+ * value to be applied if a more specific override is not configured.
+ *
+ * @param sanitizedEntityName: <sanitizedUserPrincipal> or <sanitizedUserPrincipal>/clients/<clientId>
+ * @param configs: The final set of configs that will be applied to the topic. If any new configs need to be added or
+ * existing configs need to be deleted, it should be done prior to invoking this API
+ *
+ */
+ def changeUserOrUserClientIdConfig(sanitizedEntityName: String, configs: Properties) {
+ if (sanitizedEntityName == ConfigEntityName.Default || sanitizedEntityName.contains("/clients"))
+ DynamicConfig.Client.validate(configs)
+ else
+ DynamicConfig.User.validate(configs)
+ changeEntityConfig(ConfigType.User, sanitizedEntityName, configs)
+ }
+
+ /**
+ * validates the topic configs
+ * @param topic
+ * @param configs
+ */
+ def validateTopicConfig(topic: String, configs: Properties): Unit = {
+ Topic.validate(topic)
+ if (!zkClient.topicExists(topic))
+ throw new AdminOperationException("Topic \"%s\" does not exist.".format(topic))
+ // remove the topic overrides
+ LogConfig.validate(configs)
+ }
+
+ /**
+ * Update the config for an existing topic and create a change notification so the change will propagate to other brokers
+ *
+ * @param topic: The topic for which configs are being changed
+ * @param configs: The final set of configs that will be applied to the topic. If any new configs need to be added or
+ * existing configs need to be deleted, it should be done prior to invoking this API
+ *
+ */
+ def changeTopicConfig(topic: String, configs: Properties): Unit = {
+ validateTopicConfig(topic, configs)
+ changeEntityConfig(ConfigType.Topic, topic, configs)
+ }
+
+ /**
+ * Override the broker config on some set of brokers. These overrides will be persisted between sessions, and will
+ * override any defaults entered in the broker's config files
+ *
+ * @param brokers: The list of brokers to apply config changes to
+ * @param configs: The config to change, as properties
+ */
+ def changeBrokerConfig(brokers: Seq[Int], configs: Properties): Unit = {
+ DynamicConfig.Broker.validate(configs)
+ brokers.foreach { broker => changeEntityConfig(ConfigType.Broker, broker.toString, configs)
+ }
+ }
+
+ private def changeEntityConfig(rootEntityType: String, fullSanitizedEntityName: String, configs: Properties) {
+ val sanitizedEntityPath = rootEntityType + '/' + fullSanitizedEntityName
+ zkClient.setOrCreateEntityConfigs(rootEntityType, fullSanitizedEntityName, configs)
+
+ // create the change notification
+ zkClient.createConfigChangeNotification(sanitizedEntityPath)
+ }
+
+ /**
+ * Read the entity (topic, broker, client, user or <user, client>) config (if any) from zk
+ * sanitizedEntityName is <topic>, <broker>, <client-id>, <user> or <user>/clients/<client-id>.
+ * @param rootEntityType
+ * @param sanitizedEntityName
+ * @return
+ */
+ def fetchEntityConfig(rootEntityType: String, sanitizedEntityName: String): Properties = {
+ zkClient.getEntityConfigs(rootEntityType, sanitizedEntityName)
+ }
+
+ /**
+ * Gets all topic configs
+ * @return
+ */
+ def getAllTopicConfigs(): Map[String, Properties] =
+ zkClient.getAllTopicsInCluster.map(topic => (topic, fetchEntityConfig(ConfigType.Topic, topic))).toMap
+
+ /**
+ * Gets all the entity configs for a given entityType
+ * @param entityType
+ * @return
+ */
+ def fetchAllEntityConfigs(entityType: String): Map[String, Properties] =
+ zkClient.getAllEntitiesWithConfig(entityType).map(entity => (entity, fetchEntityConfig(entityType, entity))).toMap
+
+ /**
+ * Gets all the entity configs for a given childEntityType
+ * @param rootEntityType
+ * @param childEntityType
+ * @return
+ */
+ def fetchAllChildEntityConfigs(rootEntityType: String, childEntityType: String): Map[String, Properties] = {
+ def entityPaths(rootPath: Option[String]): Seq[String] = {
+ val root = rootPath match {
+ case Some(path) => rootEntityType + '/' + path
+ case None => rootEntityType
+ }
+ val entityNames = zkClient.getAllEntitiesWithConfig(root)
+ rootPath match {
+ case Some(path) => entityNames.map(entityName => path + '/' + entityName)
+ case None => entityNames
+ }
+ }
+ entityPaths(None)
+ .flatMap(entity => entityPaths(Some(entity + '/' + childEntityType)))
+ .map(entityPath => (entityPath, fetchEntityConfig(rootEntityType, entityPath))).toMap
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/main/scala/kafka/zk/KafkaZkClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index 24d7ba9..b419654 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -21,7 +21,7 @@ import java.util.Properties
import kafka.api.LeaderAndIsr
import kafka.cluster.Broker
-import kafka.controller.LeaderIsrAndControllerEpoch
+import kafka.controller.{LeaderIsrAndControllerEpoch, ReassignedPartitionsContext}
import kafka.log.LogConfig
import kafka.security.auth.SimpleAclAuthorizer.VersionedAcls
import kafka.security.auth.{Acl, Resource, ResourceType}
@@ -33,8 +33,8 @@ import org.apache.zookeeper.KeeperException.Code
import org.apache.zookeeper.data.{ACL, Stat}
import org.apache.zookeeper.{CreateMode, KeeperException}
-import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
+import scala.collection.{Seq, mutable}
/**
* Provides higher level Kafka-specific operations on top of the pipelined [[kafka.zookeeper.ZooKeeperClient]].
@@ -168,7 +168,7 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
configResponse.resultCode match {
case Code.OK =>
val overrides = ConfigEntityZNode.decode(configResponse.data)
- val logConfig = LogConfig.fromProps(config, overrides.getOrElse(new Properties))
+ val logConfig = LogConfig.fromProps(config, overrides)
logConfigs.put(topic, logConfig)
case Code.NONODE =>
val logConfig = LogConfig.fromProps(config, new Properties)
@@ -180,32 +180,100 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
}
/**
+ * Get entity configs for a given entity name
+ * @param rootEntityType entity type
+ * @param sanitizedEntityName entity name
+ * @return The successfully gathered log configs
+ */
+ def getEntityConfigs(rootEntityType: String, sanitizedEntityName: String): Properties = {
+ val getDataRequest = GetDataRequest(ConfigEntityZNode.path(rootEntityType, sanitizedEntityName))
+ val getDataResponse = retryRequestUntilConnected(getDataRequest)
+
+ getDataResponse.resultCode match {
+ case Code.OK =>
+ ConfigEntityZNode.decode(getDataResponse.data)
+ case Code.NONODE => new Properties()
+ case _ => throw getDataResponse.resultException.get
+ }
+ }
+
+ /**
+ * Sets or creates the entity znode path with the given configs depending
+ * on whether it already exists or not.
+ * @param rootEntityType entity type
+ * @param sanitizedEntityName entity name
+ * @throws KeeperException if there is an error while setting or creating the znode
+ */
+ def setOrCreateEntityConfigs(rootEntityType: String, sanitizedEntityName: String, config: Properties) = {
+
+ def set(configData: Array[Byte]): SetDataResponse = {
+ val setDataRequest = SetDataRequest(ConfigEntityZNode.path(rootEntityType, sanitizedEntityName), ConfigEntityZNode.encode(config), ZkVersion.NoVersion)
+ retryRequestUntilConnected(setDataRequest)
+ }
+
+ def create(configData: Array[Byte]) = {
+ val path = ConfigEntityZNode.path(rootEntityType, sanitizedEntityName)
+ createRecursive(path, ConfigEntityZNode.encode(config))
+ }
+
+ val configData = ConfigEntityZNode.encode(config)
+
+ val setDataResponse = set(configData)
+ setDataResponse.resultCode match {
+ case Code.NONODE => create(configData)
+ case _ => setDataResponse.resultException.foreach(e => throw e)
+ }
+ }
+
+ /**
+ * Returns all the entities for a given entityType
+ * @param entityType entity type
+ * @return List of all entity names
+ */
+ def getAllEntitiesWithConfig(entityType: String): Seq[String] = {
+ getChildren(ConfigEntityTypeZNode.path(entityType))
+ }
+
+ /**
+ * Creates config change notification
+ * @param sanitizedEntityPath sanitizedEntityPath path to write
+ * @throws KeeperException if there is an error while setting or creating the znode
+ */
+ def createConfigChangeNotification(sanitizedEntityPath: String): Unit = {
+ val path = ConfigEntityChangeNotificationSequenceZNode.createPath
+ val createRequest = CreateRequest(path, ConfigEntityChangeNotificationSequenceZNode.encode(sanitizedEntityPath), acls(path), CreateMode.PERSISTENT_SEQUENTIAL)
+ val createResponse = retryRequestUntilConnected(createRequest)
+ if (createResponse.resultCode != Code.OK) {
+ createResponse.resultException.foreach(e => throw e)
+ }
+ }
+
+ /**
* Gets all brokers in the cluster.
* @return sequence of brokers in the cluster.
*/
def getAllBrokersInCluster: Seq[Broker] = {
- val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(BrokerIdsZNode.path))
- getChildrenResponse.resultCode match {
- case Code.OK =>
- val brokerIds = getChildrenResponse.children.map(_.toInt)
- val getDataRequests = brokerIds.map(brokerId => GetDataRequest(BrokerIdZNode.path(brokerId), ctx = Some(brokerId)))
- val getDataResponses = retryRequestsUntilConnected(getDataRequests)
- getDataResponses.flatMap { getDataResponse =>
- val brokerId = getDataResponse.ctx.get.asInstanceOf[Int]
- getDataResponse.resultCode match {
- case Code.OK =>
- Option(BrokerIdZNode.decode(brokerId, getDataResponse.data))
- case Code.NONODE => None
- case _ => throw getDataResponse.resultException.get
- }
- }
- case Code.NONODE =>
- Seq.empty
- case _ =>
- throw getChildrenResponse.resultException.get
+ val brokerIds = getSortedBrokerList
+ val getDataRequests = brokerIds.map(brokerId => GetDataRequest(BrokerIdZNode.path(brokerId), ctx = Some(brokerId)))
+ val getDataResponses = retryRequestsUntilConnected(getDataRequests)
+ getDataResponses.flatMap { getDataResponse =>
+ val brokerId = getDataResponse.ctx.get.asInstanceOf[Int]
+ getDataResponse.resultCode match {
+ case Code.OK =>
+ Option(BrokerIdZNode.decode(brokerId, getDataResponse.data))
+ case Code.NONODE => None
+ case _ => throw getDataResponse.resultException.get
+ }
}
}
+
+ /**
+ * Gets the list of sorted broker Ids
+ */
+ def getSortedBrokerList(): Seq[Int] =
+ getChildren(BrokerIdsZNode.path).map(_.toInt).sorted
+
/**
* Gets all topics in the cluster.
* @return sequence of topics in the cluster.
@@ -221,6 +289,15 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
}
/**
+ * Checks the topic existence
+ * @param topicName
+ * @return true if topic exists else false
+ */
+ def topicExists(topicName: String): Boolean = {
+ pathExists(TopicZNode.path(topicName))
+ }
+
+ /**
* Sets the topic znode with the given assignment.
* @param topic the topic whose assignment is being set.
* @param assignment the partition to replica mapping to set for the given topic
@@ -232,6 +309,29 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
}
/**
+ * Sets the topic znode with the given assignment.
+ * @param topic the topic whose assignment is being set.
+ * @param assignment the partition to replica mapping to set for the given topic
+ * @throws KeeperException if there is an error while setting assignment
+ */
+ def setTopicAssignment(topic: String, assignment: Map[TopicPartition, Seq[Int]]) = {
+ val setDataResponse = setTopicAssignmentRaw(topic, assignment)
+ if (setDataResponse.resultCode != Code.OK) {
+ setDataResponse.resultException.foreach(e => throw e)
+ }
+ }
+
+ /**
+ * Create the topic znode with the given assignment.
+ * @param topic the topic whose assignment is being set.
+ * @param assignment the partition to replica mapping to set for the given topic
+ * @throws KeeperException if there is an error while creating assignment
+ */
+ def createTopicAssignment(topic: String, assignment: Map[TopicPartition, Seq[Int]]) = {
+ createRecursive(TopicZNode.path(topic), TopicZNode.encode(assignment))
+ }
+
+ /**
* Gets the log dir event notifications as strings. These strings are the znode names and not the absolute znode path.
* @return sequence of znode names and not the absolute znode path.
*/
@@ -271,7 +371,7 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
if (getChildrenResponse.resultCode == Code.OK) {
deleteLogDirEventNotifications(getChildrenResponse.children)
} else if (getChildrenResponse.resultCode != Code.NONODE) {
- throw getChildrenResponse.resultException.get
+ getChildrenResponse.resultException.foreach(e => throw e)
}
}
@@ -305,6 +405,40 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
}
/**
+ * Gets partition the assignments for the given topics.
+ * @param topics the topics whose partitions we wish to get the assignments for.
+ * @return the partition assignment for each partition from the given topics.
+ */
+ def getPartitionAssignmentForTopics(topics: Set[String]): Map[String, Map[Int, Seq[Int]]] = {
+ val getDataRequests = topics.map(topic => GetDataRequest(TopicZNode.path(topic), ctx = Some(topic)))
+ val getDataResponses = retryRequestsUntilConnected(getDataRequests.toSeq)
+ getDataResponses.flatMap { getDataResponse =>
+ val topic = getDataResponse.ctx.get.asInstanceOf[String]
+ if (getDataResponse.resultCode == Code.OK) {
+ val partitionMap = TopicZNode.decode(topic, getDataResponse.data).map { case (k, v) => (k.partition, v) }
+ Map(topic -> partitionMap)
+ } else if (getDataResponse.resultCode == Code.NONODE) {
+ Map.empty[String, Map[Int, Seq[Int]]]
+ } else {
+ throw getDataResponse.resultException.get
+ }
+ }.toMap
+ }
+
+ /**
+ * Gets the partition numbers for the given topics
+ * @param topics the topics whose partitions we wish to get.
+ * @return the partition array for each topic from the given topics.
+ */
+ def getPartitionsForTopics(topics: Set[String]): Map[String, Seq[Int]] = {
+ getPartitionAssignmentForTopics(topics).map { topicAndPartitionMap =>
+ val topic = topicAndPartitionMap._1
+ val partitionMap = topicAndPartitionMap._2
+ topic -> partitionMap.keys.toSeq.sortWith((s, t) => s < t)
+ }
+ }
+
+ /**
* Gets the partition count for a given topic
* @param topic The topic to get partition count for.
* @return optional integer that is Some if the topic exists and None otherwise.
@@ -318,6 +452,16 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
}
/**
+ * Gets the assigned replicas for a specific topic and partition
+ * @param topicPartition TopicAndPartition to get assigned replicas for .
+ * @return List of assigned replicas
+ */
+ def getReplicasForPartition(topicPartition: TopicPartition): Seq[Int] = {
+ val topicData = getReplicaAssignmentForTopics(Set(topicPartition.topic))
+ topicData.getOrElse(topicPartition, Seq.empty)
+ }
+
+ /**
* Gets the data and version at the given zk path
* @param path zk node path
* @return A tuple of 2 elements, where first element is zk node data as string
@@ -413,6 +557,25 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
}
/**
+ * Creates the delete topic znode.
+ * @param topicName topic name
+ * @throws KeeperException if there is an error while setting or creating the znode
+ */
+ def createDeleteTopicPath(topicName: String): Unit = {
+ createRecursive(DeleteTopicsTopicZNode.path(topicName))
+ }
+
+
+ /**
+ * Checks if topic is marked for deletion
+ * @param topic
+ * @return true if topic is marked for deletion, else false
+ */
+ def isTopicMarkedForDeletion(topic: String): Boolean = {
+ pathExists(DeleteTopicsTopicZNode.path(topic))
+ }
+
+ /**
* Get all topics marked for deletion.
* @return sequence of topics marked for deletion.
*/
@@ -479,6 +642,21 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
}
/**
+ * Creates the partition reassignment znode with the given reassignment.
+ * @param reassignment the reassignment to set on the reassignment znode.
+ * @throws KeeperException if there is an error while setting or creating the znode
+ */
+ def createPartitionReassignment(reassignment: Map[TopicPartition, Seq[Int]]) = {
+ val createRequest = CreateRequest(ReassignPartitionsZNode.path, ReassignPartitionsZNode.encode(reassignment),
+ acls(ReassignPartitionsZNode.path), CreateMode.PERSISTENT)
+ val createResponse = retryRequestUntilConnected(createRequest)
+
+ if (createResponse.resultCode != Code.OK) {
+ throw createResponse.resultException.get
+ }
+ }
+
+ /**
* Deletes the partition reassignment znode.
*/
def deletePartitionReassignment(): Unit = {
@@ -487,6 +665,22 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
}
/**
+ * Checks if reassign partitions is in progress
+ * @return true if reassign partitions is in progress, else false
+ */
+ def reassignPartitionsInProgress(): Boolean = {
+ pathExists(ReassignPartitionsZNode.path)
+ }
+
+ /**
+ * Gets the partitions being reassigned for given topics
+ * @return ReassignedPartitionsContexts for each topic which are being reassigned.
+ */
+ def getPartitionsBeingReassigned(): Map[TopicPartition, ReassignedPartitionsContext] = {
+ getPartitionReassignment.mapValues(replicas => ReassignedPartitionsContext(replicas))
+ }
+
+ /**
* Gets topic partition states for the given partitions.
* @param partitions the partitions for which we want to get states.
* @return map containing LeaderIsrAndControllerEpoch of each partition for we were able to lookup the partition state.
@@ -504,6 +698,35 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
}
/**
+ * Gets topic partition state for the given partition.
+ * @param partition the partition for which we want to get state.
+ * @return LeaderIsrAndControllerEpoch of the partition state if exists, else None
+ */
+ def getTopicPartitionState(partition: TopicPartition): Option[LeaderIsrAndControllerEpoch] = {
+ val getDataResponse = getTopicPartitionStatesRaw(Seq(partition)).head
+ if (getDataResponse.resultCode == Code.OK) {
+ TopicPartitionStateZNode.decode(getDataResponse.data, getDataResponse.stat)
+ } else if (getDataResponse.resultCode == Code.NONODE) {
+ None
+ } else {
+ throw getDataResponse.resultException.get
+ }
+ }
+
+ /**
+ * Gets the leader for a given partition
+ * @param partition
+ * @return optional integer if the leader exists and None otherwise.
+ */
+ def getLeaderForPartition(partition: TopicPartition): Option[Int] = {
+ val leaderIsrEpoch = getTopicPartitionState(partition)
+ if (leaderIsrEpoch.isDefined)
+ Option(leaderIsrEpoch.get.leaderAndIsr.leader)
+ else
+ None
+ }
+
+ /**
* Gets the isr change notifications as strings. These strings are the znode names and not the absolute znode path.
* @return sequence of znode names and not the absolute znode path.
*/
@@ -543,7 +766,7 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
if (getChildrenResponse.resultCode == Code.OK) {
deleteIsrChangeNotifications(getChildrenResponse.children.map(IsrChangeNotificationSequenceZNode.sequenceNumber))
} else if (getChildrenResponse.resultCode != Code.NONODE) {
- throw getChildrenResponse.resultException.get
+ getChildrenResponse.resultException.foreach(e => throw e)
}
}
@@ -641,9 +864,9 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
* Creates the required zk nodes for Acl storage
*/
def createAclPaths(): Unit = {
- createRecursive(AclZNode.path)
- createRecursive(AclChangeNotificationZNode.path)
- ResourceType.values.foreach(resource => createRecursive(ResourceTypeZNode.path(resource.name)))
+ createRecursive(AclZNode.path, throwIfPathExists = false)
+ createRecursive(AclChangeNotificationZNode.path, throwIfPathExists = false)
+ ResourceType.values.foreach(resource => createRecursive(ResourceTypeZNode.path(resource.name), throwIfPathExists = false))
}
/**
@@ -719,7 +942,7 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
if (getChildrenResponse.resultCode == Code.OK) {
deleteAclChangeNotifications(getChildrenResponse.children)
} else if (getChildrenResponse.resultCode != Code.NONODE) {
- throw getChildrenResponse.resultException.get
+ getChildrenResponse.resultException.foreach(e => throw e)
}
}
@@ -735,7 +958,7 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
val deleteResponses = retryRequestsUntilConnected(deleteRequests)
deleteResponses.foreach { deleteResponse =>
if (deleteResponse.resultCode != Code.OK && deleteResponse.resultCode != Code.NONODE) {
- throw deleteResponse.resultException.get
+ deleteResponse.resultException.foreach(e => throw e)
}
}
}
@@ -790,7 +1013,7 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
case _ => throw deleteResponse.resultException.get
}
}
-
+
/**
* Deletes the zk node recursively
* @param path
@@ -889,7 +1112,7 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
}
}
- /**
+ /**
* Set the committed offset for a topic partition and group
* @param group the group whose offset is being set
* @param topicPartition the topic partition whose offset is being set
@@ -898,8 +1121,7 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
def setOrCreateConsumerOffset(group: String, topicPartition: TopicPartition, offset: Long): Unit = {
val setDataResponse = setConsumerOffset(group, topicPartition, offset)
if (setDataResponse.resultCode == Code.NONODE) {
- val createResponse = createConsumerOffset(group, topicPartition, offset)
- createResponse.resultException.foreach(e => throw e)
+ createConsumerOffset(group, topicPartition, offset)
} else {
setDataResponse.resultException.foreach(e => throw e)
}
@@ -911,17 +1133,9 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
retryRequestUntilConnected(setDataRequest)
}
- private def createConsumerOffset(group: String, topicPartition: TopicPartition, offset: Long): CreateResponse = {
+ private def createConsumerOffset(group: String, topicPartition: TopicPartition, offset: Long) = {
val path = ConsumerOffset.path(group, topicPartition.topic, topicPartition.partition)
- val createRequest = CreateRequest(path, ConsumerOffset.encode(offset), acls(path), CreateMode.PERSISTENT)
- var createResponse = retryRequestUntilConnected(createRequest)
- if (createResponse.resultCode == Code.NONODE) {
- val indexOfLastSlash = path.lastIndexOf("/")
- if (indexOfLastSlash == -1) throw new IllegalArgumentException(s"Invalid path ${path}")
- createRecursive(path.substring(0, indexOfLastSlash))
- createResponse = retryRequestUntilConnected(createRequest)
- }
- createResponse
+ createRecursive(path, ConsumerOffset.encode(offset))
}
/**
@@ -955,21 +1169,40 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
}
}
- private[zk] def createRecursive(path: String): Unit = {
- val createRequest = CreateRequest(path, null, acls(path), CreateMode.PERSISTENT)
- var createResponse = retryRequestUntilConnected(createRequest)
- if (createResponse.resultCode == Code.NONODE) {
+ private[zk] def createRecursive(path: String, data: Array[Byte] = null, throwIfPathExists: Boolean = true) = {
+
+ def parentPath(path: String): String = {
val indexOfLastSlash = path.lastIndexOf("/")
if (indexOfLastSlash == -1) throw new IllegalArgumentException(s"Invalid path ${path}")
- val parentPath = path.substring(0, indexOfLastSlash)
- createRecursive(parentPath)
- createResponse = retryRequestUntilConnected(createRequest)
- if (createResponse.resultCode != Code.OK && createResponse.resultCode != Code.NODEEXISTS) {
+ path.substring(0, indexOfLastSlash)
+ }
+
+ def createRecursive0(path: String): Unit = {
+ val createRequest = CreateRequest(path, null, acls(path), CreateMode.PERSISTENT)
+ var createResponse = retryRequestUntilConnected(createRequest)
+ if (createResponse.resultCode == Code.NONODE) {
+ createRecursive0(parentPath(path))
+ createResponse = retryRequestUntilConnected(createRequest)
+ if (createResponse.resultCode != Code.OK && createResponse.resultCode != Code.NODEEXISTS) {
+ throw createResponse.resultException.get
+ }
+ } else if (createResponse.resultCode != Code.OK && createResponse.resultCode != Code.NODEEXISTS) {
throw createResponse.resultException.get
}
- } else if (createResponse.resultCode != Code.OK && createResponse.resultCode != Code.NODEEXISTS) {
- throw createResponse.resultException.get
}
+
+ val createRequest = CreateRequest(path, data, acls(path), CreateMode.PERSISTENT)
+ var createResponse = retryRequestUntilConnected(createRequest)
+
+ if (throwIfPathExists && createResponse.resultCode == Code.NODEEXISTS) {
+ createResponse.resultException.foreach(e => throw e)
+ } else if (createResponse.resultCode == Code.NONODE) {
+ createRecursive0(parentPath(path))
+ createResponse = retryRequestUntilConnected(createRequest)
+ createResponse.resultException.foreach(e => throw e)
+ } else if (createResponse.resultCode != Code.NODEEXISTS)
+ createResponse.resultException.foreach(e => throw e)
+
}
private def createTopicPartition(partitions: Seq[TopicPartition]): Seq[CreateResponse] = {
http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/main/scala/kafka/zk/ZkData.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/zk/ZkData.scala b/core/src/main/scala/kafka/zk/ZkData.scala
index 4c618a0..a0085cd 100644
--- a/core/src/main/scala/kafka/zk/ZkData.scala
+++ b/core/src/main/scala/kafka/zk/ZkData.scala
@@ -28,8 +28,6 @@ import kafka.utils.Json
import org.apache.kafka.common.TopicPartition
import org.apache.zookeeper.data.Stat
-import scala.collection.Seq
-
// This file contains objects for encoding/decoding data stored in ZooKeeper nodes (znodes).
object ControllerZNode {
@@ -140,16 +138,29 @@ object ConfigEntityZNode {
import scala.collection.JavaConverters._
Json.encodeAsBytes(Map("version" -> 1, "config" -> config.asScala))
}
- def decode(bytes: Array[Byte]): Option[Properties] = {
- Json.parseBytes(bytes).map { js =>
- val configOpt = js.asJsonObjectOption.flatMap(_.get("config").flatMap(_.asJsonObjectOption))
- val props = new Properties()
- configOpt.foreach(config => config.iterator.foreach { case (k, v) => props.setProperty(k, v.to[String]) })
- props
+ def decode(bytes: Array[Byte]): Properties = {
+ val props = new Properties()
+ if (bytes != null) {
+ Json.parseBytes(bytes).map { js =>
+ val configOpt = js.asJsonObjectOption.flatMap(_.get("config").flatMap(_.asJsonObjectOption))
+ configOpt.foreach(config => config.iterator.foreach { case (k, v) => props.setProperty(k, v.to[String]) })
+ }
}
+ props
}
}
+object ConfigEntityChangeNotificationZNode {
+ def path = s"${ConfigZNode.path}/changes"
+}
+
+object ConfigEntityChangeNotificationSequenceZNode {
+ val SequenceNumberPrefix = "config_change_"
+ def createPath = s"${ConfigEntityChangeNotificationZNode.path}/$SequenceNumberPrefix"
+ def encode(sanitizedEntityPath : String): Array[Byte] = Json.encodeAsBytes(Map("version" -> 2, "entity_path" -> sanitizedEntityPath))
+ def decode(bytes: Array[Byte]): String = new String(bytes, UTF_8)
+}
+
object IsrChangeNotificationZNode {
def path = "/isr_change_notification"
}
[2/3] kafka git commit: KAFKA-5646;
Use KafkaZkClient in DynamicConfigManager and AdminManager
Posted by ju...@apache.org.
http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala b/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala
index 47d487e..3f51528 100644
--- a/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala
@@ -32,7 +32,7 @@ class ReassignPartitionsIntegrationTest extends ZooKeeperTestHarness with RackAw
"--replication-factor", replicationFactor.toString,
"--disable-rack-aware",
"--topic", "foo"))
- kafka.admin.TopicCommand.createTopic(zkUtils, createOpts)
+ kafka.admin.TopicCommand.createTopic(zkClient, createOpts)
val topicJson = """{"topics": [{"topic": "foo"}], "version":1}"""
val (proposedAssignment, currentAssignment) = ReassignPartitionsCommand.generateAssignment(zkUtils,
http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 522fcd3..b701f13 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -19,7 +19,6 @@ import java.util.regex.Pattern
import java.util.{ArrayList, Collections, Properties}
import kafka.admin.AdminClient
-import kafka.admin.AdminUtils
import kafka.admin.ConsumerGroupCommand.ConsumerGroupCommandOptions
import kafka.admin.ConsumerGroupCommand.KafkaConsumerGroupService
import kafka.common.TopicAndPartition
@@ -447,9 +446,9 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
*/
@Test
def testAuthorizationWithTopicNotExisting() {
- AdminUtils.deleteTopic(zkUtils, topic)
+ adminZkClient.deleteTopic(topic)
TestUtils.verifyTopicDeletion(zkUtils, topic, 1, servers)
- AdminUtils.deleteTopic(zkUtils, deleteTopic)
+ adminZkClient.deleteTopic(deleteTopic)
TestUtils.verifyTopicDeletion(zkUtils, deleteTopic, 1, servers)
val requestKeyToRequest = mutable.LinkedHashMap[ApiKeys, AbstractRequest](
http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
index eadb488..0badda9 100644
--- a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
@@ -22,7 +22,6 @@ import java.util.Properties
import java.util.concurrent.TimeUnit
import collection.JavaConverters._
-import kafka.admin.AdminUtils
import kafka.integration.KafkaServerTestHarness
import kafka.log.LogConfig
import kafka.server.KafkaConfig
@@ -375,7 +374,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
val existingAssignment = zkUtils.getReplicaAssignmentForTopics(List(topic)).map {
case (topicPartition, replicas) => topicPartition.partition -> replicas
}
- AdminUtils.addPartitions(zkUtils, topic, existingAssignment, AdminUtils.getBrokerMetadatas(zkUtils), 2)
+ adminZkClient.addPartitions(topic, existingAssignment, adminZkClient.getBrokerMetadatas(), 2)
// read metadata from a broker and verify the new topic partitions exist
TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0)
TestUtils.waitUntilMetadataIsPropagated(servers, topic, 1)
http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala b/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala
index 383f139..3e08327 100644
--- a/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala
@@ -16,7 +16,6 @@ package kafka.api
import java.util.Properties
-import kafka.admin.AdminUtils
import kafka.server.{DynamicConfig, KafkaConfig, QuotaId}
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.utils.Sanitizer
@@ -54,6 +53,6 @@ class ClientIdQuotaTest extends BaseQuotaTest {
}
private def updateQuotaOverride(clientId: String, properties: Properties) {
- AdminUtils.changeClientIdConfig(zkUtils, Sanitizer.sanitize(clientId), properties)
+ adminZkClient.changeClientIdConfig(Sanitizer.sanitize(clientId), properties)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/integration/kafka/api/RackAwareAutoTopicCreationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/RackAwareAutoTopicCreationTest.scala b/core/src/test/scala/integration/kafka/api/RackAwareAutoTopicCreationTest.scala
index cb5262d..0ae9d17 100644
--- a/core/src/test/scala/integration/kafka/api/RackAwareAutoTopicCreationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/RackAwareAutoTopicCreationTest.scala
@@ -18,7 +18,7 @@ package kafka.api
import java.util.Properties
-import kafka.admin.{RackAwareMode, AdminUtils, RackAwareTest}
+import kafka.admin.{RackAwareMode, RackAwareTest}
import kafka.integration.KafkaServerTestHarness
import kafka.server.KafkaConfig
import kafka.utils.TestUtils
@@ -55,7 +55,7 @@ class RackAwareAutoTopicCreationTest extends KafkaServerTestHarness with RackAwa
val assignment = zkUtils.getReplicaAssignmentForTopics(Seq(topic)).map { case (topicPartition, replicas) =>
topicPartition.partition -> replicas
}
- val brokerMetadatas = AdminUtils.getBrokerMetadatas(zkUtils, RackAwareMode.Enforced)
+ val brokerMetadatas = adminZkClient.getBrokerMetadatas(RackAwareMode.Enforced)
val expectedMap = Map(0 -> "0", 1 -> "0", 2 -> "1", 3 -> "1")
assertEquals(expectedMap, brokerMetadatas.map(b => b.id -> b.rack.get).toMap)
checkReplicaDistribution(assignment, expectedMap, numServers, numPartitions, replicationFactor)
http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/integration/kafka/api/UserClientIdQuotaTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/UserClientIdQuotaTest.scala b/core/src/test/scala/integration/kafka/api/UserClientIdQuotaTest.scala
index e25f886..453ac91 100644
--- a/core/src/test/scala/integration/kafka/api/UserClientIdQuotaTest.scala
+++ b/core/src/test/scala/integration/kafka/api/UserClientIdQuotaTest.scala
@@ -17,7 +17,6 @@ package kafka.api
import java.io.File
import java.util.Properties
-import kafka.admin.AdminUtils
import kafka.server._
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.Sanitizer
@@ -41,7 +40,7 @@ class UserClientIdQuotaTest extends BaseQuotaTest {
this.serverConfig.setProperty(KafkaConfig.ConsumerQuotaBytesPerSecondDefaultProp, Long.MaxValue.toString)
super.setUp()
val defaultProps = quotaProperties(defaultProducerQuota, defaultConsumerQuota, defaultRequestQuota)
- AdminUtils.changeUserOrUserClientIdConfig(zkUtils, ConfigEntityName.Default + "/clients/" + ConfigEntityName.Default, defaultProps)
+ adminZkClient.changeUserOrUserClientIdConfig(ConfigEntityName.Default + "/clients/" + ConfigEntityName.Default, defaultProps)
waitForQuotaUpdate(defaultProducerQuota, defaultConsumerQuota, defaultRequestQuota)
}
@@ -59,11 +58,11 @@ class UserClientIdQuotaTest extends BaseQuotaTest {
override def removeQuotaOverrides() {
val emptyProps = new Properties
- AdminUtils.changeUserOrUserClientIdConfig(zkUtils, Sanitizer.sanitize(userPrincipal) + "/clients/" + Sanitizer.sanitize(producerClientId), emptyProps)
- AdminUtils.changeUserOrUserClientIdConfig(zkUtils, Sanitizer.sanitize(userPrincipal) + "/clients/" + Sanitizer.sanitize(consumerClientId), emptyProps)
+ adminZkClient.changeUserOrUserClientIdConfig(Sanitizer.sanitize(userPrincipal) + "/clients/" + Sanitizer.sanitize(producerClientId), emptyProps)
+ adminZkClient.changeUserOrUserClientIdConfig(Sanitizer.sanitize(userPrincipal) + "/clients/" + Sanitizer.sanitize(consumerClientId), emptyProps)
}
private def updateQuotaOverride(userPrincipal: String, clientId: String, properties: Properties) {
- AdminUtils.changeUserOrUserClientIdConfig(zkUtils, Sanitizer.sanitize(userPrincipal) + "/clients/" + Sanitizer.sanitize(clientId), properties)
+ adminZkClient.changeUserOrUserClientIdConfig(Sanitizer.sanitize(userPrincipal) + "/clients/" + Sanitizer.sanitize(clientId), properties)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala b/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala
index b5d88c0..91a92fa 100644
--- a/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala
+++ b/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala
@@ -17,7 +17,6 @@ package kafka.api
import java.io.File
import java.util.Properties
-import kafka.admin.AdminUtils
import kafka.server.{ConfigEntityName, KafkaConfig, QuotaId}
import kafka.utils.JaasTestUtils
import org.apache.kafka.common.security.auth.SecurityProtocol
@@ -45,7 +44,7 @@ class UserQuotaTest extends BaseQuotaTest with SaslSetup {
this.serverConfig.setProperty(KafkaConfig.ConsumerQuotaBytesPerSecondDefaultProp, Long.MaxValue.toString)
super.setUp()
val defaultProps = quotaProperties(defaultProducerQuota, defaultConsumerQuota, defaultRequestQuota)
- AdminUtils.changeUserOrUserClientIdConfig(zkUtils, ConfigEntityName.Default, defaultProps)
+ adminZkClient.changeUserOrUserClientIdConfig(ConfigEntityName.Default, defaultProps)
waitForQuotaUpdate(defaultProducerQuota, defaultConsumerQuota, defaultRequestQuota)
}
@@ -67,6 +66,6 @@ class UserQuotaTest extends BaseQuotaTest with SaslSetup {
}
private def updateQuotaOverride(properties: Properties) {
- AdminUtils.changeUserOrUserClientIdConfig(zkUtils, Sanitizer.sanitize(userPrincipal), properties)
+ adminZkClient.changeUserOrUserClientIdConfig(Sanitizer.sanitize(userPrincipal), properties)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala b/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala
index 075b1af..10c7737 100644
--- a/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala
+++ b/core/src/test/scala/integration/kafka/server/ReplicaFetcherThreadFatalErrorTest.scala
@@ -19,7 +19,6 @@ package kafka.server
import java.util.concurrent.atomic.AtomicBoolean
-import kafka.admin.AdminUtils
import kafka.cluster.BrokerEndPoint
import kafka.server.ReplicaFetcherThread.{FetchRequest, PartitionData}
import kafka.utils.{Exit, TestUtils, ZkUtils}
@@ -60,7 +59,7 @@ class ReplicaFetcherThreadFatalErrorTest extends ZooKeeperTestHarness {
// Unlike `TestUtils.createTopic`, this doesn't wait for metadata propagation as the broker shuts down before
// the metadata is propagated.
def createTopic(zkUtils: ZkUtils, topic: String): Unit = {
- AdminUtils.createTopic(zkUtils, topic, partitions = 1, replicationFactor = 2)
+ adminZkClient.createTopic(topic, partitions = 1, replicationFactor = 2)
TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, topic, 0)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
index 79fc68f..d79ba32 100755
--- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
@@ -73,7 +73,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
@Test
def testWrongReplicaCount(): Unit = {
try {
- AdminUtils.addPartitions(zkUtils, topic1, topic1Assignment, AdminUtils.getBrokerMetadatas(zkUtils), 2,
+ adminZkClient.addPartitions(topic1, topic1Assignment, adminZkClient.getBrokerMetadatas(), 2,
Some(Map(0 -> Seq(0, 1), 1 -> Seq(0, 1, 2))))
fail("Add partitions should fail")
} catch {
@@ -84,7 +84,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
@Test
def testMissingPartition0(): Unit = {
try {
- AdminUtils.addPartitions(zkUtils, topic5, topic5Assignment, AdminUtils.getBrokerMetadatas(zkUtils), 2,
+ adminZkClient.addPartitions(topic5, topic5Assignment, adminZkClient.getBrokerMetadatas(), 2,
Some(Map(1 -> Seq(0, 1), 2 -> Seq(0, 1, 2))))
fail("Add partitions should fail")
} catch {
@@ -95,7 +95,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
@Test
def testIncrementPartitions(): Unit = {
- AdminUtils.addPartitions(zkUtils, topic1, topic1Assignment, AdminUtils.getBrokerMetadatas(zkUtils), 3)
+ adminZkClient.addPartitions(topic1, topic1Assignment, adminZkClient.getBrokerMetadatas(), 3)
// wait until leader is elected
val leader1 = waitUntilLeaderIsElectedOrChanged(zkUtils, topic1, 1)
val leader2 = waitUntilLeaderIsElectedOrChanged(zkUtils, topic1, 2)
@@ -123,7 +123,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
@Test
def testManualAssignmentOfReplicas(): Unit = {
// Add 2 partitions
- AdminUtils.addPartitions(zkUtils, topic2, topic2Assignment, AdminUtils.getBrokerMetadatas(zkUtils), 3,
+ adminZkClient.addPartitions(topic2, topic2Assignment, adminZkClient.getBrokerMetadatas(), 3,
Some(Map(0 -> Seq(1, 2), 1 -> Seq(0, 1), 2 -> Seq(2, 3))))
// wait until leader is elected
val leader1 = waitUntilLeaderIsElectedOrChanged(zkUtils, topic2, 1)
@@ -152,7 +152,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
@Test
def testReplicaPlacementAllServers(): Unit = {
- AdminUtils.addPartitions(zkUtils, topic3, topic3Assignment, AdminUtils.getBrokerMetadatas(zkUtils), 7)
+ adminZkClient.addPartitions(topic3, topic3Assignment, adminZkClient.getBrokerMetadatas(), 7)
// read metadata from a broker and verify the new topic partitions exist
TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 1)
@@ -179,7 +179,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
@Test
def testReplicaPlacementPartialServers(): Unit = {
- AdminUtils.addPartitions(zkUtils, topic2, topic2Assignment, AdminUtils.getBrokerMetadatas(zkUtils), 3)
+ adminZkClient.addPartitions(topic2, topic2Assignment, adminZkClient.getBrokerMetadatas(), 3)
// read metadata from a broker and verify the new topic partitions exist
TestUtils.waitUntilMetadataIsPropagated(servers, topic2, 1)
http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
index 87ce46e..acac907 100644
--- a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
@@ -21,8 +21,8 @@ import java.util.Properties
import kafka.admin.ConfigCommand.ConfigCommandOptions
import kafka.common.InvalidConfigException
import kafka.server.ConfigEntityName
-import kafka.utils.{Logging, ZkUtils}
-import kafka.zk.ZooKeeperTestHarness
+import kafka.utils.Logging
+import kafka.zk.{AdminZkClient, KafkaZkClient, ZooKeeperTestHarness}
import org.apache.kafka.common.security.scram.ScramCredentialUtils
import org.apache.kafka.common.utils.Sanitizer
import org.easymock.EasyMock
@@ -95,7 +95,7 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
def shouldFailIfUnrecognisedEntityType(): Unit = {
val createOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect,
"--entity-name", "client", "--entity-type", "not-recognised", "--alter", "--add-config", "a=b,c=d"))
- ConfigCommand.alterConfig(null, createOpts, new TestAdminUtils)
+ ConfigCommand.alterConfig(null, createOpts, new DummyAdminZkClient(zkClient))
}
@Test
@@ -106,14 +106,15 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
"--alter",
"--add-config", "a=b,c=d"))
- val configChange = new TestAdminUtils {
- override def changeClientIdConfig(zkUtils: ZkUtils, clientId: String, configChange: Properties): Unit = {
+ case class TestAdminZkClient(val zkClient: KafkaZkClient) extends AdminZkClient(zkClient) {
+ override def changeClientIdConfig(clientId: String, configChange: Properties): Unit = {
assertEquals("my-client-id", clientId)
assertEquals("b", configChange.get("a"))
assertEquals("d", configChange.get("c"))
}
}
- ConfigCommand.alterConfig(null, createOpts, configChange)
+
+ ConfigCommand.alterConfig(null, createOpts, new TestAdminZkClient(zkClient))
}
@Test
@@ -124,14 +125,15 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
"--alter",
"--add-config", "a=b,c=d"))
- val configChange = new TestAdminUtils {
- override def changeTopicConfig(zkUtils: ZkUtils, topic: String, configChange: Properties): Unit = {
+ case class TestAdminZkClient(val zkClient: KafkaZkClient) extends AdminZkClient(zkClient) {
+ override def changeTopicConfig(topic: String, configChange: Properties): Unit = {
assertEquals("my-topic", topic)
assertEquals("b", configChange.get("a"))
assertEquals("d", configChange.get("c"))
}
}
- ConfigCommand.alterConfig(null, createOpts, configChange)
+
+ ConfigCommand.alterConfig(null, createOpts, new TestAdminZkClient(zkClient))
}
@Test
@@ -142,14 +144,15 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
"--alter",
"--add-config", "a=b,c=d"))
- val configChange = new TestAdminUtils {
- override def changeBrokerConfig(zkUtils: ZkUtils, brokerIds: Seq[Int], configChange: Properties): Unit = {
+ case class TestAdminZkClient(val zkClient: KafkaZkClient) extends AdminZkClient(zkClient) {
+ override def changeBrokerConfig(brokerIds: Seq[Int], configChange: Properties): Unit = {
assertEquals(Seq(1), brokerIds)
assertEquals("b", configChange.get("a"))
assertEquals("d", configChange.get("c"))
}
}
- ConfigCommand.alterConfig(null, createOpts, configChange)
+
+ ConfigCommand.alterConfig(null, createOpts, new TestAdminZkClient(zkClient))
}
@Test
@@ -160,15 +163,18 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
"--alter",
"--add-config", "a=b,c=[d,e ,f],g=[h,i]"))
- val configChange = new TestAdminUtils {
- override def changeBrokerConfig(zkUtils: ZkUtils, brokerIds: Seq[Int], configChange: Properties): Unit = {
+ case class TestAdminZkClient(val zkClient: KafkaZkClient) extends AdminZkClient(zkClient) {
+ override def changeBrokerConfig(brokerIds: Seq[Int], configChange: Properties): Unit = {
assertEquals(Seq(1), brokerIds)
assertEquals("b", configChange.get("a"))
assertEquals("d,e ,f", configChange.get("c"))
assertEquals("h,i", configChange.get("g"))
}
+
+ override def changeTopicConfig(topic: String, configs: Properties): Unit = {}
}
- ConfigCommand.alterConfig(null, createOpts, configChange)
+
+ ConfigCommand.alterConfig(null, createOpts, new TestAdminZkClient(zkClient))
}
@Test (expected = classOf[IllegalArgumentException])
@@ -178,7 +184,7 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
"--entity-type", "brokers",
"--alter",
"--add-config", "a=b"))
- ConfigCommand.alterConfig(null, createOpts, new TestAdminUtils)
+ ConfigCommand.alterConfig(null, createOpts, new DummyAdminZkClient(zkClient))
}
@Test (expected = classOf[IllegalArgumentException])
@@ -188,7 +194,7 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
"--entity-type", "brokers",
"--alter",
"--add-config", "a="))
- ConfigCommand.alterConfig(null, createOpts, new TestAdminUtils)
+ ConfigCommand.alterConfig(null, createOpts, new DummyAdminZkClient(zkClient))
}
@Test (expected = classOf[IllegalArgumentException])
@@ -198,7 +204,7 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
"--entity-type", "brokers",
"--alter",
"--add-config", "a=[b,c,d=e"))
- ConfigCommand.alterConfig(null, createOpts, new TestAdminUtils)
+ ConfigCommand.alterConfig(null, createOpts, new DummyAdminZkClient(zkClient))
}
@Test (expected = classOf[InvalidConfigException])
@@ -208,7 +214,7 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
"--entity-type", "topics",
"--alter",
"--delete-config", "missing_config1, missing_config2"))
- ConfigCommand.alterConfig(null, createOpts, new TestAdminUtils)
+ ConfigCommand.alterConfig(null, createOpts, new DummyAdminZkClient(zkClient))
}
@Test
@@ -219,8 +225,8 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
"--alter",
"--delete-config", "a,c"))
- val configChange = new TestAdminUtils {
- override def fetchEntityConfig(zkUtils: ZkUtils, entityType: String, entityName: String): Properties = {
+ case class TestAdminZkClient(val zkClient: KafkaZkClient) extends AdminZkClient(zkClient) {
+ override def fetchEntityConfig(entityType: String, entityName: String): Properties = {
val properties: Properties = new Properties
properties.put("a", "b")
properties.put("c", "d")
@@ -228,12 +234,13 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
properties
}
- override def changeBrokerConfig(zkUtils: ZkUtils, brokerIds: Seq[Int], configChange: Properties): Unit = {
+ override def changeBrokerConfig(brokerIds: Seq[Int], configChange: Properties): Unit = {
assertEquals("f", configChange.get("e"))
assertEquals(1, configChange.size())
}
}
- ConfigCommand.alterConfig(null, createOpts, configChange)
+
+ ConfigCommand.alterConfig(null, createOpts, new TestAdminZkClient(zkClient))
}
@Test
@@ -253,11 +260,11 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
"--delete-config", mechanism))
val credentials = mutable.Map[String, Properties]()
- case class CredentialChange(val user: String, val mechanisms: Set[String], val iterations: Int) extends TestAdminUtils {
- override def fetchEntityConfig(zkUtils: ZkUtils, entityType: String, entityName: String): Properties = {
+ case class CredentialChange(val user: String, val mechanisms: Set[String], val iterations: Int) extends AdminZkClient(zkClient) {
+ override def fetchEntityConfig(entityType: String, entityName: String): Properties = {
credentials.getOrElse(entityName, new Properties())
}
- override def changeUserOrUserClientIdConfig(zkUtils: ZkUtils, sanitizedEntityName: String, configChange: Properties): Unit = {
+ override def changeUserOrUserClientIdConfig(sanitizedEntityName: String, configChange: Properties): Unit = {
assertEquals(user, sanitizedEntityName)
assertEquals(mechanisms, configChange.keySet().asScala)
for (mechanism <- mechanisms) {
@@ -397,17 +404,17 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
@Test
def testQuotaDescribeEntities() {
- val zkUtils = EasyMock.createNiceMock(classOf[ZkUtils])
+ val zkClient = EasyMock.createNiceMock(classOf[KafkaZkClient])
def checkEntities(opts: Array[String], expectedFetches: Map[String, Seq[String]], expectedEntityNames: Seq[String]) {
val entity = ConfigCommand.parseEntity(new ConfigCommandOptions(opts :+ "--describe"))
expectedFetches.foreach {
- case (name, values) => EasyMock.expect(zkUtils.getAllEntitiesWithConfig(name)).andReturn(values)
+ case (name, values) => EasyMock.expect(zkClient.getAllEntitiesWithConfig(name)).andReturn(values)
}
- EasyMock.replay(zkUtils)
- val entities = entity.getAllEntities(zkUtils)
+ EasyMock.replay(zkClient)
+ val entities = entity.getAllEntities(zkClient)
assertEquals(expectedEntityNames, entities.map(e => e.fullSanitizedName))
- EasyMock.reset(zkUtils)
+ EasyMock.reset(zkClient)
}
val clientId = "a-client"
@@ -456,4 +463,13 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
Map("users" -> Seq("<default>", sanitizedPrincipal)) ++ defaultUserMap ++ userMap,
Seq("<default>/clients/client-3", sanitizedPrincipal + "/clients/client-2"))
}
+
+ case class DummyAdminZkClient(zkClient: KafkaZkClient) extends AdminZkClient(zkClient) {
+ override def changeBrokerConfig(brokerIds: Seq[Int], configs: Properties): Unit = {}
+ override def fetchEntityConfig(entityType: String, entityName: String): Properties = {new Properties}
+ override def changeClientIdConfig(clientId: String, configs: Properties): Unit = {}
+ override def changeUserOrUserClientIdConfig(sanitizedEntityName: String, configs: Properties): Unit = {}
+ override def changeTopicConfig(topic: String, configs: Properties): Unit = {}
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
index 78f022a..1922aaf 100644
--- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
@@ -46,7 +46,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
val topic = "test"
servers = createTestTopicAndCluster(topic)
// start topic deletion
- AdminUtils.deleteTopic(zkUtils, topic)
+ adminZkClient.deleteTopic(topic)
TestUtils.verifyTopicDeletion(zkUtils, topic, 1, servers)
}
@@ -61,7 +61,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
val follower = servers.filter(s => s.config.brokerId != leaderIdOpt.get).last
follower.shutdown()
// start topic deletion
- AdminUtils.deleteTopic(zkUtils, topic)
+ adminZkClient.deleteTopic(topic)
// check if all replicas but the one that is shut down has deleted the log
TestUtils.waitUntilTrue(() =>
servers.filter(s => s.config.brokerId != follower.config.brokerId)
@@ -86,7 +86,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
follower.shutdown()
// start topic deletion
- AdminUtils.deleteTopic(zkUtils, topic)
+ adminZkClient.deleteTopic(topic)
// shut down the controller to trigger controller failover during delete topic
controller.shutdown()
@@ -112,7 +112,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
this.servers = allServers
val servers = allServers.filter(s => expectedReplicaAssignment(0).contains(s.config.brokerId))
// create the topic
- AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, expectedReplicaAssignment)
+ adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment)
// wait until replica log is created on every broker
TestUtils.waitUntilTrue(() => servers.forall(_.getLogManager().getLog(topicPartition).isDefined),
"Replicas for topic test not created.")
@@ -121,7 +121,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
val follower = servers.filter(s => s.config.brokerId != leaderIdOpt.get).last
follower.shutdown()
// start topic deletion
- AdminUtils.deleteTopic(zkUtils, topic)
+ adminZkClient.deleteTopic(topic)
// start partition reassignment at the same time right after delete topic. In this case, reassignment will fail since
// the topic is being deleted
// reassign partition 0
@@ -155,16 +155,16 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
val follower = servers.filter(_.config.brokerId != leaderIdOpt.get).last
val newPartition = new TopicPartition(topic, 1)
// capture the brokers before we shutdown so that we don't fail validation in `addPartitions`
- val brokers = AdminUtils.getBrokerMetadatas(zkUtils)
+ val brokers = adminZkClient.getBrokerMetadatas()
follower.shutdown()
// wait until the broker has been removed from ZK to reduce non-determinism
TestUtils.waitUntilTrue(() => zkUtils.getBrokerInfo(follower.config.brokerId).isEmpty,
s"Follower ${follower.config.brokerId} was not removed from ZK")
// add partitions to topic
- AdminUtils.addPartitions(zkUtils, topic, expectedReplicaAssignment, brokers, 2,
+ adminZkClient.addPartitions(topic, expectedReplicaAssignment, brokers, 2,
Some(Map(1 -> Seq(0, 1, 2), 2 -> Seq(0, 1, 2))))
// start topic deletion
- AdminUtils.deleteTopic(zkUtils, topic)
+ adminZkClient.deleteTopic(topic)
follower.startup()
// test if topic deletion is resumed
TestUtils.verifyTopicDeletion(zkUtils, topic, 1, servers)
@@ -178,12 +178,12 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
def testAddPartitionDuringDeleteTopic() {
val topic = "test"
servers = createTestTopicAndCluster(topic)
- val brokers = AdminUtils.getBrokerMetadatas(zkUtils)
+ val brokers = adminZkClient.getBrokerMetadatas()
// start topic deletion
- AdminUtils.deleteTopic(zkUtils, topic)
+ adminZkClient.deleteTopic(topic)
// add partitions to topic
val newPartition = new TopicPartition(topic, 1)
- AdminUtils.addPartitions(zkUtils, topic, expectedReplicaAssignment, brokers, 2,
+ adminZkClient.addPartitions(topic, expectedReplicaAssignment, brokers, 2,
Some(Map(1 -> Seq(0, 1, 2), 2 -> Seq(0, 1, 2))))
TestUtils.verifyTopicDeletion(zkUtils, topic, 1, servers)
// verify that new partition doesn't exist on any broker either
@@ -198,10 +198,10 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
val topicPartition = new TopicPartition(topic, 0)
servers = createTestTopicAndCluster(topic)
// start topic deletion
- AdminUtils.deleteTopic(zkUtils, topic)
+ adminZkClient.deleteTopic(topic)
TestUtils.verifyTopicDeletion(zkUtils, topic, 1, servers)
// re-create topic on same replicas
- AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, expectedReplicaAssignment)
+ adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment)
// wait until leader is elected
TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, topic, 0, 1000)
// check if all replica logs are created
@@ -216,7 +216,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
servers = createTestTopicAndCluster(topic)
// start topic deletion
try {
- AdminUtils.deleteTopic(zkUtils, "test2")
+ adminZkClient.deleteTopic("test2")
fail("Expected UnknownTopicOrPartitionException")
} catch {
case _: UnknownTopicOrPartitionException => // expected exception
@@ -258,7 +258,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
server.logManager.cleaner.awaitCleaned(new TopicPartition(topicName, 0), 0)
// delete topic
- AdminUtils.deleteTopic(zkUtils, "test")
+ adminZkClient.deleteTopic("test")
TestUtils.verifyTopicDeletion(zkUtils, "test", 1, servers)
}
@@ -270,9 +270,9 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
try {
// start topic deletion
- AdminUtils.deleteTopic(zkUtils, topic)
+ adminZkClient.deleteTopic(topic)
// try to delete topic marked as deleted
- AdminUtils.deleteTopic(zkUtils, topic)
+ adminZkClient.deleteTopic(topic)
fail("Expected TopicAlreadyMarkedForDeletionException")
}
catch {
@@ -293,7 +293,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
// create brokers
val servers = brokerConfigs.map(b => TestUtils.createServer(KafkaConfig.fromProps(b)))
// create the topic
- AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, expectedReplicaAssignment)
+ adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment)
// wait until replica log is created on every broker
TestUtils.waitUntilTrue(() => servers.forall(_.getLogManager().getLog(topicPartition).isDefined),
"Replicas for topic test not created")
@@ -316,7 +316,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
val topic = topicPartition.topic
servers = createTestTopicAndCluster(topic, deleteTopicEnabled = false)
// mark the topic for deletion
- AdminUtils.deleteTopic(zkUtils, "test")
+ adminZkClient.deleteTopic("test")
TestUtils.waitUntilTrue(() => !zkUtils.isTopicMarkedForDeletion(topic),
"Admin path /admin/delete_topic/%s path not deleted even if deleteTopic is disabled".format(topic))
// verify that topic test is untouched
http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
index 7000308..e367372 100644
--- a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
@@ -56,7 +56,7 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness {
@Before
override def setUp() {
super.setUp()
- AdminUtils.createTopic(zkUtils, topic, 1, 1)
+ adminZkClient.createTopic(topic, 1, 1)
}
@After
@@ -234,7 +234,7 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness {
def testDescribeWithMultiPartitionTopicAndMultipleConsumersWithNewConsumer() {
TestUtils.createOffsetsTopic(zkUtils, servers)
val topic2 = "foo2"
- AdminUtils.createTopic(zkUtils, topic2, 2, 1)
+ adminZkClient.createTopic(topic2, 2, 1)
// run two consumers in the group consuming from a two-partition topic
consumerGroupExecutor = new ConsumerGroupExecutor(brokerList, 2, group, topic2)
http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala b/core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala
index 6727fad..4b22898 100644
--- a/core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala
@@ -30,7 +30,6 @@ import kafka.integration.KafkaServerTestHarness
import kafka.server.KafkaConfig
import kafka.utils.TestUtils
-
class ListConsumerGroupTest extends KafkaServerTestHarness {
val overridingProps = new Properties()
@@ -47,7 +46,7 @@ class ListConsumerGroupTest extends KafkaServerTestHarness {
override def setUp() {
super.setUp()
- AdminUtils.createTopic(zkUtils, topic, 1, 1)
+ adminZkClient.createTopic(topic, 1, 1)
props.setProperty("group.id", group)
props.setProperty("zookeeper.connect", zkConnect)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala b/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala
index 6853b16..8227487 100644
--- a/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala
@@ -88,7 +88,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
val opts = new ConsumerGroupCommandOptions(cgcArgs)
val consumerGroupCommand = new KafkaConsumerGroupService(opts)
- AdminUtils.createTopic(zkUtils, topic1, 1, 1)
+ adminZkClient.createTopic(topic1, 1, 1)
TestUtils.produceMessages(servers, topic1, 100, acks = 1, 100 * 1000)
@@ -98,13 +98,13 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
}, "Expected the consumer group to reset to offset 1 (specific offset).")
printConsumerGroup("new.group")
- AdminUtils.deleteTopic(zkUtils, topic1)
+ adminZkClient.deleteTopic(topic1)
consumerGroupCommand.close()
}
@Test
def testResetOffsetsToLocalDateTime() {
- AdminUtils.createTopic(zkUtils, topic1, 1, 1)
+ adminZkClient.createTopic(topic1, 1, 1)
val format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS")
val calendar = Calendar.getInstance()
@@ -144,12 +144,12 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
printConsumerGroup()
- AdminUtils.deleteTopic(zkUtils, topic1)
+ adminZkClient.deleteTopic(topic1)
}
@Test
def testResetOffsetsToZonedDateTime() {
- AdminUtils.createTopic(zkUtils, topic1, 1, 1)
+ adminZkClient.createTopic(topic1, 1, 1)
TestUtils.produceMessages(servers, topic1, 50, acks = 1, 100 * 1000)
val format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX")
@@ -188,7 +188,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
printConsumerGroup()
- AdminUtils.deleteTopic(zkUtils, topic1)
+ adminZkClient.deleteTopic(topic1)
}
@Test
@@ -230,7 +230,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
val opts = new ConsumerGroupCommandOptions(cgcArgs)
val consumerGroupCommand = createConsumerGroupService(opts)
- AdminUtils.createTopic(zkUtils, topic1, 1, 1)
+ adminZkClient.createTopic(topic1, 1, 1)
produceConsumeAndShutdown(consumerGroupCommand, 1, topic1, 100)
@@ -241,7 +241,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
printConsumerGroup()
- AdminUtils.deleteTopic(zkUtils, topic1)
+ adminZkClient.deleteTopic(topic1)
}
@Test
@@ -250,7 +250,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
val opts = new ConsumerGroupCommandOptions(cgcArgs)
val consumerGroupCommand = createConsumerGroupService(opts)
- AdminUtils.createTopic(zkUtils, topic1, 1, 1)
+ adminZkClient.createTopic(topic1, 1, 1)
produceConsumeAndShutdown(consumerGroupCommand, 1, topic1, 100)
@@ -260,7 +260,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
}, "Expected the consumer group to reset to offset 100 (latest by duration).")
printConsumerGroup()
- AdminUtils.deleteTopic(zkUtils, topic1)
+ adminZkClient.deleteTopic(topic1)
}
@Test
@@ -269,7 +269,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
val opts = new ConsumerGroupCommandOptions(cgcArgs)
val consumerGroupCommand = createConsumerGroupService(opts)
- AdminUtils.createTopic(zkUtils, topic1, 1, 1)
+ adminZkClient.createTopic(topic1, 1, 1)
produceConsumeAndShutdown(consumerGroupCommand, 1, topic1, 100)
@@ -279,7 +279,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
}, "Expected the consumer group to reset to offset 0 (earliest).")
printConsumerGroup()
- AdminUtils.deleteTopic(zkUtils, topic1)
+ adminZkClient.deleteTopic(topic1)
}
@Test
@@ -288,7 +288,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
val opts = new ConsumerGroupCommandOptions(cgcArgs)
val consumerGroupCommand = createConsumerGroupService(opts)
- AdminUtils.createTopic(zkUtils, topic1, 1, 1)
+ adminZkClient.createTopic(topic1, 1, 1)
produceConsumeAndShutdown(consumerGroupCommand, 1, topic1, 100)
@@ -301,7 +301,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
}, "Expected the consumer group to reset to offset 200 (latest).")
printConsumerGroup()
- AdminUtils.deleteTopic(zkUtils, topic1)
+ adminZkClient.deleteTopic(topic1)
}
@Test
@@ -310,7 +310,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
val opts = new ConsumerGroupCommandOptions(cgcArgs)
val consumerGroupCommand = createConsumerGroupService(opts)
- AdminUtils.createTopic(zkUtils, topic1, 1, 1)
+ adminZkClient.createTopic(topic1, 1, 1)
produceConsumeAndShutdown(consumerGroupCommand, 1, topic1, 100)
@@ -322,7 +322,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
}, "Expected the consumer group to reset to offset 100 (current).")
printConsumerGroup()
- AdminUtils.deleteTopic(zkUtils, topic1)
+ adminZkClient.deleteTopic(topic1)
}
private def produceConsumeAndShutdown(consumerGroupCommand: KafkaConsumerGroupService, numConsumers: Int, topic: String, totalMessages: Int) {
@@ -351,7 +351,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
val opts = new ConsumerGroupCommandOptions(cgcArgs)
val consumerGroupCommand = createConsumerGroupService(opts)
- AdminUtils.createTopic(zkUtils, topic1, 1, 1)
+ adminZkClient.createTopic(topic1, 1, 1)
produceConsumeAndShutdown(consumerGroupCommand, 1, topic1, 100)
@@ -362,7 +362,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
}, "Expected the consumer group to reset to offset 1 (specific offset).")
printConsumerGroup()
- AdminUtils.deleteTopic(zkUtils, topic1)
+ adminZkClient.deleteTopic(topic1)
}
@Test
@@ -371,7 +371,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
val opts = new ConsumerGroupCommandOptions(cgcArgs)
val consumerGroupCommand = createConsumerGroupService(opts)
- AdminUtils.createTopic(zkUtils, topic1, 1, 1)
+ adminZkClient.createTopic(topic1, 1, 1)
produceConsumeAndShutdown(consumerGroupCommand, 1, topic1, 100)
@@ -383,7 +383,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
}, "Expected the consumer group to reset to offset 150 (current + 50).")
printConsumerGroup()
- AdminUtils.deleteTopic(zkUtils, topic1)
+ adminZkClient.deleteTopic(topic1)
}
@Test
@@ -392,7 +392,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
val opts = new ConsumerGroupCommandOptions(cgcArgs)
val consumerGroupCommand = createConsumerGroupService(opts)
- AdminUtils.createTopic(zkUtils, topic1, 1, 1)
+ adminZkClient.createTopic(topic1, 1, 1)
produceConsumeAndShutdown(consumerGroupCommand, 1, topic1, 100)
@@ -405,7 +405,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
}, "Expected the consumer group to reset to offset 50 (current - 50).")
printConsumerGroup()
- AdminUtils.deleteTopic(zkUtils, topic1)
+ adminZkClient.deleteTopic(topic1)
}
@Test
@@ -414,7 +414,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
val opts = new ConsumerGroupCommandOptions(cgcArgs)
val consumerGroupCommand = createConsumerGroupService(opts)
- AdminUtils.createTopic(zkUtils, topic1, 1, 1)
+ adminZkClient.createTopic(topic1, 1, 1)
produceConsumeAndShutdown(consumerGroupCommand, 1, topic1, 100)
@@ -426,7 +426,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
}, "Expected the consumer group to reset to offset 0 (earliest by shift).")
printConsumerGroup()
- AdminUtils.deleteTopic(zkUtils, topic1)
+ adminZkClient.deleteTopic(topic1)
}
@Test
@@ -435,7 +435,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
val opts = new ConsumerGroupCommandOptions(cgcArgs)
val consumerGroupCommand = createConsumerGroupService(opts)
- AdminUtils.createTopic(zkUtils, topic1, 1, 1)
+ adminZkClient.createTopic(topic1, 1, 1)
produceConsumeAndShutdown(consumerGroupCommand, 1, topic1, 100)
@@ -447,7 +447,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
}, "Expected the consumer group to reset to offset 200 (latest by shift).")
printConsumerGroup()
- AdminUtils.deleteTopic(zkUtils, topic1)
+ adminZkClient.deleteTopic(topic1)
}
@Test
@@ -456,7 +456,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
val opts = new ConsumerGroupCommandOptions(cgcArgs)
val consumerGroupCommand = createConsumerGroupService(opts)
- AdminUtils.createTopic(zkUtils, topic1, 1, 1)
+ adminZkClient.createTopic(topic1, 1, 1)
produceConsumeAndShutdown(consumerGroupCommand, 1, topic1, 100)
@@ -466,7 +466,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
}, "Expected the consumer group to reset to offset 0 (earliest).")
printConsumerGroup()
- AdminUtils.deleteTopic(zkUtils, topic1)
+ adminZkClient.deleteTopic(topic1)
}
@Test
@@ -475,7 +475,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
val opts = new ConsumerGroupCommandOptions(cgcArgs)
val consumerGroupCommand = createConsumerGroupService(opts)
- AdminUtils.createTopic(zkUtils, topic1, 2, 1)
+ adminZkClient.createTopic(topic1, 2, 1)
produceConsumeAndShutdown(consumerGroupCommand, 2, topic1, 100)
@@ -485,7 +485,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
}, "Expected the consumer group to reset to offset 0 (earliest) in partition 1.")
printConsumerGroup()
- AdminUtils.deleteTopic(zkUtils, topic1)
+ adminZkClient.deleteTopic(topic1)
}
@Test
@@ -498,8 +498,8 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
val opts = new ConsumerGroupCommandOptions(cgcArgs)
val consumerGroupCommand = createConsumerGroupService(opts)
- AdminUtils.createTopic(zkUtils, topic1, 1, 1)
- AdminUtils.createTopic(zkUtils, topic2, 1, 1)
+ adminZkClient.createTopic(topic1, 1, 1)
+ adminZkClient.createTopic(topic2, 1, 1)
produceConsumeAndShutdown(consumerGroupCommand, 1, topic1, 100)
produceConsumeAndShutdown(consumerGroupCommand, 1, topic2, 100)
@@ -511,8 +511,8 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
}, "Expected the consumer group to reset to offset 0 (earliest).")
printConsumerGroup()
- AdminUtils.deleteTopic(zkUtils, topic1)
- AdminUtils.deleteTopic(zkUtils, topic2)
+ adminZkClient.deleteTopic(topic1)
+ adminZkClient.deleteTopic(topic2)
}
@Test
@@ -525,8 +525,8 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
val opts = new ConsumerGroupCommandOptions(cgcArgs)
val consumerGroupCommand = createConsumerGroupService(opts)
- AdminUtils.createTopic(zkUtils, topic1, 2, 1)
- AdminUtils.createTopic(zkUtils, topic2, 2, 1)
+ adminZkClient.createTopic(topic1, 2, 1)
+ adminZkClient.createTopic(topic2, 2, 1)
produceConsumeAndShutdown(consumerGroupCommand, 2, topic1, 100)
produceConsumeAndShutdown(consumerGroupCommand, 2, topic2, 100)
@@ -538,8 +538,8 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
}, "Expected the consumer group to reset to offset 0 (earliest) in partition 1.")
printConsumerGroup()
- AdminUtils.deleteTopic(zkUtils, topic1)
- AdminUtils.deleteTopic(zkUtils, topic2)
+ adminZkClient.deleteTopic(topic1)
+ adminZkClient.deleteTopic(topic2)
}
@Test
@@ -548,7 +548,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
val opts = new ConsumerGroupCommandOptions(cgcArgs)
val consumerGroupCommand = createConsumerGroupService(opts)
- AdminUtils.createTopic(zkUtils, topic1, 2, 1)
+ adminZkClient.createTopic(topic1, 2, 1)
produceConsumeAndShutdown(consumerGroupCommand, 2, topic1, 100)
@@ -575,7 +575,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness {
file.deleteOnExit()
printConsumerGroup()
- AdminUtils.deleteTopic(zkUtils, topic1)
+ adminZkClient.deleteTopic(topic1)
}
private def printConsumerGroup() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
index 53efa34..180f257 100644
--- a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
@@ -44,8 +44,8 @@ class TopicCommandTest extends ZooKeeperTestHarness with Logging with RackAwareT
"--replication-factor", "1",
"--config", cleanupKey + "=" + cleanupVal,
"--topic", topic))
- TopicCommand.createTopic(zkUtils, createOpts)
- val props = AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic, topic)
+ TopicCommand.createTopic(zkClient, createOpts)
+ val props = adminZkClient.fetchEntityConfig(ConfigType.Topic, topic)
assertTrue("Properties after creation don't contain " + cleanupKey, props.containsKey(cleanupKey))
assertTrue("Properties after creation have incorrect value", props.getProperty(cleanupKey).equals(cleanupVal))
@@ -55,8 +55,8 @@ class TopicCommandTest extends ZooKeeperTestHarness with Logging with RackAwareT
// modify the topic to add new partitions
val numPartitionsModified = 3
val alterOpts = new TopicCommandOptions(Array("--partitions", numPartitionsModified.toString, "--topic", topic))
- TopicCommand.alterTopic(zkUtils, alterOpts)
- val newProps = AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic, topic)
+ TopicCommand.alterTopic(zkClient, alterOpts)
+ val newProps = adminZkClient.fetchEntityConfig(ConfigType.Topic, topic)
assertTrue("Updated properties do not contain " + cleanupKey, newProps.containsKey(cleanupKey))
assertTrue("Updated properties have incorrect value", newProps.getProperty(cleanupKey).equals(cleanupVal))
}
@@ -75,27 +75,27 @@ class TopicCommandTest extends ZooKeeperTestHarness with Logging with RackAwareT
val createOpts = new TopicCommandOptions(Array("--partitions", numPartitionsOriginal.toString,
"--replication-factor", "1",
"--topic", normalTopic))
- TopicCommand.createTopic(zkUtils, createOpts)
+ TopicCommand.createTopic(zkClient, createOpts)
// delete the NormalTopic
val deleteOpts = new TopicCommandOptions(Array("--topic", normalTopic))
val deletePath = getDeleteTopicPath(normalTopic)
assertFalse("Delete path for topic shouldn't exist before deletion.", zkUtils.pathExists(deletePath))
- TopicCommand.deleteTopic(zkUtils, deleteOpts)
+ TopicCommand.deleteTopic(zkClient, deleteOpts)
assertTrue("Delete path for topic should exist after deletion.", zkUtils.pathExists(deletePath))
// create the offset topic
val createOffsetTopicOpts = new TopicCommandOptions(Array("--partitions", numPartitionsOriginal.toString,
"--replication-factor", "1",
"--topic", Topic.GROUP_METADATA_TOPIC_NAME))
- TopicCommand.createTopic(zkUtils, createOffsetTopicOpts)
+ TopicCommand.createTopic(zkClient, createOffsetTopicOpts)
// try to delete the Topic.GROUP_METADATA_TOPIC_NAME and make sure it doesn't
val deleteOffsetTopicOpts = new TopicCommandOptions(Array("--topic", Topic.GROUP_METADATA_TOPIC_NAME))
val deleteOffsetTopicPath = getDeleteTopicPath(Topic.GROUP_METADATA_TOPIC_NAME)
assertFalse("Delete path for topic shouldn't exist before deletion.", zkUtils.pathExists(deleteOffsetTopicPath))
intercept[AdminOperationException] {
- TopicCommand.deleteTopic(zkUtils, deleteOffsetTopicOpts)
+ TopicCommand.deleteTopic(zkClient, deleteOffsetTopicOpts)
}
assertFalse("Delete path for topic shouldn't exist after deletion.", zkUtils.pathExists(deleteOffsetTopicPath))
}
@@ -109,12 +109,12 @@ class TopicCommandTest extends ZooKeeperTestHarness with Logging with RackAwareT
// delete a topic that does not exist without --if-exists
val deleteOpts = new TopicCommandOptions(Array("--topic", "test"))
intercept[IllegalArgumentException] {
- TopicCommand.deleteTopic(zkUtils, deleteOpts)
+ TopicCommand.deleteTopic(zkClient, deleteOpts)
}
// delete a topic that does not exist with --if-exists
val deleteExistsOpts = new TopicCommandOptions(Array("--topic", "test", "--if-exists"))
- TopicCommand.deleteTopic(zkUtils, deleteExistsOpts)
+ TopicCommand.deleteTopic(zkClient, deleteExistsOpts)
}
@Test
@@ -126,12 +126,12 @@ class TopicCommandTest extends ZooKeeperTestHarness with Logging with RackAwareT
// alter a topic that does not exist without --if-exists
val alterOpts = new TopicCommandOptions(Array("--topic", "test", "--partitions", "1"))
intercept[IllegalArgumentException] {
- TopicCommand.alterTopic(zkUtils, alterOpts)
+ TopicCommand.alterTopic(zkClient, alterOpts)
}
// alter a topic that does not exist with --if-exists
val alterExistsOpts = new TopicCommandOptions(Array("--topic", "test", "--partitions", "1", "--if-exists"))
- TopicCommand.alterTopic(zkUtils, alterExistsOpts)
+ TopicCommand.alterTopic(zkClient, alterExistsOpts)
}
@Test
@@ -146,17 +146,17 @@ class TopicCommandTest extends ZooKeeperTestHarness with Logging with RackAwareT
// create the topic
val createOpts = new TopicCommandOptions(
Array("--partitions", numPartitions.toString, "--replication-factor", "1", "--topic", topic))
- TopicCommand.createTopic(zkUtils, createOpts)
+ TopicCommand.createTopic(zkClient, createOpts)
// try to re-create the topic without --if-not-exists
intercept[TopicExistsException] {
- TopicCommand.createTopic(zkUtils, createOpts)
+ TopicCommand.createTopic(zkClient, createOpts)
}
// try to re-create the topic with --if-not-exists
val createNotExistsOpts = new TopicCommandOptions(
Array("--partitions", numPartitions.toString, "--replication-factor", "1", "--topic", topic, "--if-not-exists"))
- TopicCommand.createTopic(zkUtils, createNotExistsOpts)
+ TopicCommand.createTopic(zkClient, createNotExistsOpts)
}
@Test
@@ -170,7 +170,7 @@ class TopicCommandTest extends ZooKeeperTestHarness with Logging with RackAwareT
"--partitions", numPartitions.toString,
"--replication-factor", replicationFactor.toString,
"--topic", "foo"))
- TopicCommand.createTopic(zkUtils, createOpts)
+ TopicCommand.createTopic(zkClient, createOpts)
var assignment = zkUtils.getReplicaAssignmentForTopics(Seq("foo")).map { case (tp, replicas) =>
tp.partition -> replicas
@@ -182,7 +182,7 @@ class TopicCommandTest extends ZooKeeperTestHarness with Logging with RackAwareT
val alterOpts = new TopicCommandOptions(Array(
"--partitions", alteredNumPartitions.toString,
"--topic", "foo"))
- TopicCommand.alterTopic(zkUtils, alterOpts)
+ TopicCommand.alterTopic(zkClient, alterOpts)
assignment = zkUtils.getReplicaAssignmentForTopics(Seq("foo")).map { case (tp, replicas) =>
tp.partition -> replicas
}
@@ -198,28 +198,28 @@ class TopicCommandTest extends ZooKeeperTestHarness with Logging with RackAwareT
TestUtils.createBrokersInZk(zkUtils, brokers)
val createOpts = new TopicCommandOptions(Array("--partitions", "1", "--replication-factor", "1", "--topic", topic))
- TopicCommand.createTopic(zkUtils, createOpts)
+ TopicCommand.createTopic(zkClient, createOpts)
// delete the broker first, so when we attempt to delete the topic it gets into "marked for deletion"
TestUtils.deleteBrokersInZk(zkUtils, brokers)
- TopicCommand.deleteTopic(zkUtils, new TopicCommandOptions(Array("--topic", topic)))
+ TopicCommand.deleteTopic(zkClient, new TopicCommandOptions(Array("--topic", topic)))
// Test describe topics
def describeTopicsWithConfig() {
- TopicCommand.describeTopic(zkUtils, new TopicCommandOptions(Array("--describe")))
+ TopicCommand.describeTopic(zkClient, new TopicCommandOptions(Array("--describe")))
}
val outputWithConfig = TestUtils.grabConsoleOutput(describeTopicsWithConfig)
assertTrue(outputWithConfig.contains(topic) && outputWithConfig.contains(markedForDeletionDescribe))
def describeTopicsNoConfig() {
- TopicCommand.describeTopic(zkUtils, new TopicCommandOptions(Array("--describe", "--unavailable-partitions")))
+ TopicCommand.describeTopic(zkClient, new TopicCommandOptions(Array("--describe", "--unavailable-partitions")))
}
val outputNoConfig = TestUtils.grabConsoleOutput(describeTopicsNoConfig)
assertTrue(outputNoConfig.contains(topic) && outputNoConfig.contains(markedForDeletionDescribe))
// Test list topics
def listTopics() {
- TopicCommand.listTopics(zkUtils, new TopicCommandOptions(Array("--list")))
+ TopicCommand.listTopics(zkClient, new TopicCommandOptions(Array("--list")))
}
val output = TestUtils.grabConsoleOutput(listTopics)
assertTrue(output.contains(topic) && output.contains(markedForDeletionList))
http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala
index 04f9bea..32e23cc 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala
@@ -20,7 +20,6 @@ package kafka.controller
import java.util.Properties
import java.util.concurrent.CountDownLatch
-import kafka.admin.AdminUtils
import kafka.integration.KafkaServerTestHarness
import kafka.server.KafkaConfig
import kafka.utils._
@@ -60,7 +59,7 @@ class ControllerFailoverTest extends KafkaServerTestHarness with Logging {
}
val initialEpoch = initialController.epoch
// Create topic with one partition
- AdminUtils.createTopic(servers.head.zkUtils, topic, 1, 1)
+ adminZkClient.createTopic(topic, 1, 1)
val topicPartition = new TopicPartition("topic1", 0)
TestUtils.waitUntilTrue(() =>
initialController.partitionStateMachine.partitionsInState(OnlinePartition).contains(topicPartition),
http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala b/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala
index bec5026..daa4276 100644
--- a/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala
@@ -26,7 +26,6 @@ import org.junit.{Before, Test}
import com.yammer.metrics.Metrics
import com.yammer.metrics.core.Gauge
-
class MetricsDuringTopicCreationDeletionTest extends KafkaServerTestHarness with Logging {
private val nodesNum = 3
@@ -136,7 +135,7 @@ class MetricsDuringTopicCreationDeletionTest extends KafkaServerTestHarness with
// Create topics
for (t <- topics if running) {
try {
- kafka.admin.AdminUtils.createTopic(zkUtils, t, partitionNum, replicationFactor)
+ adminZkClient.createTopic(t, partitionNum, replicationFactor)
} catch {
case e: Exception => e.printStackTrace
}
@@ -146,7 +145,7 @@ class MetricsDuringTopicCreationDeletionTest extends KafkaServerTestHarness with
// Delete topics
for (t <- topics if running) {
try {
- kafka.admin.AdminUtils.deleteTopic(zkUtils, t)
+ adminZkClient.deleteTopic(t)
} catch {
case e: Exception => e.printStackTrace
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
index 6e9e8ff..e93dcf6 100644
--- a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
@@ -17,7 +17,6 @@
package kafka.integration
-import kafka.admin.AdminUtils
import kafka.api.TopicMetadataResponse
import kafka.client.ClientUtils
import kafka.cluster.BrokerEndPoint
@@ -219,7 +218,7 @@ class TopicMetadataTest extends ZooKeeperTestHarness {
// create topic
val topic: String = "test"
- AdminUtils.createTopic(zkUtils, topic, 1, numBrokers)
+ adminZkClient.createTopic(topic, 1, numBrokers)
// shutdown a broker
adHocServers.last.shutdown()
http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
index 24421d0..7732e38 100755
--- a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala
@@ -25,7 +25,6 @@ import org.apache.log4j.{Level, Logger}
import java.util.Properties
import java.util.concurrent.ExecutionException
-import kafka.admin.AdminUtils
import kafka.consumer.{Consumer, ConsumerConfig}
import kafka.serializer.StringDecoder
import kafka.server.{KafkaConfig, KafkaServer}
@@ -109,7 +108,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
startBrokers(Seq(configProps1, configProps2))
// create topic with 1 partition, 2 replicas, one on each broker
- AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, Map(partitionId -> Seq(brokerId1, brokerId2)))
+ adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, Map(partitionId -> Seq(brokerId1, brokerId2)))
verifyUncleanLeaderElectionEnabled
}
@@ -121,7 +120,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
startBrokers(Seq(configProps1, configProps2))
// create topic with 1 partition, 2 replicas, one on each broker
- AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, Map(partitionId -> Seq(brokerId1, brokerId2)))
+ adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, Map(partitionId -> Seq(brokerId1, brokerId2)))
verifyUncleanLeaderElectionDisabled
}
@@ -136,7 +135,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
// create topic with 1 partition, 2 replicas, one on each broker, and unclean leader election enabled
val topicProps = new Properties()
topicProps.put("unclean.leader.election.enable", "true")
- AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, Map(partitionId -> Seq(brokerId1, brokerId2)),
+ adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, Map(partitionId -> Seq(brokerId1, brokerId2)),
topicProps)
verifyUncleanLeaderElectionEnabled
@@ -153,7 +152,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
// create topic with 1 partition, 2 replicas, one on each broker, and unclean leader election disabled
val topicProps = new Properties()
topicProps.put("unclean.leader.election.enable", "false")
- AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, Map(partitionId -> Seq(brokerId1, brokerId2)),
+ adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, Map(partitionId -> Seq(brokerId1, brokerId2)),
topicProps)
verifyUncleanLeaderElectionDisabled
@@ -168,7 +167,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness {
topicProps.put("unclean.leader.election.enable", "invalid")
intercept[ConfigException] {
- AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, Map(partitionId -> Seq(brokerId1)), topicProps)
+ adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, Map(partitionId -> Seq(brokerId1)), topicProps)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
index 0dcff53..c250d85 100644
--- a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
+++ b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
@@ -28,7 +28,6 @@ import kafka.integration.KafkaServerTestHarness
import kafka.server._
import kafka.serializer._
import kafka.utils._
-import kafka.admin.AdminUtils
import kafka.utils.TestUtils._
import scala.collection._
@@ -73,8 +72,8 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
@Test
def testMetricsReporterAfterDeletingTopic() {
val topic = "test-topic-metric"
- AdminUtils.createTopic(zkUtils, topic, 1, 1)
- AdminUtils.deleteTopic(zkUtils, topic)
+ adminZkClient.createTopic(topic, 1, 1)
+ adminZkClient.deleteTopic(topic)
TestUtils.verifyTopicDeletion(zkUtils, topic, 1, servers)
assertEquals("Topic metrics exists after deleteTopic", Set.empty, topicMetricGroups(topic))
}
@@ -82,13 +81,13 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
@Test
def testBrokerTopicMetricsUnregisteredAfterDeletingTopic() {
val topic = "test-broker-topic-metric"
- AdminUtils.createTopic(zkUtils, topic, 2, 1)
+ adminZkClient.createTopic(topic, 2, 1)
// Produce a few messages to create the metrics
// Don't consume messages as it may cause metrics to be re-created causing the test to fail, see KAFKA-5238
TestUtils.produceMessages(servers, topic, nMessages)
assertTrue("Topic metrics don't exist", topicMetricGroups(topic).nonEmpty)
servers.foreach(s => assertNotNull(s.brokerTopicStats.topicStats(topic)))
- AdminUtils.deleteTopic(zkUtils, topic)
+ adminZkClient.deleteTopic(topic)
TestUtils.verifyTopicDeletion(zkUtils, topic, 1, servers)
assertEquals("Topic metrics exists after deleteTopic", Set.empty, topicMetricGroups(topic))
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
index fa1174d..a0680a2 100644
--- a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
@@ -125,7 +125,7 @@ class SyncProducerTest extends KafkaServerTestHarness {
props.put("request.required.acks", "0")
val producer = new SyncProducer(new SyncProducerConfig(props))
- AdminUtils.createTopic(zkUtils, "test", 1, 1)
+ adminZkClient.createTopic("test", 1, 1)
TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, "test", 0)
// This message will be dropped silently since message size too large.
@@ -167,9 +167,9 @@ class SyncProducerTest extends KafkaServerTestHarness {
}
// #2 - test that we get correct offsets when partition is owned by broker
- AdminUtils.createTopic(zkUtils, "topic1", 1, 1)
+ adminZkClient.createTopic("topic1", 1, 1)
TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, "topic1", 0)
- AdminUtils.createTopic(zkUtils, "topic3", 1, 1)
+ adminZkClient.createTopic("topic3", 1, 1)
TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, "topic3", 0)
val response2 = producer.send(request)
@@ -243,7 +243,7 @@ class SyncProducerTest extends KafkaServerTestHarness {
val producer = new SyncProducer(new SyncProducerConfig(props))
val topicProps = new Properties()
topicProps.put("min.insync.replicas","2")
- AdminUtils.createTopic(zkUtils, topicName, 1, 1,topicProps)
+ adminZkClient.createTopic(topicName, 1, 1,topicProps)
TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, topicName, 0)
val response = producer.send(produceRequest(topicName, 0,
http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
index dd7bb5f..85bd6a1 100644
--- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
@@ -26,7 +26,7 @@ import org.easymock.EasyMock
import org.junit.Test
import kafka.integration.KafkaServerTestHarness
import kafka.utils._
-import kafka.admin.{AdminOperationException, AdminUtils}
+import kafka.admin.AdminOperationException
import org.apache.kafka.common.TopicPartition
import scala.collection.Map
@@ -43,14 +43,14 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
val tp = new TopicPartition("test", 0)
val logProps = new Properties()
logProps.put(FlushMessagesProp, oldVal.toString)
- AdminUtils.createTopic(zkUtils, tp.topic, 1, 1, logProps)
+ adminZkClient.createTopic(tp.topic, 1, 1, logProps)
TestUtils.retry(10000) {
val logOpt = this.servers.head.logManager.getLog(tp)
assertTrue(logOpt.isDefined)
assertEquals(oldVal, logOpt.get.config.flushInterval)
}
logProps.put(FlushMessagesProp, newVal.toString)
- AdminUtils.changeTopicConfig(zkUtils, tp.topic, logProps)
+ adminZkClient.changeTopicConfig(tp.topic, logProps)
TestUtils.retry(10000) {
assertEquals(newVal, this.servers.head.logManager.getLog(tp).get.config.flushInterval)
}
@@ -65,8 +65,8 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
val quotaManagers = servers.head.apis.quotas
rootEntityType match {
- case ConfigType.Client => AdminUtils.changeClientIdConfig(zkUtils, configEntityName, props)
- case _ => AdminUtils.changeUserOrUserClientIdConfig(zkUtils, configEntityName, props)
+ case ConfigType.Client => adminZkClient.changeClientIdConfig(configEntityName, props)
+ case _ => adminZkClient.changeUserOrUserClientIdConfig(configEntityName, props)
}
TestUtils.retry(10000) {
@@ -84,8 +84,8 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
val emptyProps = new Properties()
rootEntityType match {
- case ConfigType.Client => AdminUtils.changeClientIdConfig(zkUtils, configEntityName, emptyProps)
- case _ => AdminUtils.changeUserOrUserClientIdConfig(zkUtils, configEntityName, emptyProps)
+ case ConfigType.Client => adminZkClient.changeClientIdConfig(configEntityName, emptyProps)
+ case _ => adminZkClient.changeUserOrUserClientIdConfig(configEntityName, emptyProps)
}
TestUtils.retry(10000) {
val producerQuota = quotaManagers.produce.quota(user, clientId)
@@ -142,9 +142,9 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
userClientIdProps.put(DynamicConfig.Client.ProducerByteRateOverrideProp, "100000")
userClientIdProps.put(DynamicConfig.Client.ConsumerByteRateOverrideProp, "200000")
- AdminUtils.changeClientIdConfig(zkUtils, "overriddenClientId", clientIdProps)
- AdminUtils.changeUserOrUserClientIdConfig(zkUtils, "overriddenUser", userProps)
- AdminUtils.changeUserOrUserClientIdConfig(zkUtils, "ANONYMOUS/clients/overriddenUserClientId", userClientIdProps)
+ adminZkClient.changeClientIdConfig("overriddenClientId", clientIdProps)
+ adminZkClient.changeUserOrUserClientIdConfig("overriddenUser", userProps)
+ adminZkClient.changeUserOrUserClientIdConfig("ANONYMOUS/clients/overriddenUserClientId", userClientIdProps)
// Remove config change znodes to force quota initialization only through loading of user/client quotas
zkUtils.getChildren(ZkUtils.ConfigChangesPath).foreach { p => zkUtils.deletePath(ZkUtils.ConfigChangesPath + "/" + p) }
@@ -165,7 +165,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
try {
val logProps = new Properties()
logProps.put(FlushMessagesProp, 10000: java.lang.Integer)
- AdminUtils.changeTopicConfig(zkUtils, topic, logProps)
+ adminZkClient.changeTopicConfig(topic, logProps)
fail("Should fail with AdminOperationException for topic doesn't exist")
} catch {
case _: AdminOperationException => // expected
@@ -187,7 +187,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
EasyMock.expectLastCall().once()
EasyMock.replay(handler)
- val configManager = new DynamicConfigManager(zkUtils, zkClient, Map(ConfigType.Topic -> handler))
+ val configManager = new DynamicConfigManager(zkClient, Map(ConfigType.Topic -> handler))
// Notifications created using the old TopicConfigManager are ignored.
configManager.ConfigChangedNotificationHandler.processNotification("not json")
http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/unit/kafka/server/DynamicConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/DynamicConfigTest.scala b/core/src/test/scala/unit/kafka/server/DynamicConfigTest.scala
index 9e6b1b2..b2378cf 100644
--- a/core/src/test/scala/unit/kafka/server/DynamicConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicConfigTest.scala
@@ -16,50 +16,39 @@
*/
package kafka.server
-import kafka.admin.AdminUtils
-import kafka.utils.ZkUtils
-import org.I0Itec.zkclient.ZkClient
-import org.apache.kafka.common.config._
-import org.easymock.EasyMock
-import org.junit.{Before, Test}
import kafka.utils.CoreUtils._
+import kafka.zk.ZooKeeperTestHarness
+import org.apache.kafka.common.config._
+import org.junit.Test
-class DynamicConfigTest {
+class DynamicConfigTest extends ZooKeeperTestHarness {
private final val nonExistentConfig: String = "some.config.that.does.not.exist"
private final val someValue: String = "some interesting value"
- var zkUtils: ZkUtils = _
-
- @Before
- def setUp() {
- val zkClient = EasyMock.createMock(classOf[ZkClient])
- zkUtils = ZkUtils(zkClient, isZkSecurityEnabled = false)
- }
-
@Test(expected = classOf[IllegalArgumentException])
def shouldFailWhenChangingBrokerUnknownConfig() {
- AdminUtils.changeBrokerConfig(zkUtils, Seq(0), propsWith(nonExistentConfig, someValue))
+ adminZkClient.changeBrokerConfig(Seq(0), propsWith(nonExistentConfig, someValue))
}
@Test(expected = classOf[IllegalArgumentException])
def shouldFailWhenChangingClientIdUnknownConfig() {
- AdminUtils.changeClientIdConfig(zkUtils, "ClientId", propsWith(nonExistentConfig, someValue))
+ adminZkClient.changeClientIdConfig("ClientId", propsWith(nonExistentConfig, someValue))
}
@Test(expected = classOf[IllegalArgumentException])
def shouldFailWhenChangingUserUnknownConfig() {
- AdminUtils.changeUserOrUserClientIdConfig(zkUtils, "UserId", propsWith(nonExistentConfig, someValue))
+ adminZkClient.changeUserOrUserClientIdConfig("UserId", propsWith(nonExistentConfig, someValue))
}
@Test(expected = classOf[ConfigException])
def shouldFailLeaderConfigsWithInvalidValues() {
- AdminUtils.changeBrokerConfig(zkUtils, Seq(0),
+ adminZkClient.changeBrokerConfig(Seq(0),
propsWith(DynamicConfig.Broker.LeaderReplicationThrottledRateProp, "-100"))
}
@Test(expected = classOf[ConfigException])
def shouldFailFollowerConfigsWithInvalidValues() {
- AdminUtils.changeBrokerConfig(zkUtils, Seq(0),
+ adminZkClient.changeBrokerConfig(Seq(0),
propsWith(DynamicConfig.Broker.FollowerReplicationThrottledRateProp, "-100"))
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index cabbd5d..a51acd0 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -90,7 +90,6 @@ class KafkaApisTest {
groupCoordinator,
txnCoordinator,
controller,
- zkUtils,
zkClient,
brokerId,
new KafkaConfig(properties),
http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
index 306dbc0..b0a7d72 100755
--- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
@@ -21,7 +21,6 @@ import java.io.File
import java.util.concurrent.atomic.AtomicInteger
import java.util.{Properties, Random}
-import kafka.admin.AdminUtils
import kafka.api.{FetchRequestBuilder, OffsetRequest, PartitionOffsetRequestInfo}
import kafka.common.TopicAndPartition
import kafka.consumer.SimpleConsumer
@@ -81,7 +80,7 @@ class LogOffsetTest extends ZooKeeperTestHarness {
val part = Integer.valueOf(topicPartition.split("-").last).intValue
// setup brokers in ZooKeeper as owners of partitions for this test
- AdminUtils.createTopic(zkUtils, topic, 1, 1)
+ adminZkClient.createTopic(topic, 1, 1)
val logManager = server.getLogManager
waitUntilTrue(() => logManager.getLog(new TopicPartition(topic, part)).isDefined,
@@ -116,7 +115,7 @@ class LogOffsetTest extends ZooKeeperTestHarness {
val part = Integer.valueOf(topicPartition.split("-").last).intValue
// setup brokers in ZooKeeper as owners of partitions for this test
- AdminUtils.createTopic(zkUtils, topic, 1, 1)
+ adminZkClient.createTopic(topic, 1, 1)
val logManager = server.getLogManager
waitUntilTrue(() => logManager.getLog(new TopicPartition(topic, part)).isDefined,
@@ -179,7 +178,7 @@ class LogOffsetTest extends ZooKeeperTestHarness {
val part = Integer.valueOf(topicPartition.split("-").last).intValue
// setup brokers in ZooKeeper as owners of partitions for this test
- AdminUtils.createTopic(zkUtils, topic, 3, 1)
+ adminZkClient.createTopic(topic, 3, 1)
val logManager = server.getLogManager
val log = logManager.getOrCreateLog(new TopicPartition(topic, part), logManager.defaultConfig)
@@ -208,7 +207,7 @@ class LogOffsetTest extends ZooKeeperTestHarness {
val part = Integer.valueOf(topicPartition.split("-").last).intValue
// setup brokers in ZooKeeper as owners of partitions for this test
- AdminUtils.createTopic(zkUtils, topic, 3, 1)
+ adminZkClient.createTopic(topic, 3, 1)
val logManager = server.getLogManager
val log = logManager.getOrCreateLog(new TopicPartition(topic, part), logManager.defaultConfig)
http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
index bb7e9fe..61e17e3 100755
--- a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
@@ -30,8 +30,6 @@ import org.junit.Assert._
import java.util.Properties
import java.io.File
-import kafka.admin.AdminUtils
-
import scala.util.Random
import scala.collection._
@@ -319,7 +317,7 @@ class OffsetCommitTest extends ZooKeeperTestHarness {
assertEquals(Errors.NONE, commitResponse.commitStatus.get(topicPartition).get)
// start topic deletion
- AdminUtils.deleteTopic(zkUtils, topic)
+ adminZkClient.deleteTopic(topic)
TestUtils.verifyTopicDeletion(zkUtils, topic, 1, Seq(server))
Thread.sleep(retentionCheckInterval * 2)
http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
index f162492..45a6bdd 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
@@ -19,8 +19,6 @@ package kafka.server
import java.util.Properties
-import kafka.admin.AdminUtils
-import kafka.admin.AdminUtils._
import kafka.log.LogConfig._
import kafka.server.KafkaConfig.fromProps
import kafka.server.QuotaType._
@@ -80,7 +78,7 @@ class ReplicationQuotasTest extends ZooKeeperTestHarness {
//Given six partitions, led on nodes 0,1,2,3,4,5 but with followers on node 6,7 (not started yet)
//And two extra partitions 6,7, which we don't intend on throttling.
- AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, Map(
+ adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, Map(
0 -> Seq(100, 106), //Throttled
1 -> Seq(101, 106), //Throttled
2 -> Seq(102, 106), //Throttled
@@ -99,7 +97,7 @@ class ReplicationQuotasTest extends ZooKeeperTestHarness {
//Set the throttle limit on all 8 brokers, but only assign throttled replicas to the six leaders, or two followers
(100 to 107).foreach { brokerId =>
- changeBrokerConfig(zkUtils, Seq(brokerId),
+ adminZkClient.changeBrokerConfig(Seq(brokerId),
propsWith(
(DynamicConfig.Broker.LeaderReplicationThrottledRateProp, throttle.toString),
(DynamicConfig.Broker.FollowerReplicationThrottledRateProp, throttle.toString)
@@ -108,9 +106,9 @@ class ReplicationQuotasTest extends ZooKeeperTestHarness {
//Either throttle the six leaders or the two followers
if (leaderThrottle)
- changeTopicConfig(zkUtils, topic, propsWith(LeaderReplicationThrottledReplicasProp, "0:100,1:101,2:102,3:103,4:104,5:105" ))
+ adminZkClient.changeTopicConfig(topic, propsWith(LeaderReplicationThrottledReplicasProp, "0:100,1:101,2:102,3:103,4:104,5:105" ))
else
- changeTopicConfig(zkUtils, topic, propsWith(FollowerReplicationThrottledReplicasProp, "0:106,1:106,2:106,3:107,4:107,5:107"))
+ adminZkClient.changeTopicConfig(topic, propsWith(FollowerReplicationThrottledReplicasProp, "0:106,1:106,2:106,3:107,4:107,5:107"))
//Add data equally to each partition
producer = createNewProducer(getBrokerListStrFromServers(brokers), retries = 5, acks = 1)
@@ -178,7 +176,7 @@ class ReplicationQuotasTest extends ZooKeeperTestHarness {
val config: Properties = createBrokerConfig(100, zkConnect)
config.put("log.segment.bytes", (1024 * 1024).toString)
brokers = Seq(createServer(fromProps(config)))
- AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, Map(0 -> Seq(100, 101)))
+ adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, Map(0 -> Seq(100, 101)))
//Write 20MBs and throttle at 5MB/s
val msg = msg100KB
@@ -187,8 +185,8 @@ class ReplicationQuotasTest extends ZooKeeperTestHarness {
val throttle: Long = msg.length * msgCount / expectedDuration
//Set the throttle to only limit leader
- changeBrokerConfig(zkUtils, Seq(100), propsWith(DynamicConfig.Broker.LeaderReplicationThrottledRateProp, throttle.toString))
- changeTopicConfig(zkUtils, topic, propsWith(LeaderReplicationThrottledReplicasProp, "0:100"))
+ adminZkClient.changeBrokerConfig(Seq(100), propsWith(DynamicConfig.Broker.LeaderReplicationThrottledRateProp, throttle.toString))
+ adminZkClient.changeTopicConfig(topic, propsWith(LeaderReplicationThrottledReplicasProp, "0:100"))
//Add data
addData(msgCount, msg)
http://git-wip-us.apache.org/repos/asf/kafka/blob/bc852baf/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index 4774e1d..ee75933 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -19,7 +19,6 @@ import java.nio.ByteBuffer
import java.util.{Collections, LinkedHashMap, Properties}
import java.util.concurrent.{Executors, Future, TimeUnit}
-import kafka.admin.AdminUtils
import kafka.log.LogConfig
import kafka.network.RequestChannel.Session
import kafka.security.auth._
@@ -81,9 +80,9 @@ class RequestQuotaTest extends BaseRequestTest {
// Change default client-id request quota to a small value and a single unthrottledClient with a large quota
val quotaProps = new Properties()
quotaProps.put(DynamicConfig.Client.RequestPercentageOverrideProp, "0.01")
- AdminUtils.changeClientIdConfig(zkUtils, "<default>", quotaProps)
+ adminZkClient.changeClientIdConfig("<default>", quotaProps)
quotaProps.put(DynamicConfig.Client.RequestPercentageOverrideProp, "2000")
- AdminUtils.changeClientIdConfig(zkUtils, Sanitizer.sanitize(unthrottledClientId), quotaProps)
+ adminZkClient.changeClientIdConfig(Sanitizer.sanitize(unthrottledClientId), quotaProps)
TestUtils.retry(10000) {
val quotaManager = servers.head.apis.quotas.request