You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/05/05 07:48:27 UTC

kafka git commit: KAFKA-5170; KafkaAdminClientIntegrationTest should wait until metadata is propagated to all brokers

Repository: kafka
Updated Branches:
  refs/heads/trunk 95b48b157 -> 05ea454df


KAFKA-5170; KafkaAdminClientIntegrationTest should wait until metadata is propagated to all brokers

Author: Colin P. Mccabe <cm...@confluent.io>

Reviewers: Ismael Juma <is...@juma.me.uk>

Closes #2976 from cmccabe/KAFKA-5170


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/05ea454d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/05ea454d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/05ea454d

Branch: refs/heads/trunk
Commit: 05ea454dfb1c68e9026b74fe443279e3a4bc5182
Parents: 95b48b1
Author: Colin P. Mccabe <cm...@confluent.io>
Authored: Fri May 5 08:36:04 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Fri May 5 08:46:19 2017 +0100

----------------------------------------------------------------------
 .../kafka/api/KafkaAdminClientIntegrationTest.scala  |  8 +++++++-
 core/src/test/scala/unit/kafka/utils/TestUtils.scala | 15 ++++++++++++++-
 2 files changed, 21 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/05ea454d/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala
index 04ed9c9..455ab61 100644
--- a/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala
@@ -29,7 +29,7 @@ import org.apache.kafka.clients.admin.NewTopic
 import org.apache.kafka.common.KafkaFuture
 import org.apache.kafka.common.errors.TopicExistsException
 import org.apache.kafka.common.protocol.ApiKeys
-import org.junit.{After, Rule, Test}
+import org.junit.{After, Before, Rule, Test}
 import org.junit.rules.Timeout
 import org.junit.Assert._
 
@@ -47,6 +47,12 @@ class KafkaAdminClientIntegrationTest extends KafkaServerTestHarness with Loggin
 
   var client: AdminClient = null
 
+  @Before
+  override def setUp(): Unit = {
+    super.setUp
+    TestUtils.waitUntilBrokerMetadataIsPropagated(servers)
+  }
+
   @After
   def closeClient(): Unit = {
     if (client != null)

http://git-wip-us.apache.org/repos/asf/kafka/blob/05ea454d/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index e826c7f..bedc7bc 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -37,7 +37,7 @@ import kafka.producer._
 import kafka.security.auth.{Acl, Authorizer, Resource}
 import kafka.serializer.{DefaultEncoder, Encoder, StringEncoder}
 import kafka.server._
-import kafka.server.checkpoints.{OffsetCheckpoint, OffsetCheckpointFile}
+import kafka.server.checkpoints.OffsetCheckpointFile
 import kafka.utils.ZkUtils._
 import org.apache.kafka.clients.CommonClientConfigs
 import org.apache.kafka.clients.consumer.{KafkaConsumer, RangeAssignor}
@@ -827,6 +827,19 @@ object TestUtils extends Logging {
     byteBuffer
   }
 
+  /**
+    * Wait until all brokers know about each other.
+    *
+    * @param servers The Kafka broker servers.
+    * @param timeout The amount of time waiting on this condition before assert to fail
+    */
+  def waitUntilBrokerMetadataIsPropagated(servers: Seq[KafkaServer],
+                                          timeout: Long = JTestUtils.DEFAULT_MAX_WAIT_MS): Unit = {
+    val expectedBrokerIds = servers.map(_.config.brokerId).toSet
+    TestUtils.waitUntilTrue(() => servers.forall(server =>
+      expectedBrokerIds == server.apis.metadataCache.getAliveBrokers.map(_.id).toSet
+    ), "Timed out waiting for broker metadata to propagate to all servers", timeout)
+  }
 
   /**
    * Wait until a valid leader is propagated to the metadata cache in each broker.