You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2018/06/14 03:34:56 UTC

[kafka] branch trunk updated: MINOR: Replace test usages of ClientUtils.fetchTopicMetadata with BaseRequestTest (#5216)

This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0b591d7  MINOR: Replace test usages of ClientUtils.fetchTopicMetadata with BaseRequestTest (#5216)
0b591d7 is described below

commit 0b591d703b574165fccf3c3a34f984beea140992
Author: Ismael Juma <is...@juma.me.uk>
AuthorDate: Wed Jun 13 20:33:38 2018 -0700

    MINOR: Replace test usages of ClientUtils.fetchTopicMetadata with BaseRequestTest (#5216)
    
    For tests that are not testing the old consumers functionality. As part of this,
    consolidate `TopicMetadataTest` into `MetadataRequestTest`. Finally,
    remove `ProducerBounceTest` which has no tests left in it.
    
    Reviewers: Jason Gustafson <ja...@confluent.io>
---
 .../integration/kafka/api/ProducerBounceTest.scala | 116 ---------
 .../scala/unit/kafka/admin/AddPartitionsTest.scala | 136 +++++-----
 .../unit/kafka/integration/TopicMetadataTest.scala | 289 ---------------------
 .../unit/kafka/server/MetadataRequestTest.scala    | 117 +++++++++
 4 files changed, 177 insertions(+), 481 deletions(-)

diff --git a/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
deleted file mode 100644
index a11afd3..0000000
--- a/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
+++ /dev/null
@@ -1,116 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-
-package kafka.api
-
-import java.util.Properties
-import java.util.concurrent.Future
-
-import kafka.integration.KafkaServerTestHarness
-import kafka.server.KafkaConfig
-import kafka.utils.{ShutdownableThread, TestUtils}
-import kafka.utils.Implicits._
-import org.apache.kafka.clients.producer._
-import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
-import org.apache.kafka.common.security.auth.SecurityProtocol
-import org.apache.kafka.common.serialization.StringDeserializer
-import org.junit.Assert._
-import org.junit.{Ignore, Test}
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable.ArrayBuffer
-
-class ProducerBounceTest extends KafkaServerTestHarness {
-  private val producerBufferSize =  65536
-  private val serverMessageMaxBytes =  producerBufferSize/2
-
-  val numServers = 4
-
-  val overridingProps = new Properties()
-  overridingProps.put(KafkaConfig.AutoCreateTopicsEnableProp, "false")
-  overridingProps.put(KafkaConfig.MessageMaxBytesProp, serverMessageMaxBytes.toString)
-  // Set a smaller value for the number of partitions for the offset commit topic (__consumer_offset topic)
-  // so that the creation of that topic/partition(s) and subsequent leader assignment doesn't take relatively long
-  overridingProps.put(KafkaConfig.OffsetsTopicPartitionsProp, "1")
-  overridingProps.put(KafkaConfig.ControlledShutdownEnableProp, "true")
-  overridingProps.put(KafkaConfig.UncleanLeaderElectionEnableProp, "false")
-  overridingProps.put(KafkaConfig.AutoLeaderRebalanceEnableProp, "false")
-  // This is the one of the few tests we currently allow to preallocate ports, despite the fact that this can result in transient
-  // failures due to ports getting reused. We can't use random ports because of bad behavior that can result from bouncing
-  // brokers too quickly when they get new, random ports. If we're not careful, the client can end up in a situation
-  // where metadata is not refreshed quickly enough, and by the time it's actually trying to, all the servers have
-  // been bounced and have new addresses. None of the bootstrap nodes or current metadata can get them connected to a
-  // running server.
-  //
-  // Since such quick rotation of servers is incredibly unrealistic, we allow this one test to preallocate ports, leaving
-  // a small risk of hitting errors due to port conflicts. Hopefully this is infrequent enough to not cause problems.
-  override def generateConfigs = {
-    FixedPortTestUtils.createBrokerConfigs(numServers, zkConnect,enableControlledShutdown = true)
-      .map(KafkaConfig.fromProps(_, overridingProps))
-  }
-
-  private val topic1 = "topic-1"
-
-  private class ProducerScheduler extends ShutdownableThread("daemon-producer", false) {
-    val numRecords = 1000
-    var sent = 0
-    var failed = false
-
-    val producerConfig = new Properties()
-    producerConfig.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true")
-    producerConfig.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5")
-    val producerConfigWithCompression = new Properties()
-    producerConfigWithCompression ++= producerConfig
-    producerConfigWithCompression.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4")
-    val producers = List(
-      TestUtils.createNewProducer(brokerList, bufferSize = producerBufferSize / 4, retries = 10, props = Some(producerConfig)),
-      TestUtils.createNewProducer(brokerList, bufferSize = producerBufferSize / 2, retries = 10, lingerMs = 5000, props = Some(producerConfig)),
-      TestUtils.createNewProducer(brokerList, bufferSize = producerBufferSize, retries = 10, lingerMs = 10000, props = Some(producerConfigWithCompression))
-    )
-
-    override def doWork(): Unit = {
-      info("Starting to send messages..")
-      var producerId = 0
-      val responses = new ArrayBuffer[IndexedSeq[Future[RecordMetadata]]]()
-      for (producer <- producers) {
-        val response =
-          for (i <- sent+1 to sent+numRecords)
-            yield producer.send(new ProducerRecord[Array[Byte],Array[Byte]](topic1, null, null, ((producerId + 1) * i).toString.getBytes),
-              new ErrorLoggingCallback(topic1, null, null, true))
-        responses.append(response)
-        producerId += 1
-      }
-
-      try {
-        for (response <- responses) {
-          val futures = response.toList
-          futures.map(_.get)
-          sent += numRecords
-        }
-        info(s"Sent $sent records")
-      } catch {
-        case e : Exception =>
-          error(s"Got exception ${e.getMessage}")
-          e.printStackTrace()
-          failed = true
-      }
-    }
-
-    override def shutdown(){
-      super.shutdown()
-      for (producer <- producers) {
-        producer.close()
-      }
-    }
-  }
-}
diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
index 43d8ec8..4d1e4ab 100755
--- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
@@ -17,24 +17,23 @@
 
 package kafka.admin
 
-import kafka.api.TopicMetadata
+import kafka.network.SocketServer
 import org.junit.Assert._
-import kafka.zk.ZooKeeperTestHarness
 import kafka.utils.TestUtils._
 import kafka.utils.TestUtils
-import kafka.cluster.Broker
-import kafka.client.ClientUtils
-import kafka.server.{KafkaConfig, KafkaServer}
+import kafka.server.BaseRequestTest
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.InvalidReplicaAssignmentException
-import org.apache.kafka.common.network.ListenerName
-import org.apache.kafka.common.security.auth.SecurityProtocol
-import org.junit.{After, Before, Test}
+import org.apache.kafka.common.protocol.ApiKeys
+import org.apache.kafka.common.requests.MetadataResponse.TopicMetadata
+import org.apache.kafka.common.requests.{MetadataRequest, MetadataResponse}
+import org.junit.{Before, Test}
 
-class AddPartitionsTest extends ZooKeeperTestHarness {
-  var configs: Seq[KafkaConfig] = null
-  var servers: Seq[KafkaServer] = Seq.empty[KafkaServer]
-  var brokers: Seq[Broker] = Seq.empty[Broker]
+import scala.collection.JavaConverters._
+
+class AddPartitionsTest extends BaseRequestTest {
+
+  protected override def numBrokers: Int = 4
 
   val partitionId = 0
 
@@ -53,22 +52,10 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
   override def setUp() {
     super.setUp()
 
-    configs = (0 until 4).map(i => KafkaConfig.fromProps(TestUtils.createBrokerConfig(i, zkConnect, enableControlledShutdown = false)))
-    // start all the servers
-    servers = configs.map(c => TestUtils.createServer(c))
-    brokers = servers.map(s => TestUtils.createBroker(s.config.brokerId, s.config.hostName, TestUtils.boundPort(s)))
-
-    // create topics first
-    createTopic(zkClient, topic1, partitionReplicaAssignment = topic1Assignment, servers = servers)
-    createTopic(zkClient, topic2, partitionReplicaAssignment = topic2Assignment, servers = servers)
-    createTopic(zkClient, topic3, partitionReplicaAssignment = topic3Assignment, servers = servers)
-    createTopic(zkClient, topic4, partitionReplicaAssignment = topic4Assignment, servers = servers)
-  }
-
-  @After
-  override def tearDown() {
-    TestUtils.shutdownServers(servers)
-    super.tearDown()
+    createTopic(topic1, partitionReplicaAssignment = topic1Assignment)
+    createTopic(topic2, partitionReplicaAssignment = topic2Assignment)
+    createTopic(topic3, partitionReplicaAssignment = topic3Assignment)
+    createTopic(topic4, partitionReplicaAssignment = topic4Assignment)
   }
 
   @Test
@@ -108,17 +95,15 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
     // read metadata from a broker and verify the new topic partitions exist
     TestUtils.waitUntilMetadataIsPropagated(servers, topic1, 1)
     TestUtils.waitUntilMetadataIsPropagated(servers, topic1, 2)
-    val listenerName = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)
-    val metadata = ClientUtils.fetchTopicMetadata(Set(topic1), brokers.map(_.brokerEndPoint(listenerName)),
-      "AddPartitionsTest-testIncrementPartitions", 2000, 0).topicsMetadata
-    val metaDataForTopic1 = metadata.filter(p => p.topic.equals(topic1))
-    val partitionDataForTopic1 = metaDataForTopic1.head.partitionsMetadata.sortBy(_.partitionId)
-    assertEquals(partitionDataForTopic1.size, 3)
-    assertEquals(partitionDataForTopic1(1).partitionId, 1)
-    assertEquals(partitionDataForTopic1(2).partitionId, 2)
-    val replicas = partitionDataForTopic1(1).replicas
+    val response = sendMetadataRequest(new MetadataRequest.Builder(Seq(topic1).asJava, false).build)
+    assertEquals(1, response.topicMetadata.size)
+    val partitions = response.topicMetadata.asScala.head.partitionMetadata.asScala.sortBy(_.partition)
+    assertEquals(partitions.size, 3)
+    assertEquals(1, partitions(1).partition)
+    assertEquals(2, partitions(2).partition)
+    val replicas = partitions(1).replicas
     assertEquals(replicas.size, 2)
-    assert(replicas.contains(partitionDataForTopic1(1).leader.get))
+    assertTrue(replicas.contains(partitions(1).leader))
   }
 
   @Test
@@ -137,18 +122,18 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
     // read metadata from a broker and verify the new topic partitions exist
     TestUtils.waitUntilMetadataIsPropagated(servers, topic2, 1)
     TestUtils.waitUntilMetadataIsPropagated(servers, topic2, 2)
-    val metadata = ClientUtils.fetchTopicMetadata(Set(topic2),
-      brokers.map(_.brokerEndPoint(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))),
-      "AddPartitionsTest-testManualAssignmentOfReplicas", 2000, 0).topicsMetadata
-    val metaDataForTopic2 = metadata.filter(_.topic == topic2)
-    val partitionDataForTopic2 = metaDataForTopic2.head.partitionsMetadata.sortBy(_.partitionId)
-    assertEquals(3, partitionDataForTopic2.size)
-    assertEquals(1, partitionDataForTopic2(1).partitionId)
-    assertEquals(2, partitionDataForTopic2(2).partitionId)
-    val replicas = partitionDataForTopic2(1).replicas
+    val response = sendMetadataRequest(new MetadataRequest.Builder(Seq(topic2).asJava, false).build)
+    assertEquals(1, response.topicMetadata.size)
+    val topicMetadata = response.topicMetadata.asScala.head
+    val partitionMetadata = topicMetadata.partitionMetadata.asScala.sortBy(_.partition)
+    assertEquals(3, topicMetadata.partitionMetadata.size)
+    assertEquals(0, partitionMetadata(0).partition)
+    assertEquals(1, partitionMetadata(1).partition)
+    assertEquals(2, partitionMetadata(2).partition)
+    val replicas = topicMetadata.partitionMetadata.get(1).replicas
     assertEquals(2, replicas.size)
-    assertTrue(replicas.head.id == 0 || replicas.head.id == 1)
-    assertTrue(replicas(1).id == 0 || replicas(1).id == 1)
+    assertTrue(replicas.asScala.head.id == 0 || replicas.asScala.head.id == 1)
+    assertTrue(replicas.asScala(1).id == 0 || replicas.asScala(1).id == 1)
   }
 
   @Test
@@ -163,19 +148,16 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
     TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 5)
     TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 6)
 
