You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Natan Silnitsky (JIRA)" <ji...@apache.org> on 2019/05/13 14:44:00 UTC
[jira] [Created] (KAFKA-8358) KafkaConsumer.endOffsets should be
able to also return end offsets while not ignoring control records
Natan Silnitsky created KAFKA-8358:
--------------------------------------
Summary: KafkaConsumer.endOffsets should be able to also return end offsets while not ignoring control records
Key: KAFKA-8358
URL: https://issues.apache.org/jira/browse/KAFKA-8358
Project: Kafka
Issue Type: Improvement
Reporter: Natan Silnitsky
We have a use case where we have a wrapper on top of {{kafkaConsumer}} for compact logs.
In order to know that a user can get "new" values for a key in the compact log, on init, or on rebalance, we need to block until all "old" values were read.
We wanted to use {{KafkaConsumer.endOffsets}} to help us find out where the "old" values end.
once all "old" values arrive from {{KafkaConsumer.poll}}, we can release the blocking on getting new values.
But it seems that [control records|https://github.com/apache/kafka/blob/c09e25fac2aaea61af892ae3e5273679a4bdbc7d/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java#L128] are not received in {{KafkaConsumer.poll }} but are taking into account for {{KafkaConsumer.endOffsets }}
So the Feature request is for {{KafkaConsumer.endOffsets}} to have a flag to ignore control records, the same way that {{KafkaConsumer.poll }} ignores them.
(From a quick review of the code, it seems that {{LeaderEpochFile}}.[assign|https://github.com/apache/kafka/blob/c09e25fac2aaea61af892ae3e5273679a4bdbc7d/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala#L51] can be given the flag isControl from [batch.isControlBatch|https://github.com/apache/kafka/blob/c09e25fac2aaea61af892ae3e5273679a4bdbc7d/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java#L239]
But I'm maybe wrong with my understanding there...)
CC:
[~berman7] [~berman]
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)