You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by sh...@apache.org on 2022/03/10 02:53:51 UTC
[kafka] branch trunk updated: KAFKA-13717: skip coordinator lookup in commitOffsetsAsync if offsets is empty (#11864)
This is an automated email from the ASF dual-hosted git repository.
showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 798275f KAFKA-13717: skip coordinator lookup in commitOffsetsAsync if offsets is empty (#11864)
798275f is described below
commit 798275f25401b41af6a9703c117173b687d218ba
Author: Vincent Jiang <84...@users.noreply.github.com>
AuthorDate: Wed Mar 9 18:52:05 2022 -0800
KAFKA-13717: skip coordinator lookup in commitOffsetsAsync if offsets is empty (#11864)
Reviewer: Luke Chen <sh...@gmail.com>, David Jacot <dj...@confluent.io>
---
.../clients/consumer/internals/ConsumerCoordinator.java | 7 +++++--
.../integration/kafka/api/AuthorizerIntegrationTest.scala | 14 ++++++++++++--
2 files changed, 17 insertions(+), 4 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 6678137..84cf822 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -1000,8 +1000,11 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
public RequestFuture<Void> commitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, final OffsetCommitCallback callback) {
invokeCompletedOffsetCommitCallbacks();
- RequestFuture<Void> future = null;
- if (!coordinatorUnknown()) {
+ RequestFuture<Void> future = null;
+ if (offsets.isEmpty()) {
+ // No need to check coordinator if offsets is empty since commit of empty offsets is completed locally.
+ future = doCommitOffsetsAsync(offsets, callback);
+ } else if (!coordinatorUnknown()) {
future = doCommitOffsetsAsync(offsets, callback);
} else {
// we don't know the current coordinator, so try to find it and then send the commit
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 377290d..a4323be 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -18,7 +18,6 @@ import java.util
import java.util.concurrent.ExecutionException
import java.util.regex.Pattern
import java.util.{Collections, Optional, Properties}
-
import kafka.admin.ConsumerGroupCommand.{ConsumerGroupCommandOptions, ConsumerGroupService}
import kafka.log.LogConfig
import kafka.security.authorizer.{AclAuthorizer, AclEntry}
@@ -66,9 +65,10 @@ import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.{CsvSource, ValueSource}
-import java.util.Collections.singletonList
+import java.util.Collections.singletonList
import org.apache.kafka.common.message.MetadataRequestData.MetadataRequestTopic
+import org.junit.jupiter.api.function.Executable
import scala.annotation.nowarn
import scala.collection.mutable
@@ -2483,6 +2483,16 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
}
}
+
+ @ParameterizedTest
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testCreateAndCloseConsumerWithNoAccess(quorum: String): Unit = {
+ val consumer = createConsumer()
+ val closeConsumer: Executable = () => consumer.close()
+ // Close consumer without consuming anything. close() call should pass successfully and throw no exception.
+ assertDoesNotThrow(closeConsumer, "Exception not expected on closing consumer")
+ }
+
private def testDescribeClusterClusterAuthorizedOperations(
version: Short,
expectedClusterAuthorizedOperations: Int