You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Yuriy Badalyantc (Jira)" <ji...@apache.org> on 2020/10/06 04:34:00 UTC

[jira] [Created] (KAFKA-10576) Different behavior of commitSync and commitAsync

Yuriy Badalyantc created KAFKA-10576:
----------------------------------------

             Summary: Different behavior of commitSync and commitAsync
                 Key: KAFKA-10576
                 URL: https://issues.apache.org/jira/browse/KAFKA-10576
             Project: Kafka
          Issue Type: Bug
          Components: consumer
            Reporter: Yuriy Badalyantc


It looks like {{commitSync}} and {{commitAsync}} consumer's methods have a different semantic.
{code:java}
public class TestKafka {
    public static void main(String[]args) {
        String id = "dev_test";
        Map<String, Object> settings = new HashMap<>();
        settings.put("bootstrap.servers", "localhost:9094");
        settings.put("key.deserializer", StringDeserializer.class);
        settings.put("value.deserializer", StringDeserializer.class);
        settings.put("client.id", id);
        settings.put("group.id", id);

        String topic = "test";
        Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
        offsets.put(new TopicPartition(topic, 0), new OffsetAndMetadata(1));

        try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(settings)) {
            consumer.commitSync(offsets);
        }
    }
}
{code}
In the example above I created a consumer and use {{commitSync}} to commit offsets. This code works as expected — all offsets are committed to kafka.

But in the case of {{commitAsync}} it will not work:
{code:java}
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(settings)) {
    CompletableFuture<Boolean> result = new CompletableFuture<>();
    consumer.commitAsync(offsets, new OffsetCommitCallback() {
        @Override
        public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
            if (exception != null) {
                result.completeExceptionally(exception);
            } else {
                result.complete(true);
            }
        }
    });
    result.get(15L, TimeUnit.SECONDS);
}
{code}
The {{result}} future failed with a timeout.

This behavior is pretty surprising. From naming and documentation, it looks like {{commitSync}} and {{commitAsync}} methods should behave identically. Of course, besides the blocking/non-blocking aspect. But in reality, there are some differences.

I can assume that the {{commitAsync}} method somehow depends on the {{poll}} calls. But I didn't find any explicit information about it in {{KafkaConsumer}}'s javadoc or kafka documentation page.

So, I believe that there are the next options:
# It's a but and not expected behavior. {{commitSync}} and {{commitAsync}} should have identical semantics.
# It's expected, but not well-documented behavior. In that case, this behavior should be explicitly documented.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)