You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Varsha Abhinandan (Jira)" <ji...@apache.org> on 2020/07/27 11:57:00 UTC

[jira] [Created] (KAFKA-10313) Out of range offset errors leading to offset reset

Varsha Abhinandan created KAFKA-10313:
-----------------------------------------

             Summary: Out of range offset errors leading to offset reset
                 Key: KAFKA-10313
                 URL: https://issues.apache.org/jira/browse/KAFKA-10313
             Project: Kafka
          Issue Type: Bug
          Components: consumer
    Affects Versions: 2.2.2
            Reporter: Varsha Abhinandan


Hi,
 
We have been occasionally noticing offset resets happening on the Kafka consumer because of offset out of range error. However, I don't see any errors in the broker logs. No logs related to leader-election, replica lag, Kafka broker pod restarts or anything. (just info logs were enabled in the prod environment).
 
It appeared from the logs that the out of range error was because of the fetch offset being larger than the offset range on the broker. Noticed this happening multiple times on different consumers, stream apps in the prod environment. So, it doesn't seem like an application bug and more like a bug in the KafkaConsumer. Would like to understand the cause for such errors.
 
Also, none of the offset reset options are desirable. Choosing "earliest" creates a sudden huge lag (we have a retention of 24hours) and choosing "latest" leads to data loss (the records produced between the out of range error and when offset reset happens on the consumer). So, wondering if it is better for the Kafka client to separate out 'auto.offset.reset' config for just offset not found. For, out of range error maybe the Kafka client can automatically reset the offset to latest if the fetch offset is higher to prevent data loss. Also, automatically reset it to earliest if the fetch offset is lesser than the start offset. 
 
 
Following are the logs on the consumer side :
 
[2020-07-17T08:46:00,322Z] [INFO ] [pipeline-thread-12 ([prd453-19-event-upsert]-bo-pipeline-12)] [o.a.k.c.consumer.internals.Fetcher] [Consumer clientId=bo-indexer-group-prd453-19-on-c19-bo-indexer-upsert-blue-5d665bcbb7-dnvkh-pid-1-kafka-message-source-id-544, groupId=bo-indexer-group-prd453-19] Fetch offset 476383711 is out of range for partition prd453-19-event-upsert-32, resetting offset[2020-07-17T08:46:00,330Z] [INFO ] [pipeline-thread-12 ([prd453-19-event-upsert]-bo-pipeline-12)] [o.a.k.c.consumer.internals.Fetcher] [Consumer clientId=bo-indexer-group-prd453-19-on-c19-bo-indexer-upsert-blue-5d665bcbb7-dnvkh-pid-1-kafka-message-source-id-544, groupId=bo-indexer-group-prd453-19] Resetting offset for partition prd453-19-event-upsert-32 to offset 453223789.
 
Broker logs for the partition :
_[2020-07-17T07:40:12,082Z]  [INFO ]  [kafka-scheduler-4]  [kafka.log.Log]  [Log partition=prd453-19-event-upsert-32, dir=/data/kafka] Found deletable segments with base offsets [452091893] due to retention time 86400000ms breach_
_[2020-07-17T07:40:12,082Z]  [INFO ]  [kafka-scheduler-4]  [kafka.log.Log]  [Log partition=prd453-19-event-upsert-32, dir=/data/kafka] Scheduling log segment [baseOffset 452091893, size 1073741693] for deletion._
_[2020-07-17T07:40:12,083Z]  [INFO ]  [kafka-scheduler-4]  [kafka.log.Log]  [Log partition=prd453-19-event-upsert-32, dir=/data/kafka] Incrementing log start offset to 453223789_
_[2020-07-17T07:41:12,083Z]  [INFO ]  [kafka-scheduler-7]  [kafka.log.Log]  [Log partition=prd453-19-event-upsert-32, dir=/data/kafka] Deleting segment 452091893_
_[2020-07-17T07:41:12,114Z]  [INFO ]  [kafka-scheduler-7]  [kafka.log.LogSegment]  Deleted log /data/kafka/prd453-19-event-upsert-32/00000000000452091893.log.deleted._
_[2020-07-17T07:41:12,114Z]  [INFO ]  [kafka-scheduler-7]  [kafka.log.LogSegment]  Deleted offset index /data/kafka/prd453-19-event-upsert-32/00000000000452091893.index.deleted._
_[2020-07-17T07:41:12,114Z]  [INFO ]  [kafka-scheduler-7]  [kafka.log.LogSegment]  Deleted time index /data/kafka/prd453-19-event-upsert-32/00000000000452091893.timeindex.deleted._
_[2020-07-17T07:52:31,836Z]  [INFO ]  [data-plane-kafka-request-handler-3]  [kafka.log.ProducerStateManager]  [ProducerStateManager partition=prd453-19-event-upsert-32] Writing producer snapshot at offset 475609786_
_[2020-07-17T07:52:31,836Z]  [INFO ]  [data-plane-kafka-request-handler-3]  [kafka.log.Log]  [Log partition=prd453-19-event-upsert-32, dir=/data/kafka] Rolled new log segment at offset 475609786 in 1 ms._

