You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Varsha Abhinandan <va...@gmail.com> on 2020/07/21 15:22:15 UTC
Out of range offset errors leading to offset reset
Hi,
We have been noticing offset resets happening on the kafka consumer because
of offset out of range error. Following are the INFO logs found on the
consumer side :
[2020-07-17T08:46:00,322Z] [INFO ] [pipeline-thread-12
([prd453-19-event-upsert]-bo-pipeline-12)]
[o.a.k.c.consumer.internals.Fetcher] [Consumer
clientId=bo-indexer-group-prd453-19-on-c19-bo-indexer-upsert-blue-5d665bcbb7-dnvkh-pid-1-kafka-message-source-id-544,
groupId=bo-indexer-group-prd453-19] Fetch offset 476383711 is out of
range for partition prd453-19-event-upsert-32, resetting offset
[2020-07-17T08:46:00,330Z] [INFO ] [pipeline-thread-12
([prd453-19-event-upsert]-bo-pipeline-12)]
[o.a.k.c.consumer.internals.Fetcher] [Consumer
clientId=bo-indexer-group-prd453-19-on-c19-bo-indexer-upsert-blue-5d665bcbb7-dnvkh-pid-1-kafka-message-source-id-544,
groupId=bo-indexer-group-prd453-19] Resetting offset for partition
prd453-19-event-upsert-32 to offset 453223789.
However, I don't see any errors in the broker logs(just info logs were
enabled in the production environment). No logs related to leader-election,
replica lag, kafka broker pod restarts or anything. Following are the logs
on 2 brokers with respect to the partition for which the offset got reset.
Kafka-6
*[2020-07-17T07:40:12,092Z] [INFO ] [ReplicaFetcherThread-2-7]
[kafka.log.Log] [Log partition=prd453-19-event-upsert-32,
dir=/data/kafka] Incrementing log start offset to
453223789[2020-07-17T07:40:12,641Z] [INFO ] [kafka-scheduler-7]
[kafka.log.Log] [Log partition=prd453-19-event-upsert-32,
dir=/data/kafka] Found deletable segments with base offsets [452091893] due
to retention time 86400000ms breach[2020-07-17T07:40:12,641Z] [INFO ]
[kafka-scheduler-7] [kafka.log.Log] [Log
partition=prd453-19-event-upsert-32, dir=/data/kafka] Scheduling log
segment [baseOffset 452091893, size 1073741693] for
deletion.[2020-07-17T07:41:12,642Z] [INFO ] [kafka-scheduler-0]
[kafka.log.Log] [Log partition=prd453-19-event-upsert-32,
dir=/data/kafka] Deleting segment 452091893[2020-07-17T07:41:12,712Z]
[INFO ] [kafka-scheduler-0] [kafka.log.LogSegment] Deleted log
/data/kafka/prd453-19-event-upsert-32/00000000000452091893.log.deleted.[2020-07-17T07:41:12,713Z]
[INFO ] [kafka-scheduler-0] [kafka.log.LogSegment] Deleted offset index
/data/kafka/prd453-19-event-upsert-32/00000000000452091893.index.deleted.[2020-07-17T07:41:12,713Z]
[INFO ] [kafka-scheduler-0] [kafka.log.LogSegment] Deleted time index
/data/kafka/prd453-19-event-upsert-32/00000000000452091893.timeindex.deleted.[2020-07-17T07:52:31,839Z]
[INFO ] [ReplicaFetcherThread-2-7] [kafka.log.ProducerStateManager]
[ProducerStateManager partition=prd453-19-event-upsert-32] Writing
producer snapshot at offset 475609786[2020-07-17T07:52:31,840Z] [INFO ]
[ReplicaFetcherThread-2-7] [kafka.log.Log] [Log
partition=prd453-19-event-upsert-32, dir=/data/kafka] Rolled new log
segment at offset 475609786 in 2 ms.[2020-07-17T09:05:12,085Z] [INFO ]
[ReplicaFetcherThread-2-7] [kafka.log.Log] [Log
partition=prd453-19-event-upsert-32, dir=/data/kafka] Incrementing log
start offset to 454388428[2020-07-17T09:05:12,640Z] [INFO ]
[kafka-scheduler-7] [kafka.log.Log] [Log
partition=prd453-19-event-upsert-32, dir=/data/kafka] Found deletable
segments with base offsets [453223789] due to retention time 86400000ms
breach[2020-07-17T09:05:12,640Z] [INFO ] [kafka-scheduler-7]
[kafka.log.Log] [Log partition=prd453-19-event-upsert-32,
dir=/data/kafka] Scheduling log segment [baseOffset 453223789, size
1073741355] for deletion.[2020-07-17T09:06:12,641Z] [INFO ]
[kafka-scheduler-3] [kafka.log.Log] [Log
partition=prd453-19-event-upsert-32, dir=/data/kafka] Deleting segment
453223789[2020-07-17T09:06:12,697Z] [INFO ] [kafka-scheduler-3]
[kafka.log.LogSegment] Deleted log
/data/kafka/prd453-19-event-upsert-32/00000000000453223789.log.deleted.[2020-07-17T09:06:12,697Z]
[INFO ] [kafka-scheduler-3] [kafka.log.LogSegment] Deleted offset index
/data/kafka/prd453-19-event-upsert-32/00000000000453223789.index.deleted.[2020-07-17T09:06:12,697Z]
[INFO ] [kafka-scheduler-3] [kafka.log.LogSegment] Deleted time index
/data/kafka/prd453-19-event-upsert-32/00000000000453223789.timeindex.deleted.[2020-07-17T09:06:56,684Z]
[INFO ] [ReplicaFetcherThread-2-7] [kafka.log.ProducerStateManager]
[ProducerStateManager partition=prd453-19-event-upsert-32] Writing
producer snapshot at offset 476681303[2020-07-17T09:06:56,685Z] [INFO ]
[ReplicaFetcherThread-2-7] [kafka.log.Log] [Log
partition=prd453-19-event-upsert-32, dir=/data/kafka] Rolled new log
segment at offset 476681303 in 1 ms.*
Kafka-7
*[2020-07-17T07:40:12,082Z] [INFO ] [kafka-scheduler-4] [kafka.log.Log]
[Log partition=prd453-19-event-upsert-32, dir=/data/kafka] Found deletable
segments with base offsets [452091893] due to retention time 86400000ms
breach[2020-07-17T07:40:12,082Z] [INFO ] [kafka-scheduler-4]
[kafka.log.Log] [Log partition=prd453-19-event-upsert-32,
dir=/data/kafka] Scheduling log segment [baseOffset 452091893, size
1073741693] for deletion.[2020-07-17T07:40:12,083Z] [INFO ]
[kafka-scheduler-4] [kafka.log.Log] [Log
partition=prd453-19-event-upsert-32, dir=/data/kafka] Incrementing log
start offset to 453223789[2020-07-17T07:41:12,083Z] [INFO ]
[kafka-scheduler-7] [kafka.log.Log] [Log
partition=prd453-19-event-upsert-32, dir=/data/kafka] Deleting segment
452091893[2020-07-17T07:41:12,114Z] [INFO ] [kafka-scheduler-7]
[kafka.log.LogSegment] Deleted log
/data/kafka/prd453-19-event-upsert-32/00000000000452091893.log.deleted.[2020-07-17T07:41:12,114Z]
[INFO ] [kafka-scheduler-7] [kafka.log.LogSegment] Deleted offset index
/data/kafka/prd453-19-event-upsert-32/00000000000452091893.index.deleted.[2020-07-17T07:41:12,114Z]
[INFO ] [kafka-scheduler-7] [kafka.log.LogSegment] Deleted time index
/data/kafka/prd453-19-event-upsert-32/00000000000452091893.timeindex.deleted.[2020-07-17T07:52:31,836Z]
[INFO ] [data-plane-kafka-request-handler-3]
[kafka.log.ProducerStateManager] [ProducerStateManager
partition=prd453-19-event-upsert-32] Writing producer snapshot at offset
475609786[2020-07-17T07:52:31,836Z] [INFO ]
[data-plane-kafka-request-handler-3] [kafka.log.Log] [Log
partition=prd453-19-event-upsert-32, dir=/data/kafka] Rolled new log
segment at offset 475609786 in 1 ms.[2020-07-17T09:05:12,075Z] [INFO ]
[kafka-scheduler-2] [kafka.log.Log] [Log
partition=prd453-19-event-upsert-32, dir=/data/kafka] Found deletable
segments with base offsets [453223789] due to retention time 86400000ms
breach[2020-07-17T09:05:12,075Z] [INFO ] [kafka-scheduler-2]
[kafka.log.Log] [Log partition=prd453-19-event-upsert-32,
dir=/data/kafka] Scheduling log segment [baseOffset 453223789, size
1073741355] for deletion.[2020-07-17T09:05:12,075Z] [INFO ]
[kafka-scheduler-2] [kafka.log.Log] [Log
partition=prd453-19-event-upsert-32, dir=/data/kafka] Incrementing log
start offset to 454388428[2020-07-17T09:06:12,075Z] [INFO ]
[kafka-scheduler-6] [kafka.log.Log] [Log
partition=prd453-19-event-upsert-32, dir=/data/kafka] Deleting segment
453223789[2020-07-17T09:06:12,108Z] [INFO ] [kafka-scheduler-6]
[kafka.log.LogSegment] Deleted log
/data/kafka/prd453-19-event-upsert-32/00000000000453223789.log.deleted.[2020-07-17T09:06:12,108Z]
[INFO ] [kafka-scheduler-6] [kafka.log.LogSegment] Deleted offset index
/data/kafka/prd453-19-event-upsert-32/00000000000453223789.index.deleted.[2020-07-17T09:06:12,108Z]
[INFO ] [kafka-scheduler-6] [kafka.log.LogSegment] Deleted time index
/data/kafka/prd453-19-event-upsert-32/00000000000453223789.timeindex.deleted.[2020-07-17T09:06:56,682Z]
[INFO ] [data-plane-kafka-request-handler-1]
[kafka.log.ProducerStateManager] [ProducerStateManager
partition=prd453-19-event-upsert-32] Writing producer snapshot at offset
476681303[2020-07-17T09:06:56,683Z] [INFO ]
[data-plane-kafka-request-handler-1] [kafka.log.Log] [Log
partition=prd453-19-event-upsert-32, dir=/data/kafka] Rolled new log
segment at offset 476681303 in 1 ms.*
As observed in the logs, the fetch offset 476383711 is not higher than the
log start offset. It appears that the out of range error was because of the
fetch offset being larger than the offset range on the broker.
Would like to understand the cause for these errors because none of the
offset reset options are desirable. Choosing "earliest" creates a sudden
huge lag (we have a retention of 24hours) and choosing "latest" leads to
data loss (the records produced between the out of range error and when
offset reset happens on the consumer).
Kafka version : 2.2.2
Number of brokers : 9
Log retention of the topic : 24 hours
Kafka configs :
advertised.host.name = 10.xx.xx.xx
advertised.listeners = null
advertised.port = null
alter.config.policy.class.name = null
alter.log.dirs.replication.quota.window.num = 11
alter.log.dirs.replication.quota.window.size.seconds = 1
authorizer.class.name =
auto.create.topics.enable = false
auto.leader.rebalance.enable = true
background.threads = 10
broker.id = 6
broker.id.generation.enable = true
broker.rack = null
client.quota.callback.class = null
compression.type = producer
connection.failed.authentication.delay.ms = 100
connections.max.idle.ms = 600000
connections.max.reauth.ms = 0
control.plane.listener.name = null
controlled.shutdown.enable = true
controlled.shutdown.max.retries = 3
controlled.shutdown.retry.backoff.ms = 5000
controller.socket.timeout.ms = 30000
create.topic.policy.class.name = null
default.replication.factor = 3
delegation.token.expiry.check.interval.ms = 3600000
delegation.token.expiry.time.ms = 86400000
delegation.token.master.key = null
delegation.token.max.lifetime.ms = 604800000
delete.records.purgatory.purge.interval.requests = 1
delete.topic.enable = true
fetch.purgatory.purge.interval.requests = 100
group.initial.rebalance.delay.ms = 3000
group.max.session.timeout.ms = 300000
group.max.size = 2147483647
group.min.session.timeout.ms = 6000
host.name = 0.0.0.0
inter.broker.listener.name = null
inter.broker.protocol.version = 2.2-IV1
kafka.metrics.polling.interval.secs = 10
kafka.metrics.reporters = []
leader.imbalance.check.interval.seconds = 300
leader.imbalance.per.broker.percentage = 10
listener.security.protocol.map =
PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
listeners = null
log.cleaner.backoff.ms = 15000
log.cleaner.dedupe.buffer.size = 134217728
log.cleaner.delete.retention.ms = 86400000
log.cleaner.enable = true
log.cleaner.io.buffer.load.factor = 0.9
log.cleaner.io.buffer.size = 524288
log.cleaner.io.max.bytes.per.second = 5.24288E7
log.cleaner.min.cleanable.ratio = 0.5
log.cleaner.min.compaction.lag.ms = 0
log.cleaner.threads = 1
log.cleanup.policy = [delete]
log.dir = /tmp/kafka-logs
log.dirs = /data/kafka
log.flush.interval.messages = 20000
log.flush.interval.ms = 10000
log.flush.offset.checkpoint.interval.ms = 60000
log.flush.scheduler.interval.ms = 2000
log.flush.start.offset.checkpoint.interval.ms = 60000
log.index.interval.bytes = 4096
log.index.size.max.bytes = 10485760
log.message.downconversion.enable = true
log.message.format.version = 2.2-IV1
log.message.timestamp.difference.max.ms = 9223372036854775807
log.message.timestamp.type = CreateTime
log.preallocate = false
log.retention.bytes = -1
log.retention.check.interval.ms = 300000
log.retention.hours = 24
log.retention.minutes = null
log.retention.ms = null
log.roll.hours = 168
log.roll.jitter.hours = 0
log.roll.jitter.ms = null
log.roll.ms = null
log.segment.bytes = 1073741824
log.segment.delete.delay.ms = 60000
max.connections.per.ip = 2147483647
max.connections.per.ip.overrides =
max.incremental.fetch.session.cache.slots = 1000
message.max.bytes = 1000012
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
min.insync.replicas = 1
num.io.threads = 8
num.network.threads = 8
num.partitions = 20
num.recovery.threads.per.data.dir = 4
num.replica.alter.log.dirs.threads = null
num.replica.fetchers = 4
offset.metadata.max.bytes = 4096
offsets.commit.required.acks = -1
offsets.commit.timeout.ms = 5000
offsets.load.buffer.size = 5242880
offsets.retention.check.interval.ms = 600000
offsets.retention.minutes = 10080
offsets.topic.compression.codec = 0
offsets.topic.num.partitions = 50
offsets.topic.replication.factor = 3
offsets.topic.segment.bytes = 104857600
password.encoder.cipher.algorithm = AES/CBC/PKCS5Padding
password.encoder.iterations = 4096
password.encoder.key.length = 128
password.encoder.keyfactory.algorithm = null
password.encoder.old.secret = null
password.encoder.secret = null
port = 6667
principal.builder.class = null
producer.purgatory.purge.interval.requests = 100
queued.max.request.bytes = -1
queued.max.requests = 16
quota.consumer.default = 9223372036854775807
quota.producer.default = 9223372036854775807
quota.window.num = 11
quota.window.size.seconds = 1
replica.fetch.backoff.ms = 1000
replica.fetch.max.bytes = 1048576
replica.fetch.min.bytes = 1
replica.fetch.response.max.bytes = 10485760
replica.fetch.wait.max.ms = 500
replica.high.watermark.checkpoint.interval.ms = 5000
replica.lag.time.max.ms = 25000
replica.socket.receive.buffer.bytes = 65536
replica.socket.timeout.ms = 30000
replication.quota.window.num = 11
replication.quota.window.size.seconds = 1
request.timeout.ms = 28000
reserved.broker.max.id = 400000000
sasl.client.callback.handler.class = null
sasl.enabled.mechanisms = [GSSAPI]
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.principal.to.local.rules = [DEFAULT]
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism.inter.broker.protocol = GSSAPI
sasl.server.callback.handler.class = null
security.inter.broker.protocol = PLAINTEXT
socket.receive.buffer.bytes = 1048576
socket.request.max.bytes = 104857600
socket.send.buffer.bytes = 1048576
ssl.cipher.suites = []
ssl.client.auth = none
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.principal.mapping.rules = [DEFAULT]
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.abort.timed.out.transaction.cleanup.interval.ms = 60000
transaction.max.timeout.ms = 900000
transaction.remove.expired.transaction.cleanup.interval.ms = 3600000
transaction.state.log.load.buffer.size = 5242880
transaction.state.log.min.isr = 1
transaction.state.log.num.partitions = 50
transaction.state.log.replication.factor = 3
transaction.state.log.segment.bytes = 104857600
transactional.id.expiration.ms = 604800000
unclean.leader.election.enable = false
zookeeper.connect =
cxx-zookeeper-svc.ana-zookeeper.svc.cluster.local:2181/xxxx/analytics/kafka/c19-v1
zookeeper.connection.timeout.ms = 10000
zookeeper.max.in.flight.requests = 10
zookeeper.session.timeout.ms = 15000
zookeeper.set.acl = false
zookeeper.sync.time.ms = 2000
Thanks,
Varsha