You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jj...@apache.org on 2014/08/06 01:00:14 UTC
[17/37] git commit: KAFKA-1535 Have the metadata response contain all
alive brokers rather than just the ones needed for the given topics.
KAFKA-1535 Have the metadata response contain all alive brokers rather than just the ones needed for the given topics.
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4ebcdfd5
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4ebcdfd5
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4ebcdfd5
Branch: refs/heads/transactional_messaging
Commit: 4ebcdfd51f1e9e4c3d684204e6a785fae6c0e549
Parents: 8e444a3
Author: Jay Kreps <ja...@gmail.com>
Authored: Thu Jul 17 15:53:52 2014 -0700
Committer: Jay Kreps <ja...@gmail.com>
Committed: Thu Jul 17 15:53:52 2014 -0700
----------------------------------------------------------------------
.../main/scala/kafka/api/TopicMetadataRequest.scala | 2 +-
.../main/scala/kafka/api/TopicMetadataResponse.scala | 13 +++----------
core/src/main/scala/kafka/server/KafkaApis.scala | 5 +++--
core/src/main/scala/kafka/server/MetadataCache.scala | 6 ++++++
.../scala/kafka/tools/ReplicaVerificationTool.scala | 2 +-
.../kafka/api/RequestResponseSerializationTest.scala | 2 +-
6 files changed, 15 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/4ebcdfd5/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
index a319f2f..bce004f 100644
--- a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
+++ b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
@@ -79,7 +79,7 @@ case class TopicMetadataRequest(val versionId: Short,
val topicMetadata = topics.map {
topic => TopicMetadata(topic, Nil, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
}
- val errorResponse = TopicMetadataResponse(topicMetadata, correlationId)
+ val errorResponse = TopicMetadataResponse(Seq(), topicMetadata, correlationId)
requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4ebcdfd5/core/src/main/scala/kafka/api/TopicMetadataResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/TopicMetadataResponse.scala b/core/src/main/scala/kafka/api/TopicMetadataResponse.scala
index f6b7429..b233d35 100644
--- a/core/src/main/scala/kafka/api/TopicMetadataResponse.scala
+++ b/core/src/main/scala/kafka/api/TopicMetadataResponse.scala
@@ -29,34 +29,27 @@ object TopicMetadataResponse {
val brokerMap = brokers.map(b => (b.id, b)).toMap
val topicCount = buffer.getInt
val topicsMetadata = (0 until topicCount).map(_ => TopicMetadata.readFrom(buffer, brokerMap))
- new TopicMetadataResponse(topicsMetadata, correlationId)
+ new TopicMetadataResponse(brokers, topicsMetadata, correlationId)
}
}
-case class TopicMetadataResponse(topicsMetadata: Seq[TopicMetadata],
+case class TopicMetadataResponse(brokers: Seq[Broker],
+ topicsMetadata: Seq[TopicMetadata],
override val correlationId: Int)
extends RequestOrResponse(correlationId = correlationId) {
val sizeInBytes: Int = {
- val brokers = extractBrokers(topicsMetadata).values
4 + 4 + brokers.map(_.sizeInBytes).sum + 4 + topicsMetadata.map(_.sizeInBytes).sum
}
def writeTo(buffer: ByteBuffer) {
buffer.putInt(correlationId)
/* brokers */
- val brokers = extractBrokers(topicsMetadata).values
buffer.putInt(brokers.size)
brokers.foreach(_.writeTo(buffer))
/* topic metadata */
buffer.putInt(topicsMetadata.length)
topicsMetadata.foreach(_.writeTo(buffer))
}
-
- def extractBrokers(topicMetadatas: Seq[TopicMetadata]): Map[Int, Broker] = {
- val parts = topicsMetadata.flatMap(_.partitionsMetadata)
- val brokers = (parts.flatMap(_.replicas)) ++ (parts.map(_.leader).collect{case Some(l) => l})
- brokers.map(b => (b.id, b)).toMap
- }
override def describe(details: Boolean):String = { toString }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4ebcdfd5/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 0b668f2..fd5f12e 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -591,8 +591,9 @@ class KafkaApis(val requestChannel: RequestChannel,
def handleTopicMetadataRequest(request: RequestChannel.Request) {
val metadataRequest = request.requestObj.asInstanceOf[TopicMetadataRequest]
val topicMetadata = getTopicMetadata(metadataRequest.topics.toSet)
- trace("Sending topic metadata %s for correlation id %d to client %s".format(topicMetadata.mkString(","), metadataRequest.correlationId, metadataRequest.clientId))
- val response = new TopicMetadataResponse(topicMetadata, metadataRequest.correlationId)
+ val brokers = metadataCache.getAliveBrokers
+ trace("Sending topic metadata %s and brokers %s for correlation id %d to client %s".format(topicMetadata.mkString(","), brokers.mkString(","), metadataRequest.correlationId, metadataRequest.clientId))
+ val response = new TopicMetadataResponse(brokers, topicMetadata, metadataRequest.correlationId)
requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4ebcdfd5/core/src/main/scala/kafka/server/MetadataCache.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala
index 3198cdf..7cd40e1 100644
--- a/core/src/main/scala/kafka/server/MetadataCache.scala
+++ b/core/src/main/scala/kafka/server/MetadataCache.scala
@@ -81,6 +81,12 @@ private[server] class MetadataCache {
topicResponses
}
+ def getAliveBrokers = {
+ inLock(partitionMetadataLock.readLock()) {
+ aliveBrokers.values.toList
+ }
+ }
+
def addOrUpdatePartitionInfo(topic: String,
partitionId: Int,
stateInfo: PartitionStateInfo) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/4ebcdfd5/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
index c040f49..af47836 100644
--- a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
+++ b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
@@ -118,7 +118,7 @@ object ReplicaVerificationTool extends Logging {
info("Getting topic metatdata...")
val metadataTargetBrokers = ClientUtils.parseBrokerList(options.valueOf(brokerListOpt))
val topicsMetadataResponse = ClientUtils.fetchTopicMetadata(Set[String](), metadataTargetBrokers, clientId, maxWaitMs)
- val brokerMap = topicsMetadataResponse.extractBrokers(topicsMetadataResponse.topicsMetadata)
+ val brokerMap = topicsMetadataResponse.brokers.map(b => (b.id, b)).toMap
val filteredTopicMetadata = topicsMetadataResponse.topicsMetadata.filter(
topicMetadata => if (topicWhiteListFiler.isTopicAllowed(topicMetadata.topic, excludeInternalTopics = false))
true
http://git-wip-us.apache.org/repos/asf/kafka/blob/4ebcdfd5/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
index a2117b3..d34ddf5 100644
--- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
+++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
@@ -143,7 +143,7 @@ object SerializationTestUtils {
}
def createTestTopicMetadataResponse: TopicMetadataResponse = {
- new TopicMetadataResponse(Seq(topicmetaData1, topicmetaData2), 1)
+ new TopicMetadataResponse(brokers, Seq(topicmetaData1, topicmetaData2), 1)
}
def createTestOffsetCommitRequest: OffsetCommitRequest = {