_[2020-07-17T09:05:12,075Z]  [INFO ]  [kafka-scheduler-2]  [kafka.log.Log]  [Log partition=prd453-19-event-upsert-32, dir=/data/kafka] Found deletable segments with base offsets [453223789] due to retention time 86400000ms breach_
_[2020-07-17T09:05:12,075Z]  [INFO ]  [kafka-scheduler-2]  [kafka.log.Log]  [Log partition=prd453-19-event-upsert-32, dir=/data/kafka] Scheduling log segment [baseOffset 453223789, size 1073741355] for deletion._
_[2020-07-17T09:05:12,075Z]  [INFO ]  [kafka-scheduler-2]  [kafka.log.Log]  [Log partition=prd453-19-event-upsert-32, dir=/data/kafka] Incrementing log start offset to 454388428_
_[2020-07-17T09:06:12,075Z]  [INFO ]  [kafka-scheduler-6]  [kafka.log.Log]  [Log partition=prd453-19-event-upsert-32, dir=/data/kafka] Deleting segment 453223789_
_[2020-07-17T09:06:12,108Z]  [INFO ]  [kafka-scheduler-6]  [kafka.log.LogSegment]  Deleted log /data/kafka/prd453-19-event-upsert-32/00000000000453223789.log.deleted._
_[2020-07-17T09:06:12,108Z]  [INFO ]  [kafka-scheduler-6]  [kafka.log.LogSegment]  Deleted offset index /data/kafka/prd453-19-event-upsert-32/00000000000453223789.index.deleted._
_[2020-07-17T09:06:12,108Z]  [INFO ]  [kafka-scheduler-6]  [kafka.log.LogSegment]  Deleted time index /data/kafka/prd453-19-event-upsert-32/00000000000453223789.timeindex.deleted._
_[2020-07-17T09:06:56,682Z]  [INFO ]  [data-plane-kafka-request-handler-1]  [kafka.log.ProducerStateManager]  [ProducerStateManager partition=prd453-19-event-upsert-32] Writing producer snapshot at offset 476681303_
_[2020-07-17T09:06:56,683Z]  [INFO ]  [data-plane-kafka-request-handler-1]  [kafka.log.Log]  [Log partition=prd453-19-event-upsert-32, dir=/data/kafka] Rolled new log segment at offset 476681303 in 1 ms._ 


