You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Yuriy Badalyantc (Jira)" <ji...@apache.org> on 2020/10/06 04:35:00 UTC
[jira] [Updated] (KAFKA-10576) Different behavior of commitSync and
commitAsync
[ https://issues.apache.org/jira/browse/KAFKA-10576?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Yuriy Badalyantc updated KAFKA-10576:
-------------------------------------
Description:
It looks like {{commitSync}} and {{commitAsync}} consumer's methods have a different semantic.
{code:java}
public class TestKafka {
public static void main(String[]args) {
String id = "dev_test";
Map<String, Object> settings = new HashMap<>();
settings.put("bootstrap.servers", "localhost:9094");
settings.put("key.deserializer", StringDeserializer.class);
settings.put("value.deserializer", StringDeserializer.class);
settings.put("client.id", id);
settings.put("group.id", id);
String topic = "test";
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
offsets.put(new TopicPartition(topic, 0), new OffsetAndMetadata(1));
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(settings)) {
consumer.commitSync(offsets);
}
}
}
{code}
In the example above I created a consumer and use {{commitSync}} to commit offsets. This code works as expected — all offsets are committed to kafka.
But in the case of {{commitAsync}} it will not work:
{code:java}
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(settings)) {
CompletableFuture<Boolean> result = new CompletableFuture<>();
consumer.commitAsync(offsets, new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
if (exception != null) {
result.completeExceptionally(exception);
} else {
result.complete(true);
}
}
});
result.get(15L, TimeUnit.SECONDS);
}
{code}
The {{result}} future failed with a timeout.
This behavior is pretty surprising. From naming and documentation, it looks like {{commitSync}} and {{commitAsync}} methods should behave identically. Of course, besides the blocking/non-blocking aspect. But in reality, there are some differences.
I can assume that the {{commitAsync}} method somehow depends on the {{poll}} calls. But I didn't find any explicit information about it in {{KafkaConsumer}}'s javadoc or kafka documentation page.
So, I believe that there are the next options:
# It's a bug and not expected behavior. {{commitSync}} and {{commitAsync}} should have identical semantics.
# It's expected, but not well-documented behavior. In that case, this behavior should be explicitly documented.
was:
It looks like {{commitSync}} and {{commitAsync}} consumer's methods have a different semantic.
{code:java}
public class TestKafka {
public static void main(String[]args) {
String id = "dev_test";
Map<String, Object> settings = new HashMap<>();
settings.put("bootstrap.servers", "localhost:9094");
settings.put("key.deserializer", StringDeserializer.class);
settings.put("value.deserializer", StringDeserializer.class);
settings.put("client.id", id);
settings.put("group.id", id);
String topic = "test";
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
offsets.put(new TopicPartition(topic, 0), new OffsetAndMetadata(1));
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(settings)) {
consumer.commitSync(offsets);
}
}
}
{code}
In the example above I created a consumer and use {{commitSync}} to commit offsets. This code works as expected — all offsets are committed to kafka.
But in the case of {{commitAsync}} it will not work:
{code:java}
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(settings)) {
CompletableFuture<Boolean> result = new CompletableFuture<>();
consumer.commitAsync(offsets, new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
if (exception != null) {
result.completeExceptionally(exception);
} else {
result.complete(true);
}
}
});
result.get(15L, TimeUnit.SECONDS);
}
{code}
The {{result}} future failed with a timeout.
This behavior is pretty surprising. From naming and documentation, it looks like {{commitSync}} and {{commitAsync}} methods should behave identically. Of course, besides the blocking/non-blocking aspect. But in reality, there are some differences.
I can assume that the {{commitAsync}} method somehow depends on the {{poll}} calls. But I didn't find any explicit information about it in {{KafkaConsumer}}'s javadoc or kafka documentation page.
So, I believe that there are the next options:
# It's a but and not expected behavior. {{commitSync}} and {{commitAsync}} should have identical semantics.
# It's expected, but not well-documented behavior. In that case, this behavior should be explicitly documented.
> Different behavior of commitSync and commitAsync
> ------------------------------------------------
>
> Key: KAFKA-10576
> URL: https://issues.apache.org/jira/browse/KAFKA-10576
> Project: Kafka
> Issue Type: Bug
> Components: consumer
> Reporter: Yuriy Badalyantc
> Priority: Major
>
> It looks like {{commitSync}} and {{commitAsync}} consumer's methods have a different semantic.
> {code:java}
> public class TestKafka {
> public static void main(String[]args) {
> String id = "dev_test";
> Map<String, Object> settings = new HashMap<>();
> settings.put("bootstrap.servers", "localhost:9094");
> settings.put("key.deserializer", StringDeserializer.class);
> settings.put("value.deserializer", StringDeserializer.class);
> settings.put("client.id", id);
> settings.put("group.id", id);
> String topic = "test";
> Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
> offsets.put(new TopicPartition(topic, 0), new OffsetAndMetadata(1));
> try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(settings)) {
> consumer.commitSync(offsets);
> }
> }
> }
> {code}
> In the example above I created a consumer and use {{commitSync}} to commit offsets. This code works as expected — all offsets are committed to kafka.
> But in the case of {{commitAsync}} it will not work:
> {code:java}
> try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(settings)) {
> CompletableFuture<Boolean> result = new CompletableFuture<>();
> consumer.commitAsync(offsets, new OffsetCommitCallback() {
> @Override
> public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
> if (exception != null) {
> result.completeExceptionally(exception);
> } else {
> result.complete(true);
> }
> }
> });
> result.get(15L, TimeUnit.SECONDS);
> }
> {code}
> The {{result}} future failed with a timeout.
> This behavior is pretty surprising. From naming and documentation, it looks like {{commitSync}} and {{commitAsync}} methods should behave identically. Of course, besides the blocking/non-blocking aspect. But in reality, there are some differences.
> I can assume that the {{commitAsync}} method somehow depends on the {{poll}} calls. But I didn't find any explicit information about it in {{KafkaConsumer}}'s javadoc or kafka documentation page.
> So, I believe that there are the next options:
> # It's a bug and not expected behavior. {{commitSync}} and {{commitAsync}} should have identical semantics.
> # It's expected, but not well-documented behavior. In that case, this behavior should be explicitly documented.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)