-    val metadata = ClientUtils.fetchTopicMetadata(Set(topic3),
-      brokers.map(_.brokerEndPoint(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))),
-      "AddPartitionsTest-testReplicaPlacementAllServers", 2000, 0).topicsMetadata
-
-    val metaDataForTopic3 = metadata.find(p => p.topic == topic3).get
-
-    validateLeaderAndReplicas(metaDataForTopic3, 0, 2, Set(2, 3, 0, 1))
-    validateLeaderAndReplicas(metaDataForTopic3, 1, 3, Set(3, 2, 0, 1))
-    validateLeaderAndReplicas(metaDataForTopic3, 2, 0, Set(0, 3, 1, 2))
-    validateLeaderAndReplicas(metaDataForTopic3, 3, 1, Set(1, 0, 2, 3))
-    validateLeaderAndReplicas(metaDataForTopic3, 4, 2, Set(2, 3, 0, 1))
-    validateLeaderAndReplicas(metaDataForTopic3, 5, 3, Set(3, 0, 1, 2))
-    validateLeaderAndReplicas(metaDataForTopic3, 6, 0, Set(0, 1, 2, 3))
+    val response = sendMetadataRequest(new MetadataRequest.Builder(Seq(topic3).asJava, false).build)
+    assertEquals(1, response.topicMetadata.size)
+    val topicMetadata = response.topicMetadata.asScala.head
+    validateLeaderAndReplicas(topicMetadata, 0, 2, Set(2, 3, 0, 1))
+    validateLeaderAndReplicas(topicMetadata, 1, 3, Set(3, 2, 0, 1))
+    validateLeaderAndReplicas(topicMetadata, 2, 0, Set(0, 3, 1, 2))
+    validateLeaderAndReplicas(topicMetadata, 3, 1, Set(1, 0, 2, 3))
+    validateLeaderAndReplicas(topicMetadata, 4, 2, Set(2, 3, 0, 1))
+    validateLeaderAndReplicas(topicMetadata, 5, 3, Set(3, 0, 1, 2))
+    validateLeaderAndReplicas(topicMetadata, 6, 0, Set(0, 1, 2, 3))
   }
 
   @Test
