You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Ivan Bondarenko (Jira)" <ji...@apache.org> on 2021/03/21 19:34:00 UTC

[jira] [Created] (KAFKA-12516) Standartize KafkaConsumer's commitSync/commitAsync/committed

Ivan Bondarenko created KAFKA-12516:
---------------------------------------

             Summary: Standartize KafkaConsumer's commitSync/commitAsync/committed
                 Key: KAFKA-12516
                 URL: https://issues.apache.org/jira/browse/KAFKA-12516
             Project: Kafka
          Issue Type: Improvement
          Components: consumer
    Affects Versions: 2.7.0
            Reporter: Ivan Bondarenko


Commit-related methods of KafkaConsumer are kind of asymmetrical.

The main axis of asymmetry is (let's call it "MAP"):
{code:java}
Map<TopicPartition, OffsetAndMetadata>
{code}
 

To illustrate asymmetry let's put methods to table (skipping deprecated and those having Duration to reduce table).

Before that, let's name "SAC" what is returned by code:
{code:java}
subscriptions.allConsumed()
{code}
 

So the table is...
||name||params||returns||is SAC used?||can we get MAP?||
|commited|{color:#de350b}Set<TP>{color}|MAP|no|via return|
|commitSync|--|--|used|{color:#de350b}no{color}|
|commitSync|MAP|--|no|no|
|commitAsync|--|--|no|no|
|commitAsync|callback|--|used|SAC via callback|
|commitAsync|MAP, callback|--|no|no|

What is asymmetric here...

1. *Main problem.* SAC can be retrieved only by 1 async method. So without workarounds we cannot just sync-commit (without parameters) and see what was committed.
 To solve this we can (multiple solutions can be applied):
 *  Introduce new method like "getAllConsumed()" which will return SAC.
 * commitSync() without params can return SAC instead of void.

2. *Minor.* There is no "commited()" without params, so we need to create a Set manually.
Solutions:
 *  We can add "commited()" without params. In this case the default set will be constructed based on one of:

 ** SAC
 ** all partitions for subscribed topic
 ** assignment

 3. *Looks like minor bug.* This peace of code is used in all commitAsync() methods, but not in all commitSync() methods, it doesn't look like on purpose:

 
{code:java}
offsets.forEach(this::updateLastSeenEpochIfNewer);
{code}
 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)