You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Mykhailo Baluta (Jira)" <ji...@apache.org> on 2020/09/18 14:30:00 UTC
[jira] [Updated] (KAFKA-10501) Log Cleaner never clean up some
__consumer_offsets partitions
[ https://issues.apache.org/jira/browse/KAFKA-10501?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Mykhailo Baluta updated KAFKA-10501:
------------------------------------
Description:
Some __comsumer_offsets partitions contain "broken" messages in the second log segment.
Example:
{code:java}
offset: 745253728 position: 49793647 CreateTime: 1594539245536 isvalid: true keysize: 99 valuesize: 28 magic: 2 compresscodec: NONE producerId: 37146 producerEpoch: 0 sequence: 0 isTransactional: true headerKeys: []
offset: 745253729 position: 49793844 CreateTime: 1594539245548 isvalid: true keysize: 4 valuesize: 6 magic: 2 compresscodec: NONE producerId: 37146 producerEpoch: 0 sequence: -1 isTransactional: true headerKeys: [] endTxnMarker: COMMIT coordinatorEpoch: 59
offset: 745256523 position: 50070884 CreateTime: 1594540927673 isvalid: true keysize: 4 valuesize: 6 magic: 2 compresscodec: NONE producerId: 37146 producerEpoch: 1 sequence: -1 isTransactional: true headerKeys: [] endTxnMarker: ABORT coordinatorEpoch: 59
offset: 745256543 position: 50073185 CreateTime: 1594541667798 isvalid: true keysize: 99 valuesize: 28 magic: 2 compresscodec: NONE producerId: 37146 producerEpoch: 0 sequence: 0 isTransactional: true headerKeys: []
{code}
Seems like the last 2 records are stored in the wrong order. As a result the last message is transactional and not any ABORT/COMMIT message after. It leads to a producer state with ongoing transactions and firstUncleanableDirtyOffset = 745256543. Thus, compaction always skips for such topic partitions.
h3. Possible solution
Related logs looks like:
{code:java}
WARN Producer's epoch at offset 1060744580 in __consumer_offsets-35 is 0, which is smaller than the last seen epoch 1 (kafka.log.ProducerAppendInfo){code}
Related code:
{code:java}
private def checkProducerEpoch(producerEpoch: Short, offset: Long): Unit = {
if (producerEpoch < updatedEntry.producerEpoch) {
val message = s"Producer's epoch at offset $offset in $topicPartition is $producerEpoch, which is " +
s"smaller than the last seen epoch ${updatedEntry.producerEpoch}"
if (origin == AppendOrigin.Replication) {
warn(message)
} else {
throw new ProducerFencedException(message)
}
}
}
{code}
Perhaps exception also should be thrown in case of AppendOrigin.Replication to restrict commit messages into __consumer_offsets topic partitions by old producer epoch
was:
Some __comsumer_offsets partitions contain "broken" messages in the second log segment.
Example:
{code:java}
offset: 745253728 position: 49793647 CreateTime: 1594539245536 isvalid: true keysize: 99 valuesize: 28 magic: 2 compresscodec: NONE producerId: 37146 producerEpoch: 0 sequence: 0 isTransactional: true headerKeys: []
offset: 745253729 position: 49793844 CreateTime: 1594539245548 isvalid: true keysize: 4 valuesize: 6 magic: 2 compresscodec: NONE producerId: 37146 producerEpoch: 0 sequence: -1 isTransactional: true headerKeys: [] endTxnMarker: COMMIT coordinatorEpoch: 59
offset: 745256523 position: 50070884 CreateTime: 1594540927673 isvalid: true keysize: 4 valuesize: 6 magic: 2 compresscodec: NONE producerId: 37146 producerEpoch: 1 sequence: -1 isTransactional: true headerKeys: [] endTxnMarker: ABORT coordinatorEpoch: 59
offset: 745256543 position: 50073185 CreateTime: 1594541667798 isvalid: true keysize: 99 valuesize: 28 magic: 2 compresscodec: NONE producerId: 37146 producerEpoch: 0 sequence: 0 isTransactional: true headerKeys: []
{code}
Seems like the last 2 records are stored in the wrong order. As a result the last message is transactional and not any ABORT/COMMIT message after. It leads to a producer state with ongoing transactions and firstUncleanableDirtyOffset = 745256543. Thus, compaction always skips for such topic partitions.
h3. Possible solution
Related logs looks like:
{code:java}
WARN Producer's epoch at offset 1060744580 in __consumer_offsets-35 is 0, which is smaller than the last seen epoch 1 (kafka.log.ProducerAppendInfo){code}
Related code:
{code:java}
private def checkProducerEpoch(producerEpoch: Short, offset: Long): Unit = {
if (producerEpoch < updatedEntry.producerEpoch) {
val message = s"Producer's epoch at offset $offset in $topicPartition is $producerEpoch, which is " +
s"smaller than the last seen epoch ${updatedEntry.producerEpoch}"
if (origin == AppendOrigin.Replication) {
warn(message)
} else {
throw new ProducerFencedException(message)
}
}
}
{code}
Perhaps exception also should be thrown in case of AppendOrigin.Replication to restrict commit messages into __consumer_offsets topic partitions by old producer epoch
> Log Cleaner never clean up some __consumer_offsets partitions
> -------------------------------------------------------------
>
> Key: KAFKA-10501
> URL: https://issues.apache.org/jira/browse/KAFKA-10501
> Project: Kafka
> Issue Type: Bug
> Components: log, log cleaner
> Affects Versions: 2.5.0
> Reporter: Mykhailo Baluta
> Priority: Major
>
> Some __comsumer_offsets partitions contain "broken" messages in the second log segment.
> Example:
> {code:java}
> offset: 745253728 position: 49793647 CreateTime: 1594539245536 isvalid: true keysize: 99 valuesize: 28 magic: 2 compresscodec: NONE producerId: 37146 producerEpoch: 0 sequence: 0 isTransactional: true headerKeys: []
> offset: 745253729 position: 49793844 CreateTime: 1594539245548 isvalid: true keysize: 4 valuesize: 6 magic: 2 compresscodec: NONE producerId: 37146 producerEpoch: 0 sequence: -1 isTransactional: true headerKeys: [] endTxnMarker: COMMIT coordinatorEpoch: 59
> offset: 745256523 position: 50070884 CreateTime: 1594540927673 isvalid: true keysize: 4 valuesize: 6 magic: 2 compresscodec: NONE producerId: 37146 producerEpoch: 1 sequence: -1 isTransactional: true headerKeys: [] endTxnMarker: ABORT coordinatorEpoch: 59
> offset: 745256543 position: 50073185 CreateTime: 1594541667798 isvalid: true keysize: 99 valuesize: 28 magic: 2 compresscodec: NONE producerId: 37146 producerEpoch: 0 sequence: 0 isTransactional: true headerKeys: []
> {code}
> Seems like the last 2 records are stored in the wrong order. As a result the last message is transactional and not any ABORT/COMMIT message after. It leads to a producer state with ongoing transactions and firstUncleanableDirtyOffset = 745256543. Thus, compaction always skips for such topic partitions.
> h3. Possible solution
> Related logs looks like:
> {code:java}
> WARN Producer's epoch at offset 1060744580 in __consumer_offsets-35 is 0, which is smaller than the last seen epoch 1 (kafka.log.ProducerAppendInfo){code}
> Related code:
> {code:java}
> private def checkProducerEpoch(producerEpoch: Short, offset: Long): Unit = {
> if (producerEpoch < updatedEntry.producerEpoch) {
> val message = s"Producer's epoch at offset $offset in $topicPartition is $producerEpoch, which is " +
> s"smaller than the last seen epoch ${updatedEntry.producerEpoch}"
> if (origin == AppendOrigin.Replication) {
> warn(message)
> } else {
> throw new ProducerFencedException(message)
> }
> }
> }
> {code}
> Perhaps exception also should be thrown in case of AppendOrigin.Replication to restrict commit messages into __consumer_offsets topic partitions by old producer epoch
--
This message was sent by Atlassian Jira
(v8.3.4#803005)