You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2013/01/24 20:52:08 UTC
git commit: KAFKA-696 Fix toString() API for all requests to make
logging easier to read; reviewed by Neha Narkhede, Jun Rao
Updated Branches:
refs/heads/0.8 a15f1f2d8 -> 8d41620a4
KAFKA-696 Fix toString() API for all requests to make logging easier to read; reviewed by Neha Narkhede, Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8d41620a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8d41620a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8d41620a
Branch: refs/heads/0.8
Commit: 8d41620a427a027d212c241ae105d09cd470e64f
Parents: a15f1f2
Author: Sriram Subramanian <sr...@gmail.com>
Authored: Thu Jan 24 11:50:18 2013 -0800
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Thu Jan 24 11:51:29 2013 -0800
----------------------------------------------------------------------
core/src/main/scala/kafka/api/FetchRequest.scala | 25 +++++-
.../main/scala/kafka/api/LeaderAndIsrRequest.scala | 24 +++++
core/src/main/scala/kafka/api/OffsetRequest.scala | 24 ++++-
.../src/main/scala/kafka/api/ProducerRequest.scala | 24 ++++-
.../main/scala/kafka/api/RequestOrResponse.scala | 8 +-
.../main/scala/kafka/api/StopReplicaRequest.scala | 27 +++++-
.../scala/kafka/api/TopicMetadataRequest.scala | 23 ++++-
core/src/main/scala/kafka/cluster/Partition.scala | 4 +-
.../main/scala/kafka/network/RequestChannel.scala | 11 +--
core/src/main/scala/kafka/server/KafkaApis.scala | 81 +--------------
10 files changed, 156 insertions(+), 95 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/8d41620a/core/src/main/scala/kafka/api/FetchRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/FetchRequest.scala b/core/src/main/scala/kafka/api/FetchRequest.scala
index 1bfabb0..ac74931 100644
--- a/core/src/main/scala/kafka/api/FetchRequest.scala
+++ b/core/src/main/scala/kafka/api/FetchRequest.scala
@@ -21,9 +21,10 @@ import java.nio.ByteBuffer
import kafka.utils.nonthreadsafe
import kafka.api.ApiUtils._
import scala.collection.immutable.Map
-import kafka.common.TopicAndPartition
+import kafka.common.{ErrorMapping, TopicAndPartition}
import kafka.consumer.ConsumerConfig
import java.util.concurrent.atomic.AtomicInteger
+import kafka.network.{RequestChannel}
case class PartitionFetchInfo(offset: Long, fetchSize: Int)
@@ -137,6 +138,28 @@ case class FetchRequest private[kafka] (versionId: Short = FetchRequest.CurrentV
def isFromLowLevelConsumer = replicaId == Request.DebuggingConsumerId
def numPartitions = requestInfo.size
+
+ override def toString(): String = {
+ val fetchRequest = new StringBuilder
+ fetchRequest.append("Name: " + this.getClass.getSimpleName)
+ fetchRequest.append("; Version: " + versionId)
+ fetchRequest.append("; CorrelationId: " + correlationId)
+ fetchRequest.append("; ClientId: " + clientId)
+ fetchRequest.append("; ReplicaId: " + replicaId)
+ fetchRequest.append("; MaxWait: " + maxWait + " ms")
+ fetchRequest.append("; MinBytes: " + minBytes + " bytes")
+ fetchRequest.append("; RequestInfo: " + requestInfo.mkString(","))
+ fetchRequest.toString()
+ }
+
+ override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
+ val fetchResponsePartitionData = requestInfo.map {
+ case (topicAndPartition, data) =>
+ (topicAndPartition, FetchResponsePartitionData(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), -1, null))
+ }
+ val errorResponse = FetchResponse(correlationId, fetchResponsePartitionData)
+ requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(errorResponse)))
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/8d41620a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
index 6955433..616f679 100644
--- a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
+++ b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
@@ -23,6 +23,9 @@ import kafka.utils._
import kafka.api.ApiUtils._
import kafka.cluster.Broker
import kafka.controller.LeaderIsrAndControllerEpoch
+import kafka.network.{BoundedByteBufferSend, RequestChannel}
+import kafka.common.ErrorMapping
+import kafka.network.RequestChannel.Response
object LeaderAndIsr {
@@ -157,4 +160,25 @@ case class LeaderAndIsrRequest (versionId: Short,
size += broker.sizeInBytes /* broker info */
size
}
+
+ override def toString(): String = {
+ val leaderAndIsrRequest = new StringBuilder
+ leaderAndIsrRequest.append("Name: " + this.getClass.getSimpleName)
+ leaderAndIsrRequest.append("; Version: " + versionId)
+ leaderAndIsrRequest.append("; CorrelationId: " + correlationId)
+ leaderAndIsrRequest.append("; ClientId: " + clientId)
+ leaderAndIsrRequest.append("; AckTimeoutMs: " + ackTimeoutMs + " ms")
+ leaderAndIsrRequest.append("; ControllerEpoch: " + controllerEpoch)
+ leaderAndIsrRequest.append("; PartitionStateInfo: " + partitionStateInfos.mkString(","))
+ leaderAndIsrRequest.append("; Leaders: " + leaders.mkString(","))
+ leaderAndIsrRequest.toString()
+ }
+
+ override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
+ val responseMap = partitionStateInfos.map {
+ case (topicAndPartition, partitionAndState) => (topicAndPartition, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
+ }
+ val errorResponse = LeaderAndIsrResponse(correlationId, responseMap)
+ requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/8d41620a/core/src/main/scala/kafka/api/OffsetRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetRequest.scala b/core/src/main/scala/kafka/api/OffsetRequest.scala
index 6c522bc..6360a98 100644
--- a/core/src/main/scala/kafka/api/OffsetRequest.scala
+++ b/core/src/main/scala/kafka/api/OffsetRequest.scala
@@ -18,8 +18,10 @@
package kafka.api
import java.nio.ByteBuffer
-import kafka.common.TopicAndPartition
+import kafka.common.{ErrorMapping, TopicAndPartition}
import kafka.api.ApiUtils._
+import kafka.network.{BoundedByteBufferSend, RequestChannel}
+import kafka.network.RequestChannel.Response
object OffsetRequest {
@@ -104,4 +106,24 @@ case class OffsetRequest(requestInfo: Map[TopicAndPartition, PartitionOffsetRequ
def isFromOrdinaryClient = replicaId == Request.OrdinaryConsumerId
def isFromDebuggingClient = replicaId == Request.DebuggingConsumerId
+
+ override def toString(): String = {
+ val offsetRequest = new StringBuilder
+ offsetRequest.append("Name: " + this.getClass.getSimpleName)
+ offsetRequest.append("; Version: " + versionId)
+ offsetRequest.append("; CorrelationId: " + correlationId)
+ offsetRequest.append("; ClientId: " + clientId)
+ offsetRequest.append("; RequestInfo: " + requestInfo.mkString(","))
+ offsetRequest.append("; ReplicaId: " + replicaId)
+ offsetRequest.toString()
+ }
+
+ override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
+ val partitionOffsetResponseMap = requestInfo.map {
+ case (topicAndPartition, partitionOffsetRequest) =>
+ (topicAndPartition, PartitionOffsetsResponse(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), null))
+ }
+ val errorResponse = OffsetResponse(correlationId, partitionOffsetResponseMap)
+ requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/8d41620a/core/src/main/scala/kafka/api/ProducerRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ProducerRequest.scala b/core/src/main/scala/kafka/api/ProducerRequest.scala
index ffa96a6..72b2cba 100644
--- a/core/src/main/scala/kafka/api/ProducerRequest.scala
+++ b/core/src/main/scala/kafka/api/ProducerRequest.scala
@@ -20,8 +20,10 @@ package kafka.api
import java.nio._
import kafka.message._
import scala.collection.Map
-import kafka.common.TopicAndPartition
import kafka.api.ApiUtils._
+import kafka.common._
+import kafka.network.RequestChannel.Response
+import kafka.network.{RequestChannel, BoundedByteBufferSend}
object ProducerRequest {
val CurrentVersion = 0.shortValue
@@ -120,5 +122,25 @@ case class ProducerRequest(versionId: Short = ProducerRequest.CurrentVersion,
def numPartitions = data.size
+ override def toString(): String = {
+ val producerRequest = new StringBuilder
+ producerRequest.append("Name: " + this.getClass.getSimpleName)
+ producerRequest.append("; Version: " + versionId)
+ producerRequest.append("; CorrelationId: " + correlationId)
+ producerRequest.append("; ClientId: " + clientId)
+ producerRequest.append("; RequiredAcks: " + requiredAcks)
+ producerRequest.append("; AckTimeoutMs: " + ackTimeoutMs + " ms")
+ producerRequest.append("; TopicAndPartition: " + data.map(r => r._1 -> r._2.sizeInBytes).toMap.mkString(","))
+ producerRequest.toString()
+ }
+
+ override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
+ val producerResponseStatus = data.map {
+ case (topicAndPartition, data) =>
+ (topicAndPartition, ProducerResponseStatus(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), -1l))
+ }
+ val errorResponse = ProducerResponse(correlationId, producerResponseStatus)
+ requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/8d41620a/core/src/main/scala/kafka/api/RequestOrResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/RequestOrResponse.scala b/core/src/main/scala/kafka/api/RequestOrResponse.scala
index 83ad42c..3175e1c 100644
--- a/core/src/main/scala/kafka/api/RequestOrResponse.scala
+++ b/core/src/main/scala/kafka/api/RequestOrResponse.scala
@@ -18,6 +18,8 @@ package kafka.api
*/
import java.nio._
+import kafka.network.RequestChannel
+import kafka.utils.Logging
object Request {
val OrdinaryConsumerId: Int = -1
@@ -25,10 +27,12 @@ object Request {
}
-private[kafka] abstract class RequestOrResponse(val requestId: Option[Short] = None) {
+private[kafka] abstract class RequestOrResponse(val requestId: Option[Short] = None) extends Logging{
def sizeInBytes: Int
def writeTo(buffer: ByteBuffer): Unit
-
+
+ def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {}
}
+
http://git-wip-us.apache.org/repos/asf/kafka/blob/8d41620a/core/src/main/scala/kafka/api/StopReplicaRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/StopReplicaRequest.scala b/core/src/main/scala/kafka/api/StopReplicaRequest.scala
index 9fe849b..0580636 100644
--- a/core/src/main/scala/kafka/api/StopReplicaRequest.scala
+++ b/core/src/main/scala/kafka/api/StopReplicaRequest.scala
@@ -20,8 +20,10 @@ package kafka.api
import java.nio._
import kafka.api.ApiUtils._
-import kafka.utils.Logging
-import kafka.network.InvalidRequestException
+import kafka.network.{BoundedByteBufferSend, RequestChannel, InvalidRequestException}
+import kafka.common.ErrorMapping
+import kafka.network.RequestChannel.Response
+import kafka.utils.{Logging}
object StopReplicaRequest extends Logging {
@@ -93,4 +95,25 @@ case class StopReplicaRequest(versionId: Short,
}
size
}
+
+ override def toString(): String = {
+ val stopReplicaRequest = new StringBuilder
+ stopReplicaRequest.append("Name: " + this.getClass.getSimpleName)
+ stopReplicaRequest.append("; Version: " + versionId)
+ stopReplicaRequest.append("; CorrelationId: " + correlationId)
+ stopReplicaRequest.append("; ClientId: " + clientId)
+ stopReplicaRequest.append("; AckTimeoutMs: " + ackTimeoutMs + " ms")
+ stopReplicaRequest.append("; DeletePartitions: " + deletePartitions)
+ stopReplicaRequest.append("; ControllerEpoch: " + controllerEpoch)
+ stopReplicaRequest.append("; Partitions: " + partitions.mkString(","))
+ stopReplicaRequest.toString()
+ }
+
+ override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
+ val responseMap = partitions.map {
+ case topicAndPartition => (topicAndPartition, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
+ }.toMap
+ val errorResponse = StopReplicaResponse(correlationId, responseMap)
+ requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/8d41620a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
index fe1170f..824f8f1 100644
--- a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
+++ b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
@@ -20,7 +20,10 @@ package kafka.api
import java.nio.ByteBuffer
import kafka.api.ApiUtils._
import collection.mutable.ListBuffer
-import kafka.utils.Logging
+import kafka.network.{BoundedByteBufferSend, RequestChannel}
+import kafka.common.ErrorMapping
+import kafka.network.RequestChannel.Response
+import kafka.utils.{Logging}
object TopicMetadataRequest extends Logging {
val CurrentVersion = 0.shortValue
@@ -67,4 +70,22 @@ case class TopicMetadataRequest(val versionId: Short,
4 + /* number of topics */
topics.foldLeft(0)(_ + shortStringLength(_)) /* topics */
}
+
+ override def toString(): String = {
+ val topicMetadataRequest = new StringBuilder
+ topicMetadataRequest.append("Name: " + this.getClass.getSimpleName)
+ topicMetadataRequest.append("; Version: " + versionId)
+ topicMetadataRequest.append("; CorrelationId: " + correlationId)
+ topicMetadataRequest.append("; ClientId: " + clientId)
+ topicMetadataRequest.append("; Topics: " + topics.mkString(","))
+ topicMetadataRequest.toString()
+ }
+
+ override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
+ val topicMetadata = topics.map {
+ topic => TopicMetadata(topic, Nil, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
+ }
+ val errorResponse = TopicMetadataResponse(topicMetadata, correlationId)
+ requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/8d41620a/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index ea5b5a0..71eb980 100644
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -337,8 +337,8 @@ class Partition(val topic: String,
partitionString.append("Topic: " + topic)
partitionString.append("; Partition: " + partitionId)
partitionString.append("; Leader: " + leaderReplicaIdOpt)
- partitionString.append("; Assigned replicas: " + assignedReplicaMap.keys.mkString(","))
- partitionString.append("; In Sync replicas: " + inSyncReplicas.map(_.brokerId).mkString(","))
+ partitionString.append("; AssignedReplicas: " + assignedReplicaMap.keys.mkString(","))
+ partitionString.append("; InSyncReplicas: " + inSyncReplicas.map(_.brokerId).mkString(","))
partitionString.toString()
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/8d41620a/core/src/main/scala/kafka/network/RequestChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index 848c877..5185dec 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -46,12 +46,7 @@ object RequestChannel extends Logging {
val requestId = buffer.getShort()
val requestObj: RequestOrResponse = RequestKeys.deserializerForKey(requestId)(buffer)
buffer.rewind()
- buffer.getShort
- val versionId = buffer.getShort
- val correlationId = buffer.getInt
- val clientId = ApiUtils.readShortString(buffer)
- buffer.rewind()
- trace("Received request v%d with correlation id %d from client %s: %s".format(versionId, correlationId, clientId, requestObj))
+ trace("Received request : %s".format(requestObj))
def updateRequestMetrics() {
val endTimeMs = SystemTime.milliseconds
@@ -80,8 +75,8 @@ object RequestChannel extends Logging {
m.responseSendTimeHist.update(responseSendTime)
m.totalTimeHist.update(totalTime)
}
- trace("Completed request v%d with correlation id %d and client %s: %s, totalTime:%d, queueTime:%d, localTime:%d, remoteTime:%d, sendTime:%d"
- .format(versionId, correlationId, clientId, requestObj, totalTime, queueTime, apiLocalTime, apiRemoteTime, responseSendTime))
+ trace("Completed request : %s, totalTime:%d, queueTime:%d, localTime:%d, remoteTime:%d, sendTime:%d"
+ .format(requestObj, totalTime, queueTime, apiLocalTime, apiRemoteTime, responseSendTime))
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/8d41620a/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 60752fb..0a1a11a 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -54,6 +54,8 @@ class KafkaApis(val requestChannel: RequestChannel,
*/
def handle(request: RequestChannel.Request) {
try{
+ if(requestLogger.isTraceEnabled)
+ requestLogger.trace("Handling request: %s".format(request.requestObj))
request.requestId match {
case RequestKeys.ProduceKey => handleProducerRequest(request)
case RequestKeys.FetchKey => handleFetchRequest(request)
@@ -65,68 +67,14 @@ class KafkaApis(val requestChannel: RequestChannel,
}
} catch {
case e: Throwable =>
- request.requestId match {
- case RequestKeys.ProduceKey =>
- val apiRequest = request.requestObj.asInstanceOf[ProducerRequest]
- val producerResponseStatus = apiRequest.data.map {
- case (topicAndPartition, data) =>
- (topicAndPartition, ProducerResponseStatus(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), -1L))
- }
- val errorResponse = ProducerResponse(apiRequest.correlationId, producerResponseStatus)
- requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
- error("error when handling request %s".format(apiRequest), e)
- case RequestKeys.FetchKey =>
- val apiRequest = request.requestObj.asInstanceOf[FetchRequest]
- val fetchResponsePartitionData = apiRequest.requestInfo.map {
- case (topicAndPartition, data) =>
- (topicAndPartition, FetchResponsePartitionData(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), -1, null))
- }
- val errorResponse = FetchResponse(apiRequest.correlationId, fetchResponsePartitionData)
- requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(errorResponse)))
- error("error when handling request %s".format(apiRequest), e)
- case RequestKeys.OffsetsKey =>
- val apiRequest = request.requestObj.asInstanceOf[OffsetRequest]
- val partitionOffsetResponseMap = apiRequest.requestInfo.map {
- case (topicAndPartition, partitionOffsetRequest) =>
- (topicAndPartition, PartitionOffsetsResponse(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), null))
- }
- val errorResponse = OffsetResponse(apiRequest.correlationId, partitionOffsetResponseMap)
- requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
- error("error when handling request %s".format(apiRequest), e)
- case RequestKeys.MetadataKey =>
- val apiRequest = request.requestObj.asInstanceOf[TopicMetadataRequest]
- val topicMeatadata = apiRequest.topics.map {
- topic => TopicMetadata(topic, Nil, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
- }
- val errorResponse = TopicMetadataResponse(topicMeatadata, apiRequest.correlationId)
- requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
- error("error when handling request %s".format(apiRequest), e)
- case RequestKeys.LeaderAndIsrKey =>
- val apiRequest = request.requestObj.asInstanceOf[LeaderAndIsrRequest]
- val responseMap = apiRequest.partitionStateInfos.map {
- case (topicAndPartition, partitionAndState) => (topicAndPartition, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
- }
- val errorResponse = LeaderAndIsrResponse(apiRequest.correlationId, responseMap)
- requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
- error("error when handling request %s".format(apiRequest), e)
- case RequestKeys.StopReplicaKey =>
- val apiRequest = request.requestObj.asInstanceOf[StopReplicaRequest]
- val responseMap = apiRequest.partitions.map {
- case topicAndPartition => (topicAndPartition, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
- }.toMap
- error("error when handling request %s".format(apiRequest), e)
- val errorResponse = StopReplicaResponse(apiRequest.correlationId, responseMap)
- requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
- }
+ request.requestObj.handleError(e, requestChannel, request)
+ error("error when handling request %s".format(request.requestObj), e)
} finally
request.apiLocalCompleteTimeMs = SystemTime.milliseconds
}
def handleLeaderAndIsrRequest(request: RequestChannel.Request) {
val leaderAndIsrRequest = request.requestObj.asInstanceOf[LeaderAndIsrRequest]
- if(requestLogger.isTraceEnabled)
- requestLogger.trace("Handling LeaderAndIsrRequest v%d with correlation id %d from client %s: %s"
- .format(leaderAndIsrRequest.versionId, leaderAndIsrRequest.correlationId, leaderAndIsrRequest.clientId, leaderAndIsrRequest.toString))
try {
val (response, error) = replicaManager.becomeLeaderOrFollower(leaderAndIsrRequest)
val leaderAndIsrResponse = new LeaderAndIsrResponse(leaderAndIsrRequest.correlationId, response, error)
@@ -141,14 +89,9 @@ class KafkaApis(val requestChannel: RequestChannel,
def handleStopReplicaRequest(request: RequestChannel.Request) {
val stopReplicaRequest = request.requestObj.asInstanceOf[StopReplicaRequest]
- if(requestLogger.isTraceEnabled)
- requestLogger.trace("Handling StopReplicaRequest v%d with correlation id %d from client %s: %s"
- .format(stopReplicaRequest.versionId, stopReplicaRequest.correlationId, stopReplicaRequest.clientId, stopReplicaRequest.toString))
-
val (response, error) = replicaManager.stopReplicas(stopReplicaRequest)
val stopReplicaResponse = new StopReplicaResponse(stopReplicaRequest.correlationId, response.toMap, error)
requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(stopReplicaResponse)))
-
replicaManager.replicaFetcherManager.shutdownIdleFetcherThreads()
}
@@ -174,10 +117,6 @@ class KafkaApis(val requestChannel: RequestChannel,
def handleProducerRequest(request: RequestChannel.Request) {
val produceRequest = request.requestObj.asInstanceOf[ProducerRequest]
val sTime = SystemTime.milliseconds
- if(requestLogger.isTraceEnabled)
- requestLogger.trace("Handling ProducerRequest v%d with correlation id %d from client %s: %s"
- .format(produceRequest.versionId, produceRequest.correlationId, produceRequest.clientId, produceRequest.toString))
-
val localProduceResults = appendToLocalLog(produceRequest)
debug("Produce to local log in %d ms".format(SystemTime.milliseconds - sTime))
@@ -272,10 +211,6 @@ class KafkaApis(val requestChannel: RequestChannel,
*/
def handleFetchRequest(request: RequestChannel.Request) {
val fetchRequest = request.requestObj.asInstanceOf[FetchRequest]
- if(requestLogger.isTraceEnabled)
- requestLogger.trace("Handling FetchRequest v%d with correlation id %d from client %s: %s"
- .format(fetchRequest.versionId, fetchRequest.correlationId, fetchRequest.clientId, fetchRequest.toString))
-
if(fetchRequest.isFromFollower) {
maybeUpdatePartitionHw(fetchRequest)
// after updating HW, some delayed produce requests may be unblocked
@@ -382,10 +317,6 @@ class KafkaApis(val requestChannel: RequestChannel,
*/
def handleOffsetRequest(request: RequestChannel.Request) {
val offsetRequest = request.requestObj.asInstanceOf[OffsetRequest]
- if(requestLogger.isTraceEnabled)
- requestLogger.trace("Handling OffsetRequest v%d with correlation id %d from client %s: %s"
- .format(offsetRequest.versionId, offsetRequest.correlationId, offsetRequest.clientId, offsetRequest.toString))
-
val responseMap = offsetRequest.requestInfo.map(elem => {
val (topicAndPartition, partitionOffsetRequestInfo) = elem
try {
@@ -422,10 +353,6 @@ class KafkaApis(val requestChannel: RequestChannel,
*/
def handleTopicMetadataRequest(request: RequestChannel.Request) {
val metadataRequest = request.requestObj.asInstanceOf[TopicMetadataRequest]
- if(requestLogger.isTraceEnabled)
- requestLogger.trace("Handling TopicMetadataRequest v%d with correlation id %d from client %s: %s"
- .format(metadataRequest.versionId, metadataRequest.correlationId, metadataRequest.clientId, metadataRequest.toString))
-
val topicsMetadata = new mutable.ArrayBuffer[TopicMetadata]()
val config = replicaManager.config
val uniqueTopics = {