You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2016/01/05 20:39:59 UTC
kafka git commit: KAFKA-3010; Include error in log when ack 0
Repository: kafka
Updated Branches:
refs/heads/trunk 23d607dc2 -> 5db147c1d
KAFKA-3010; Include error in log when ack 0
I verified this by trying to produce to __consumer_offsets and the logged message looks like:
[2015-12-22 10:34:40,897] INFO [KafkaApi-0] Closing connection due to error during produce request with correlation id 1 from client id console-producer with ack=0
Topic and partition to exceptions: [__consumer_offsets,43] -> kafka.common.InvalidTopicException (kafka.server.KafkaApis)
Author: Ismael Juma <is...@juma.me.uk>
Reviewers: Jun Rao <ju...@gmail.com>
Closes #709 from ijuma/kafka-3010-include-error-in-log-when-ack-0
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5db147c1
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5db147c1
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5db147c1
Branch: refs/heads/trunk
Commit: 5db147c1d453c9dabcc277bd95435f17201b9c1c
Parents: 23d607d
Author: Ismael Juma <is...@juma.me.uk>
Authored: Tue Jan 5 11:39:56 2016 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Tue Jan 5 11:39:56 2016 -0800
----------------------------------------------------------------------
.../src/main/scala/kafka/server/KafkaApis.scala | 31 +++++++++-----------
1 file changed, 14 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/5db147c1/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 8589400..cbf5031 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -218,10 +218,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val mergedCommitStatus = commitStatus ++ unauthorizedRequestInfo.mapValues(_ => ErrorMapping.TopicAuthorizationCode)
mergedCommitStatus.foreach { case (topicAndPartition, errorCode) =>
- // we only print warnings for known errors here; only replica manager could see an unknown
- // exception while trying to write the offset message to the local log, and it will log
- // an error message and write the error code in this case; hence it can be ignored here
- if (errorCode != ErrorMapping.NoError && errorCode != ErrorMapping.UnknownCode) {
+ if (errorCode != ErrorMapping.NoError) {
debug("Offset commit request with correlation id %d from client %s on partition %s failed due to %s"
.format(offsetCommitRequest.correlationId, offsetCommitRequest.clientId,
topicAndPartition, ErrorMapping.exceptionNameFor(errorCode)))
@@ -315,19 +312,19 @@ class KafkaApis(val requestChannel: RequestChannel,
// the callback for sending a produce response
def sendResponseCallback(responseStatus: Map[TopicAndPartition, ProducerResponseStatus]) {
- var errorInResponse = false
+
val mergedResponseStatus = responseStatus ++ unauthorizedRequestInfo.mapValues(_ => ProducerResponseStatus(ErrorMapping.TopicAuthorizationCode, -1))
+ var errorInResponse = false
+
mergedResponseStatus.foreach { case (topicAndPartition, status) =>
- // we only print warnings for known errors here; if it is unknown, it will cause
- // an error message in the replica manager
- if (status.error != ErrorMapping.NoError && status.error != ErrorMapping.UnknownCode) {
+ if (status.error != ErrorMapping.NoError) {
+ errorInResponse = true
debug("Produce request with correlation id %d from client %s on partition %s failed due to %s".format(
produceRequest.correlationId,
produceRequest.clientId,
topicAndPartition,
ErrorMapping.exceptionNameFor(status.error)))
- errorInResponse = true
}
}
@@ -338,10 +335,14 @@ class KafkaApis(val requestChannel: RequestChannel,
// the request, since no response is expected by the producer, the server will close socket server so that
// the producer client will know that some error has happened and will refresh its metadata
if (errorInResponse) {
+ val exceptionsSummary = mergedResponseStatus.map { case (topicAndPartition, status) =>
+ topicAndPartition -> ErrorMapping.exceptionNameFor(status.error)
+ }.mkString(", ")
info(
- "Close connection due to error handling produce request with correlation id %d from client id %s with ack=0".format(
- produceRequest.correlationId,
- produceRequest.clientId))
+ s"Closing connection due to error during produce request with correlation id ${produceRequest.correlationId} " +
+ s"from client id ${produceRequest.clientId} with ack=0\n" +
+ s"Topic and partition to exceptions: $exceptionsSummary"
+ )
requestChannel.closeConnection(request.processor, request)
} else {
requestChannel.noOperation(request.processor, request)
@@ -368,8 +369,6 @@ class KafkaApis(val requestChannel: RequestChannel,
if (authorizedRequestInfo.isEmpty)
sendResponseCallback(Map.empty)
else {
- // only allow appending to internal topic partitions
- // if the client is not from admin
val internalTopicsAllowed = produceRequest.clientId == AdminUtils.AdminClientId
// call the replica manager to append messages to the replicas
@@ -404,9 +403,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val mergedResponseStatus = responsePartitionData ++ unauthorizedResponseStatus
mergedResponseStatus.foreach { case (topicAndPartition, data) =>
- // we only print warnings for known errors here; if it is unknown, it will cause
- // an error message in the replica manager already and hence can be ignored here
- if (data.error != ErrorMapping.NoError && data.error != ErrorMapping.UnknownCode) {
+ if (data.error != ErrorMapping.NoError) {
debug("Fetch request with correlation id %d from client %s on partition %s failed due to %s"
.format(fetchRequest.correlationId, fetchRequest.clientId,
topicAndPartition, ErrorMapping.exceptionNameFor(data.error)))