You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Varsha Abhinandan <va...@gmail.com> on 2020/07/21 15:22:15 UTC

Out of range offset errors leading to offset reset

Hi,

We have been noticing offset resets happening on the kafka consumer because
of offset out of range error. Following are the INFO logs found on the
consumer side :

[2020-07-17T08:46:00,322Z]  [INFO ]  [pipeline-thread-12
([prd453-19-event-upsert]-bo-pipeline-12)]
[o.a.k.c.consumer.internals.Fetcher]  [Consumer
clientId=bo-indexer-group-prd453-19-on-c19-bo-indexer-upsert-blue-5d665bcbb7-dnvkh-pid-1-kafka-message-source-id-544,
groupId=bo-indexer-group-prd453-19] Fetch offset 476383711 is out of
range for partition prd453-19-event-upsert-32, resetting offset

[2020-07-17T08:46:00,330Z]  [INFO ]  [pipeline-thread-12
([prd453-19-event-upsert]-bo-pipeline-12)]
[o.a.k.c.consumer.internals.Fetcher]  [Consumer
clientId=bo-indexer-group-prd453-19-on-c19-bo-indexer-upsert-blue-5d665bcbb7-dnvkh-pid-1-kafka-message-source-id-544,
groupId=bo-indexer-group-prd453-19] Resetting offset for partition
prd453-19-event-upsert-32 to offset 453223789.

However, I don't see any errors in the broker logs(just info logs were
enabled in the production environment). No logs related to leader-election,
replica lag, kafka broker pod restarts or anything. Following are the logs
on 2 brokers with respect to the partition for which the offset got reset.

Kafka-6




















*[2020-07-17T07:40:12,092Z]  [INFO ]  [ReplicaFetcherThread-2-7]
 [kafka.log.Log]  [Log partition=prd453-19-event-upsert-32,
dir=/data/kafka] Incrementing log start offset to
453223789[2020-07-17T07:40:12,641Z]  [INFO ]  [kafka-scheduler-7]
 [kafka.log.Log]  [Log partition=prd453-19-event-upsert-32,
dir=/data/kafka] Found deletable segments with base offsets [452091893] due
to retention time 86400000ms breach[2020-07-17T07:40:12,641Z]  [INFO ]
 [kafka-scheduler-7]  [kafka.log.Log]  [Log
partition=prd453-19-event-upsert-32, dir=/data/kafka] Scheduling log
segment [baseOffset 452091893, size 1073741693] for
deletion.[2020-07-17T07:41:12,642Z]  [INFO ]  [kafka-scheduler-0]
 [kafka.log.Log]  [Log partition=prd453-19-event-upsert-32,
dir=/data/kafka] Deleting segment 452091893[2020-07-17T07:41:12,712Z]
 [INFO ]  [kafka-scheduler-0]  [kafka.log.LogSegment]  Deleted log
/data/kafka/prd453-19-event-upsert-32/00000000000452091893.log.deleted.[2020-07-17T07:41:12,713Z]
 [INFO ]  [kafka-scheduler-0]  [kafka.log.LogSegment]  Deleted offset index
/data/kafka/prd453-19-event-upsert-32/00000000000452091893.index.deleted.[2020-07-17T07:41:12,713Z]
 [INFO ]  [kafka-scheduler-0]  [kafka.log.LogSegment]  Deleted time index
/data/kafka/prd453-19-event-upsert-32/00000000000452091893.timeindex.deleted.[2020-07-17T07:52:31,839Z]
 [INFO ]  [ReplicaFetcherThread-2-7]  [kafka.log.ProducerStateManager]
 [ProducerStateManager partition=prd453-19-event-upsert-32] Writing
producer snapshot at offset 475609786[2020-07-17T07:52:31,840Z]  [INFO ]
 [ReplicaFetcherThread-2-7]  [kafka.log.Log]  [Log
partition=prd453-19-event-upsert-32, dir=/data/kafka] Rolled new log
segment at offset 475609786 in 2 ms.[2020-07-17T09:05:12,085Z]  [INFO ]
 [ReplicaFetcherThread-2-7]  [kafka.log.Log]  [Log
partition=prd453-19-event-upsert-32, dir=/data/kafka] Incrementing log
start offset to 454388428[2020-07-17T09:05:12,640Z]  [INFO ]
 [kafka-scheduler-7]  [kafka.log.Log]  [Log
partition=prd453-19-event-upsert-32, dir=/data/kafka] Found deletable
segments with base offsets [453223789] due to retention time 86400000ms
breach[2020-07-17T09:05:12,640Z]  [INFO ]  [kafka-scheduler-7]
 [kafka.log.Log]  [Log partition=prd453-19-event-upsert-32,
dir=/data/kafka] Scheduling log segment [baseOffset 453223789, size
1073741355] for deletion.[2020-07-17T09:06:12,641Z]  [INFO ]
 [kafka-scheduler-3]  [kafka.log.Log]  [Log
partition=prd453-19-event-upsert-32, dir=/data/kafka] Deleting segment
453223789[2020-07-17T09:06:12,697Z]  [INFO ]  [kafka-scheduler-3]
 [kafka.log.LogSegment]  Deleted log