@@ -186,25 +168,27 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
     TestUtils.waitUntilMetadataIsPropagated(servers, topic2, 1)
     TestUtils.waitUntilMetadataIsPropagated(servers, topic2, 2)
 
-    val metadata = ClientUtils.fetchTopicMetadata(Set(topic2),
-      brokers.map(_.brokerEndPoint(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))),
-      "AddPartitionsTest-testReplicaPlacementPartialServers", 2000, 0).topicsMetadata
-
-    val metaDataForTopic2 = metadata.find(p => p.topic == topic2).get
-
-    validateLeaderAndReplicas(metaDataForTopic2, 0, 1, Set(1, 2))
-    validateLeaderAndReplicas(metaDataForTopic2, 1, 2, Set(0, 2))
-    validateLeaderAndReplicas(metaDataForTopic2, 2, 3, Set(1, 3))
+    val response = sendMetadataRequest(new MetadataRequest.Builder(Seq(topic2).asJava, false).build)
+    assertEquals(1, response.topicMetadata.size)
+    val topicMetadata = response.topicMetadata.asScala.head
+    validateLeaderAndReplicas(topicMetadata, 0, 1, Set(1, 2))
+    validateLeaderAndReplicas(topicMetadata, 1, 2, Set(0, 2))
+    validateLeaderAndReplicas(topicMetadata, 2, 3, Set(1, 3))
   }
 
