You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Ivan Bondarenko (Jira)" <ji...@apache.org> on 2021/03/21 19:34:00 UTC
[jira] [Created] (KAFKA-12516) Standartize KafkaConsumer's
commitSync/commitAsync/committed
Ivan Bondarenko created KAFKA-12516:
---------------------------------------
Summary: Standartize KafkaConsumer's commitSync/commitAsync/committed
Key: KAFKA-12516
URL: https://issues.apache.org/jira/browse/KAFKA-12516
Project: Kafka
Issue Type: Improvement
Components: consumer
Affects Versions: 2.7.0
Reporter: Ivan Bondarenko
Commit-related methods of KafkaConsumer are kind of asymmetrical.
The main axis of asymmetry is (let's call it "MAP"):
{code:java}
Map<TopicPartition, OffsetAndMetadata>
{code}
To illustrate asymmetry let's put methods to table (skipping deprecated and those having Duration to reduce table).
Before that, let's name "SAC" what is returned by code:
{code:java}
subscriptions.allConsumed()
{code}
So the table is...
||name||params||returns||is SAC used?||can we get MAP?||
|commited|{color:#de350b}Set<TP>{color}|MAP|no|via return|
|commitSync|--|--|used|{color:#de350b}no{color}|
|commitSync|MAP|--|no|no|
|commitAsync|--|--|no|no|
|commitAsync|callback|--|used|SAC via callback|
|commitAsync|MAP, callback|--|no|no|
What is asymmetric here...
1. *Main problem.* SAC can be retrieved only by 1 async method. So without workarounds we cannot just sync-commit (without parameters) and see what was committed.
To solve this we can (multiple solutions can be applied):
* Introduce new method like "getAllConsumed()" which will return SAC.
* commitSync() without params can return SAC instead of void.
2. *Minor.* There is no "commited()" without params, so we need to create a Set manually.
Solutions:
* We can add "commited()" without params. In this case the default set will be constructed based on one of:
** SAC
** all partitions for subscribed topic
** assignment
3. *Looks like minor bug.* This peace of code is used in all commitAsync() methods, but not in all commitSync() methods, it doesn't look like on purpose:
{code:java}
offsets.forEach(this::updateLastSeenEpochIfNewer);
{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)