You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Mykhailo Baluta (Jira)" <ji...@apache.org> on 2020/09/18 14:30:00 UTC

[jira] [Updated] (KAFKA-10501) Log Cleaner never clean up some __consumer_offsets partitions

     [ https://issues.apache.org/jira/browse/KAFKA-10501?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Mykhailo Baluta updated KAFKA-10501:
------------------------------------
    Description: 
Some __comsumer_offsets partitions contain "broken" messages in the second log segment.

Example: 
{code:java}
offset: 745253728 position: 49793647 CreateTime: 1594539245536 isvalid: true keysize: 99 valuesize: 28 magic: 2 compresscodec: NONE producerId: 37146 producerEpoch: 0 sequence: 0 isTransactional: true headerKeys: []
offset: 745253729 position: 49793844 CreateTime: 1594539245548 isvalid: true keysize: 4 valuesize: 6 magic: 2 compresscodec: NONE producerId: 37146 producerEpoch: 0 sequence: -1 isTransactional: true headerKeys: [] endTxnMarker: COMMIT coordinatorEpoch: 59
offset: 745256523 position: 50070884 CreateTime: 1594540927673 isvalid: true keysize: 4 valuesize: 6 magic: 2 compresscodec: NONE producerId: 37146 producerEpoch: 1 sequence: -1 isTransactional: true headerKeys: [] endTxnMarker: ABORT coordinatorEpoch: 59
offset: 745256543 position: 50073185 CreateTime: 1594541667798 isvalid: true keysize: 99 valuesize: 28 magic: 2 compresscodec: NONE producerId: 37146 producerEpoch: 0 sequence: 0 isTransactional: true headerKeys: []
{code}
 Seems like the last 2 records are stored in the wrong order. As a result the last message is transactional and not any ABORT/COMMIT message after. It leads to a producer state with ongoing transactions and firstUncleanableDirtyOffset = 745256543. Thus, compaction always skips for such topic partitions.
h3. Possible solution

Related logs looks like: 
{code:java}
 WARN Producer's epoch at offset 1060744580 in __consumer_offsets-35 is 0, which is smaller than the last seen epoch 1 (kafka.log.ProducerAppendInfo){code}
 Related code: 
{code:java}
  private def checkProducerEpoch(producerEpoch: Short, offset: Long): Unit = {
    if (producerEpoch < updatedEntry.producerEpoch) {
      val message = s"Producer's epoch at offset $offset in $topicPartition is $producerEpoch, which is " +
        s"smaller than the last seen epoch ${updatedEntry.producerEpoch}"
      if (origin == AppendOrigin.Replication) {
        warn(message)
      } else {
        throw new ProducerFencedException(message)
      }
    }
  }
{code}
Perhaps exception also should be thrown in case of AppendOrigin.Replication to restrict commit messages into __consumer_offsets topic partitions by old producer epoch

  was:
Some __comsumer_offsets partitions contain "broken" messages in the second log segment.

Example:

 
{code:java}
offset: 745253728 position: 49793647 CreateTime: 1594539245536 isvalid: true keysize: 99 valuesize: 28 magic: 2 compresscodec: NONE producerId: 37146 producerEpoch: 0 sequence: 0 isTransactional: true headerKeys: []
offset: 745253729 position: 49793844 CreateTime: 1594539245548 isvalid: true keysize: 4 valuesize: 6 magic: 2 compresscodec: NONE producerId: 37146 producerEpoch: 0 sequence: -1 isTransactional: true headerKeys: [] endTxnMarker: COMMIT coordinatorEpoch: 59
offset: 745256523 position: 50070884 CreateTime: 1594540927673 isvalid: true keysize: 4 valuesize: 6 magic: 2 compresscodec: NONE producerId: 37146 producerEpoch: 1 sequence: -1 isTransactional: true headerKeys: [] endTxnMarker: ABORT coordinatorEpoch: 59
offset: 745256543 position: 50073185 CreateTime: 1594541667798 isvalid: true keysize: 99 valuesize: 28 magic: 2 compresscodec: NONE producerId: 37146 producerEpoch: 0 sequence: 0 isTransactional: true headerKeys: []
{code}
 

Seems like the last 2 records are stored in the wrong order. As a result the last message is transactional and not any ABORT/COMMIT message after. It leads to a producer state with ongoing transactions and firstUncleanableDirtyOffset = 745256543. Thus, compaction always skips for such topic partitions.
h3. Possible solution

Related logs looks like:

 
{code:java}
 WARN Producer's epoch at offset 1060744580 in __consumer_offsets-35 is 0, which is smaller than the last seen epoch 1 (kafka.log.ProducerAppendInfo){code}
 


Related code:

 
{code:java}
  private def checkProducerEpoch(producerEpoch: Short, offset: Long): Unit = {
    if (producerEpoch < updatedEntry.producerEpoch) {
      val message = s"Producer's epoch at offset $offset in $topicPartition is $producerEpoch, which is " +
        s"smaller than the last seen epoch ${updatedEntry.producerEpoch}"
      if (origin == AppendOrigin.Replication) {
        warn(message)
      } else {
        throw new ProducerFencedException(message)
      }
    }
  }
{code}
Perhaps exception also should be thrown in case of AppendOrigin.Replication to restrict commit messages into __consumer_offsets topic partitions by old producer epoch


> Log Cleaner never clean up some __consumer_offsets partitions
> -------------------------------------------------------------
>
>                 Key: KAFKA-10501
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10501
>             Project: Kafka
>          Issue Type: Bug
>          Components: log, log cleaner
>    Affects Versions: 2.5.0
>            Reporter: Mykhailo Baluta
>            Priority: Major
>
> Some __comsumer_offsets partitions contain "broken" messages in the second log segment.
> Example: 
> {code:java}
> offset: 745253728 position: 49793647 CreateTime: 1594539245536 isvalid: true keysize: 99 valuesize: 28 magic: 2 compresscodec: NONE producerId: 37146 producerEpoch: 0 sequence: 0 isTransactional: true headerKeys: []
> offset: 745253729 position: 49793844 CreateTime: 1594539245548 isvalid: true keysize: 4 valuesize: 6 magic: 2 compresscodec: NONE producerId: 37146 producerEpoch: 0 sequence: -1 isTransactional: true headerKeys: [] endTxnMarker: COMMIT coordinatorEpoch: 59
> offset: 745256523 position: 50070884 CreateTime: 1594540927673 isvalid: true keysize: 4 valuesize: 6 magic: 2 compresscodec: NONE producerId: 37146 producerEpoch: 1 sequence: -1 isTransactional: true headerKeys: [] endTxnMarker: ABORT coordinatorEpoch: 59
> offset: 745256543 position: 50073185 CreateTime: 1594541667798 isvalid: true keysize: 99 valuesize: 28 magic: 2 compresscodec: NONE producerId: 37146 producerEpoch: 0 sequence: 0 isTransactional: true headerKeys: []
> {code}
>  Seems like the last 2 records are stored in the wrong order. As a result the last message is transactional and not any ABORT/COMMIT message after. It leads to a producer state with ongoing transactions and firstUncleanableDirtyOffset = 745256543. Thus, compaction always skips for such topic partitions.
> h3. Possible solution
> Related logs looks like: 
> {code:java}
>  WARN Producer's epoch at offset 1060744580 in __consumer_offsets-35 is 0, which is smaller than the last seen epoch 1 (kafka.log.ProducerAppendInfo){code}
>  Related code: 
> {code:java}
>   private def checkProducerEpoch(producerEpoch: Short, offset: Long): Unit = {
>     if (producerEpoch < updatedEntry.producerEpoch) {
>       val message = s"Producer's epoch at offset $offset in $topicPartition is $producerEpoch, which is " +
>         s"smaller than the last seen epoch ${updatedEntry.producerEpoch}"
>       if (origin == AppendOrigin.Replication) {
>         warn(message)
>       } else {
>         throw new ProducerFencedException(message)
>       }
>     }
>   }
> {code}
> Perhaps exception also should be thrown in case of AppendOrigin.Replication to restrict commit messages into __consumer_offsets topic partitions by old producer epoch



--
This message was sent by Atlassian Jira
(v8.3.4#803005)