You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Apurva Mehta (JIRA)" <ji...@apache.org> on 2017/07/19 21:02:00 UTC

[jira] [Updated] (KAFKA-5610) KafkaApis.handleWriteTxnMarkerRequest can return UNSUPPORTED_FOR_MESSAGE_FORMAT error on partition emigration

     [ https://issues.apache.org/jira/browse/KAFKA-5610?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Apurva Mehta updated KAFKA-5610:
--------------------------------
    Description: 
This bug was revealed by the following system test failure http://confluent-systest.s3-website-us-west-2.amazonaws.com/confluent-kafka-system-test-results/?prefix=2017-07-18--001.1500383975--apache--trunk--28c83d9/

What happened was that a commit marker to the offsets topic (sent as part of the {{producer.sendOffsetsToTransaction}} method) was lost, causing data to be reprocessed, and hence causing the test to fail. 

The bug is that the wrong error code is returned from the handleWriteTxnMarker request when there is partition emigration. In particular, we have: 

{code:java}
for (marker <- markers.asScala) {
      val producerId = marker.producerId
      val (goodPartitions, partitionsWithIncorrectMessageFormat) = marker.partitions.asScala.partition { partition =>
        replicaManager.getMagic(partition) match {
          case Some(magic) if magic >= RecordBatch.MAGIC_VALUE_V2 => true
          case _ => false
        }
      }

      if (partitionsWithIncorrectMessageFormat.nonEmpty) {
        val currentErrors = new ConcurrentHashMap[TopicPartition, Errors]()
        partitionsWithIncorrectMessageFormat.foreach { partition => currentErrors.put(partition, Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT) }
        updateErrors(producerId, currentErrors)
      }
{code}

But the {{replicaManager.getMagic()}} call will return {{None}} when the partition emigrates, causing the {{handleWriteTxnMarkersRequest}} to return an {{UNSUPPORTED_FOR_MESSAGE_FORMAT}} error. 

From the log, we see that the partition did emigrate a few milliseconds before the {{WriteTxnMarkerRequest}} was sent.

On the old leader, worker10:
{noformat}
./worker10/debug/server.log:32245:[2017-07-18 05:43:20,950] INFO [GroupCoordinator 2]: Unloading group metadata for transactions-test-consumer-group with generation 0 (kafka.coordinator.group.GroupCoordinator)
{noformat}

On the client: 
{noformat}
[2017-07-18 05:43:20,959] INFO [Transaction Marker Request Completion Handler 1]: Sending my-first-transactional-id's transaction marker from partition __consumer_offsets-47 has failed with  UNSUPPORTED_FOR_MESSAGE_FORMAT. This partition will be removed from the set of partitions waiting for completion (kafka.coordinator.transaction.TransactionMarkerRequestCompletionHandler)
{noformat}

As you can see, the client received the response 9 ms after the emigration was initiated on the server.

Since it is perfectly acceptable for the LeaderISR metadata to be propagated asynchronously, we should have more robust handling of emgiration in KafkaApis so that it returns the right error code when handling a request for a partition for which the broker is no longer the leader.

  was:
This bug was revealed by the following system test failure http://confluent-systest.s3-website-us-west-2.amazonaws.com/confluent-kafka-system-test-results/?prefix=2017-07-18--001.1500383975--apache--trunk--28c83d9/

What happened was that a commit marker to the offsets topic (sent as part of the {{producer.sendOffsetsToTransaction}} method) was lost, causing data to be reprocessed, and hence causing the test to fail. 

The bug is that the wrong error code is returned from the handleWriteTxnMarker request when there is partition emigration. In particular, we have: 

{code:scala}
for (marker <- markers.asScala) {
      val producerId = marker.producerId
      val (goodPartitions, partitionsWithIncorrectMessageFormat) = marker.partitions.asScala.partition { partition =>
        replicaManager.getMagic(partition) match {
          case Some(magic) if magic >= RecordBatch.MAGIC_VALUE_V2 => true
          case _ => false
        }
      }

      if (partitionsWithIncorrectMessageFormat.nonEmpty) {
        val currentErrors = new ConcurrentHashMap[TopicPartition, Errors]()
        partitionsWithIncorrectMessageFormat.foreach { partition => currentErrors.put(partition, Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT) }
        updateErrors(producerId, currentErrors)
      }
{code}

But the {{replicaManager.getMagic()}} call will return {{None}} when the partition emigrates, causing the {{handleWriteTxnMarkersRequest}} to return an {{UNSUPPORTED_FOR_MESSAGE_FORMAT}} error. 

From the log, we see that the partition did emigrate a few milliseconds before the {{WriteTxnMarkerRequest}} was sent.

On the old leader, worker10:
{noformat}
./worker10/debug/server.log:32245:[2017-07-18 05:43:20,950] INFO [GroupCoordinator 2]: Unloading group metadata for transactions-test-consumer-group with generation 0 (kafka.coordinator.group.GroupCoordinator)
{noformat}

On the client: 
{noformat}
[2017-07-18 05:43:20,959] INFO [Transaction Marker Request Completion Handler 1]: Sending my-first-transactional-id's transaction marker from partition __consumer_offsets-47 has failed with  UNSUPPORTED_FOR_MESSAGE_FORMAT. This partition will be removed from the set of partitions waiting for completion (kafka.coordinator.transaction.TransactionMarkerRequestCompletionHandler)
{noformat}

As you can see, the client received the response 9 ms after the emigration was initiated on the server.

Since it is perfectly acceptable for the LeaderISR metadata to be propagated asynchronously, we should have more robust handling of emgiration in KafkaApis so that it returns the right error code when handling a request for a partition for which the broker is no longer the leader.


> KafkaApis.handleWriteTxnMarkerRequest can return UNSUPPORTED_FOR_MESSAGE_FORMAT error on partition emigration
> -------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-5610
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5610
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.11.0.0
>            Reporter: Apurva Mehta
>            Assignee: Apurva Mehta
>            Priority: Critical
>             Fix For: 0.11.0.1
>
>
> This bug was revealed by the following system test failure http://confluent-systest.s3-website-us-west-2.amazonaws.com/confluent-kafka-system-test-results/?prefix=2017-07-18--001.1500383975--apache--trunk--28c83d9/
> What happened was that a commit marker to the offsets topic (sent as part of the {{producer.sendOffsetsToTransaction}} method) was lost, causing data to be reprocessed, and hence causing the test to fail. 
> The bug is that the wrong error code is returned from the handleWriteTxnMarker request when there is partition emigration. In particular, we have: 
> {code:java}
> for (marker <- markers.asScala) {
>       val producerId = marker.producerId
>       val (goodPartitions, partitionsWithIncorrectMessageFormat) = marker.partitions.asScala.partition { partition =>
>         replicaManager.getMagic(partition) match {
>           case Some(magic) if magic >= RecordBatch.MAGIC_VALUE_V2 => true
>           case _ => false
>         }
>       }
>       if (partitionsWithIncorrectMessageFormat.nonEmpty) {
>         val currentErrors = new ConcurrentHashMap[TopicPartition, Errors]()
>         partitionsWithIncorrectMessageFormat.foreach { partition => currentErrors.put(partition, Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT) }
>         updateErrors(producerId, currentErrors)
>       }
> {code}
> But the {{replicaManager.getMagic()}} call will return {{None}} when the partition emigrates, causing the {{handleWriteTxnMarkersRequest}} to return an {{UNSUPPORTED_FOR_MESSAGE_FORMAT}} error. 
> From the log, we see that the partition did emigrate a few milliseconds before the {{WriteTxnMarkerRequest}} was sent.
> On the old leader, worker10:
> {noformat}
> ./worker10/debug/server.log:32245:[2017-07-18 05:43:20,950] INFO [GroupCoordinator 2]: Unloading group metadata for transactions-test-consumer-group with generation 0 (kafka.coordinator.group.GroupCoordinator)
> {noformat}
> On the client: 
> {noformat}
> [2017-07-18 05:43:20,959] INFO [Transaction Marker Request Completion Handler 1]: Sending my-first-transactional-id's transaction marker from partition __consumer_offsets-47 has failed with  UNSUPPORTED_FOR_MESSAGE_FORMAT. This partition will be removed from the set of partitions waiting for completion (kafka.coordinator.transaction.TransactionMarkerRequestCompletionHandler)
> {noformat}
> As you can see, the client received the response 9 ms after the emigration was initiated on the server.
> Since it is perfectly acceptable for the LeaderISR metadata to be propagated asynchronously, we should have more robust handling of emgiration in KafkaApis so that it returns the right error code when handling a request for a partition for which the broker is no longer the leader.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)