You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by sh...@apache.org on 2022/03/10 02:53:51 UTC

[kafka] branch trunk updated: KAFKA-13717: skip coordinator lookup in commitOffsetsAsync if offsets is empty (#11864)

This is an automated email from the ASF dual-hosted git repository.

showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 798275f  KAFKA-13717: skip coordinator lookup in commitOffsetsAsync if offsets is empty (#11864)
798275f is described below

commit 798275f25401b41af6a9703c117173b687d218ba
Author: Vincent Jiang <84...@users.noreply.github.com>
AuthorDate: Wed Mar 9 18:52:05 2022 -0800

    KAFKA-13717: skip coordinator lookup in commitOffsetsAsync if offsets is empty (#11864)
    
    Reviewer: Luke Chen <sh...@gmail.com>, David Jacot <dj...@confluent.io>
---
 .../clients/consumer/internals/ConsumerCoordinator.java    |  7 +++++--
 .../integration/kafka/api/AuthorizerIntegrationTest.scala  | 14 ++++++++++++--
 2 files changed, 17 insertions(+), 4 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 6678137..84cf822 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -1000,8 +1000,11 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
     public RequestFuture<Void> commitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, final OffsetCommitCallback callback) {
         invokeCompletedOffsetCommitCallbacks();
 
-        RequestFuture<Void> future =  null;
-        if (!coordinatorUnknown()) {
+        RequestFuture<Void> future = null;
+        if (offsets.isEmpty()) {
+            // No need to check coordinator if offsets is empty since commit of empty offsets is completed locally.
+            future = doCommitOffsetsAsync(offsets, callback);
+        } else if (!coordinatorUnknown()) {
             future = doCommitOffsetsAsync(offsets, callback);
         } else {
             // we don't know the current coordinator, so try to find it and then send the commit
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 377290d..a4323be 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -18,7 +18,6 @@ import java.util
 import java.util.concurrent.ExecutionException
 import java.util.regex.Pattern
 import java.util.{Collections, Optional, Properties}
-
 import kafka.admin.ConsumerGroupCommand.{ConsumerGroupCommandOptions, ConsumerGroupService}
 import kafka.log.LogConfig
 import kafka.security.authorizer.{AclAuthorizer, AclEntry}
@@ -66,9 +65,10 @@ import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.{CsvSource, ValueSource}
-import java.util.Collections.singletonList
 
+import java.util.Collections.singletonList
 import org.apache.kafka.common.message.MetadataRequestData.MetadataRequestTopic
+import org.junit.jupiter.api.function.Executable
 
 import scala.annotation.nowarn
 import scala.collection.mutable
@@ -2483,6 +2483,16 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     }
   }
 
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testCreateAndCloseConsumerWithNoAccess(quorum: String): Unit = {
+    val consumer = createConsumer()
+    val closeConsumer: Executable = () => consumer.close()
+    // Close consumer without consuming anything. close() call should pass successfully and throw no exception.
+    assertDoesNotThrow(closeConsumer, "Exception not expected on closing consumer")
+  }
+
   private def testDescribeClusterClusterAuthorizedOperations(
     version: Short,
     expectedClusterAuthorizedOperations: Int