You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2014/10/14 01:16:22 UTC
git commit: kafka-1702; Messages silently Lost by producer;
patched by Alexis Midon; reviewed by Jun Rao
Repository: kafka
Updated Branches:
refs/heads/trunk be2e8a769 -> d5041bc79
kafka-1702; Messages silently Lost by producer; patched by Alexis Midon; reviewed by Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d5041bc7
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d5041bc7
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d5041bc7
Branch: refs/heads/trunk
Commit: d5041bc79fd656fe361c7f6643f9f26c2e8f22fe
Parents: be2e8a7
Author: Alexis Midon <mi...@apache.org>
Authored: Mon Oct 13 16:16:14 2014 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Mon Oct 13 16:16:14 2014 -0700
----------------------------------------------------------------------
.../producer/async/DefaultEventHandler.scala | 102 ++++++++++---------
1 file changed, 53 insertions(+), 49 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/d5041bc7/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
index 33470ff..821901e 100644
--- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
+++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
@@ -95,27 +95,28 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
val partitionedDataOpt = partitionAndCollate(messages)
partitionedDataOpt match {
case Some(partitionedData) =>
- val failedProduceRequests = new ArrayBuffer[KeyedMessage[K,Message]]
- try {
- for ((brokerid, messagesPerBrokerMap) <- partitionedData) {
- if (logger.isTraceEnabled)
- messagesPerBrokerMap.foreach(partitionAndEvent =>
- trace("Handling event for Topic: %s, Broker: %d, Partitions: %s".format(partitionAndEvent._1, brokerid, partitionAndEvent._2)))
- val messageSetPerBroker = groupMessagesToSet(messagesPerBrokerMap)
-
- val failedTopicPartitions = send(brokerid, messageSetPerBroker)
- failedTopicPartitions.foreach(topicPartition => {
- messagesPerBrokerMap.get(topicPartition) match {
- case Some(data) => failedProduceRequests.appendAll(data)
- case None => // nothing
- }
- })
+ val failedProduceRequests = new ArrayBuffer[KeyedMessage[K, Message]]
+ for ((brokerid, messagesPerBrokerMap) <- partitionedData) {
+ if (logger.isTraceEnabled) {
+ messagesPerBrokerMap.foreach(partitionAndEvent =>
+ trace("Handling event for Topic: %s, Broker: %d, Partitions: %s".format(partitionAndEvent._1, brokerid, partitionAndEvent._2)))
+ }
+ val messageSetPerBrokerOpt = groupMessagesToSet(messagesPerBrokerMap)
+ messageSetPerBrokerOpt match {
+ case Some(messageSetPerBroker) =>
+ val failedTopicPartitions = send(brokerid, messageSetPerBroker)
+ failedTopicPartitions.foreach(topicPartition => {
+ messagesPerBrokerMap.get(topicPartition) match {
+ case Some(data) => failedProduceRequests.appendAll(data)
+ case None => // nothing
+ }
+ })
+ case None => // failed to group messages
+ messagesPerBrokerMap.values.foreach(m => failedProduceRequests.appendAll(m))
}
- } catch {
- case t: Throwable => error("Failed to send messages", t)
}
failedProduceRequests
- case None => // all produce requests failed
+ case None => // failed to collate messages
messages
}
}
@@ -290,43 +291,46 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
}
}
- private def groupMessagesToSet(messagesPerTopicAndPartition: collection.mutable.Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]]) = {
+ private def groupMessagesToSet(messagesPerTopicAndPartition: collection.mutable.Map[TopicAndPartition, Seq[KeyedMessage[K, Message]]]) = {
/** enforce the compressed.topics config here.
- * If the compression codec is anything other than NoCompressionCodec,
- * Enable compression only for specified topics if any
- * If the list of compressed topics is empty, then enable the specified compression codec for all topics
- * If the compression codec is NoCompressionCodec, compression is disabled for all topics
+ * If the compression codec is anything other than NoCompressionCodec,
+ * Enable compression only for specified topics if any
+ * If the list of compressed topics is empty, then enable the specified compression codec for all topics
+ * If the compression codec is NoCompressionCodec, compression is disabled for all topics
*/
-
- val messagesPerTopicPartition = messagesPerTopicAndPartition.map { case (topicAndPartition, messages) =>
- val rawMessages = messages.map(_.message)
- ( topicAndPartition,
- config.compressionCodec match {
- case NoCompressionCodec =>
- debug("Sending %d messages with no compression to %s".format(messages.size, topicAndPartition))
- new ByteBufferMessageSet(NoCompressionCodec, rawMessages: _*)
- case _ =>
- config.compressedTopics.size match {
- case 0 =>
- debug("Sending %d messages with compression codec %d to %s"
- .format(messages.size, config.compressionCodec.codec, topicAndPartition))
- new ByteBufferMessageSet(config.compressionCodec, rawMessages: _*)
- case _ =>
- if(config.compressedTopics.contains(topicAndPartition.topic)) {
+ try {
+ val messagesPerTopicPartition = messagesPerTopicAndPartition.map { case (topicAndPartition, messages) =>
+ val rawMessages = messages.map(_.message)
+ (topicAndPartition,
+ config.compressionCodec match {
+ case NoCompressionCodec =>
+ debug("Sending %d messages with no compression to %s".format(messages.size, topicAndPartition))
+ new ByteBufferMessageSet(NoCompressionCodec, rawMessages: _*)
+ case _ =>
+ config.compressedTopics.size match {
+ case 0 =>
debug("Sending %d messages with compression codec %d to %s"
.format(messages.size, config.compressionCodec.codec, topicAndPartition))
new ByteBufferMessageSet(config.compressionCodec, rawMessages: _*)
- }
- else {
- debug("Sending %d messages to %s with no compression as it is not in compressed.topics - %s"
- .format(messages.size, topicAndPartition, config.compressedTopics.toString))
- new ByteBufferMessageSet(NoCompressionCodec, rawMessages: _*)
- }
- }
- }
- )
+ case _ =>
+ if (config.compressedTopics.contains(topicAndPartition.topic)) {
+ debug("Sending %d messages with compression codec %d to %s"
+ .format(messages.size, config.compressionCodec.codec, topicAndPartition))
+ new ByteBufferMessageSet(config.compressionCodec, rawMessages: _*)
+ }
+ else {
+ debug("Sending %d messages to %s with no compression as it is not in compressed.topics - %s"
+ .format(messages.size, topicAndPartition, config.compressedTopics.toString))
+ new ByteBufferMessageSet(NoCompressionCodec, rawMessages: _*)
+ }
+ }
+ }
+ )
+ }
+ Some(messagesPerTopicPartition)
+ } catch {
+ case t: Throwable => error("Failed to group messages", t); None
}
- messagesPerTopicPartition
}
def close() {