You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Yuriy Badalyantc (Jira)" <ji...@apache.org> on 2020/10/06 04:34:00 UTC
[jira] [Created] (KAFKA-10576) Different behavior of commitSync and
commitAsync
Yuriy Badalyantc created KAFKA-10576:
----------------------------------------
Summary: Different behavior of commitSync and commitAsync
Key: KAFKA-10576
URL: https://issues.apache.org/jira/browse/KAFKA-10576
Project: Kafka
Issue Type: Bug
Components: consumer
Reporter: Yuriy Badalyantc
It looks like {{commitSync}} and {{commitAsync}} consumer's methods have a different semantic.
{code:java}
public class TestKafka {
public static void main(String[]args) {
String id = "dev_test";
Map<String, Object> settings = new HashMap<>();
settings.put("bootstrap.servers", "localhost:9094");
settings.put("key.deserializer", StringDeserializer.class);
settings.put("value.deserializer", StringDeserializer.class);
settings.put("client.id", id);
settings.put("group.id", id);
String topic = "test";
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
offsets.put(new TopicPartition(topic, 0), new OffsetAndMetadata(1));
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(settings)) {
consumer.commitSync(offsets);
}
}
}
{code}
In the example above I created a consumer and use {{commitSync}} to commit offsets. This code works as expected — all offsets are committed to kafka.
But in the case of {{commitAsync}} it will not work:
{code:java}
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(settings)) {
CompletableFuture<Boolean> result = new CompletableFuture<>();
consumer.commitAsync(offsets, new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
if (exception != null) {
result.completeExceptionally(exception);
} else {
result.complete(true);
}
}
});
result.get(15L, TimeUnit.SECONDS);
}
{code}
The {{result}} future failed with a timeout.
This behavior is pretty surprising. From naming and documentation, it looks like {{commitSync}} and {{commitAsync}} methods should behave identically. Of course, besides the blocking/non-blocking aspect. But in reality, there are some differences.
I can assume that the {{commitAsync}} method somehow depends on the {{poll}} calls. But I didn't find any explicit information about it in {{KafkaConsumer}}'s javadoc or kafka documentation page.
So, I believe that there are the next options:
# It's a but and not expected behavior. {{commitSync}} and {{commitAsync}} should have identical semantics.
# It's expected, but not well-documented behavior. In that case, this behavior should be explicitly documented.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)