You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/02/08 17:49:15 UTC
[kafka] branch trunk updated: KAFKA-6362;
Async auto-commit should discover coordinator if it is unknown
(#4326)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new ac267dc KAFKA-6362; Async auto-commit should discover coordinator if it is unknown (#4326)
ac267dc is described below
commit ac267dc5cec605f3981c9db7d889cabd59f09a61
Author: huxi <hu...@hotmail.com>
AuthorDate: Fri Feb 9 01:49:12 2018 +0800
KAFKA-6362; Async auto-commit should discover coordinator if it is unknown (#4326)
Currently `maybeAutoCommitOffsetsAsync` does not try to find the coordinator if it is unknown. As a result, asynchronous auto-commits will fail indefinitely. This patch changes the behavior to add coordinator discovery to the async auto-commit path.
---
.../apache/kafka/clients/consumer/KafkaConsumer.java | 2 +-
.../consumer/internals/ConsumerCoordinator.java | 20 +++++++-------------
.../consumer/internals/ConsumerCoordinatorTest.java | 19 +++++++++++++++++++
3 files changed, 27 insertions(+), 14 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 1d84f84..2f7fd58 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -1058,7 +1058,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
// make sure the offsets of topic partitions the consumer is unsubscribing from
// are committed since there will be no following rebalance
- this.coordinator.maybeAutoCommitOffsetsNow();
+ this.coordinator.maybeAutoCommitOffsetsAsync(time.milliseconds());
log.debug("Subscribed to partition(s): {}", Utils.join(partitions, ", "));
this.subscriptions.assignFromUser(new HashSet<>(partitions));
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 5c1e60e..d7c1ce9 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -528,6 +528,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
public void onSuccess(Void value) {
pendingAsyncCommits.decrementAndGet();
doCommitOffsetsAsync(offsets, callback);
+ client.pollNoWakeup();
}
@Override
@@ -623,20 +624,10 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
return false;
}
- private void maybeAutoCommitOffsetsAsync(long now) {
- if (autoCommitEnabled) {
- if (coordinatorUnknown()) {
- this.nextAutoCommitDeadline = now + retryBackoffMs;
- } else if (now >= nextAutoCommitDeadline) {
- this.nextAutoCommitDeadline = now + autoCommitIntervalMs;
- doAutoCommitOffsetsAsync();
- }
- }
- }
-
- public void maybeAutoCommitOffsetsNow() {
- if (autoCommitEnabled && !coordinatorUnknown())
+ public void maybeAutoCommitOffsetsAsync(long now) {
+ if (autoCommitEnabled && now >= nextAutoCommitDeadline) {
doAutoCommitOffsetsAsync();
+ }
}
private void doAutoCommitOffsetsAsync() {
@@ -650,8 +641,11 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
log.warn("Asynchronous auto-commit of offsets {} failed: {}", offsets, exception.getMessage());
if (exception instanceof RetriableException)
nextAutoCommitDeadline = Math.min(time.milliseconds() + retryBackoffMs, nextAutoCommitDeadline);
+ else
+ nextAutoCommitDeadline = time.milliseconds() + autoCommitIntervalMs;
} else {
log.debug("Completed asynchronous auto-commit of offsets {}", offsets);
+ nextAutoCommitDeadline = time.milliseconds() + autoCommitIntervalMs;
}
}
});
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 76301a7..c49339b 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -1625,6 +1625,25 @@ public class ConsumerCoordinatorTest {
assertFalse("Heartbeat thread active after close", threads[i].getName().contains(groupId));
}
+ @Test
+ public void testAutoCommitAfterCoordinatorBackToService() {
+ ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors,
+ ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, true, true);
+ subscriptions.assignFromUser(Collections.singleton(t1p));
+ subscriptions.seek(t1p, 100L);
+
+ coordinator.coordinatorDead();
+ assertTrue(coordinator.coordinatorUnknown());
+ client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+ client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE)));
+
+ // async commit offset should find coordinator
+ time.sleep(autoCommitIntervalMs); // sleep for a while to ensure auto commit does happen
+ coordinator.maybeAutoCommitOffsetsAsync(time.milliseconds());
+ assertFalse(coordinator.coordinatorUnknown());
+ assertEquals(subscriptions.committed(t1p).offset(), 100L);
+ }
+
private ConsumerCoordinator prepareCoordinatorForCloseTest(final boolean useGroupManagement,
final boolean autoCommit,
final boolean leaveGroup) {
--
To stop receiving notification emails like this one, please contact
jgus@apache.org.