-  def validateLeaderAndReplicas(metadata: TopicMetadata, partitionId: Int, expectedLeaderId: Int, expectedReplicas: Set[Int]) = {
-    val partitionOpt = metadata.partitionsMetadata.find(_.partitionId == partitionId)
+  def validateLeaderAndReplicas(metadata: TopicMetadata, partitionId: Int, expectedLeaderId: Int,
+                                expectedReplicas: Set[Int]): Unit = {
+    val partitionOpt = metadata.partitionMetadata.asScala.find(_.partition == partitionId)
     assertTrue(s"Partition $partitionId should exist", partitionOpt.isDefined)
     val partition = partitionOpt.get
 
-    assertTrue("Partition leader should exist", partition.leader.isDefined)
-    assertEquals("Partition leader id should match", expectedLeaderId, partition.leader.get.id)
+    assertNotNull("Partition leader should exist", partition.leader)
+    assertEquals("Partition leader id should match", expectedLeaderId, partition.leaderId)
+    assertEquals("Replica set should match", expectedReplicas, partition.replicas.asScala.map(_.id).toSet)
+  }
 
-    assertEquals("Replica set should match", expectedReplicas, partition.replicas.map(_.id).toSet)
+  private def sendMetadataRequest(request: MetadataRequest, destination: Option[SocketServer] = None): MetadataResponse = {
+    val response = connectAndSend(request, ApiKeys.METADATA, destination = destination.getOrElse(anySocketServer))
+    MetadataResponse.parse(response, request.version)
   }
 }
