You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Yuriy Badalyantc (Jira)" <ji...@apache.org> on 2020/10/06 04:35:00 UTC

[jira] [Updated] (KAFKA-10576) Different behavior of commitSync and commitAsync

     [ https://issues.apache.org/jira/browse/KAFKA-10576?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Yuriy Badalyantc updated KAFKA-10576:
-------------------------------------
    Description: 
It looks like {{commitSync}} and {{commitAsync}} consumer's methods have a different semantic.
{code:java}
public class TestKafka {
    public static void main(String[]args) {
        String id = "dev_test";
        Map<String, Object> settings = new HashMap<>();
        settings.put("bootstrap.servers", "localhost:9094");
        settings.put("key.deserializer", StringDeserializer.class);
        settings.put("value.deserializer", StringDeserializer.class);
        settings.put("client.id", id);
        settings.put("group.id", id);

        String topic = "test";
        Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
        offsets.put(new TopicPartition(topic, 0), new OffsetAndMetadata(1));

        try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(settings)) {
            consumer.commitSync(offsets);
        }
    }
}
{code}
In the example above I created a consumer and use {{commitSync}} to commit offsets. This code works as expected — all offsets are committed to kafka.

But in the case of {{commitAsync}} it will not work:
{code:java}
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(settings)) {
    CompletableFuture<Boolean> result = new CompletableFuture<>();
    consumer.commitAsync(offsets, new OffsetCommitCallback() {
        @Override
        public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
            if (exception != null) {
                result.completeExceptionally(exception);
            } else {
                result.complete(true);
            }
        }
    });
    result.get(15L, TimeUnit.SECONDS);
}
{code}
The {{result}} future failed with a timeout.

This behavior is pretty surprising. From naming and documentation, it looks like {{commitSync}} and {{commitAsync}} methods should behave identically. Of course, besides the blocking/non-blocking aspect. But in reality, there are some differences.

I can assume that the {{commitAsync}} method somehow depends on the {{poll}} calls. But I didn't find any explicit information about it in {{KafkaConsumer}}'s javadoc or kafka documentation page.

So, I believe that there are the next options:
 # It's a bug and not expected behavior. {{commitSync}} and {{commitAsync}} should have identical semantics.
 # It's expected, but not well-documented behavior. In that case, this behavior should be explicitly documented.

  was:
It looks like {{commitSync}} and {{commitAsync}} consumer's methods have a different semantic.
{code:java}
public class TestKafka {
    public static void main(String[]args) {
        String id = "dev_test";
        Map<String, Object> settings = new HashMap<>();
        settings.put("bootstrap.servers", "localhost:9094");
        settings.put("key.deserializer", StringDeserializer.class);
        settings.put("value.deserializer", StringDeserializer.class);
        settings.put("client.id", id);
        settings.put("group.id", id);

        String topic = "test";
        Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
        offsets.put(new TopicPartition(topic, 0), new OffsetAndMetadata(1));

        try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(settings)) {
            consumer.commitSync(offsets);
        }
    }
}
{code}
In the example above I created a consumer and use {{commitSync}} to commit offsets. This code works as expected — all offsets are committed to kafka.

But in the case of {{commitAsync}} it will not work:
{code:java}
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(settings)) {
    CompletableFuture<Boolean> result = new CompletableFuture<>();
    consumer.commitAsync(offsets, new OffsetCommitCallback() {
        @Override
        public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
            if (exception != null) {
                result.completeExceptionally(exception);
            } else {
                result.complete(true);
            }
        }
    });
    result.get(15L, TimeUnit.SECONDS);
}
{code}
The {{result}} future failed with a timeout.

This behavior is pretty surprising. From naming and documentation, it looks like {{commitSync}} and {{commitAsync}} methods should behave identically. Of course, besides the blocking/non-blocking aspect. But in reality, there are some differences.

I can assume that the {{commitAsync}} method somehow depends on the {{poll}} calls. But I didn't find any explicit information about it in {{KafkaConsumer}}'s javadoc or kafka documentation page.

So, I believe that there are the next options:
# It's a but and not expected behavior. {{commitSync}} and {{commitAsync}} should have identical semantics.
# It's expected, but not well-documented behavior. In that case, this behavior should be explicitly documented.


> Different behavior of commitSync and commitAsync
> ------------------------------------------------
>
>                 Key: KAFKA-10576
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10576
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer
>            Reporter: Yuriy Badalyantc
>            Priority: Major
>
> It looks like {{commitSync}} and {{commitAsync}} consumer's methods have a different semantic.
> {code:java}
> public class TestKafka {
>     public static void main(String[]args) {
>         String id = "dev_test";
>         Map<String, Object> settings = new HashMap<>();
>         settings.put("bootstrap.servers", "localhost:9094");
>         settings.put("key.deserializer", StringDeserializer.class);
>         settings.put("value.deserializer", StringDeserializer.class);
>         settings.put("client.id", id);
>         settings.put("group.id", id);
>         String topic = "test";
>         Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
>         offsets.put(new TopicPartition(topic, 0), new OffsetAndMetadata(1));
>         try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(settings)) {
>             consumer.commitSync(offsets);
>         }
>     }
> }
> {code}
> In the example above I created a consumer and use {{commitSync}} to commit offsets. This code works as expected — all offsets are committed to kafka.
> But in the case of {{commitAsync}} it will not work:
> {code:java}
> try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(settings)) {
>     CompletableFuture<Boolean> result = new CompletableFuture<>();
>     consumer.commitAsync(offsets, new OffsetCommitCallback() {
>         @Override
>         public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
>             if (exception != null) {
>                 result.completeExceptionally(exception);
>             } else {
>                 result.complete(true);
>             }
>         }
>     });
>     result.get(15L, TimeUnit.SECONDS);
> }
> {code}
> The {{result}} future failed with a timeout.
> This behavior is pretty surprising. From naming and documentation, it looks like {{commitSync}} and {{commitAsync}} methods should behave identically. Of course, besides the blocking/non-blocking aspect. But in reality, there are some differences.
> I can assume that the {{commitAsync}} method somehow depends on the {{poll}} calls. But I didn't find any explicit information about it in {{KafkaConsumer}}'s javadoc or kafka documentation page.
> So, I believe that there are the next options:
>  # It's a bug and not expected behavior. {{commitSync}} and {{commitAsync}} should have identical semantics.
>  # It's expected, but not well-documented behavior. In that case, this behavior should be explicitly documented.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)