You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2013/02/07 19:03:24 UTC
git commit: Minor log4j fixes for DefaultEventHandler to avoid
printing ByteBufferMessageSets in error messages. This makes reading the
important part of the error message very difficult
Updated Branches:
refs/heads/0.8 814c9709c -> b89fc2be8
Minor log4j fixes for DefaultEventHandler to avoid printing ByteBufferMessageSets in error messages. This makes reading the important part of the error message very difficult
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b89fc2be
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b89fc2be
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b89fc2be
Branch: refs/heads/0.8
Commit: b89fc2be8a771a8a7b4a618805a6154ec0bda18b
Parents: 814c970
Author: Neha Narkhede <ne...@gmail.com>
Authored: Thu Feb 7 10:03:19 2013 -0800
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Thu Feb 7 10:03:19 2013 -0800
----------------------------------------------------------------------
.../kafka/producer/async/DefaultEventHandler.scala | 10 +++++-----
1 files changed, 5 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/b89fc2be/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
index 374cd6b..5569cc2 100644
--- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
+++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
@@ -83,7 +83,8 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
producerStats.failedSendRate.mark()
val correlationIdEnd = correlationId.get()
- error("Failed to send the following requests with correlation ids in [%d,%d]: %s".format(correlationIdStart, correlationIdEnd-1, outstandingProduceRequests))
+ error("Failed to send requests for topics %s with correlation ids in [%d,%d]".format(outstandingProduceRequests.map(_.topic).mkString(","),
+ correlationIdStart, correlationIdEnd-1))
throw new FailedToSendMessageException("Failed to send messages after " + config.messageSendMaxRetries + " tries.", null)
}
}
@@ -228,8 +229,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
*/
private def send(brokerId: Int, messagesPerTopic: Map[TopicAndPartition, ByteBufferMessageSet]) = {
if(brokerId < 0) {
- warn("Failed to send data %s since partitions %s don't have a leader".format(messagesPerTopic.map(_._2),
- messagesPerTopic.map(_._1.toString).mkString(",")))
+ warn("Failed to send data since partitions %s don't have a leader".format(messagesPerTopic.map(_._1).mkString(",")))
messagesPerTopic.keys.toSeq
} else if(messagesPerTopic.size > 0) {
val currentCorrelationId = correlationId.getAndIncrement
@@ -259,8 +259,8 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
failedTopicPartitions
} catch {
case t: Throwable =>
- warn("Failed to send producer request with correlation id %d to broker %d with data %s"
- .format(currentCorrelationId, brokerId, messagesPerTopic), t)
+ warn("Failed to send producer request with correlation id %d to broker %d with data for partitions %s"
+ .format(currentCorrelationId, brokerId, messagesPerTopic.map(_._1).mkString(",")), t)
messagesPerTopic.keys.toSeq
}
} else {