diff --git a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
deleted file mode 100644
index 87ffdf1..0000000
--- a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
+++ /dev/null
@@ -1,289 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package kafka.integration
-
-import kafka.api.TopicMetadataResponse
-import kafka.client.ClientUtils
-import kafka.cluster.BrokerEndPoint
-import kafka.server.{KafkaConfig, KafkaServer, NotRunning}
-import kafka.utils.TestUtils
-import kafka.utils.TestUtils._
-import kafka.zk.ZooKeeperTestHarness
-import org.apache.kafka.common.protocol.Errors
-import org.junit.Assert._
-import org.junit.{Test, After, Before}
-
-class TopicMetadataTest extends ZooKeeperTestHarness {
-  private var server1: KafkaServer = null
-  private var adHocServers: Seq[KafkaServer] = Seq()
-  var brokerEndPoints: Seq[BrokerEndPoint] = null
-  var adHocConfigs: Seq[KafkaConfig] = null
-  val numConfigs: Int = 4
-
-  @Before
-  override def setUp() {
-    super.setUp()
-    val props = createBrokerConfigs(numConfigs, zkConnect)
-    val configs: Seq[KafkaConfig] = props.map(KafkaConfig.fromProps)
-    adHocConfigs = configs.takeRight(configs.size - 1) // Started and stopped by individual test cases
-    server1 = TestUtils.createServer(configs.head)
-    brokerEndPoints = Seq(
-      // We are using the Scala clients and they don't support SSL. Once we move to the Java ones, we should use
-      // `securityProtocol` instead of PLAINTEXT below
-      new BrokerEndPoint(server1.config.brokerId, server1.config.hostName, TestUtils.boundPort(server1))
-    )
-  }
-
-  @After
-  override def tearDown() {
-    TestUtils.shutdownServers(adHocServers :+ server1)
-    super.tearDown()
-  }
-
-  @Test
-  def testBasicTopicMetadata(): Unit = {
-    // create topic
-    val topic = "test"
-    createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 1, servers = Seq(server1))
-
-    val topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), brokerEndPoints, "TopicMetadataTest-testBasicTopicMetadata",
-      2000, 0).topicsMetadata
-    assertEquals(Errors.NONE, topicsMetadata.head.error)
-    assertEquals(Errors.NONE, topicsMetadata.head.partitionsMetadata.head.error)
-    assertEquals("Expecting metadata only for 1 topic", 1, topicsMetadata.size)
-    assertEquals("Expecting metadata for the test topic", "test", topicsMetadata.head.topic)
-    val partitionMetadata = topicsMetadata.head.partitionsMetadata
-    assertEquals("Expecting metadata for 1 partition", 1, partitionMetadata.size)
-    assertEquals("Expecting partition id to be 0", 0, partitionMetadata.head.partitionId)
-    assertEquals(1, partitionMetadata.head.replicas.size)
-  }
-
-  @Test
-  def testGetAllTopicMetadata(): Unit = {
-    // create topic
-    val topic1 = "testGetAllTopicMetadata1"
-    val topic2 = "testGetAllTopicMetadata2"
-    createTopic(zkClient, topic1, numPartitions = 1, replicationFactor = 1, servers = Seq(server1))
-    createTopic(zkClient, topic2, numPartitions = 1, replicationFactor = 1, servers = Seq(server1))
-
-    // issue metadata request with empty list of topics
-    val topicsMetadata = ClientUtils.fetchTopicMetadata(Set.empty, brokerEndPoints, "TopicMetadataTest-testGetAllTopicMetadata",
-      2000, 0).topicsMetadata
-    assertEquals(Errors.NONE, topicsMetadata.head.error)
-    assertEquals(2, topicsMetadata.size)
-    assertEquals(Errors.NONE, topicsMetadata.head.partitionsMetadata.head.error)
-    assertEquals(Errors.NONE, topicsMetadata.last.partitionsMetadata.head.error)
-    val partitionMetadataTopic1 = topicsMetadata.head.partitionsMetadata
-    val partitionMetadataTopic2 = topicsMetadata.last.partitionsMetadata
-    assertEquals("Expecting metadata for 1 partition", 1, partitionMetadataTopic1.size)
-    assertEquals("Expecting partition id to be 0", 0, partitionMetadataTopic1.head.partitionId)
-    assertEquals(1, partitionMetadataTopic1.head.replicas.size)
-    assertEquals("Expecting metadata for 1 partition", 1, partitionMetadataTopic2.size)
-    assertEquals("Expecting partition id to be 0", 0, partitionMetadataTopic2.head.partitionId)
-    assertEquals(1, partitionMetadataTopic2.head.replicas.size)
-  }
-
-  @Test
-  def testAutoCreateTopic(): Unit = {
-    // auto create topic
-    val topic = "testAutoCreateTopic"
-    var topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), brokerEndPoints, "TopicMetadataTest-testAutoCreateTopic",
-      2000,0).topicsMetadata
-    assertEquals(Errors.LEADER_NOT_AVAILABLE, topicsMetadata.head.error)
-    assertEquals("Expecting metadata only for 1 topic", 1, topicsMetadata.size)
-    assertEquals("Expecting metadata for the test topic", topic, topicsMetadata.head.topic)
-    assertEquals(0, topicsMetadata.head.partitionsMetadata.size)
-
-    // wait for leader to be elected
-    TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
-    TestUtils.waitUntilMetadataIsPropagated(Seq(server1), topic, 0)
-
-    // retry the metadata for the auto created topic
-    topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), brokerEndPoints, "TopicMetadataTest-testBasicTopicMetadata",
-      2000,0).topicsMetadata
-    assertEquals(Errors.NONE, topicsMetadata.head.error)
-    assertEquals(Errors.NONE, topicsMetadata.head.partitionsMetadata.head.error)
-    val partitionMetadata = topicsMetadata.head.partitionsMetadata
-    assertEquals("Expecting metadata for 1 partition", 1, partitionMetadata.size)
-    assertEquals("Expecting partition id to be 0", 0, partitionMetadata.head.partitionId)
-    assertEquals(1, partitionMetadata.head.replicas.size)
-    assertTrue(partitionMetadata.head.leader.isDefined)
-  }
-
-  @Test
-  def testAutoCreateTopicWithInvalidReplication(): Unit = {
-    val adHocProps = createBrokerConfig(2, zkConnect)
-    // Set default replication higher than the number of live brokers
-    adHocProps.setProperty(KafkaConfig.DefaultReplicationFactorProp, "3")
-    // start adHoc brokers with replication factor too high
-    val adHocServer = createServer(new KafkaConfig(adHocProps))
-    adHocServers = Seq(adHocServer)
-    // We are using the Scala clients and they don't support SSL. Once we move to the Java ones, we should use
-    // `securityProtocol` instead of PLAINTEXT below
-    val adHocEndpoint = new BrokerEndPoint(adHocServer.config.brokerId, adHocServer.config.hostName,
-      TestUtils.boundPort(adHocServer))
-
-    // auto create topic on "bad" endpoint
-    val topic = "testAutoCreateTopic"
-    val topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), Seq(adHocEndpoint), "TopicMetadataTest-testAutoCreateTopic",
-      2000, 0).topicsMetadata
-    assertEquals(Errors.INVALID_REPLICATION_FACTOR, topicsMetadata.head.error)
-    assertEquals("Expecting metadata only for 1 topic", 1, topicsMetadata.size)
-    assertEquals("Expecting metadata for the test topic", topic, topicsMetadata.head.topic)
-    assertEquals(0, topicsMetadata.head.partitionsMetadata.size)
-  }
-
-  @Test
-  def testAutoCreateTopicWithCollision(): Unit = {
-    // auto create topic
-    val topic1 = "testAutoCreate_Topic"
-    val topic2 = "testAutoCreate.Topic"
-    var topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic1, topic2), brokerEndPoints, "TopicMetadataTest-testAutoCreateTopic",
-      2000, 0).topicsMetadata
-    assertEquals("Expecting metadata for 2 topics", 2, topicsMetadata.size)
-    assertEquals("Expecting metadata for topic1", topic1, topicsMetadata.head.topic)
-    assertEquals(Errors.LEADER_NOT_AVAILABLE, topicsMetadata.head.error)
-    assertEquals("Expecting metadata for topic2", topic2, topicsMetadata(1).topic)
-    assertEquals("Expecting InvalidTopicCode for topic2 metadata", Errors.INVALID_TOPIC_EXCEPTION, topicsMetadata(1).error)
-
-    // wait for leader to be elected
-    TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic1, 0)
-    TestUtils.waitUntilMetadataIsPropagated(Seq(server1), topic1, 0)
-
-    // retry the metadata for the first auto created topic
-    topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic1), brokerEndPoints, "TopicMetadataTest-testBasicTopicMetadata",
-      2000, 0).topicsMetadata
-    assertEquals(Errors.NONE, topicsMetadata.head.error)
-    assertEquals(Errors.NONE, topicsMetadata.head.partitionsMetadata.head.error)
-    val partitionMetadata = topicsMetadata.head.partitionsMetadata
-    assertEquals("Expecting metadata for 1 partition", 1, partitionMetadata.size)
-    assertEquals("Expecting partition id to be 0", 0, partitionMetadata.head.partitionId)
-    assertEquals(1, partitionMetadata.head.replicas.size)
-    assertTrue(partitionMetadata.head.leader.isDefined)
-  }
-
-  private def checkIsr(servers: Seq[KafkaServer]): Unit = {
-    val activeBrokers: Seq[KafkaServer] = servers.filter(x => x.brokerState.currentState != NotRunning.state)
-    val expectedIsr: Seq[BrokerEndPoint] = activeBrokers.map { x =>
-      new BrokerEndPoint(x.config.brokerId,
-        if (x.config.hostName.nonEmpty) x.config.hostName else "localhost",
-        TestUtils.boundPort(x))
-    }
-
-    // Assert that topic metadata at new brokers is updated correctly
-    activeBrokers.foreach(x => {
-      var metadata: TopicMetadataResponse = new TopicMetadataResponse(Seq(), Seq(), -1)
-      waitUntilTrue(() => {
-        metadata = ClientUtils.fetchTopicMetadata(Set.empty,
-                                Seq(new BrokerEndPoint(x.config.brokerId,
-                                                       if (x.config.hostName.nonEmpty) x.config.hostName else "localhost",
-                                                       TestUtils.boundPort(x))),
-                                "TopicMetadataTest-testBasicTopicMetadata", 2000, 0)
-        metadata.topicsMetadata.nonEmpty &&
-          metadata.topicsMetadata.head.partitionsMetadata.nonEmpty &&
-          expectedIsr.sortBy(_.id) == metadata.topicsMetadata.head.partitionsMetadata.head.isr.sortBy(_.id)
-      },
-        "Topic metadata is not correctly updated for broker " + x + ".\n" +
-        "Expected ISR: " + expectedIsr + "\n" +
-        "Actual ISR  : " + (if (metadata.topicsMetadata.nonEmpty &&
-                                metadata.topicsMetadata.head.partitionsMetadata.nonEmpty)
-                              metadata.topicsMetadata.head.partitionsMetadata.head.isr
-                            else
-                              ""), 8000L)
-    })
-  }
-
-  @Test
-  def testIsrAfterBrokerShutDownAndJoinsBack(): Unit = {
-    val numBrokers = 2 //just 2 brokers are enough for the test
-
-    // start adHoc brokers
-    adHocServers = adHocConfigs.take(numBrokers - 1).map(p => createServer(p))
-    val allServers: Seq[KafkaServer] = Seq(server1) ++ adHocServers
-
-    // create topic
-    val topic: String = "test"
-    adminZkClient.createTopic(topic, 1, numBrokers)
-
-    // shutdown a broker
-    adHocServers.last.shutdown()
-    adHocServers.last.awaitShutdown()
-
-    // startup a broker
-    adHocServers.last.startup()
-
-    // check metadata is still correct and updated at all brokers
-    checkIsr(allServers)
-  }
-
-  private def checkMetadata(servers: Seq[KafkaServer], expectedBrokersCount: Int): Unit = {
-    var topicMetadata: TopicMetadataResponse = new TopicMetadataResponse(Seq(), Seq(), -1)
-
-    // Get topic metadata from old broker
-    // Wait for metadata to get updated by checking metadata from a new broker
-    waitUntilTrue(() => {
-    topicMetadata = ClientUtils.fetchTopicMetadata(
-      Set.empty, brokerEndPoints, "TopicMetadataTest-testBasicTopicMetadata", 2000, 0)
-    topicMetadata.brokers.size == expectedBrokersCount},
-      "Alive brokers list is not correctly propagated by coordinator to brokers"
-    )
-
-    // Assert that topic metadata at new brokers is updated correctly
-    servers.filter(x => x.brokerState.currentState != NotRunning.state).foreach(x =>
-      waitUntilTrue(() => {
-          val foundMetadata = ClientUtils.fetchTopicMetadata(
-            Set.empty,
-            Seq(new BrokerEndPoint(x.config.brokerId, x.config.hostName, TestUtils.boundPort(x))),
-            "TopicMetadataTest-testBasicTopicMetadata", 2000, 0)
-          topicMetadata.brokers.sortBy(_.id) == foundMetadata.brokers.sortBy(_.id) &&
-            topicMetadata.topicsMetadata.sortBy(_.topic) == foundMetadata.topicsMetadata.sortBy(_.topic)
-        },
-        s"Topic metadata is not correctly updated"))
-  }
-
-  @Test
-  def testAliveBrokerListWithNoTopics(): Unit = {
-    checkMetadata(Seq(server1), 1)
-  }
-
-  @Test
-  def testAliveBrokersListWithNoTopicsAfterNewBrokerStartup(): Unit = {
-    adHocServers = adHocConfigs.takeRight(adHocConfigs.size - 1).map(p => createServer(p))
-
-    checkMetadata(adHocServers, numConfigs - 1)
-
-    // Add a broker
-    adHocServers = adHocServers ++ Seq(createServer(adHocConfigs.head))
-
-    checkMetadata(adHocServers, numConfigs)
-  }
-
-
-  @Test
-  def testAliveBrokersListWithNoTopicsAfterABrokerShutdown(): Unit = {
-    adHocServers = adHocConfigs.map(p => createServer(p))
-
-    checkMetadata(adHocServers, numConfigs)
-
-    // Shutdown a broker
-    adHocServers.last.shutdown()
-    adHocServers.last.awaitShutdown()
-
-    checkMetadata(adHocServers, numConfigs - 1)
-  }
-}
diff --git a/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala b/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
index d4c3e7c..6b61381 100644
--- a/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
@@ -21,6 +21,7 @@ import java.util.Properties
 
 import kafka.network.SocketServer
 import kafka.utils.TestUtils
