You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Alexis Midon (JIRA)" <ji...@apache.org> on 2014/10/13 01:45:33 UTC

[jira] [Updated] (KAFKA-1702) Messages silently Lost by producer

     [ https://issues.apache.org/jira/browse/KAFKA-1702?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Alexis Midon updated KAFKA-1702:
--------------------------------
    Status: Patch Available  (was: Open)

diff --git core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
index d8ac915..0f7f941 100644
--- core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
+++ core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
@@ -95,27 +95,28 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
     val partitionedDataOpt = partitionAndCollate(messages)
     partitionedDataOpt match {
       case Some(partitionedData) =>
-        val failedProduceRequests = new ArrayBuffer[KeyedMessage[K,Message]]
-        try {
-          for ((brokerid, messagesPerBrokerMap) <- partitionedData) {
-            if (logger.isTraceEnabled)
-              messagesPerBrokerMap.foreach(partitionAndEvent =>
-                trace("Handling event for Topic: %s, Broker: %d, Partitions: %s".format(partitionAndEvent._1, brokerid, partitionAndEvent._2)))
-            val messageSetPerBroker = groupMessagesToSet(messagesPerBrokerMap)
-
-            val failedTopicPartitions = send(brokerid, messageSetPerBroker)
-            failedTopicPartitions.foreach(topicPartition => {
-              messagesPerBrokerMap.get(topicPartition) match {
-                case Some(data) => failedProduceRequests.appendAll(data)
-                case None => // nothing
-              }
-            })
+        val failedProduceRequests = new ArrayBuffer[KeyedMessage[K, Message]]
+        for ((brokerid, messagesPerBrokerMap) <- partitionedData) {
+          if (logger.isTraceEnabled) {
+            messagesPerBrokerMap.foreach(partitionAndEvent =>
+              trace("Handling event for Topic: %s, Broker: %d, Partitions: %s".format(partitionAndEvent._1, brokerid, partitionAndEvent._2)))
+          }
+          val messageSetPerBrokerOpt = groupMessagesToSet(messagesPerBrokerMap)
+          messageSetPerBrokerOpt match {
+            case Some(messageSetPerBroker) =>
+              val failedTopicPartitions = send(brokerid, messageSetPerBroker)
+              failedTopicPartitions.foreach(topicPartition => {
+                messagesPerBrokerMap.get(topicPartition) match {
+                  case Some(data) => failedProduceRequests.appendAll(data)
+                  case None => // nothing
+                }
+              })
+            case None => // failed to group messages
+              messagesPerBrokerMap.values.foreach(m => failedProduceRequests.appendAll(m))
           }
-        } catch {
-          case t: Throwable => error("Failed to send messages", t)
         }
         failedProduceRequests
-      case None => // all produce requests failed
+      case None => // failed to collate messages
         messages
     }
   }
@@ -290,43 +291,46 @@ class DefaultEventHandler[K,V](config: ProducerConfig,
     }
   }
 
-  private def groupMessagesToSet(messagesPerTopicAndPartition: collection.mutable.Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]]) = {
+  private def groupMessagesToSet(messagesPerTopicAndPartition: collection.mutable.Map[TopicAndPartition, Seq[KeyedMessage[K, Message]]]) = {
     /** enforce the compressed.topics config here.
-      *  If the compression codec is anything other than NoCompressionCodec,
-      *    Enable compression only for specified topics if any
-      *    If the list of compressed topics is empty, then enable the specified compression codec for all topics
-      *  If the compression codec is NoCompressionCodec, compression is disabled for all topics
+      * If the compression codec is anything other than NoCompressionCodec,
+      * Enable compression only for specified topics if any
+      * If the list of compressed topics is empty, then enable the specified compression codec for all topics
+      * If the compression codec is NoCompressionCodec, compression is disabled for all topics
       */
-
-    val messagesPerTopicPartition = messagesPerTopicAndPartition.map { case (topicAndPartition, messages) =>
-      val rawMessages = messages.map(_.message)
-      ( topicAndPartition,
-        config.compressionCodec match {
-          case NoCompressionCodec =>
-            debug("Sending %d messages with no compression to %s".format(messages.size, topicAndPartition))
-            new ByteBufferMessageSet(NoCompressionCodec, rawMessages: _*)
-          case _ =>
-            config.compressedTopics.size match {
-              case 0 =>
-                debug("Sending %d messages with compression codec %d to %s"
-                  .format(messages.size, config.compressionCodec.codec, topicAndPartition))
-                new ByteBufferMessageSet(config.compressionCodec, rawMessages: _*)
-              case _ =>
-                if(config.compressedTopics.contains(topicAndPartition.topic)) {
+    try {
+      val messagesPerTopicPartition = messagesPerTopicAndPartition.map { case (topicAndPartition, messages) =>
+        val rawMessages = messages.map(_.message)
+        (topicAndPartition,
+          config.compressionCodec match {
+            case NoCompressionCodec =>
+              debug("Sending %d messages with no compression to %s".format(messages.size, topicAndPartition))
+              new ByteBufferMessageSet(NoCompressionCodec, rawMessages: _*)
+            case _ =>
+              config.compressedTopics.size match {
+                case 0 =>
                   debug("Sending %d messages with compression codec %d to %s"
                     .format(messages.size, config.compressionCodec.codec, topicAndPartition))
                   new ByteBufferMessageSet(config.compressionCodec, rawMessages: _*)
-                }
-                else {
-                  debug("Sending %d messages to %s with no compression as it is not in compressed.topics - %s"
-                    .format(messages.size, topicAndPartition, config.compressedTopics.toString))
-                  new ByteBufferMessageSet(NoCompressionCodec, rawMessages: _*)
-                }
-            }
-        }
-        )
+                case _ =>
+                  if (config.compressedTopics.contains(topicAndPartition.topic)) {
+                    debug("Sending %d messages with compression codec %d to %s"
+                      .format(messages.size, config.compressionCodec.codec, topicAndPartition))
+                    new ByteBufferMessageSet(config.compressionCodec, rawMessages: _*)
+                  }
+                  else {
+                    debug("Sending %d messages to %s with no compression as it is not in compressed.topics - %s"
+                      .format(messages.size, topicAndPartition, config.compressedTopics.toString))
+                    new ByteBufferMessageSet(NoCompressionCodec, rawMessages: _*)
+                  }
+              }
+          }
+          )
+      }
+      Some(messagesPerTopicPartition)
+    } catch {
+      case t: Throwable => error("Failed to group messages", t); None
     }
-    messagesPerTopicPartition
   }
 
   def close() {


> Messages silently Lost by producer
> ----------------------------------
>
>                 Key: KAFKA-1702
>                 URL: https://issues.apache.org/jira/browse/KAFKA-1702
>             Project: Kafka
>          Issue Type: Bug
>          Components: producer 
>    Affects Versions: 0.8.1.1
>            Reporter: Alexis Midon
>            Assignee: Jun Rao
>         Attachments: KAFKA-1702.0.patch
>
>
> Hello,
> we lost millions of messages because of this {{try/catch}} in  the producer {{DefaultEventHandler}}:
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala#L114-L116
> If a Throwable is caught by this {{try/catch}}, the retry policy will have no effect and all yet-to-be-sent messages are lost (the error will break the loop over the broker list).
> This issue is very hard to detect because: the producer (async or sync) cannot even catch the error, and *all* the metrics are updated as if everything was fine.
> Only the abnormal drop in the producers network I/O, or the incoming message rate on the brokers; or the alerting on errors in producer logs could have revealed the issue. 
> This behavior was introduced by KAFKA-300. I can't see a good reason for it, so here is a patch that will let the retry-policy do its job when such a {{Throwable}} occurs.
> Thanks in advance for your help.
> Alexis
> ps: you might wonder how could this {{try/catch}} ever caught something? {{DefaultEventHandler#groupMessagesToSet}} looks so harmless. 
> Here are the details:
> We use Snappy compression. When the native snappy library is not installed on the host, Snappy, during the initialization of class {{org.xerial.snappy.Snappy}}  will [write a C library|https://github.com/xerial/snappy-java/blob/1.1.0/src/main/java/org/xerial/snappy/SnappyLoader.java#L312] in the JVM temp directory {{java.io.tmpdir}}.
> In our scenario, {{java.io.tmpdir}} was a subdirectory of {{/tmp}}. After an instance reboot (thank you [AWS|https://twitter.com/hashtag/AWSReboot?src=hash]!), the JVM temp directory was removed. The JVM was then running with a non-existing temp dir. Snappy class would be impossible to initialize and the following message would be silently logged:
> {code}
> ERROR [2014-10-07 22:23:56,530] kafka.producer.async.DefaultEventHandler: Failed to send messages
> ! java.lang.NoClassDefFoundError: Could not initialize class org.xerial.snappy.Snappy
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)