You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Natan Silnitsky (JIRA)" <ji...@apache.org> on 2019/05/13 14:44:00 UTC

[jira] [Created] (KAFKA-8358) KafkaConsumer.endOffsets should be able to also return end offsets while not ignoring control records

Natan Silnitsky created KAFKA-8358:
--------------------------------------

             Summary: KafkaConsumer.endOffsets should be able to also return end offsets while not ignoring control records
                 Key: KAFKA-8358
                 URL: https://issues.apache.org/jira/browse/KAFKA-8358
             Project: Kafka
          Issue Type: Improvement
            Reporter: Natan Silnitsky


We have a use case where we have a wrapper on top of {{kafkaConsumer}} for compact logs.
In order to know that a user can get "new" values for a key in the compact log, on init, or on rebalance, we need to block until all "old" values were read.

We wanted to use {{KafkaConsumer.endOffsets}} to help us find out where the "old" values end.
once all "old" values arrive from {{KafkaConsumer.poll}}, we can release the blocking on getting new values.

But it seems that [control records|https://github.com/apache/kafka/blob/c09e25fac2aaea61af892ae3e5273679a4bdbc7d/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java#L128] are not received in {{KafkaConsumer.poll }} but are taking into account for {{KafkaConsumer.endOffsets }}

So the Feature request is for {{KafkaConsumer.endOffsets}} to have a flag to ignore control records, the same way that {{KafkaConsumer.poll }} ignores them.



(From a quick review of the code, it seems that {{LeaderEpochFile}}.[assign|https://github.com/apache/kafka/blob/c09e25fac2aaea61af892ae3e5273679a4bdbc7d/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala#L51] can be given the flag isControl from [batch.isControlBatch|https://github.com/apache/kafka/blob/c09e25fac2aaea61af892ae3e5273679a4bdbc7d/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java#L239]

But I'm maybe wrong with my understanding there...)

CC:
[~berman7] [~berman]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)