You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2013/05/21 19:06:42 UTC

git commit: KAFKA-901 follow up changes to fix update metadata response handling and request logging

Updated Branches:
  refs/heads/0.8 eff59330f -> 2d40ca30d


KAFKA-901 follow up changes to fix update metadata response handling and request logging


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2d40ca30
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2d40ca30
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2d40ca30

Branch: refs/heads/0.8
Commit: 2d40ca30d34b2d4de35d5b811858b0f915451a19
Parents: eff5933
Author: Neha Narkhede <ne...@apache.org>
Authored: Tue May 21 10:01:51 2013 -0700
Committer: Neha Narkhede <ne...@apache.org>
Committed: Tue May 21 10:01:51 2013 -0700

----------------------------------------------------------------------
 .../controller/ControllerChannelManager.scala      |   15 +++++++++++++--
 core/src/main/scala/kafka/server/KafkaApis.scala   |    8 ++++++--
 2 files changed, 19 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/2d40ca30/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index 7e8ae29..0c41d1d 100644
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -130,6 +130,8 @@ class RequestSendThread(val controllerId: Int,
             response = LeaderAndIsrResponse.readFrom(receive.buffer)
           case RequestKeys.StopReplicaKey =>
             response = StopReplicaResponse.readFrom(receive.buffer)
+          case RequestKeys.UpdateMetadataKey =>
+            response = UpdateMetadataResponse.readFrom(receive.buffer)
         }
         stateChangeLogger.trace("Controller %d epoch %d received response correlationId %d for a request sent to broker %d"
                                   .format(controllerId, controllerContext.epoch, response.correlationId, toBrokerId))
@@ -157,9 +159,18 @@ class ControllerBrokerRequestBatch(controllerContext: ControllerContext, sendReq
 
   def newBatch() {
     // raise error if the previous batch is not empty
-    if(leaderAndIsrRequestMap.size > 0 || stopReplicaRequestMap.size > 0)
+    if(leaderAndIsrRequestMap.size > 0)
       throw new IllegalStateException("Controller to broker state change requests batch is not empty while creating " +
-        "a new one. Some state changes %s might be lost ".format(leaderAndIsrRequestMap.toString()))
+        "a new one. Some LeaderAndIsr state changes %s might be lost ".format(leaderAndIsrRequestMap.toString()))
+    if(stopReplicaRequestMap.size > 0)
+      throw new IllegalStateException("Controller to broker state change requests batch is not empty while creating a " +
+        "new one. Some StopReplica state changes %s might be lost ".format(stopReplicaRequestMap.toString()))
+    if(updateMetadataRequestMap.size > 0)
+      throw new IllegalStateException("Controller to broker state change requests batch is not empty while creating a " +
+        "new one. Some UpdateMetadata state changes %s might be lost ".format(updateMetadataRequestMap.toString()))
+    if(stopAndDeleteReplicaRequestMap.size > 0)
+      throw new IllegalStateException("Controller to broker state change requests batch is not empty while creating a " +
+        "new one. Some StopReplica with delete state changes %s might be lost ".format(stopAndDeleteReplicaRequestMap.toString()))
     leaderAndIsrRequestMap.clear()
     stopReplicaRequestMap.clear()
     updateMetadataRequestMap.clear()

http://git-wip-us.apache.org/repos/asf/kafka/blob/2d40ca30/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 0c5c4d5..93e2f04 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -101,12 +101,13 @@ class KafkaApis(val requestChannel: RequestChannel,
 
   def handleUpdateMetadataRequest(request: RequestChannel.Request) {
     val updateMetadataRequest = request.requestObj.asInstanceOf[UpdateMetadataRequest]
+    val stateChangeLogger = replicaManager.stateChangeLogger
     if(updateMetadataRequest.controllerEpoch < replicaManager.controllerEpoch) {
       val stateControllerEpochErrorMessage = ("Broker %d received update metadata request with correlation id %d from an " +
         "old controller %d with epoch %d. Latest known controller epoch is %d").format(brokerId,
         updateMetadataRequest.correlationId, updateMetadataRequest.controllerId, updateMetadataRequest.controllerEpoch,
         replicaManager.controllerEpoch)
-      replicaManager.stateChangeLogger.warn(stateControllerEpochErrorMessage)
+      stateChangeLogger.warn(stateControllerEpochErrorMessage)
       throw new ControllerMovedException(stateControllerEpochErrorMessage)
     }
     partitionMetadataLock synchronized {
@@ -115,7 +116,10 @@ class KafkaApis(val requestChannel: RequestChannel,
       updateMetadataRequest.aliveBrokers.foreach(b => aliveBrokers.put(b.id, b))
       updateMetadataRequest.partitionStateInfos.foreach { partitionState =>
         leaderCache.put(partitionState._1, partitionState._2)
-        debug("Caching leader info %s for partition %s".format(partitionState._2, partitionState._1))
+        if(stateChangeLogger.isTraceEnabled)
+          stateChangeLogger.trace(("Broker %d cached leader info %s for partition %s in response to UpdateMetadata request " +
+            "sent by controller %d epoch %d with correlation id %d").format(brokerId, partitionState._2, partitionState._1,
+            updateMetadataRequest.controllerId, updateMetadataRequest.controllerEpoch, updateMetadataRequest.correlationId))
       }
     }
     val updateMetadataResponse = new UpdateMetadataResponse(updateMetadataRequest.correlationId)