You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Xiaoyu Wang <xw...@rocketfuel.com> on 2014/12/18 22:22:50 UTC

Better handling of exception in kafka.producer.async.DefaultEventHandler

Hello,

I am looking at 0.8.1.1, the kafka.producer.async.DefaultEventHandler
file. Below is the dispatchSerializedData function. Looks like we are
catching exception outside the loop and purely logs an error message.
We then return failedProduceRequests.

In case one broker is having problem, messages that will be sent to
brokers after the problematic broker will NOT be included in the
failedTopicAndPartitions and will be ignored quietly. Is this correct?
Shall we change the code to catch exception for sending message to
each broker?

Thanks

private def dispatchSerializedData(messages:
Seq[KeyedMessage[K,Message]]): Seq[KeyedMessage[K, Message]] = {
  val partitionedDataOpt = partitionAndCollate(messages)
  partitionedDataOpt match {
    case Some(partitionedData) =>
      val failedProduceRequests = new ArrayBuffer[KeyedMessage[K,Message]]
      try {

*      for ((brokerid, messagesPerBrokerMap) <- partitionedData) { *
      if (logger.isTraceEnabled)
            messagesPerBrokerMap.foreach(partitionAndEvent =>
              trace("Handling event for Topic: %s, Broker: %d,
Partitions: %s".format(partitionAndEvent._1, brokerid,
partitionAndEvent._2)))
          val messageSetPerBroker = groupMessagesToSet(messagesPerBrokerMap)

          val failedTopicPartitions = send(brokerid, messageSetPerBroker)
          failedTopicPartitions.foreach(topicPartition => {
            messagesPerBrokerMap.get(topicPartition) match {
              case Some(data) => failedProduceRequests.appendAll(data)
              case None => // nothing
            }
          })
        }



*   } catch {        case t: Throwable => error("Failed to send
messages", t)      }  *    failedProduceRequests
    case None => // all produce requests failed
      messages
  }
}

Re: Better handling of exception in kafka.producer.async.DefaultEventHandler

Posted by Shangan Chen <ch...@gmail.com>.
handle() will throw the exception to the caller. In sync mode, client will
receive the exception, but in async mode, there's an independent thread
actually doing the sending. It's not easy to inform the caller except for
blocking the queue.
In practice, you can configure the retry times, and each time
sendPartitionPerTopicCache
will be cleared, and the messages might thus go to another available broker
if you do not specify the partition key(partition key and partition class
might decide the message might go to the same partition even it failed)

how handle() throw exception:

   if(outstandingProduceRequests.size > 0) {
      producerStats.failedSendRate.mark()
      val correlationIdEnd = correlationId.get()
      error("Failed to send requests for topics %s with correlation ids in
[%d,%d]"
        .format(outstandingProduceRequests.map(_.topic).toSet.mkString(","),
        correlationIdStart, correlationIdEnd-1))
      throw new FailedToSendMessageException("Failed to send messages after
" + config.messageSendMaxRetries + " tries.", null)
    }

On Fri, Dec 19, 2014 at 5:22 AM, Xiaoyu Wang <xw...@rocketfuel.com> wrote:

> Hello,
>
> I am looking at 0.8.1.1, the kafka.producer.async.DefaultEventHandler
> file. Below is the dispatchSerializedData function. Looks like we are
> catching exception outside the loop and purely logs an error message.
> We then return failedProduceRequests.
>
> In case one broker is having problem, messages that will be sent to
> brokers after the problematic broker will NOT be included in the
> failedTopicAndPartitions and will be ignored quietly. Is this correct?
> Shall we change the code to catch exception for sending message to
> each broker?
>
> Thanks
>
> private def dispatchSerializedData(messages:
> Seq[KeyedMessage[K,Message]]): Seq[KeyedMessage[K, Message]] = {
>   val partitionedDataOpt = partitionAndCollate(messages)
>   partitionedDataOpt match {
>     case Some(partitionedData) =>
>       val failedProduceRequests = new ArrayBuffer[KeyedMessage[K,Message]]
>       try {
>
> *      for ((brokerid, messagesPerBrokerMap) <- partitionedData) { *
>       if (logger.isTraceEnabled)
>             messagesPerBrokerMap.foreach(partitionAndEvent =>
>               trace("Handling event for Topic: %s, Broker: %d,
> Partitions: %s".format(partitionAndEvent._1, brokerid,
> partitionAndEvent._2)))
>           val messageSetPerBroker =
> groupMessagesToSet(messagesPerBrokerMap)
>
>           val failedTopicPartitions = send(brokerid, messageSetPerBroker)
>           failedTopicPartitions.foreach(topicPartition => {
>             messagesPerBrokerMap.get(topicPartition) match {
>               case Some(data) => failedProduceRequests.appendAll(data)
>               case None => // nothing
>             }
>           })
>         }
>
>
>
> *   } catch {        case t: Throwable => error("Failed to send
> messages", t)      }  *    failedProduceRequests
>     case None => // all produce requests failed
>       messages
>   }
> }
>



-- 
have a good day!
chenshang'an