/data/kafka/prd453-19-event-upsert-32/00000000000453223789.log.deleted.[2020-07-17T09:06:12,697Z]
 [INFO ]  [kafka-scheduler-3]  [kafka.log.LogSegment]  Deleted offset index
/data/kafka/prd453-19-event-upsert-32/00000000000453223789.index.deleted.[2020-07-17T09:06:12,697Z]
 [INFO ]  [kafka-scheduler-3]  [kafka.log.LogSegment]  Deleted time index
/data/kafka/prd453-19-event-upsert-32/00000000000453223789.timeindex.deleted.[2020-07-17T09:06:56,684Z]
 [INFO ]  [ReplicaFetcherThread-2-7]  [kafka.log.ProducerStateManager]
 [ProducerStateManager partition=prd453-19-event-upsert-32] Writing
producer snapshot at offset 476681303[2020-07-17T09:06:56,685Z]  [INFO ]
 [ReplicaFetcherThread-2-7]  [kafka.log.Log]  [Log
partition=prd453-19-event-upsert-32, dir=/data/kafka] Rolled new log
segment at offset 476681303 in 1 ms.*
Kafka-7




















*[2020-07-17T07:40:12,082Z]  [INFO ]  [kafka-scheduler-4]  [kafka.log.Log]
 [Log partition=prd453-19-event-upsert-32, dir=/data/kafka] Found deletable
segments with base offsets [452091893] due to retention time 86400000ms
breach[2020-07-17T07:40:12,082Z]  [INFO ]  [kafka-scheduler-4]
 [kafka.log.Log]  [Log partition=prd453-19-event-upsert-32,
dir=/data/kafka] Scheduling log segment [baseOffset 452091893, size
1073741693] for deletion.[2020-07-17T07:40:12,083Z]  [INFO ]
 [kafka-scheduler-4]  [kafka.log.Log]  [Log
partition=prd453-19-event-upsert-32, dir=/data/kafka] Incrementing log
start offset to 453223789[2020-07-17T07:41:12,083Z]  [INFO ]
 [kafka-scheduler-7]  [kafka.log.Log]  [Log
partition=prd453-19-event-upsert-32, dir=/data/kafka] Deleting segment
452091893[2020-07-17T07:41:12,114Z]  [INFO ]  [kafka-scheduler-7]
 [kafka.log.LogSegment]  Deleted log
/data/kafka/prd453-19-event-upsert-32/00000000000452091893.log.deleted.[2020-07-17T07:41:12,114Z]
 [INFO ]  [kafka-scheduler-7]  [kafka.log.LogSegment]  Deleted offset index
/data/kafka/prd453-19-event-upsert-32/00000000000452091893.index.deleted.[2020-07-17T07:41:12,114Z]
 [INFO ]  [kafka-scheduler-7]  [kafka.log.LogSegment]  Deleted time index
/data/kafka/prd453-19-event-upsert-32/00000000000452091893.timeindex.deleted.[2020-07-17T07:52:31,836Z]
 [INFO ]  [data-plane-kafka-request-handler-3]
 [kafka.log.ProducerStateManager]  [ProducerStateManager
partition=prd453-19-event-upsert-32] Writing producer snapshot at offset
475609786[2020-07-17T07:52:31,836Z]  [INFO ]
 [data-plane-kafka-request-handler-3]  [kafka.log.Log]  [Log
partition=prd453-19-event-upsert-32, dir=/data/kafka] Rolled new log
segment at offset 475609786 in 1 ms.[2020-07-17T09:05:12,075Z]  [INFO ]
 [kafka-scheduler-2]  [kafka.log.Log]  [Log
partition=prd453-19-event-upsert-32, dir=/data/kafka] Found deletable
segments with base offsets [453223789] due to retention time 86400000ms
breach[2020-07-17T09:05:12,075Z]  [INFO ]  [kafka-scheduler-2]
 [kafka.log.Log]  [Log partition=prd453-19-event-upsert-32,
dir=/data/kafka] Scheduling log segment [baseOffset 453223789, size
1073741355] for deletion.[2020-07-17T09:05:12,075Z]  [INFO ]
 [kafka-scheduler-2]  [kafka.log.Log]  [Log
partition=prd453-19-event-upsert-32, dir=/data/kafka] Incrementing log
start offset to 454388428[2020-07-17T09:06:12,075Z]  [INFO ]
 [kafka-scheduler-6]  [kafka.log.Log]  [Log
partition=prd453-19-event-upsert-32, dir=/data/kafka] Deleting segment
453223789[2020-07-17T09:06:12,108Z]  [INFO ]  [kafka-scheduler-6]
 [kafka.log.LogSegment]  Deleted log
