You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/05/05 07:48:27 UTC
kafka git commit: KAFKA-5170;
KafkaAdminClientIntegrationTest should wait until metadata is
propagated to all brokers
Repository: kafka
Updated Branches:
refs/heads/trunk 95b48b157 -> 05ea454df
KAFKA-5170; KafkaAdminClientIntegrationTest should wait until metadata is propagated to all brokers
Author: Colin P. Mccabe <cm...@confluent.io>
Reviewers: Ismael Juma <is...@juma.me.uk>
Closes #2976 from cmccabe/KAFKA-5170
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/05ea454d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/05ea454d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/05ea454d
Branch: refs/heads/trunk
Commit: 05ea454dfb1c68e9026b74fe443279e3a4bc5182
Parents: 95b48b1
Author: Colin P. Mccabe <cm...@confluent.io>
Authored: Fri May 5 08:36:04 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Fri May 5 08:46:19 2017 +0100
----------------------------------------------------------------------
.../kafka/api/KafkaAdminClientIntegrationTest.scala | 8 +++++++-
core/src/test/scala/unit/kafka/utils/TestUtils.scala | 15 ++++++++++++++-
2 files changed, 21 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/05ea454d/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala
index 04ed9c9..455ab61 100644
--- a/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala
@@ -29,7 +29,7 @@ import org.apache.kafka.clients.admin.NewTopic
import org.apache.kafka.common.KafkaFuture
import org.apache.kafka.common.errors.TopicExistsException
import org.apache.kafka.common.protocol.ApiKeys
-import org.junit.{After, Rule, Test}
+import org.junit.{After, Before, Rule, Test}
import org.junit.rules.Timeout
import org.junit.Assert._
@@ -47,6 +47,12 @@ class KafkaAdminClientIntegrationTest extends KafkaServerTestHarness with Loggin
var client: AdminClient = null
+ @Before
+ override def setUp(): Unit = {
+ super.setUp
+ TestUtils.waitUntilBrokerMetadataIsPropagated(servers)
+ }
+
@After
def closeClient(): Unit = {
if (client != null)
http://git-wip-us.apache.org/repos/asf/kafka/blob/05ea454d/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index e826c7f..bedc7bc 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -37,7 +37,7 @@ import kafka.producer._
import kafka.security.auth.{Acl, Authorizer, Resource}
import kafka.serializer.{DefaultEncoder, Encoder, StringEncoder}
import kafka.server._
-import kafka.server.checkpoints.{OffsetCheckpoint, OffsetCheckpointFile}
+import kafka.server.checkpoints.OffsetCheckpointFile
import kafka.utils.ZkUtils._
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.consumer.{KafkaConsumer, RangeAssignor}
@@ -827,6 +827,19 @@ object TestUtils extends Logging {
byteBuffer
}
+ /**
+ * Wait until all brokers know about each other.
+ *
+ * @param servers The Kafka broker servers.
+ * @param timeout The amount of time waiting on this condition before assert to fail
+ */
+ def waitUntilBrokerMetadataIsPropagated(servers: Seq[KafkaServer],
+ timeout: Long = JTestUtils.DEFAULT_MAX_WAIT_MS): Unit = {
+ val expectedBrokerIds = servers.map(_.config.brokerId).toSet
+ TestUtils.waitUntilTrue(() => servers.forall(server =>
+ expectedBrokerIds == server.apis.metadataCache.getAliveBrokers.map(_.id).toSet
+ ), "Timed out waiting for broker metadata to propagate to all servers", timeout)
+ }
/**
* Wait until a valid leader is propagated to the metadata cache in each broker.