You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2014/10/14 01:16:22 UTC

git commit: kafka-1702; Messages silently Lost by producer; patched by Alexis Midon; reviewed by Jun Rao

Repository: kafka
Updated Branches:
  refs/heads/trunk be2e8a769 -> d5041bc79


kafka-1702; Messages silently Lost by producer; patched by Alexis Midon; reviewed by Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d5041bc7
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d5041bc7
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d5041bc7

Branch: refs/heads/trunk
Commit: d5041bc79fd656fe361c7f6643f9f26c2e8f22fe
Parents: be2e8a7
Author: Alexis Midon <mi...@apache.org>
Authored: Mon Oct 13 16:16:14 2014 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Mon Oct 13 16:16:14 2014 -0700

----------------------------------------------------------------------
 .../producer/async/DefaultEventHandler.scala    | 102 ++++++++++---------
 1 file changed, 53 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/d5041bc7/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
index 33470ff..821901e 100644
--- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
+++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
@@ -95,27 +95,28 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
     val partitionedDataOpt = partitionAndCollate(messages)
     partitionedDataOpt match {
       case Some(partitionedData) =>
-        val failedProduceRequests = new ArrayBuffer[KeyedMessage[K,Message]]
-        try {
-          for ((brokerid, messagesPerBrokerMap) <- partitionedData) {
-            if (logger.isTraceEnabled)
-              messagesPerBrokerMap.foreach(partitionAndEvent =>
-                trace("Handling event for Topic: %s, Broker: %d, Partitions: %s".format(partitionAndEvent._1, brokerid, partitionAndEvent._2)))
-            val messageSetPerBroker = groupMessagesToSet(messagesPerBrokerMap)
-
-            val failedTopicPartitions = send(brokerid, messageSetPerBroker)
-            failedTopicPartitions.foreach(topicPartition => {
-              messagesPerBrokerMap.get(topicPartition) match {
-                case Some(data) => failedProduceRequests.appendAll(data)
-                case None => // nothing
-              }
-            })
+        val failedProduceRequests = new ArrayBuffer[KeyedMessage[K, Message]]
+        for ((brokerid, messagesPerBrokerMap) <- partitionedData) {
+          if (logger.isTraceEnabled) {
+            messagesPerBrokerMap.foreach(partitionAndEvent =>
+              trace("Handling event for Topic: %s, Broker: %d, Partitions: %s".format(partitionAndEvent._1, brokerid, partitionAndEvent._2)))
+          }
+          val messageSetPerBrokerOpt = groupMessagesToSet(messagesPerBrokerMap)
+          messageSetPerBrokerOpt match {
+            case Some(messageSetPerBroker) =>
+              val failedTopicPartitions = send(brokerid, messageSetPerBroker)
+              failedTopicPartitions.foreach(topicPartition => {
+                messagesPerBrokerMap.get(topicPartition) match {
+                  case Some(data) => failedProduceRequests.appendAll(data)
+                  case None => // nothing
+                }
+              })
+            case None => // failed to group messages
+              messagesPerBrokerMap.values.foreach(m => failedProduceRequests.appendAll(m))
           }
-        } catch {
-          case t: Throwable => error("Failed to send messages", t)
         }
         failedProduceRequests
-      case None => // all produce requests failed
+      case None => // failed to collate messages
         messages
     }
   }
@@ -290,43 +291,46 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
     }
   }
 
-  private def groupMessagesToSet(messagesPerTopicAndPartition: collection.mutable.Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]]) = {
+  private def groupMessagesToSet(messagesPerTopicAndPartition: collection.mutable.Map[TopicAndPartition, Seq[KeyedMessage[K, Message]]]) = {
     /** enforce the compressed.topics config here.
-      *  If the compression codec is anything other than NoCompressionCodec,
-      *    Enable compression only for specified topics if any
-      *    If the list of compressed topics is empty, then enable the specified compression codec for all topics
-      *  If the compression codec is NoCompressionCodec, compression is disabled for all topics
+      * If the compression codec is anything other than NoCompressionCodec,
+      * Enable compression only for specified topics if any
+      * If the list of compressed topics is empty, then enable the specified compression codec for all topics
+      * If the compression codec is NoCompressionCodec, compression is disabled for all topics
       */
-
-    val messagesPerTopicPartition = messagesPerTopicAndPartition.map { case (topicAndPartition, messages) =>
-      val rawMessages = messages.map(_.message)
-      ( topicAndPartition,
-        config.compressionCodec match {
-          case NoCompressionCodec =>
-            debug("Sending %d messages with no compression to %s".format(messages.size, topicAndPartition))
-            new ByteBufferMessageSet(NoCompressionCodec, rawMessages: _*)
-          case _ =>
-            config.compressedTopics.size match {
-              case 0 =>
-                debug("Sending %d messages with compression codec %d to %s"
-                  .format(messages.size, config.compressionCodec.codec, topicAndPartition))
-                new ByteBufferMessageSet(config.compressionCodec, rawMessages: _*)
-              case _ =>
-                if(config.compressedTopics.contains(topicAndPartition.topic)) {
+    try {
+      val messagesPerTopicPartition = messagesPerTopicAndPartition.map { case (topicAndPartition, messages) =>
+        val rawMessages = messages.map(_.message)
+        (topicAndPartition,
+          config.compressionCodec match {
+            case NoCompressionCodec =>
+              debug("Sending %d messages with no compression to %s".format(messages.size, topicAndPartition))
+              new ByteBufferMessageSet(NoCompressionCodec, rawMessages: _*)
+            case _ =>
+              config.compressedTopics.size match {
+                case 0 =>
                   debug("Sending %d messages with compression codec %d to %s"
                     .format(messages.size, config.compressionCodec.codec, topicAndPartition))
                   new ByteBufferMessageSet(config.compressionCodec, rawMessages: _*)
-                }
-                else {
-                  debug("Sending %d messages to %s with no compression as it is not in compressed.topics - %s"
-                    .format(messages.size, topicAndPartition, config.compressedTopics.toString))
-                  new ByteBufferMessageSet(NoCompressionCodec, rawMessages: _*)
-                }
-            }
-        }
-        )
+                case _ =>
+                  if (config.compressedTopics.contains(topicAndPartition.topic)) {
+                    debug("Sending %d messages with compression codec %d to %s"
+                      .format(messages.size, config.compressionCodec.codec, topicAndPartition))
+                    new ByteBufferMessageSet(config.compressionCodec, rawMessages: _*)
+                  }
+                  else {
+                    debug("Sending %d messages to %s with no compression as it is not in compressed.topics - %s"
+                      .format(messages.size, topicAndPartition, config.compressedTopics.toString))
+                    new ByteBufferMessageSet(NoCompressionCodec, rawMessages: _*)
+                  }
+              }
+          }
+          )
+      }
+      Some(messagesPerTopicPartition)
+    } catch {
+      case t: Throwable => error("Failed to group messages", t); None
     }
-    messagesPerTopicPartition
   }
 
   def close() {