You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by lu...@apache.org on 2024/01/15 20:50:16 UTC
(kafka) branch trunk updated: KAFKA-16133 - Reconciliation auto-commit fix (#15194)
This is an automated email from the ASF dual-hosted git repository.
lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b16df3b103d KAFKA-16133 - Reconciliation auto-commit fix (#15194)
b16df3b103d is described below
commit b16df3b103d915d33670b8156217fc6c2b473f61
Author: Lianet Magrans <98...@users.noreply.github.com>
AuthorDate: Mon Jan 15 15:50:09 2024 -0500
KAFKA-16133 - Reconciliation auto-commit fix (#15194)
This fixes an issue with the time boundaries used for the auto-commit performed when partitions are revoked.
Reviewers: Lucas Brutschy <lb...@confluent.io>
---
.../consumer/internals/MembershipManagerImpl.java | 17 +++++++++++++++--
.../clients/consumer/internals/RequestManagers.java | 3 ++-
.../clients/consumer/internals/ConsumerTestBuilder.java | 3 ++-
.../consumer/internals/MembershipManagerImplTest.java | 13 ++++++++-----
.../integration/kafka/api/PlaintextConsumerTest.scala | 3 +--
5 files changed, 28 insertions(+), 11 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
index 4e3a35a4b64..ee95a0e3a50 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
@@ -38,6 +38,7 @@ import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
import org.apache.kafka.common.telemetry.internals.ClientTelemetryProvider;
import org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter;
import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
@@ -275,6 +276,8 @@ public class MembershipManagerImpl implements MembershipManager, ClusterResource
*/
private final BackgroundEventHandler backgroundEventHandler;
+ private final Time time;
+
public MembershipManagerImpl(String groupId,
Optional<String> groupInstanceId,
int rebalanceTimeoutMs,
@@ -284,7 +287,8 @@ public class MembershipManagerImpl implements MembershipManager, ClusterResource
ConsumerMetadata metadata,
LogContext logContext,
Optional<ClientTelemetryReporter> clientTelemetryReporter,
- BackgroundEventHandler backgroundEventHandler) {
+ BackgroundEventHandler backgroundEventHandler,
+ Time time) {
this.groupId = groupId;
this.state = MemberState.UNSUBSCRIBED;
this.serverAssignor = serverAssignor;
@@ -301,6 +305,7 @@ public class MembershipManagerImpl implements MembershipManager, ClusterResource
this.clientTelemetryReporter = clientTelemetryReporter;
this.rebalanceTimeoutMs = rebalanceTimeoutMs;
this.backgroundEventHandler = backgroundEventHandler;
+ this.time = time;
}
/**
@@ -833,7 +838,7 @@ public class MembershipManagerImpl implements MembershipManager, ClusterResource
// the current reconciliation is in process. Note this is using the rebalance timeout as
// it is the limit enforced by the broker to complete the reconciliation process.
commitResult = commitRequestManager.maybeAutoCommitAllConsumedNow(
- Optional.of((long) rebalanceTimeoutMs),
+ Optional.of(getExpirationTimeForTimeout(rebalanceTimeoutMs)),
true);
// Execute commit -> onPartitionsRevoked -> onPartitionsAssigned.
@@ -854,6 +859,14 @@ public class MembershipManagerImpl implements MembershipManager, ClusterResource
return true;
}
+ long getExpirationTimeForTimeout(final long timeoutMs) {
+ long expiration = time.milliseconds() + timeoutMs;
+ if (expiration < 0) {
+ return Long.MAX_VALUE;
+ }
+ return expiration;
+ }
+
/**
* Trigger onPartitionsRevoked callbacks if any partitions where revoked. If it succeeds,
* proceed to trigger the onPartitionsAssigned (even if no new partitions were added), and
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
index 5305c004b95..88b909b4ede 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
@@ -178,7 +178,8 @@ public class RequestManagers implements Closeable {
metadata,
logContext,
clientTelemetryReporter,
- backgroundEventHandler);
+ backgroundEventHandler,
+ time);
membershipManager.registerStateListener(commit);
heartbeatRequestManager = new HeartbeatRequestManager(
logContext,
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java
index 013ddca9d54..28ebe43706e 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java
@@ -201,7 +201,8 @@ public class ConsumerTestBuilder implements Closeable {
metadata,
logContext,
Optional.empty(),
- backgroundEventHandler
+ backgroundEventHandler,
+ time
)
);
HeartbeatRequestManager.HeartbeatState heartbeatState = spy(new HeartbeatRequestManager.HeartbeatState(
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java
index 1d462310ceb..ce424540466 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java
@@ -32,6 +32,7 @@ import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -89,6 +90,7 @@ public class MembershipManagerImplTest {
private ConsumerTestBuilder testBuilder;
private BlockingQueue<BackgroundEvent> backgroundEventQueue;
private BackgroundEventHandler backgroundEventHandler;
+ private Time time;
@BeforeEach
public void setup() {
@@ -98,6 +100,7 @@ public class MembershipManagerImplTest {
commitRequestManager = testBuilder.commitRequestManager.orElseThrow(IllegalStateException::new);
backgroundEventQueue = testBuilder.backgroundEventQueue;
backgroundEventHandler = testBuilder.backgroundEventHandler;
+ time = testBuilder.time;
}
@AfterEach
@@ -111,7 +114,7 @@ public class MembershipManagerImplTest {
MembershipManagerImpl manager = spy(new MembershipManagerImpl(
GROUP_ID, Optional.empty(), REBALANCE_TIMEOUT, Optional.empty(),
subscriptionState, commitRequestManager, metadata, logContext, Optional.empty(),
- backgroundEventHandler));
+ backgroundEventHandler, time));
manager.transitionToJoining();
return manager;
}
@@ -120,7 +123,7 @@ public class MembershipManagerImplTest {
MembershipManagerImpl manager = spy(new MembershipManagerImpl(
GROUP_ID, Optional.ofNullable(groupInstanceId), REBALANCE_TIMEOUT, Optional.empty(),
subscriptionState, commitRequestManager, metadata, logContext, Optional.empty(),
- backgroundEventHandler));
+ backgroundEventHandler, time));
manager.transitionToJoining();
return manager;
}
@@ -130,7 +133,7 @@ public class MembershipManagerImplTest {
MembershipManagerImpl manager = new MembershipManagerImpl(
GROUP_ID, Optional.ofNullable(groupInstanceId), REBALANCE_TIMEOUT,
Optional.ofNullable(serverAssignor), subscriptionState, commitRequestManager,
- metadata, logContext, Optional.empty(), backgroundEventHandler);
+ metadata, logContext, Optional.empty(), backgroundEventHandler, time);
manager.transitionToJoining();
return manager;
}
@@ -156,7 +159,7 @@ public class MembershipManagerImplTest {
MembershipManagerImpl manager = new MembershipManagerImpl(
GROUP_ID, Optional.empty(), REBALANCE_TIMEOUT, Optional.empty(),
subscriptionState, commitRequestManager, metadata, logContext, Optional.empty(),
- backgroundEventHandler);
+ backgroundEventHandler, time);
manager.transitionToJoining();
verify(metadata).addClusterUpdateListener(manager);
clearInvocations(metadata);
@@ -235,7 +238,7 @@ public class MembershipManagerImplTest {
MembershipManagerImpl membershipManager = new MembershipManagerImpl(
GROUP_ID, Optional.empty(), REBALANCE_TIMEOUT, Optional.empty(),
subscriptionState, commitRequestManager, metadata, logContext, Optional.empty(),
- backgroundEventHandler);
+ backgroundEventHandler, time);
assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state());
membershipManager.transitionToJoining();
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index faab3a4782a..cb568760514 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -1553,9 +1553,8 @@ class PlaintextConsumerTest extends BaseConsumerTest {
assertEquals(7, consumer.committed(Set(tp2).asJava).get(tp2).offset)
}
- // TODO: enable this test for the consumer group protocol when auto-commit support is implemented.
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
- @MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
+ @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testAutoCommitOnRebalance(quorum: String, groupProtocol: String): Unit = {
val topic2 = "topic2"
createTopic(topic2, 2, brokerCount)