You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2018/06/14 03:34:56 UTC
[kafka] branch trunk updated: MINOR: Replace test usages of
ClientUtils.fetchTopicMetadata with BaseRequestTest (#5216)
This is an automated email from the ASF dual-hosted git repository.
ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 0b591d7 MINOR: Replace test usages of ClientUtils.fetchTopicMetadata with BaseRequestTest (#5216)
0b591d7 is described below
commit 0b591d703b574165fccf3c3a34f984beea140992
Author: Ismael Juma <is...@juma.me.uk>
AuthorDate: Wed Jun 13 20:33:38 2018 -0700
MINOR: Replace test usages of ClientUtils.fetchTopicMetadata with BaseRequestTest (#5216)
For tests that are not testing the old consumers functionality. As part of this,
consolidate `TopicMetadataTest` into `MetadataRequestTest`. Finally,
remove `ProducerBounceTest` which has no tests left in it.
Reviewers: Jason Gustafson <ja...@confluent.io>
---
.../integration/kafka/api/ProducerBounceTest.scala | 116 ---------
.../scala/unit/kafka/admin/AddPartitionsTest.scala | 136 +++++-----
.../unit/kafka/integration/TopicMetadataTest.scala | 289 ---------------------
.../unit/kafka/server/MetadataRequestTest.scala | 117 +++++++++
4 files changed, 177 insertions(+), 481 deletions(-)
diff --git a/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
deleted file mode 100644
index a11afd3..0000000
--- a/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
+++ /dev/null
@@ -1,116 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-
-package kafka.api
-
-import java.util.Properties
-import java.util.concurrent.Future
-
-import kafka.integration.KafkaServerTestHarness
-import kafka.server.KafkaConfig
-import kafka.utils.{ShutdownableThread, TestUtils}
-import kafka.utils.Implicits._
-import org.apache.kafka.clients.producer._
-import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
-import org.apache.kafka.common.security.auth.SecurityProtocol
-import org.apache.kafka.common.serialization.StringDeserializer
-import org.junit.Assert._
-import org.junit.{Ignore, Test}
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable.ArrayBuffer
-
-class ProducerBounceTest extends KafkaServerTestHarness {
- private val producerBufferSize = 65536
- private val serverMessageMaxBytes = producerBufferSize/2
-
- val numServers = 4
-
- val overridingProps = new Properties()
- overridingProps.put(KafkaConfig.AutoCreateTopicsEnableProp, "false")
- overridingProps.put(KafkaConfig.MessageMaxBytesProp, serverMessageMaxBytes.toString)
- // Set a smaller value for the number of partitions for the offset commit topic (__consumer_offset topic)
- // so that the creation of that topic/partition(s) and subsequent leader assignment doesn't take relatively long
- overridingProps.put(KafkaConfig.OffsetsTopicPartitionsProp, "1")
- overridingProps.put(KafkaConfig.ControlledShutdownEnableProp, "true")
- overridingProps.put(KafkaConfig.UncleanLeaderElectionEnableProp, "false")
- overridingProps.put(KafkaConfig.AutoLeaderRebalanceEnableProp, "false")
- // This is the one of the few tests we currently allow to preallocate ports, despite the fact that this can result in transient
- // failures due to ports getting reused. We can't use random ports because of bad behavior that can result from bouncing
- // brokers too quickly when they get new, random ports. If we're not careful, the client can end up in a situation
- // where metadata is not refreshed quickly enough, and by the time it's actually trying to, all the servers have
- // been bounced and have new addresses. None of the bootstrap nodes or current metadata can get them connected to a
- // running server.
- //
- // Since such quick rotation of servers is incredibly unrealistic, we allow this one test to preallocate ports, leaving
- // a small risk of hitting errors due to port conflicts. Hopefully this is infrequent enough to not cause problems.
- override def generateConfigs = {
- FixedPortTestUtils.createBrokerConfigs(numServers, zkConnect,enableControlledShutdown = true)
- .map(KafkaConfig.fromProps(_, overridingProps))
- }
-
- private val topic1 = "topic-1"
-
- private class ProducerScheduler extends ShutdownableThread("daemon-producer", false) {
- val numRecords = 1000
- var sent = 0
- var failed = false
-
- val producerConfig = new Properties()
- producerConfig.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true")
- producerConfig.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5")
- val producerConfigWithCompression = new Properties()
- producerConfigWithCompression ++= producerConfig
- producerConfigWithCompression.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4")
- val producers = List(
- TestUtils.createNewProducer(brokerList, bufferSize = producerBufferSize / 4, retries = 10, props = Some(producerConfig)),
- TestUtils.createNewProducer(brokerList, bufferSize = producerBufferSize / 2, retries = 10, lingerMs = 5000, props = Some(producerConfig)),
- TestUtils.createNewProducer(brokerList, bufferSize = producerBufferSize, retries = 10, lingerMs = 10000, props = Some(producerConfigWithCompression))
- )
-
- override def doWork(): Unit = {
- info("Starting to send messages..")
- var producerId = 0
- val responses = new ArrayBuffer[IndexedSeq[Future[RecordMetadata]]]()
- for (producer <- producers) {
- val response =
- for (i <- sent+1 to sent+numRecords)
- yield producer.send(new ProducerRecord[Array[Byte],Array[Byte]](topic1, null, null, ((producerId + 1) * i).toString.getBytes),
- new ErrorLoggingCallback(topic1, null, null, true))
- responses.append(response)
- producerId += 1
- }
-
- try {
- for (response <- responses) {
- val futures = response.toList
- futures.map(_.get)
- sent += numRecords
- }
- info(s"Sent $sent records")
- } catch {
- case e : Exception =>
- error(s"Got exception ${e.getMessage}")
- e.printStackTrace()
- failed = true
- }
- }
-
- override def shutdown(){
- super.shutdown()
- for (producer <- producers) {
- producer.close()
- }
- }
- }
-}
diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
index 43d8ec8..4d1e4ab 100755
--- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
@@ -17,24 +17,23 @@
package kafka.admin
-import kafka.api.TopicMetadata
+import kafka.network.SocketServer
import org.junit.Assert._
-import kafka.zk.ZooKeeperTestHarness
import kafka.utils.TestUtils._
import kafka.utils.TestUtils
-import kafka.cluster.Broker
-import kafka.client.ClientUtils
-import kafka.server.{KafkaConfig, KafkaServer}
+import kafka.server.BaseRequestTest
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.InvalidReplicaAssignmentException
-import org.apache.kafka.common.network.ListenerName
-import org.apache.kafka.common.security.auth.SecurityProtocol
-import org.junit.{After, Before, Test}
+import org.apache.kafka.common.protocol.ApiKeys
+import org.apache.kafka.common.requests.MetadataResponse.TopicMetadata
+import org.apache.kafka.common.requests.{MetadataRequest, MetadataResponse}
+import org.junit.{Before, Test}
-class AddPartitionsTest extends ZooKeeperTestHarness {
- var configs: Seq[KafkaConfig] = null
- var servers: Seq[KafkaServer] = Seq.empty[KafkaServer]
- var brokers: Seq[Broker] = Seq.empty[Broker]
+import scala.collection.JavaConverters._
+
+class AddPartitionsTest extends BaseRequestTest {
+
+ protected override def numBrokers: Int = 4
val partitionId = 0
@@ -53,22 +52,10 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
override def setUp() {
super.setUp()
- configs = (0 until 4).map(i => KafkaConfig.fromProps(TestUtils.createBrokerConfig(i, zkConnect, enableControlledShutdown = false)))
- // start all the servers
- servers = configs.map(c => TestUtils.createServer(c))
- brokers = servers.map(s => TestUtils.createBroker(s.config.brokerId, s.config.hostName, TestUtils.boundPort(s)))
-
- // create topics first
- createTopic(zkClient, topic1, partitionReplicaAssignment = topic1Assignment, servers = servers)
- createTopic(zkClient, topic2, partitionReplicaAssignment = topic2Assignment, servers = servers)
- createTopic(zkClient, topic3, partitionReplicaAssignment = topic3Assignment, servers = servers)
- createTopic(zkClient, topic4, partitionReplicaAssignment = topic4Assignment, servers = servers)
- }
-
- @After
- override def tearDown() {
- TestUtils.shutdownServers(servers)
- super.tearDown()
+ createTopic(topic1, partitionReplicaAssignment = topic1Assignment)
+ createTopic(topic2, partitionReplicaAssignment = topic2Assignment)
+ createTopic(topic3, partitionReplicaAssignment = topic3Assignment)
+ createTopic(topic4, partitionReplicaAssignment = topic4Assignment)
}
@Test
@@ -108,17 +95,15 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
// read metadata from a broker and verify the new topic partitions exist
TestUtils.waitUntilMetadataIsPropagated(servers, topic1, 1)
TestUtils.waitUntilMetadataIsPropagated(servers, topic1, 2)
- val listenerName = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)
- val metadata = ClientUtils.fetchTopicMetadata(Set(topic1), brokers.map(_.brokerEndPoint(listenerName)),
- "AddPartitionsTest-testIncrementPartitions", 2000, 0).topicsMetadata
- val metaDataForTopic1 = metadata.filter(p => p.topic.equals(topic1))
- val partitionDataForTopic1 = metaDataForTopic1.head.partitionsMetadata.sortBy(_.partitionId)
- assertEquals(partitionDataForTopic1.size, 3)
- assertEquals(partitionDataForTopic1(1).partitionId, 1)
- assertEquals(partitionDataForTopic1(2).partitionId, 2)
- val replicas = partitionDataForTopic1(1).replicas
+ val response = sendMetadataRequest(new MetadataRequest.Builder(Seq(topic1).asJava, false).build)
+ assertEquals(1, response.topicMetadata.size)
+ val partitions = response.topicMetadata.asScala.head.partitionMetadata.asScala.sortBy(_.partition)
+ assertEquals(partitions.size, 3)
+ assertEquals(1, partitions(1).partition)
+ assertEquals(2, partitions(2).partition)
+ val replicas = partitions(1).replicas
assertEquals(replicas.size, 2)
- assert(replicas.contains(partitionDataForTopic1(1).leader.get))
+ assertTrue(replicas.contains(partitions(1).leader))
}
@Test
@@ -137,18 +122,18 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
// read metadata from a broker and verify the new topic partitions exist
TestUtils.waitUntilMetadataIsPropagated(servers, topic2, 1)
TestUtils.waitUntilMetadataIsPropagated(servers, topic2, 2)
- val metadata = ClientUtils.fetchTopicMetadata(Set(topic2),
- brokers.map(_.brokerEndPoint(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))),
- "AddPartitionsTest-testManualAssignmentOfReplicas", 2000, 0).topicsMetadata
- val metaDataForTopic2 = metadata.filter(_.topic == topic2)
- val partitionDataForTopic2 = metaDataForTopic2.head.partitionsMetadata.sortBy(_.partitionId)
- assertEquals(3, partitionDataForTopic2.size)
- assertEquals(1, partitionDataForTopic2(1).partitionId)
- assertEquals(2, partitionDataForTopic2(2).partitionId)
- val replicas = partitionDataForTopic2(1).replicas
+ val response = sendMetadataRequest(new MetadataRequest.Builder(Seq(topic2).asJava, false).build)
+ assertEquals(1, response.topicMetadata.size)
+ val topicMetadata = response.topicMetadata.asScala.head
+ val partitionMetadata = topicMetadata.partitionMetadata.asScala.sortBy(_.partition)
+ assertEquals(3, topicMetadata.partitionMetadata.size)
+ assertEquals(0, partitionMetadata(0).partition)
+ assertEquals(1, partitionMetadata(1).partition)
+ assertEquals(2, partitionMetadata(2).partition)
+ val replicas = topicMetadata.partitionMetadata.get(1).replicas
assertEquals(2, replicas.size)
- assertTrue(replicas.head.id == 0 || replicas.head.id == 1)
- assertTrue(replicas(1).id == 0 || replicas(1).id == 1)
+ assertTrue(replicas.asScala.head.id == 0 || replicas.asScala.head.id == 1)
+ assertTrue(replicas.asScala(1).id == 0 || replicas.asScala(1).id == 1)
}
@Test
@@ -163,19 +148,16 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 5)
TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 6)
- val metadata = ClientUtils.fetchTopicMetadata(Set(topic3),
- brokers.map(_.brokerEndPoint(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))),
- "AddPartitionsTest-testReplicaPlacementAllServers", 2000, 0).topicsMetadata
-
- val metaDataForTopic3 = metadata.find(p => p.topic == topic3).get
-
- validateLeaderAndReplicas(metaDataForTopic3, 0, 2, Set(2, 3, 0, 1))
- validateLeaderAndReplicas(metaDataForTopic3, 1, 3, Set(3, 2, 0, 1))
- validateLeaderAndReplicas(metaDataForTopic3, 2, 0, Set(0, 3, 1, 2))
- validateLeaderAndReplicas(metaDataForTopic3, 3, 1, Set(1, 0, 2, 3))
- validateLeaderAndReplicas(metaDataForTopic3, 4, 2, Set(2, 3, 0, 1))
- validateLeaderAndReplicas(metaDataForTopic3, 5, 3, Set(3, 0, 1, 2))
- validateLeaderAndReplicas(metaDataForTopic3, 6, 0, Set(0, 1, 2, 3))
+ val response = sendMetadataRequest(new MetadataRequest.Builder(Seq(topic3).asJava, false).build)
+ assertEquals(1, response.topicMetadata.size)
+ val topicMetadata = response.topicMetadata.asScala.head
+ validateLeaderAndReplicas(topicMetadata, 0, 2, Set(2, 3, 0, 1))
+ validateLeaderAndReplicas(topicMetadata, 1, 3, Set(3, 2, 0, 1))
+ validateLeaderAndReplicas(topicMetadata, 2, 0, Set(0, 3, 1, 2))
+ validateLeaderAndReplicas(topicMetadata, 3, 1, Set(1, 0, 2, 3))
+ validateLeaderAndReplicas(topicMetadata, 4, 2, Set(2, 3, 0, 1))
+ validateLeaderAndReplicas(topicMetadata, 5, 3, Set(3, 0, 1, 2))
+ validateLeaderAndReplicas(topicMetadata, 6, 0, Set(0, 1, 2, 3))
}
@Test
@@ -186,25 +168,27 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
TestUtils.waitUntilMetadataIsPropagated(servers, topic2, 1)
TestUtils.waitUntilMetadataIsPropagated(servers, topic2, 2)
- val metadata = ClientUtils.fetchTopicMetadata(Set(topic2),
- brokers.map(_.brokerEndPoint(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))),
- "AddPartitionsTest-testReplicaPlacementPartialServers", 2000, 0).topicsMetadata
-
- val metaDataForTopic2 = metadata.find(p => p.topic == topic2).get
-
- validateLeaderAndReplicas(metaDataForTopic2, 0, 1, Set(1, 2))
- validateLeaderAndReplicas(metaDataForTopic2, 1, 2, Set(0, 2))
- validateLeaderAndReplicas(metaDataForTopic2, 2, 3, Set(1, 3))
+ val response = sendMetadataRequest(new MetadataRequest.Builder(Seq(topic2).asJava, false).build)
+ assertEquals(1, response.topicMetadata.size)
+ val topicMetadata = response.topicMetadata.asScala.head
+ validateLeaderAndReplicas(topicMetadata, 0, 1, Set(1, 2))
+ validateLeaderAndReplicas(topicMetadata, 1, 2, Set(0, 2))
+ validateLeaderAndReplicas(topicMetadata, 2, 3, Set(1, 3))
}
- def validateLeaderAndReplicas(metadata: TopicMetadata, partitionId: Int, expectedLeaderId: Int, expectedReplicas: Set[Int]) = {
- val partitionOpt = metadata.partitionsMetadata.find(_.partitionId == partitionId)
+ def validateLeaderAndReplicas(metadata: TopicMetadata, partitionId: Int, expectedLeaderId: Int,
+ expectedReplicas: Set[Int]): Unit = {
+ val partitionOpt = metadata.partitionMetadata.asScala.find(_.partition == partitionId)
assertTrue(s"Partition $partitionId should exist", partitionOpt.isDefined)
val partition = partitionOpt.get
- assertTrue("Partition leader should exist", partition.leader.isDefined)
- assertEquals("Partition leader id should match", expectedLeaderId, partition.leader.get.id)
+ assertNotNull("Partition leader should exist", partition.leader)
+ assertEquals("Partition leader id should match", expectedLeaderId, partition.leaderId)
+ assertEquals("Replica set should match", expectedReplicas, partition.replicas.asScala.map(_.id).toSet)
+ }
- assertEquals("Replica set should match", expectedReplicas, partition.replicas.map(_.id).toSet)
+ private def sendMetadataRequest(request: MetadataRequest, destination: Option[SocketServer] = None): MetadataResponse = {
+ val response = connectAndSend(request, ApiKeys.METADATA, destination = destination.getOrElse(anySocketServer))
+ MetadataResponse.parse(response, request.version)
}
}
diff --git a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
deleted file mode 100644
index 87ffdf1..0000000
--- a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
+++ /dev/null
@@ -1,289 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package kafka.integration
-
-import kafka.api.TopicMetadataResponse
-import kafka.client.ClientUtils
-import kafka.cluster.BrokerEndPoint
-import kafka.server.{KafkaConfig, KafkaServer, NotRunning}
-import kafka.utils.TestUtils
-import kafka.utils.TestUtils._
-import kafka.zk.ZooKeeperTestHarness
-import org.apache.kafka.common.protocol.Errors
-import org.junit.Assert._
-import org.junit.{Test, After, Before}
-
-class TopicMetadataTest extends ZooKeeperTestHarness {
- private var server1: KafkaServer = null
- private var adHocServers: Seq[KafkaServer] = Seq()
- var brokerEndPoints: Seq[BrokerEndPoint] = null
- var adHocConfigs: Seq[KafkaConfig] = null
- val numConfigs: Int = 4
-
- @Before
- override def setUp() {
- super.setUp()
- val props = createBrokerConfigs(numConfigs, zkConnect)
- val configs: Seq[KafkaConfig] = props.map(KafkaConfig.fromProps)
- adHocConfigs = configs.takeRight(configs.size - 1) // Started and stopped by individual test cases
- server1 = TestUtils.createServer(configs.head)
- brokerEndPoints = Seq(
- // We are using the Scala clients and they don't support SSL. Once we move to the Java ones, we should use
- // `securityProtocol` instead of PLAINTEXT below
- new BrokerEndPoint(server1.config.brokerId, server1.config.hostName, TestUtils.boundPort(server1))
- )
- }
-
- @After
- override def tearDown() {
- TestUtils.shutdownServers(adHocServers :+ server1)
- super.tearDown()
- }
-
- @Test
- def testBasicTopicMetadata(): Unit = {
- // create topic
- val topic = "test"
- createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 1, servers = Seq(server1))
-
- val topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), brokerEndPoints, "TopicMetadataTest-testBasicTopicMetadata",
- 2000, 0).topicsMetadata
- assertEquals(Errors.NONE, topicsMetadata.head.error)
- assertEquals(Errors.NONE, topicsMetadata.head.partitionsMetadata.head.error)
- assertEquals("Expecting metadata only for 1 topic", 1, topicsMetadata.size)
- assertEquals("Expecting metadata for the test topic", "test", topicsMetadata.head.topic)
- val partitionMetadata = topicsMetadata.head.partitionsMetadata
- assertEquals("Expecting metadata for 1 partition", 1, partitionMetadata.size)
- assertEquals("Expecting partition id to be 0", 0, partitionMetadata.head.partitionId)
- assertEquals(1, partitionMetadata.head.replicas.size)
- }
-
- @Test
- def testGetAllTopicMetadata(): Unit = {
- // create topic
- val topic1 = "testGetAllTopicMetadata1"
- val topic2 = "testGetAllTopicMetadata2"
- createTopic(zkClient, topic1, numPartitions = 1, replicationFactor = 1, servers = Seq(server1))
- createTopic(zkClient, topic2, numPartitions = 1, replicationFactor = 1, servers = Seq(server1))
-
- // issue metadata request with empty list of topics
- val topicsMetadata = ClientUtils.fetchTopicMetadata(Set.empty, brokerEndPoints, "TopicMetadataTest-testGetAllTopicMetadata",
- 2000, 0).topicsMetadata
- assertEquals(Errors.NONE, topicsMetadata.head.error)
- assertEquals(2, topicsMetadata.size)
- assertEquals(Errors.NONE, topicsMetadata.head.partitionsMetadata.head.error)
- assertEquals(Errors.NONE, topicsMetadata.last.partitionsMetadata.head.error)
- val partitionMetadataTopic1 = topicsMetadata.head.partitionsMetadata
- val partitionMetadataTopic2 = topicsMetadata.last.partitionsMetadata
- assertEquals("Expecting metadata for 1 partition", 1, partitionMetadataTopic1.size)
- assertEquals("Expecting partition id to be 0", 0, partitionMetadataTopic1.head.partitionId)
- assertEquals(1, partitionMetadataTopic1.head.replicas.size)
- assertEquals("Expecting metadata for 1 partition", 1, partitionMetadataTopic2.size)
- assertEquals("Expecting partition id to be 0", 0, partitionMetadataTopic2.head.partitionId)
- assertEquals(1, partitionMetadataTopic2.head.replicas.size)
- }
-
- @Test
- def testAutoCreateTopic(): Unit = {
- // auto create topic
- val topic = "testAutoCreateTopic"
- var topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), brokerEndPoints, "TopicMetadataTest-testAutoCreateTopic",
- 2000,0).topicsMetadata
- assertEquals(Errors.LEADER_NOT_AVAILABLE, topicsMetadata.head.error)
- assertEquals("Expecting metadata only for 1 topic", 1, topicsMetadata.size)
- assertEquals("Expecting metadata for the test topic", topic, topicsMetadata.head.topic)
- assertEquals(0, topicsMetadata.head.partitionsMetadata.size)
-
- // wait for leader to be elected
- TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
- TestUtils.waitUntilMetadataIsPropagated(Seq(server1), topic, 0)
-
- // retry the metadata for the auto created topic
- topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), brokerEndPoints, "TopicMetadataTest-testBasicTopicMetadata",
- 2000,0).topicsMetadata
- assertEquals(Errors.NONE, topicsMetadata.head.error)
- assertEquals(Errors.NONE, topicsMetadata.head.partitionsMetadata.head.error)
- val partitionMetadata = topicsMetadata.head.partitionsMetadata
- assertEquals("Expecting metadata for 1 partition", 1, partitionMetadata.size)
- assertEquals("Expecting partition id to be 0", 0, partitionMetadata.head.partitionId)
- assertEquals(1, partitionMetadata.head.replicas.size)
- assertTrue(partitionMetadata.head.leader.isDefined)
- }
-
- @Test
- def testAutoCreateTopicWithInvalidReplication(): Unit = {
- val adHocProps = createBrokerConfig(2, zkConnect)
- // Set default replication higher than the number of live brokers
- adHocProps.setProperty(KafkaConfig.DefaultReplicationFactorProp, "3")
- // start adHoc brokers with replication factor too high
- val adHocServer = createServer(new KafkaConfig(adHocProps))
- adHocServers = Seq(adHocServer)
- // We are using the Scala clients and they don't support SSL. Once we move to the Java ones, we should use
- // `securityProtocol` instead of PLAINTEXT below
- val adHocEndpoint = new BrokerEndPoint(adHocServer.config.brokerId, adHocServer.config.hostName,
- TestUtils.boundPort(adHocServer))
-
- // auto create topic on "bad" endpoint
- val topic = "testAutoCreateTopic"
- val topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), Seq(adHocEndpoint), "TopicMetadataTest-testAutoCreateTopic",
- 2000, 0).topicsMetadata
- assertEquals(Errors.INVALID_REPLICATION_FACTOR, topicsMetadata.head.error)
- assertEquals("Expecting metadata only for 1 topic", 1, topicsMetadata.size)
- assertEquals("Expecting metadata for the test topic", topic, topicsMetadata.head.topic)
- assertEquals(0, topicsMetadata.head.partitionsMetadata.size)
- }
-
- @Test
- def testAutoCreateTopicWithCollision(): Unit = {
- // auto create topic
- val topic1 = "testAutoCreate_Topic"
- val topic2 = "testAutoCreate.Topic"
- var topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic1, topic2), brokerEndPoints, "TopicMetadataTest-testAutoCreateTopic",
- 2000, 0).topicsMetadata
- assertEquals("Expecting metadata for 2 topics", 2, topicsMetadata.size)
- assertEquals("Expecting metadata for topic1", topic1, topicsMetadata.head.topic)
- assertEquals(Errors.LEADER_NOT_AVAILABLE, topicsMetadata.head.error)
- assertEquals("Expecting metadata for topic2", topic2, topicsMetadata(1).topic)
- assertEquals("Expecting InvalidTopicCode for topic2 metadata", Errors.INVALID_TOPIC_EXCEPTION, topicsMetadata(1).error)
-
- // wait for leader to be elected
- TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic1, 0)
- TestUtils.waitUntilMetadataIsPropagated(Seq(server1), topic1, 0)
-
- // retry the metadata for the first auto created topic
- topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic1), brokerEndPoints, "TopicMetadataTest-testBasicTopicMetadata",
- 2000, 0).topicsMetadata
- assertEquals(Errors.NONE, topicsMetadata.head.error)
- assertEquals(Errors.NONE, topicsMetadata.head.partitionsMetadata.head.error)
- val partitionMetadata = topicsMetadata.head.partitionsMetadata
- assertEquals("Expecting metadata for 1 partition", 1, partitionMetadata.size)
- assertEquals("Expecting partition id to be 0", 0, partitionMetadata.head.partitionId)
- assertEquals(1, partitionMetadata.head.replicas.size)
- assertTrue(partitionMetadata.head.leader.isDefined)
- }
-
- private def checkIsr(servers: Seq[KafkaServer]): Unit = {
- val activeBrokers: Seq[KafkaServer] = servers.filter(x => x.brokerState.currentState != NotRunning.state)
- val expectedIsr: Seq[BrokerEndPoint] = activeBrokers.map { x =>
- new BrokerEndPoint(x.config.brokerId,
- if (x.config.hostName.nonEmpty) x.config.hostName else "localhost",
- TestUtils.boundPort(x))
- }
-
- // Assert that topic metadata at new brokers is updated correctly
- activeBrokers.foreach(x => {
- var metadata: TopicMetadataResponse = new TopicMetadataResponse(Seq(), Seq(), -1)
- waitUntilTrue(() => {
- metadata = ClientUtils.fetchTopicMetadata(Set.empty,
- Seq(new BrokerEndPoint(x.config.brokerId,
- if (x.config.hostName.nonEmpty) x.config.hostName else "localhost",
- TestUtils.boundPort(x))),
- "TopicMetadataTest-testBasicTopicMetadata", 2000, 0)
- metadata.topicsMetadata.nonEmpty &&
- metadata.topicsMetadata.head.partitionsMetadata.nonEmpty &&
- expectedIsr.sortBy(_.id) == metadata.topicsMetadata.head.partitionsMetadata.head.isr.sortBy(_.id)
- },
- "Topic metadata is not correctly updated for broker " + x + ".\n" +
- "Expected ISR: " + expectedIsr + "\n" +
- "Actual ISR : " + (if (metadata.topicsMetadata.nonEmpty &&
- metadata.topicsMetadata.head.partitionsMetadata.nonEmpty)
- metadata.topicsMetadata.head.partitionsMetadata.head.isr
- else
- ""), 8000L)
- })
- }
-
- @Test
- def testIsrAfterBrokerShutDownAndJoinsBack(): Unit = {
- val numBrokers = 2 //just 2 brokers are enough for the test
-
- // start adHoc brokers
- adHocServers = adHocConfigs.take(numBrokers - 1).map(p => createServer(p))
- val allServers: Seq[KafkaServer] = Seq(server1) ++ adHocServers
-
- // create topic
- val topic: String = "test"
- adminZkClient.createTopic(topic, 1, numBrokers)
-
- // shutdown a broker
- adHocServers.last.shutdown()
- adHocServers.last.awaitShutdown()
-
- // startup a broker
- adHocServers.last.startup()
-
- // check metadata is still correct and updated at all brokers
- checkIsr(allServers)
- }
-
- private def checkMetadata(servers: Seq[KafkaServer], expectedBrokersCount: Int): Unit = {
- var topicMetadata: TopicMetadataResponse = new TopicMetadataResponse(Seq(), Seq(), -1)
-
- // Get topic metadata from old broker
- // Wait for metadata to get updated by checking metadata from a new broker
- waitUntilTrue(() => {
- topicMetadata = ClientUtils.fetchTopicMetadata(
- Set.empty, brokerEndPoints, "TopicMetadataTest-testBasicTopicMetadata", 2000, 0)
- topicMetadata.brokers.size == expectedBrokersCount},
- "Alive brokers list is not correctly propagated by coordinator to brokers"
- )
-
- // Assert that topic metadata at new brokers is updated correctly
- servers.filter(x => x.brokerState.currentState != NotRunning.state).foreach(x =>
- waitUntilTrue(() => {
- val foundMetadata = ClientUtils.fetchTopicMetadata(
- Set.empty,
- Seq(new BrokerEndPoint(x.config.brokerId, x.config.hostName, TestUtils.boundPort(x))),
- "TopicMetadataTest-testBasicTopicMetadata", 2000, 0)
- topicMetadata.brokers.sortBy(_.id) == foundMetadata.brokers.sortBy(_.id) &&
- topicMetadata.topicsMetadata.sortBy(_.topic) == foundMetadata.topicsMetadata.sortBy(_.topic)
- },
- s"Topic metadata is not correctly updated"))
- }
-
- @Test
- def testAliveBrokerListWithNoTopics(): Unit = {
- checkMetadata(Seq(server1), 1)
- }
-
- @Test
- def testAliveBrokersListWithNoTopicsAfterNewBrokerStartup(): Unit = {
- adHocServers = adHocConfigs.takeRight(adHocConfigs.size - 1).map(p => createServer(p))
-
- checkMetadata(adHocServers, numConfigs - 1)
-
- // Add a broker
- adHocServers = adHocServers ++ Seq(createServer(adHocConfigs.head))
-
- checkMetadata(adHocServers, numConfigs)
- }
-
-
- @Test
- def testAliveBrokersListWithNoTopicsAfterABrokerShutdown(): Unit = {
- adHocServers = adHocConfigs.map(p => createServer(p))
-
- checkMetadata(adHocServers, numConfigs)
-
- // Shutdown a broker
- adHocServers.last.shutdown()
- adHocServers.last.awaitShutdown()
-
- checkMetadata(adHocServers, numConfigs - 1)
- }
-}
diff --git a/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala b/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
index d4c3e7c..6b61381 100644
--- a/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
@@ -21,6 +21,7 @@ import java.util.Properties
import kafka.network.SocketServer
import kafka.utils.TestUtils
+import org.apache.kafka.common.Node
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.{MetadataRequest, MetadataResponse}
@@ -33,6 +34,7 @@ import scala.collection.JavaConverters._
class MetadataRequestTest extends BaseRequestTest {
override def propertyOverrides(properties: Properties) {
+ properties.setProperty(KafkaConfig.DefaultReplicationFactorProp, "2")
properties.setProperty(KafkaConfig.RackProp, s"rack/${properties.getProperty(KafkaConfig.BrokerIdProp)}")
}
@@ -145,6 +147,49 @@ class MetadataRequestTest extends BaseRequestTest {
}
@Test
+ def testAutoCreateTopicWithInvalidReplicationFactor(): Unit = {
+ // Shutdown all but one broker so that the number of brokers is less than the default replication factor
+ servers.tail.foreach(_.shutdown())
+ servers.tail.foreach(_.awaitShutdown())
+
+ val topic1 = "testAutoCreateTopic"
+ val response1 = sendMetadataRequest(new MetadataRequest.Builder(Seq(topic1).asJava, true).build)
+ assertEquals(1, response1.topicMetadata.size)
+ val topicMetadata = response1.topicMetadata.asScala.head
+ assertEquals(Errors.INVALID_REPLICATION_FACTOR, topicMetadata.error)
+ assertEquals(topic1, topicMetadata.topic)
+ assertEquals(0, topicMetadata.partitionMetadata.size)
+ }
+
+ @Test
+ def testAutoCreateOfCollidingTopics(): Unit = {
+ val topic1 = "testAutoCreate_Topic"
+ val topic2 = "testAutoCreate.Topic"
+ val response1 = sendMetadataRequest(new MetadataRequest.Builder(Seq(topic1, topic2).asJava, true).build)
+ assertEquals(2, response1.topicMetadata.size)
+ var topicMetadata1 = response1.topicMetadata.asScala.head
+ val topicMetadata2 = response1.topicMetadata.asScala.toSeq(1)
+ assertEquals(Errors.LEADER_NOT_AVAILABLE, topicMetadata1.error)
+ assertEquals(topic1, topicMetadata1.topic)
+ assertEquals(Errors.INVALID_TOPIC_EXCEPTION, topicMetadata2.error)
+ assertEquals(topic2, topicMetadata2.topic)
+
+ TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic1, 0)
+ TestUtils.waitUntilMetadataIsPropagated(servers, topic1, 0)
+
+ // retry the metadata for the first auto created topic
+ val response2 = sendMetadataRequest(new MetadataRequest.Builder(Seq(topic1).asJava, true).build)
+ topicMetadata1 = response2.topicMetadata.asScala.head
+ assertEquals(Errors.NONE, topicMetadata1.error)
+ assertEquals(Seq(Errors.NONE), topicMetadata1.partitionMetadata.asScala.map(_.error))
+ assertEquals(1, topicMetadata1.partitionMetadata.size)
+ val partitionMetadata = topicMetadata1.partitionMetadata.asScala.head
+ assertEquals(0, partitionMetadata.partition)
+ assertEquals(2, partitionMetadata.replicas.size)
+ assertNotNull(partitionMetadata.leader)
+ }
+
+ @Test
def testAllTopicsRequest() {
// create some topics
createTopic("t1", 3, 2)
@@ -235,8 +280,80 @@ class MetadataRequestTest extends BaseRequestTest {
assertEquals(s"Response should have $replicaCount replicas", replicaCount, v1PartitionMetadata.replicas.size)
}
+ @Test
+ def testIsrAfterBrokerShutDownAndJoinsBack(): Unit = {
+ def checkIsr(servers: Seq[KafkaServer], topic: String): Unit = {
+ val activeBrokers = servers.filter(_.brokerState.currentState != NotRunning.state)
+ val expectedIsr = activeBrokers.map { broker =>
+ new Node(broker.config.brokerId, "localhost", TestUtils.boundPort(broker), broker.config.rack.orNull)
+ }.sortBy(_.id)
+
+ // Assert that topic metadata at new brokers is updated correctly
+ activeBrokers.foreach { broker =>
+ var actualIsr: Seq[Node] = Seq.empty
+ TestUtils.waitUntilTrue(() => {
+ val metadataResponse = sendMetadataRequest(new MetadataRequest.Builder(Seq(topic).asJava, false).build,
+ Some(brokerSocketServer(broker.config.brokerId)))
+ val firstPartitionMetadata = metadataResponse.topicMetadata.asScala.headOption.flatMap(_.partitionMetadata.asScala.headOption)
+ actualIsr = firstPartitionMetadata.map { partitionMetadata =>
+ partitionMetadata.isr.asScala.sortBy(_.id)
+ }.getOrElse(Seq.empty)
+ expectedIsr == actualIsr
+ }, s"Topic metadata not updated correctly in broker $broker\n" +
+ s"Expected ISR: $expectedIsr \n" +
+ s"Actual ISR : $actualIsr")
+ }
+ }
+
+ val topic = "isr-after-broker-shutdown"
+ val replicaCount = 3
+ createTopic(topic, 1, replicaCount)
+
+ servers.last.shutdown()
+ servers.last.awaitShutdown()
+ servers.last.startup()
+
+ checkIsr(servers, topic)
+ }
+
+ @Test
+ def testAliveBrokersWithNoTopics(): Unit = {
+ def checkMetadata(servers: Seq[KafkaServer], expectedBrokersCount: Int): Unit = {
+ var controllerMetadataResponse: Option[MetadataResponse] = None
+ TestUtils.waitUntilTrue(() => {
+ val metadataResponse = sendMetadataRequest(MetadataRequest.Builder.allTopics.build,
+ Some(controllerSocketServer))
+ controllerMetadataResponse = Some(metadataResponse)
+ metadataResponse.brokers.size == expectedBrokersCount
+ }, s"Expected $expectedBrokersCount brokers, but there are ${controllerMetadataResponse.get.brokers.size} " +
+ "according to the Controller")
+
+ val brokersInController = controllerMetadataResponse.get.brokers.asScala.toSeq.sortBy(_.id)
+
+ // Assert that metadata is propagated correctly
+ servers.filter(_.brokerState.currentState != NotRunning.state).foreach { broker =>
+ TestUtils.waitUntilTrue(() => {
+ val metadataResponse = sendMetadataRequest(MetadataRequest.Builder.allTopics.build,
+ Some(brokerSocketServer(broker.config.brokerId)))
+ val brokers = metadataResponse.brokers.asScala.toSeq.sortBy(_.id)
+ val topicMetadata = metadataResponse.topicMetadata.asScala.toSeq.sortBy(_.topic)
+ brokersInController == brokers && metadataResponse.topicMetadata.asScala.toSeq.sortBy(_.topic) == topicMetadata
+ }, s"Topic metadata not updated correctly")
+ }
+ }
+
+ val serverToShutdown = servers.filterNot(_.kafkaController.isActive).last
+ serverToShutdown.shutdown()
+ serverToShutdown.awaitShutdown()
+ checkMetadata(servers, servers.size - 1)
+
+ serverToShutdown.startup()
+ checkMetadata(servers, servers.size)
+ }
+
private def sendMetadataRequest(request: MetadataRequest, destination: Option[SocketServer] = None): MetadataResponse = {
val response = connectAndSend(request, ApiKeys.METADATA, destination = destination.getOrElse(anySocketServer))
MetadataResponse.parse(response, request.version)
}
+
}
--
To stop receiving notification emails like this one, please contact
ijuma@apache.org.