You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2017/05/22 23:42:04 UTC

[jira] [Commented] (KAFKA-5273) KafkaConsumer.committed() should get latest committed offsets from the server

    [ https://issues.apache.org/jira/browse/KAFKA-5273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16020408#comment-16020408 ] 

ASF GitHub Bot commented on KAFKA-5273:
---------------------------------------

GitHub user apurvam opened a pull request:

    https://github.com/apache/kafka/pull/3119

    KAFKA-5273: Make KafkaConsumer.committed query the server for all partitions

    Before this patch the consumer would return the cached offsets for partitions in its current assignment. This worked when all the offset commits went through the consumer. 
    
    With KIP-98, offsets can be committed transactionally through the producer. This means that relying on cached positions in the consumer returns incorrect information: since commits go through the producer, the cache is never updated. 
    
    Hence we need to update the `KafkaConsumer.committed` method to always lookup the server for the last committed offset to ensure it gets the correct information every time.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/apurvam/kafka KAFKA-5273-kafkaconsumer-committed-should-always-hit-server

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/kafka/pull/3119.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3119
    
----
commit 17eb7eab70a40e3d4208a56463bb418350f80950
Author: Apurva Mehta <ap...@confluent.io>
Date:   2017-05-22T23:36:38Z

    Make KafkaConsumer.committed hit the server for all partitions, even those in its current assignment

----


> KafkaConsumer.committed() should get latest committed offsets from the server
> -----------------------------------------------------------------------------
>
>                 Key: KAFKA-5273
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5273
>             Project: Kafka
>          Issue Type: Sub-task
>          Components: clients, core, producer 
>            Reporter: Apurva Mehta
>            Assignee: Apurva Mehta
>            Priority: Blocker
>              Labels: exactly-once
>             Fix For: 0.11.0.0
>
>
> Currently, the `KafkaConsumer.committed(topicPartition)` will return the current position of the consumer for that partition if the consumer has been assigned the partition. Otherwise, it will lookup the committed position from the server. 
> With the new producer `sendOffsetsToTransaction` api, we get into a state where we can commit the offsets for an assigned partition through the producer. So the consumer doesn't update it's cached view and subsequently returns a stale committed offset for it's assigned partition. 
> We should either update the consumer's cache when offsets are committed through the producer, or drop the cache totally and always lookup the server to get the committed offset. This way the `committed` method will always return the latest committed offset for any partition.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)