You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/04/05 07:06:19 UTC

[kafka] branch trunk updated: MINOR: Don’t send the DeleteTopicsRequest for invalid topic names (#4763)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 53d4267  MINOR: Don’t send the DeleteTopicsRequest for invalid topic names (#4763)
53d4267 is described below

commit 53d4267c59177cd826e1b0645168b1e9330bdbcd
Author: Chia-Ping Tsai <ch...@gmail.com>
AuthorDate: Thu Apr 5 15:06:14 2018 +0800

    MINOR: Don’t send the DeleteTopicsRequest for invalid topic names (#4763)
    
    The invalid topic name is already handled locally so it is unnecessary to send the DeleteTopicsRequest. This PR adds a count to MockClient for testing.
    
    Reviewers: Colin Patrick McCabe <co...@cmccabe.xyz>, Jason Gustafson <ja...@confluent.io>
---
 .../java/org/apache/kafka/clients/admin/KafkaAdminClient.java  |  8 +++++---
 clients/src/test/java/org/apache/kafka/clients/MockClient.java | 10 +++++++++-
 .../org/apache/kafka/clients/admin/KafkaAdminClientTest.java   |  4 ++++
 3 files changed, 18 insertions(+), 4 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index e455b9c..5118953 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -1150,9 +1150,10 @@ public class KafkaAdminClient extends AdminClient {
     }
 
     @Override
-    public DeleteTopicsResult deleteTopics(final Collection<String> topicNames,
+    public DeleteTopicsResult deleteTopics(Collection<String> topicNames,
                                            DeleteTopicsOptions options) {
         final Map<String, KafkaFutureImpl<Void>> topicFutures = new HashMap<>(topicNames.size());
+        final List<String> validTopicNames = new ArrayList<>(topicNames.size());
         for (String topicName : topicNames) {
             if (topicNameIsUnrepresentable(topicName)) {
                 KafkaFutureImpl<Void> future = new KafkaFutureImpl<>();
@@ -1161,6 +1162,7 @@ public class KafkaAdminClient extends AdminClient {
                 topicFutures.put(topicName, future);
             } else if (!topicFutures.containsKey(topicName)) {
                 topicFutures.put(topicName, new KafkaFutureImpl<Void>());
+                validTopicNames.add(topicName);
             }
         }
         final long now = time.milliseconds();
@@ -1169,7 +1171,7 @@ public class KafkaAdminClient extends AdminClient {
 
             @Override
             AbstractRequest.Builder createRequest(int timeoutMs) {
-                return new DeleteTopicsRequest.Builder(new HashSet<>(topicNames), timeoutMs);
+                return new DeleteTopicsRequest.Builder(new HashSet<>(validTopicNames), timeoutMs);
             }
 
             @Override
@@ -1204,7 +1206,7 @@ public class KafkaAdminClient extends AdminClient {
                 completeAllExceptionally(topicFutures.values(), throwable);
             }
         };
-        if (!topicNames.isEmpty()) {
+        if (!validTopicNames.isEmpty()) {
             runnable.call(call, now);
         }
         return new DeleteTopicsResult(new HashMap<String, KafkaFuture<Void>>(topicFutures));
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index 60af9bc..a73175c 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.clients;
 
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.errors.AuthenticationException;
@@ -88,7 +89,7 @@ public class MockClient implements KafkaClient {
     private final Queue<MetadataUpdate> metadataUpdates = new ArrayDeque<>();
     private volatile NodeApiVersions nodeApiVersions = NodeApiVersions.create();
     private volatile int numBlockingWakeups = 0;
-
+    private final AtomicInteger totalRequestCount = new AtomicInteger(0);
     public MockClient(Time time) {
         this(time, null);
     }
@@ -394,6 +395,7 @@ public class MockClient implements KafkaClient {
         futureResponses.clear();
         metadataUpdates.clear();
         authenticationErrors.clear();
+        totalRequestCount.set(0);
     }
 
     public boolean hasPendingMetadataUpdates() {
@@ -461,6 +463,7 @@ public class MockClient implements KafkaClient {
     @Override
     public ClientRequest newClientRequest(String nodeId, AbstractRequest.Builder<?> requestBuilder, long createdTimeMs,
                                           boolean expectResponse, RequestCompletionHandler callback) {
+        totalRequestCount.incrementAndGet();
         return new ClientRequest(nodeId, requestBuilder, 0, "mockClientId", createdTimeMs,
                 expectResponse, callback);
     }
@@ -503,4 +506,9 @@ public class MockClient implements KafkaClient {
             this.expectMatchRefreshTopics = expectMatchRefreshTopics;
         }
     }
+
+    // visible for testing
+    public int totalRequestCount() {
+        return totalRequestCount.get();
+    }
 }
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index f08a99b..0d4dee6 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -230,22 +230,26 @@ public class KafkaAdminClientTest {
             for (String sillyTopicName : sillyTopicNames) {
                 assertFutureError(deleteFutures.get(sillyTopicName), InvalidTopicException.class);
             }
+            assertEquals(0, env.kafkaClient().totalRequestCount());
 
             Map<String, KafkaFuture<TopicDescription>> describeFutures =
                     env.adminClient().describeTopics(sillyTopicNames).values();
             for (String sillyTopicName : sillyTopicNames) {
                 assertFutureError(describeFutures.get(sillyTopicName), InvalidTopicException.class);
             }
+            assertEquals(0, env.kafkaClient().totalRequestCount());
 
             List<NewTopic> newTopics = new ArrayList<>();
             for (String sillyTopicName : sillyTopicNames) {
                 newTopics.add(new NewTopic(sillyTopicName, 1, (short) 1));
             }
+
             Map<String, KafkaFuture<Void>> createFutures =
                 env.adminClient().createTopics(newTopics).values();
             for (String sillyTopicName : sillyTopicNames) {
                 assertFutureError(createFutures .get(sillyTopicName), InvalidTopicException.class);
             }
+            assertEquals(0, env.kafkaClient().totalRequestCount());
         }
     }
 

-- 
To stop receiving notification emails like this one, please contact
jgus@apache.org.