+import org.apache.kafka.common.Node
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests.{MetadataRequest, MetadataResponse}
@@ -33,6 +34,7 @@ import scala.collection.JavaConverters._
 class MetadataRequestTest extends BaseRequestTest {
 
   override def propertyOverrides(properties: Properties) {
+    properties.setProperty(KafkaConfig.DefaultReplicationFactorProp, "2")
     properties.setProperty(KafkaConfig.RackProp, s"rack/${properties.getProperty(KafkaConfig.BrokerIdProp)}")
   }
 
@@ -145,6 +147,49 @@ class MetadataRequestTest extends BaseRequestTest {
   }
 
   @Test
+  def testAutoCreateTopicWithInvalidReplicationFactor(): Unit = {
+    // Shutdown all but one broker so that the number of brokers is less than the default replication factor
+    servers.tail.foreach(_.shutdown())
+    servers.tail.foreach(_.awaitShutdown())
+
+    val topic1 = "testAutoCreateTopic"
+    val response1 = sendMetadataRequest(new MetadataRequest.Builder(Seq(topic1).asJava, true).build)
+    assertEquals(1, response1.topicMetadata.size)
+    val topicMetadata = response1.topicMetadata.asScala.head
+    assertEquals(Errors.INVALID_REPLICATION_FACTOR, topicMetadata.error)
+    assertEquals(topic1, topicMetadata.topic)
+    assertEquals(0, topicMetadata.partitionMetadata.size)
+  }
+
+  @Test
+  def testAutoCreateOfCollidingTopics(): Unit = {
+    val topic1 = "testAutoCreate_Topic"
+    val topic2 = "testAutoCreate.Topic"
+    val response1 = sendMetadataRequest(new MetadataRequest.Builder(Seq(topic1, topic2).asJava, true).build)
+    assertEquals(2, response1.topicMetadata.size)
+    var topicMetadata1 = response1.topicMetadata.asScala.head
+    val topicMetadata2 = response1.topicMetadata.asScala.toSeq(1)
+    assertEquals(Errors.LEADER_NOT_AVAILABLE, topicMetadata1.error)
+    assertEquals(topic1, topicMetadata1.topic)
+    assertEquals(Errors.INVALID_TOPIC_EXCEPTION, topicMetadata2.error)
+    assertEquals(topic2, topicMetadata2.topic)
+    
+    TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic1, 0)
+    TestUtils.waitUntilMetadataIsPropagated(servers, topic1, 0)
+
+    // retry the metadata for the first auto created topic
+    val response2 = sendMetadataRequest(new MetadataRequest.Builder(Seq(topic1).asJava, true).build)
+    topicMetadata1 = response2.topicMetadata.asScala.head
+    assertEquals(Errors.NONE, topicMetadata1.error)
+    assertEquals(Seq(Errors.NONE), topicMetadata1.partitionMetadata.asScala.map(_.error))
+    assertEquals(1, topicMetadata1.partitionMetadata.size)
+    val partitionMetadata = topicMetadata1.partitionMetadata.asScala.head
+    assertEquals(0, partitionMetadata.partition)
+    assertEquals(2, partitionMetadata.replicas.size)
+    assertNotNull(partitionMetadata.leader)
+  }
+
+  @Test
   def testAllTopicsRequest() {
     // create some topics
     createTopic("t1", 3, 2)
@@ -235,8 +280,80 @@ class MetadataRequestTest extends BaseRequestTest {
     assertEquals(s"Response should have $replicaCount replicas", replicaCount, v1PartitionMetadata.replicas.size)
   }
 
+  @Test
+  def testIsrAfterBrokerShutDownAndJoinsBack(): Unit = {
+    def checkIsr(servers: Seq[KafkaServer], topic: String): Unit = {
+      val activeBrokers = servers.filter(_.brokerState.currentState != NotRunning.state)
+      val expectedIsr = activeBrokers.map { broker =>
+        new Node(broker.config.brokerId, "localhost", TestUtils.boundPort(broker), broker.config.rack.orNull)
+      }.sortBy(_.id)
+
+      // Assert that topic metadata at new brokers is updated correctly
+      activeBrokers.foreach { broker =>
+        var actualIsr: Seq[Node] = Seq.empty
+        TestUtils.waitUntilTrue(() => {
+          val metadataResponse = sendMetadataRequest(new MetadataRequest.Builder(Seq(topic).asJava, false).build,
+            Some(brokerSocketServer(broker.config.brokerId)))
+          val firstPartitionMetadata = metadataResponse.topicMetadata.asScala.headOption.flatMap(_.partitionMetadata.asScala.headOption)
+          actualIsr = firstPartitionMetadata.map { partitionMetadata =>
+            partitionMetadata.isr.asScala.sortBy(_.id)
+          }.getOrElse(Seq.empty)
+          expectedIsr == actualIsr
+        }, s"Topic metadata not updated correctly in broker $broker\n" +
+          s"Expected ISR: $expectedIsr \n" +
+          s"Actual ISR : $actualIsr")
+      }
+    }
+
+    val topic = "isr-after-broker-shutdown"
+    val replicaCount = 3
+    createTopic(topic, 1, replicaCount)
+
+    servers.last.shutdown()
+    servers.last.awaitShutdown()
+    servers.last.startup()
+
+    checkIsr(servers, topic)
+  }
+
+  @Test
+  def testAliveBrokersWithNoTopics(): Unit = {
+    def checkMetadata(servers: Seq[KafkaServer], expectedBrokersCount: Int): Unit = {
+      var controllerMetadataResponse: Option[MetadataResponse] = None
+      TestUtils.waitUntilTrue(() => {
+        val metadataResponse = sendMetadataRequest(MetadataRequest.Builder.allTopics.build,
+          Some(controllerSocketServer))
+        controllerMetadataResponse = Some(metadataResponse)
+        metadataResponse.brokers.size == expectedBrokersCount
+      }, s"Expected $expectedBrokersCount brokers, but there are ${controllerMetadataResponse.get.brokers.size} " +
+        "according to the Controller")
+
+      val brokersInController = controllerMetadataResponse.get.brokers.asScala.toSeq.sortBy(_.id)
+
+      // Assert that metadata is propagated correctly
+      servers.filter(_.brokerState.currentState != NotRunning.state).foreach { broker =>
+        TestUtils.waitUntilTrue(() => {
+          val metadataResponse = sendMetadataRequest(MetadataRequest.Builder.allTopics.build,
+            Some(brokerSocketServer(broker.config.brokerId)))
+          val brokers = metadataResponse.brokers.asScala.toSeq.sortBy(_.id)
+          val topicMetadata = metadataResponse.topicMetadata.asScala.toSeq.sortBy(_.topic)
+          brokersInController == brokers && metadataResponse.topicMetadata.asScala.toSeq.sortBy(_.topic) == topicMetadata
+        }, s"Topic metadata not updated correctly")
+      }
+    }
+
+    val serverToShutdown = servers.filterNot(_.kafkaController.isActive).last
+    serverToShutdown.shutdown()
+    serverToShutdown.awaitShutdown()
+    checkMetadata(servers, servers.size - 1)
+
+    serverToShutdown.startup()
+    checkMetadata(servers, servers.size)
+  }
+
   private def sendMetadataRequest(request: MetadataRequest, destination: Option[SocketServer] = None): MetadataResponse = {
     val response = connectAndSend(request, ApiKeys.METADATA, destination = destination.getOrElse(anySocketServer))
     MetadataResponse.parse(response, request.version)
   }
+
 }

-- 
To stop receiving notification emails like this one, please contact
ijuma@apache.org.