You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jj...@apache.org on 2014/08/06 01:00:14 UTC

[17/37] git commit: KAFKA-1535 Have the metadata response contain all alive brokers rather than just the ones needed for the given topics.

KAFKA-1535 Have the metadata response contain all alive brokers rather than just the ones needed for the given topics.


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4ebcdfd5
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4ebcdfd5
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4ebcdfd5

Branch: refs/heads/transactional_messaging
Commit: 4ebcdfd51f1e9e4c3d684204e6a785fae6c0e549
Parents: 8e444a3
Author: Jay Kreps <ja...@gmail.com>
Authored: Thu Jul 17 15:53:52 2014 -0700
Committer: Jay Kreps <ja...@gmail.com>
Committed: Thu Jul 17 15:53:52 2014 -0700

----------------------------------------------------------------------
 .../main/scala/kafka/api/TopicMetadataRequest.scala    |  2 +-
 .../main/scala/kafka/api/TopicMetadataResponse.scala   | 13 +++----------
 core/src/main/scala/kafka/server/KafkaApis.scala       |  5 +++--
 core/src/main/scala/kafka/server/MetadataCache.scala   |  6 ++++++
 .../scala/kafka/tools/ReplicaVerificationTool.scala    |  2 +-
 .../kafka/api/RequestResponseSerializationTest.scala   |  2 +-
 6 files changed, 15 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/4ebcdfd5/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
index a319f2f..bce004f 100644
--- a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
+++ b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
@@ -79,7 +79,7 @@ case class TopicMetadataRequest(val versionId: Short,
     val topicMetadata = topics.map {
       topic => TopicMetadata(topic, Nil, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
     }
-    val errorResponse = TopicMetadataResponse(topicMetadata, correlationId)
+    val errorResponse = TopicMetadataResponse(Seq(), topicMetadata, correlationId)
     requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/4ebcdfd5/core/src/main/scala/kafka/api/TopicMetadataResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/TopicMetadataResponse.scala b/core/src/main/scala/kafka/api/TopicMetadataResponse.scala
index f6b7429..b233d35 100644
--- a/core/src/main/scala/kafka/api/TopicMetadataResponse.scala
+++ b/core/src/main/scala/kafka/api/TopicMetadataResponse.scala
@@ -29,34 +29,27 @@ object TopicMetadataResponse {
     val brokerMap = brokers.map(b => (b.id, b)).toMap
     val topicCount = buffer.getInt
     val topicsMetadata = (0 until topicCount).map(_ => TopicMetadata.readFrom(buffer, brokerMap))
-    new TopicMetadataResponse(topicsMetadata, correlationId)
+    new TopicMetadataResponse(brokers, topicsMetadata, correlationId)
   }
 }
 
-case class TopicMetadataResponse(topicsMetadata: Seq[TopicMetadata],
+case class TopicMetadataResponse(brokers: Seq[Broker],
+                                 topicsMetadata: Seq[TopicMetadata],
                                  override val correlationId: Int)
     extends RequestOrResponse(correlationId = correlationId) {
   val sizeInBytes: Int = {
-    val brokers = extractBrokers(topicsMetadata).values
     4 + 4 + brokers.map(_.sizeInBytes).sum + 4 + topicsMetadata.map(_.sizeInBytes).sum
   }
 
   def writeTo(buffer: ByteBuffer) {
     buffer.putInt(correlationId)
     /* brokers */
-    val brokers = extractBrokers(topicsMetadata).values
     buffer.putInt(brokers.size)
     brokers.foreach(_.writeTo(buffer))
     /* topic metadata */
     buffer.putInt(topicsMetadata.length)
     topicsMetadata.foreach(_.writeTo(buffer))
   }
-    
-  def extractBrokers(topicMetadatas: Seq[TopicMetadata]): Map[Int, Broker] = {
-    val parts = topicsMetadata.flatMap(_.partitionsMetadata)
-    val brokers = (parts.flatMap(_.replicas)) ++ (parts.map(_.leader).collect{case Some(l) => l})
-    brokers.map(b => (b.id, b)).toMap
-  }
 
   override def describe(details: Boolean):String = { toString }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/4ebcdfd5/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 0b668f2..fd5f12e 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -591,8 +591,9 @@ class KafkaApis(val requestChannel: RequestChannel,
   def handleTopicMetadataRequest(request: RequestChannel.Request) {
     val metadataRequest = request.requestObj.asInstanceOf[TopicMetadataRequest]
     val topicMetadata = getTopicMetadata(metadataRequest.topics.toSet)
-    trace("Sending topic metadata %s for correlation id %d to client %s".format(topicMetadata.mkString(","), metadataRequest.correlationId, metadataRequest.clientId))
-    val response = new TopicMetadataResponse(topicMetadata, metadataRequest.correlationId)
+    val brokers = metadataCache.getAliveBrokers
+    trace("Sending topic metadata %s and brokers %s for correlation id %d to client %s".format(topicMetadata.mkString(","), brokers.mkString(","), metadataRequest.correlationId, metadataRequest.clientId))
+    val response = new TopicMetadataResponse(brokers, topicMetadata, metadataRequest.correlationId)
     requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/4ebcdfd5/core/src/main/scala/kafka/server/MetadataCache.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala
index 3198cdf..7cd40e1 100644
--- a/core/src/main/scala/kafka/server/MetadataCache.scala
+++ b/core/src/main/scala/kafka/server/MetadataCache.scala
@@ -81,6 +81,12 @@ private[server] class MetadataCache {
     topicResponses
   }
 
+  def getAliveBrokers = {
+    inLock(partitionMetadataLock.readLock()) {
+      aliveBrokers.values.toList
+    }
+  }
+
   def addOrUpdatePartitionInfo(topic: String,
                                partitionId: Int,
                                stateInfo: PartitionStateInfo) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/4ebcdfd5/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
index c040f49..af47836 100644
--- a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
+++ b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
@@ -118,7 +118,7 @@ object ReplicaVerificationTool extends Logging {
     info("Getting topic metatdata...")
     val metadataTargetBrokers = ClientUtils.parseBrokerList(options.valueOf(brokerListOpt))
     val topicsMetadataResponse = ClientUtils.fetchTopicMetadata(Set[String](), metadataTargetBrokers, clientId, maxWaitMs)
-    val brokerMap = topicsMetadataResponse.extractBrokers(topicsMetadataResponse.topicsMetadata)
+    val brokerMap = topicsMetadataResponse.brokers.map(b => (b.id, b)).toMap
     val filteredTopicMetadata = topicsMetadataResponse.topicsMetadata.filter(
         topicMetadata => if (topicWhiteListFiler.isTopicAllowed(topicMetadata.topic, excludeInternalTopics = false))
           true

http://git-wip-us.apache.org/repos/asf/kafka/blob/4ebcdfd5/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
index a2117b3..d34ddf5 100644
--- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
+++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
@@ -143,7 +143,7 @@ object SerializationTestUtils {
   }
 
   def createTestTopicMetadataResponse: TopicMetadataResponse = {
-    new TopicMetadataResponse(Seq(topicmetaData1, topicmetaData2), 1)
+    new TopicMetadataResponse(brokers, Seq(topicmetaData1, topicmetaData2), 1)
   }
 
   def createTestOffsetCommitRequest: OffsetCommitRequest = {