You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by lu...@apache.org on 2024/01/15 20:50:16 UTC

(kafka) branch trunk updated: KAFKA-16133 - Reconciliation auto-commit fix (#15194)

This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b16df3b103d KAFKA-16133 - Reconciliation auto-commit fix (#15194)
b16df3b103d is described below

commit b16df3b103d915d33670b8156217fc6c2b473f61
Author: Lianet Magrans <98...@users.noreply.github.com>
AuthorDate: Mon Jan 15 15:50:09 2024 -0500

    KAFKA-16133 - Reconciliation auto-commit fix (#15194)
    
    This fixes an issue with the time boundaries used for the auto-commit performed when partitions are revoked.
    
    Reviewers: Lucas Brutschy <lb...@confluent.io>
---
 .../consumer/internals/MembershipManagerImpl.java       | 17 +++++++++++++++--
 .../clients/consumer/internals/RequestManagers.java     |  3 ++-
 .../clients/consumer/internals/ConsumerTestBuilder.java |  3 ++-
 .../consumer/internals/MembershipManagerImplTest.java   | 13 ++++++++-----
 .../integration/kafka/api/PlaintextConsumerTest.scala   |  3 +--
 5 files changed, 28 insertions(+), 11 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
index 4e3a35a4b64..ee95a0e3a50 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
@@ -38,6 +38,7 @@ import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
 import org.apache.kafka.common.telemetry.internals.ClientTelemetryProvider;
 import org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter;
 import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.slf4j.Logger;
 
@@ -275,6 +276,8 @@ public class MembershipManagerImpl implements MembershipManager, ClusterResource
      */
     private final BackgroundEventHandler backgroundEventHandler;
 
+    private final Time time;
+
     public MembershipManagerImpl(String groupId,
                                  Optional<String> groupInstanceId,
                                  int rebalanceTimeoutMs,
@@ -284,7 +287,8 @@ public class MembershipManagerImpl implements MembershipManager, ClusterResource
                                  ConsumerMetadata metadata,
                                  LogContext logContext,
                                  Optional<ClientTelemetryReporter> clientTelemetryReporter,
-                                 BackgroundEventHandler backgroundEventHandler) {
+                                 BackgroundEventHandler backgroundEventHandler,
+                                 Time time) {
         this.groupId = groupId;
         this.state = MemberState.UNSUBSCRIBED;
         this.serverAssignor = serverAssignor;
@@ -301,6 +305,7 @@ public class MembershipManagerImpl implements MembershipManager, ClusterResource
         this.clientTelemetryReporter = clientTelemetryReporter;
         this.rebalanceTimeoutMs = rebalanceTimeoutMs;
         this.backgroundEventHandler = backgroundEventHandler;
+        this.time = time;
     }
 
     /**
@@ -833,7 +838,7 @@ public class MembershipManagerImpl implements MembershipManager, ClusterResource
         // the current reconciliation is in process. Note this is using the rebalance timeout as
         // it is the limit enforced by the broker to complete the reconciliation process.
         commitResult = commitRequestManager.maybeAutoCommitAllConsumedNow(
-            Optional.of((long) rebalanceTimeoutMs),
+            Optional.of(getExpirationTimeForTimeout(rebalanceTimeoutMs)),
             true);
 
         // Execute commit -> onPartitionsRevoked -> onPartitionsAssigned.
@@ -854,6 +859,14 @@ public class MembershipManagerImpl implements MembershipManager, ClusterResource
         return true;
     }
 
+    long getExpirationTimeForTimeout(final long timeoutMs) {
+        long expiration = time.milliseconds() + timeoutMs;
+        if (expiration < 0) {
+            return Long.MAX_VALUE;
+        }
+        return expiration;
+    }
+
     /**
      * Trigger onPartitionsRevoked callbacks if any partitions where revoked. If it succeeds,
      * proceed to trigger the onPartitionsAssigned (even if no new partitions were added), and
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
index 5305c004b95..88b909b4ede 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
@@ -178,7 +178,8 @@ public class RequestManagers implements Closeable {
                             metadata,
                             logContext,
                             clientTelemetryReporter,
-                            backgroundEventHandler);
+                            backgroundEventHandler,
+                            time);
                     membershipManager.registerStateListener(commit);
                     heartbeatRequestManager = new HeartbeatRequestManager(
                             logContext,
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java
index 013ddca9d54..28ebe43706e 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java
@@ -201,7 +201,8 @@ public class ConsumerTestBuilder implements Closeable {
                         metadata,
                         logContext,
                         Optional.empty(),
-                        backgroundEventHandler
+                        backgroundEventHandler,
+                        time
                 )
             );
             HeartbeatRequestManager.HeartbeatState heartbeatState = spy(new HeartbeatRequestManager.HeartbeatState(
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java
index 1d462310ceb..ce424540466 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java
@@ -32,6 +32,7 @@ import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
 import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -89,6 +90,7 @@ public class MembershipManagerImplTest {
     private ConsumerTestBuilder testBuilder;
     private BlockingQueue<BackgroundEvent> backgroundEventQueue;
     private BackgroundEventHandler backgroundEventHandler;
+    private Time time;
 
     @BeforeEach
     public void setup() {
@@ -98,6 +100,7 @@ public class MembershipManagerImplTest {
         commitRequestManager = testBuilder.commitRequestManager.orElseThrow(IllegalStateException::new);
         backgroundEventQueue = testBuilder.backgroundEventQueue;
         backgroundEventHandler = testBuilder.backgroundEventHandler;
+        time = testBuilder.time;
     }
 
     @AfterEach
@@ -111,7 +114,7 @@ public class MembershipManagerImplTest {
         MembershipManagerImpl manager = spy(new MembershipManagerImpl(
                 GROUP_ID, Optional.empty(), REBALANCE_TIMEOUT, Optional.empty(),
                 subscriptionState, commitRequestManager, metadata, logContext, Optional.empty(),
-                backgroundEventHandler));
+                backgroundEventHandler, time));
         manager.transitionToJoining();
         return manager;
     }
@@ -120,7 +123,7 @@ public class MembershipManagerImplTest {
         MembershipManagerImpl manager = spy(new MembershipManagerImpl(
                 GROUP_ID, Optional.ofNullable(groupInstanceId), REBALANCE_TIMEOUT, Optional.empty(),
                 subscriptionState, commitRequestManager, metadata, logContext, Optional.empty(),
-                backgroundEventHandler));
+                backgroundEventHandler, time));
         manager.transitionToJoining();
         return manager;
     }
@@ -130,7 +133,7 @@ public class MembershipManagerImplTest {
         MembershipManagerImpl manager = new MembershipManagerImpl(
                 GROUP_ID, Optional.ofNullable(groupInstanceId), REBALANCE_TIMEOUT,
                 Optional.ofNullable(serverAssignor), subscriptionState, commitRequestManager,
-                metadata, logContext, Optional.empty(), backgroundEventHandler);
+                metadata, logContext, Optional.empty(), backgroundEventHandler, time);
         manager.transitionToJoining();
         return manager;
     }
@@ -156,7 +159,7 @@ public class MembershipManagerImplTest {
         MembershipManagerImpl manager = new MembershipManagerImpl(
                 GROUP_ID, Optional.empty(), REBALANCE_TIMEOUT, Optional.empty(),
                 subscriptionState, commitRequestManager, metadata, logContext, Optional.empty(),
-                backgroundEventHandler);
+                backgroundEventHandler, time);
         manager.transitionToJoining();
         verify(metadata).addClusterUpdateListener(manager);
         clearInvocations(metadata);
@@ -235,7 +238,7 @@ public class MembershipManagerImplTest {
         MembershipManagerImpl membershipManager = new MembershipManagerImpl(
                 GROUP_ID, Optional.empty(), REBALANCE_TIMEOUT, Optional.empty(),
                 subscriptionState, commitRequestManager, metadata, logContext, Optional.empty(),
-                backgroundEventHandler);
+                backgroundEventHandler, time);
         assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state());
         membershipManager.transitionToJoining();
 
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index faab3a4782a..cb568760514 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -1553,9 +1553,8 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     assertEquals(7, consumer.committed(Set(tp2).asJava).get(tp2).offset)
   }
 
-  // TODO: enable this test for the consumer group protocol when auto-commit support is implemented.
   @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
-  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
   def testAutoCommitOnRebalance(quorum: String, groupProtocol: String): Unit = {
     val topic2 = "topic2"
     createTopic(topic2, 2, brokerCount)