Kafka version : 2.2.2
Number of brokers : 9
Log retention of the topic : 24 hours
Kafka configs :
       
        unclean.leader.election.enable = false
        auto.leader.rebalance.enable = true
        background.threads = 10
        broker.id.generation.enable = true
        [connection.failed.authentication.delay.ms|http://connection.failed.authentication.delay.ms/] = 100
        [connections.max.idle.ms|http://connections.max.idle.ms/] = 600000
        default.replication.factor = 3
        delete.records.purgatory.purge.interval.requests = 1
        delete.topic.enable = true
        [group.initial.rebalance.delay.ms|http://group.initial.rebalance.delay.ms/] = 3000
        [group.max.session.timeout.ms|http://group.max.session.timeout.ms/] = 300000
        [group.min.session.timeout.ms|http://group.min.session.timeout.ms/] = 6000
        inter.broker.protocol.version = 2.2-IV1
        kafka.metrics.polling.interval.secs = 10
        leader.imbalance.check.interval.seconds = 300
        leader.imbalance.per.broker.percentage = 10
        listener.security.protocol.map = PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
        listeners = null
        [log.cleaner.backoff.ms|http://log.cleaner.backoff.ms/] = 15000
        log.cleaner.dedupe.buffer.size = 134217728
        [log.cleaner.delete.retention.ms|http://log.cleaner.delete.retention.ms/] = 86400000
        log.cleaner.enable = true
        log.cleaner.io.buffer.load.factor = 0.9
        log.cleaner.io.buffer.size = 524288
        log.cleaner.io.max.bytes.per.second = 5.24288E7
        log.cleaner.min.cleanable.ratio = 0.5
        [log.cleaner.min.compaction.lag.ms|http://log.cleaner.min.compaction.lag.ms/] = 0
        log.cleaner.threads = 1
        log.cleanup.policy = [delete]
        log.dir = /tmp/kafka-logs
        log.dirs = /data/kafka
        log.flush.interval.messages = 20000
        [log.flush.interval.ms|http://log.flush.interval.ms/] = 10000
        [log.flush.offset.checkpoint.interval.ms|http://log.flush.offset.checkpoint.interval.ms/] = 60000
        [log.flush.scheduler.interval.ms|http://log.flush.scheduler.interval.ms/] = 2000
        [log.flush.start.offset.checkpoint.interval.ms|http://log.flush.start.offset.checkpoint.interval.ms/] = 60000
        log.index.interval.bytes = 4096
        log.index.size.max.bytes = 10485760
        log.message.downconversion.enable = true
        log.message.format.version = 2.2-IV1
        [log.message.timestamp.difference.max.ms|http://log.message.timestamp.difference.max.ms/] = 9223372036854775807
        log.message.timestamp.type = CreateTime
        log.preallocate = false
        log.retention.bytes = -1
        [log.retention.check.interval.ms|http://log.retention.check.interval.ms/] = 300000
        log.retention.hours = 24
        log.roll.hours = 168
        log.segment.bytes = 1073741824
        [log.segment.delete.delay.ms|http://log.segment.delete.delay.ms/] = 60000
        max.incremental.fetch.session.cache.slots = 1000
        message.max.bytes = 1000012
        min.insync.replicas = 1
        num.io.threads = 8
        num.network.threads = 8
        num.partitions = 20
        num.recovery.threads.per.data.dir = 4
        num.replica.alter.log.dirs.threads = null
        num.replica.fetchers = 4
        offset.metadata.max.bytes = 4096
        offsets.commit.required.acks = -1
        [offsets.commit.timeout.ms|http://offsets.commit.timeout.ms/] = 5000
        offsets.load.buffer.size = 5242880
        [offsets.retention.check.interval.ms|http://offsets.retention.check.interval.ms/] = 600000
        offsets.retention.minutes = 10080
        offsets.topic.compression.codec = 0
        offsets.topic.num.partitions = 50
        offsets.topic.replication.factor = 3
        offsets.topic.segment.bytes = 104857600
        producer.purgatory.purge.interval.requests = 100
        queued.max.request.bytes = -1
        queued.max.requests = 16
        [replica.fetch.backoff.ms|http://replica.fetch.backoff.ms/] = 1000
        replica.fetch.max.bytes = 1048576
        replica.fetch.min.bytes = 1
        replica.fetch.response.max.bytes = 10485760
        [replica.fetch.wait.max.ms|http://replica.fetch.wait.max.ms/] = 500
        [replica.high.watermark.checkpoint.interval.ms|http://replica.high.watermark.checkpoint.interval.ms/] = 5000
        [replica.lag.time.max.ms|http://replica.lag.time.max.ms/] = 25000
        replica.socket.receive.buffer.bytes = 65536
        [replica.socket.timeout.ms|http://replica.socket.timeout.ms/] = 30000
        [request.timeout.ms|http://request.timeout.ms/] = 28000



--
This message was sent by Atlassian Jira
(v8.3.4#803005)