You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by sh...@apache.org on 2023/10/11 03:41:56 UTC

[kafka] branch trunk updated: KAFKA-15507: Make AdminClient throw non-retriable exception for a new call while closing (#14455)

This is an automated email from the ASF dual-hosted git repository.

showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 3c9031c6245 KAFKA-15507: Make AdminClient throw non-retriable exception for a new call while closing (#14455)
3c9031c6245 is described below

commit 3c9031c62455e4eaa3f5d16a3bba94d7e3159fb6
Author: Gantigmaa Selenge <39...@users.noreply.github.com>
AuthorDate: Wed Oct 11 04:41:46 2023 +0100

    KAFKA-15507: Make AdminClient throw non-retriable exception for a new call while closing (#14455)
    
    AdminClient will throw IllegalStateException instead of TimeoutException if it receives new calls while closing down. This is more consistent with how Consumer and Producer clients handle new calls after closed down.
    
    Reviewers: Luke Chen <sh...@gmail.com>, Kirk True <ki...@kirktrue.pro>, Kamal Chandraprakash <ka...@gmail.com>, vamossagar12 <sa...@gmail.com>
---
 .../org/apache/kafka/clients/admin/KafkaAdminClient.java     |  5 ++---
 .../org/apache/kafka/clients/admin/KafkaAdminClientTest.java | 12 ++++++++++++
 .../kafka/api/PlaintextAdminIntegrationTest.scala            |  4 ++--
 3 files changed, 16 insertions(+), 5 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 4db6b271946..27d28b5b336 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -1546,9 +1546,8 @@ public class KafkaAdminClient extends AdminClient {
          */
         void call(Call call, long now) {
             if (hardShutdownTimeMs.get() != INVALID_SHUTDOWN_TIME) {
-                log.debug("The AdminClient is not accepting new calls. Timing out {}.", call);
-                call.handleTimeoutFailure(time.milliseconds(),
-                    new TimeoutException("The AdminClient thread is not accepting new calls."));
+                log.debug("Cannot accept new call {} when AdminClient is closing.", call);
+                call.handleFailure(new IllegalStateException("Cannot accept new calls when AdminClient is closing."));
             } else if (metadataManager.usingBootstrapControllers() &&
                     (!call.nodeProvider.supportsUseControllers())) {
                 call.fail(now, new UnsupportedEndpointTypeException("This Admin API is not " +
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 229d3119871..378e08b2c45 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -478,6 +478,18 @@ public class KafkaAdminClientTest {
         callbackCalled.acquire();
     }
 
+    @Test
+    public void testAdminClientFailureWhenClosed() {
+        MockTime time = new MockTime();
+        AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, mockCluster(3, 0));
+        env.adminClient().close();
+        ExecutionException e = assertThrows(ExecutionException.class, () -> env.adminClient().createTopics(
+                singleton(new NewTopic("myTopic", Collections.singletonMap(0, asList(0, 1, 2)))),
+                new CreateTopicsOptions().timeoutMs(10000)).all().get());
+        assertTrue(e.getCause() instanceof IllegalStateException,
+                "Expected an IllegalStateException error, but got " + Utils.stackTrace(e));
+    }
+
     private static OffsetDeleteResponse prepareOffsetDeleteResponse(Errors error) {
         return new OffsetDeleteResponse(
             new OffsetDeleteResponseData()
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index 784374d23e8..5bb3533146c 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -1026,7 +1026,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
 
   /**
     * Test closing the AdminClient with a generous timeout.  Calls in progress should be completed,
-    * since they can be done within the timeout.  New calls should receive timeouts.
+    * since they can be done within the timeout.  New calls should receive exceptions.
     */
   @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
   @ValueSource(strings = Array("zk", "kraft"))
@@ -1037,7 +1037,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
     val future = client.createTopics(newTopics.asJava, new CreateTopicsOptions().validateOnly(true)).all()
     client.close(time.Duration.ofHours(2))
     val future2 = client.createTopics(newTopics.asJava, new CreateTopicsOptions().validateOnly(true)).all()
-    assertFutureExceptionTypeEquals(future2, classOf[TimeoutException])
+    assertFutureExceptionTypeEquals(future2, classOf[IllegalStateException])
     future.get
     client.close(time.Duration.ofMinutes(30)) // multiple close-with-timeout should have no effect
   }