You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2013/05/21 19:06:42 UTC
git commit: KAFKA-901 follow up changes to fix update metadata
response handling and request logging
Updated Branches:
refs/heads/0.8 eff59330f -> 2d40ca30d
KAFKA-901 follow up changes to fix update metadata response handling and request logging
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2d40ca30
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2d40ca30
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2d40ca30
Branch: refs/heads/0.8
Commit: 2d40ca30d34b2d4de35d5b811858b0f915451a19
Parents: eff5933
Author: Neha Narkhede <ne...@apache.org>
Authored: Tue May 21 10:01:51 2013 -0700
Committer: Neha Narkhede <ne...@apache.org>
Committed: Tue May 21 10:01:51 2013 -0700
----------------------------------------------------------------------
.../controller/ControllerChannelManager.scala | 15 +++++++++++++--
core/src/main/scala/kafka/server/KafkaApis.scala | 8 ++++++--
2 files changed, 19 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/2d40ca30/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index 7e8ae29..0c41d1d 100644
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -130,6 +130,8 @@ class RequestSendThread(val controllerId: Int,
response = LeaderAndIsrResponse.readFrom(receive.buffer)
case RequestKeys.StopReplicaKey =>
response = StopReplicaResponse.readFrom(receive.buffer)
+ case RequestKeys.UpdateMetadataKey =>
+ response = UpdateMetadataResponse.readFrom(receive.buffer)
}
stateChangeLogger.trace("Controller %d epoch %d received response correlationId %d for a request sent to broker %d"
.format(controllerId, controllerContext.epoch, response.correlationId, toBrokerId))
@@ -157,9 +159,18 @@ class ControllerBrokerRequestBatch(controllerContext: ControllerContext, sendReq
def newBatch() {
// raise error if the previous batch is not empty
- if(leaderAndIsrRequestMap.size > 0 || stopReplicaRequestMap.size > 0)
+ if(leaderAndIsrRequestMap.size > 0)
throw new IllegalStateException("Controller to broker state change requests batch is not empty while creating " +
- "a new one. Some state changes %s might be lost ".format(leaderAndIsrRequestMap.toString()))
+ "a new one. Some LeaderAndIsr state changes %s might be lost ".format(leaderAndIsrRequestMap.toString()))
+ if(stopReplicaRequestMap.size > 0)
+ throw new IllegalStateException("Controller to broker state change requests batch is not empty while creating a " +
+ "new one. Some StopReplica state changes %s might be lost ".format(stopReplicaRequestMap.toString()))
+ if(updateMetadataRequestMap.size > 0)
+ throw new IllegalStateException("Controller to broker state change requests batch is not empty while creating a " +
+ "new one. Some UpdateMetadata state changes %s might be lost ".format(updateMetadataRequestMap.toString()))
+ if(stopAndDeleteReplicaRequestMap.size > 0)
+ throw new IllegalStateException("Controller to broker state change requests batch is not empty while creating a " +
+ "new one. Some StopReplica with delete state changes %s might be lost ".format(stopAndDeleteReplicaRequestMap.toString()))
leaderAndIsrRequestMap.clear()
stopReplicaRequestMap.clear()
updateMetadataRequestMap.clear()
http://git-wip-us.apache.org/repos/asf/kafka/blob/2d40ca30/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 0c5c4d5..93e2f04 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -101,12 +101,13 @@ class KafkaApis(val requestChannel: RequestChannel,
def handleUpdateMetadataRequest(request: RequestChannel.Request) {
val updateMetadataRequest = request.requestObj.asInstanceOf[UpdateMetadataRequest]
+ val stateChangeLogger = replicaManager.stateChangeLogger
if(updateMetadataRequest.controllerEpoch < replicaManager.controllerEpoch) {
val stateControllerEpochErrorMessage = ("Broker %d received update metadata request with correlation id %d from an " +
"old controller %d with epoch %d. Latest known controller epoch is %d").format(brokerId,
updateMetadataRequest.correlationId, updateMetadataRequest.controllerId, updateMetadataRequest.controllerEpoch,
replicaManager.controllerEpoch)
- replicaManager.stateChangeLogger.warn(stateControllerEpochErrorMessage)
+ stateChangeLogger.warn(stateControllerEpochErrorMessage)
throw new ControllerMovedException(stateControllerEpochErrorMessage)
}
partitionMetadataLock synchronized {
@@ -115,7 +116,10 @@ class KafkaApis(val requestChannel: RequestChannel,
updateMetadataRequest.aliveBrokers.foreach(b => aliveBrokers.put(b.id, b))
updateMetadataRequest.partitionStateInfos.foreach { partitionState =>
leaderCache.put(partitionState._1, partitionState._2)
- debug("Caching leader info %s for partition %s".format(partitionState._2, partitionState._1))
+ if(stateChangeLogger.isTraceEnabled)
+ stateChangeLogger.trace(("Broker %d cached leader info %s for partition %s in response to UpdateMetadata request " +
+ "sent by controller %d epoch %d with correlation id %d").format(brokerId, partitionState._2, partitionState._1,
+ updateMetadataRequest.controllerId, updateMetadataRequest.controllerEpoch, updateMetadataRequest.correlationId))
}
}
val updateMetadataResponse = new UpdateMetadataResponse(updateMetadataRequest.correlationId)