/data/kafka/prd453-19-event-upsert-32/00000000000453223789.log.deleted.[2020-07-17T09:06:12,108Z]
 [INFO ]  [kafka-scheduler-6]  [kafka.log.LogSegment]  Deleted offset index
/data/kafka/prd453-19-event-upsert-32/00000000000453223789.index.deleted.[2020-07-17T09:06:12,108Z]
 [INFO ]  [kafka-scheduler-6]  [kafka.log.LogSegment]  Deleted time index
/data/kafka/prd453-19-event-upsert-32/00000000000453223789.timeindex.deleted.[2020-07-17T09:06:56,682Z]
 [INFO ]  [data-plane-kafka-request-handler-1]
 [kafka.log.ProducerStateManager]  [ProducerStateManager
partition=prd453-19-event-upsert-32] Writing producer snapshot at offset
476681303[2020-07-17T09:06:56,683Z]  [INFO ]
 [data-plane-kafka-request-handler-1]  [kafka.log.Log]  [Log
partition=prd453-19-event-upsert-32, dir=/data/kafka] Rolled new log
segment at offset 476681303 in 1 ms.*

As observed in the logs, the fetch offset 476383711 is not higher than the
log start offset. It appears that the out of range error was because of the
fetch offset being larger than the offset range on the broker.

Would like to understand the cause for these errors because none of the
offset reset options are desirable. Choosing "earliest" creates a sudden
huge lag (we have a retention of 24hours) and choosing "latest" leads to
data loss (the records produced between the out of range error and when
offset reset happens on the consumer).

Kafka version : 2.2.2
Number of brokers : 9
Log retention of the topic : 24 hours
Kafka configs :
        advertised.host.name = 10.xx.xx.xx
        advertised.listeners = null
        advertised.port = null
        alter.config.policy.class.name = null
        alter.log.dirs.replication.quota.window.num = 11
        alter.log.dirs.replication.quota.window.size.seconds = 1
        authorizer.class.name =
        auto.create.topics.enable = false
        auto.leader.rebalance.enable = true
        background.threads = 10
        broker.id = 6
        broker.id.generation.enable = true
        broker.rack = null
        client.quota.callback.class = null
        compression.type = producer
        connection.failed.authentication.delay.ms = 100
        connections.max.idle.ms = 600000
        connections.max.reauth.ms = 0
        control.plane.listener.name = null
        controlled.shutdown.enable = true
        controlled.shutdown.max.retries = 3
        controlled.shutdown.retry.backoff.ms = 5000
        controller.socket.timeout.ms = 30000
        create.topic.policy.class.name = null
        default.replication.factor = 3
        delegation.token.expiry.check.interval.ms = 3600000
        delegation.token.expiry.time.ms = 86400000
        delegation.token.master.key = null
        delegation.token.max.lifetime.ms = 604800000
        delete.records.purgatory.purge.interval.requests = 1
        delete.topic.enable = true
        fetch.purgatory.purge.interval.requests = 100
        group.initial.rebalance.delay.ms = 3000
        group.max.session.timeout.ms = 300000
        group.max.size = 2147483647
        group.min.session.timeout.ms = 6000
        host.name = 0.0.0.0
        inter.broker.listener.name = null
        inter.broker.protocol.version = 2.2-IV1
        kafka.metrics.polling.interval.secs = 10
        kafka.metrics.reporters = []
        leader.imbalance.check.interval.seconds = 300
        leader.imbalance.per.broker.percentage = 10
        listener.security.protocol.map =
PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
        listeners = null
        log.cleaner.backoff.ms = 15000
        log.cleaner.dedupe.buffer.size = 134217728
        log.cleaner.delete.retention.ms = 86400000
        log.cleaner.enable = true
        log.cleaner.io.buffer.load.factor = 0.9
        log.cleaner.io.buffer.size = 524288
        log.cleaner.io.max.bytes.per.second = 5.24288E7
        log.cleaner.min.cleanable.ratio = 0.5
        log.cleaner.min.compaction.lag.ms = 0
        log.cleaner.threads = 1
        log.cleanup.policy = [delete]
        log.dir = /tmp/kafka-logs
        log.dirs = /data/kafka
        log.flush.interval.messages = 20000
        log.flush.interval.ms = 10000
        log.flush.offset.checkpoint.interval.ms = 60000
        log.flush.scheduler.interval.ms = 2000
        log.flush.start.offset.checkpoint.interval.ms = 60000
        log.index.interval.bytes = 4096
        log.index.size.max.bytes = 10485760
        log.message.downconversion.enable = true
        log.message.format.version = 2.2-IV1
        log.message.timestamp.difference.max.ms = 9223372036854775807
        log.message.timestamp.type = CreateTime
        log.preallocate = false
        log.retention.bytes = -1
        log.retention.check.interval.ms = 300000
        log.retention.hours = 24
        log.retention.minutes = null
        log.retention.ms = null
        log.roll.hours = 168
        log.roll.jitter.hours = 0
        log.roll.jitter.ms = null
        log.roll.ms = null
        log.segment.bytes = 1073741824
        log.segment.delete.delay.ms = 60000
        max.connections.per.ip = 2147483647
        max.connections.per.ip.overrides =
        max.incremental.fetch.session.cache.slots = 1000
        message.max.bytes = 1000012
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        min.insync.replicas = 1
        num.io.threads = 8
        num.network.threads = 8
        num.partitions = 20
        num.recovery.threads.per.data.dir = 4
        num.replica.alter.log.dirs.threads = null
        num.replica.fetchers = 4
        offset.metadata.max.bytes = 4096
        offsets.commit.required.acks = -1
        offsets.commit.timeout.ms = 5000
        offsets.load.buffer.size = 5242880
        offsets.retention.check.interval.ms = 600000
        offsets.retention.minutes = 10080
        offsets.topic.compression.codec = 0
        offsets.topic.num.partitions = 50
        offsets.topic.replication.factor = 3
        offsets.topic.segment.bytes = 104857600
        password.encoder.cipher.algorithm = AES/CBC/PKCS5Padding
        password.encoder.iterations = 4096
        password.encoder.key.length = 128
        password.encoder.keyfactory.algorithm = null
        password.encoder.old.secret = null
        password.encoder.secret = null
        port = 6667
        principal.builder.class = null
        producer.purgatory.purge.interval.requests = 100
        queued.max.request.bytes = -1
        queued.max.requests = 16
        quota.consumer.default = 9223372036854775807
        quota.producer.default = 9223372036854775807
        quota.window.num = 11
        quota.window.size.seconds = 1
        replica.fetch.backoff.ms = 1000
        replica.fetch.max.bytes = 1048576
        replica.fetch.min.bytes = 1
        replica.fetch.response.max.bytes = 10485760
        replica.fetch.wait.max.ms = 500
        replica.high.watermark.checkpoint.interval.ms = 5000
        replica.lag.time.max.ms = 25000
        replica.socket.receive.buffer.bytes = 65536
        replica.socket.timeout.ms = 30000
        replication.quota.window.num = 11
        replication.quota.window.size.seconds = 1
        request.timeout.ms = 28000
        reserved.broker.max.id = 400000000
        sasl.client.callback.handler.class = null
        sasl.enabled.mechanisms = [GSSAPI]
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.principal.to.local.rules = [DEFAULT]
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.login.callback.handler.class = null
        sasl.login.class = null
        sasl.login.refresh.buffer.seconds = 300
        sasl.login.refresh.min.period.seconds = 60
        sasl.login.refresh.window.factor = 0.8
        sasl.login.refresh.window.jitter = 0.05
        sasl.mechanism.inter.broker.protocol = GSSAPI
        sasl.server.callback.handler.class = null
        security.inter.broker.protocol = PLAINTEXT
        socket.receive.buffer.bytes = 1048576
        socket.request.max.bytes = 104857600
        socket.send.buffer.bytes = 1048576
        ssl.cipher.suites = []
        ssl.client.auth = none
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.endpoint.identification.algorithm = https
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.principal.mapping.rules = [DEFAULT]
        ssl.protocol = TLS
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        transaction.abort.timed.out.transaction.cleanup.interval.ms = 60000
        transaction.max.timeout.ms = 900000
        transaction.remove.expired.transaction.cleanup.interval.ms = 3600000
        transaction.state.log.load.buffer.size = 5242880
        transaction.state.log.min.isr = 1
        transaction.state.log.num.partitions = 50
        transaction.state.log.replication.factor = 3
        transaction.state.log.segment.bytes = 104857600
        transactional.id.expiration.ms = 604800000
        unclean.leader.election.enable = false
        zookeeper.connect =
cxx-zookeeper-svc.ana-zookeeper.svc.cluster.local:2181/xxxx/analytics/kafka/c19-v1
        zookeeper.connection.timeout.ms = 10000
        zookeeper.max.in.flight.requests = 10
        zookeeper.session.timeout.ms = 15000
        zookeeper.set.acl = false
        zookeeper.sync.time.ms = 2000

Thanks,
Varsha