You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/01/30 22:47:33 UTC

[kafka] branch trunk updated: KAFKA-6418; AdminClient should handle empty or null topic names better (#4470)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ef93998  KAFKA-6418; AdminClient should handle empty or null topic names better (#4470)
ef93998 is described below

commit ef93998fa7e31f6222d6f0b12b9a01fa785faf48
Author: Colin Patrick McCabe <co...@cmccabe.xyz>
AuthorDate: Tue Jan 30 14:47:28 2018 -0800

    KAFKA-6418; AdminClient should handle empty or null topic names better (#4470)
---
 .../kafka/clients/admin/KafkaAdminClient.java      | 51 ++++++++++++++++++----
 .../kafka/clients/admin/KafkaAdminClientTest.java  | 33 ++++++++++++++
 2 files changed, 75 insertions(+), 9 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 6d6788d..36cbe6c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -1077,19 +1077,33 @@ public class KafkaAdminClient extends AdminClient {
         }
     }
 
+    /**
+     * Returns true if a topic name cannot be represented in an RPC.  This function does NOT check
+     * whether the name is too long, contains invalid characters, etc.  It is better to enforce
+     * those policies on the server, so that they can be changed in the future if needed.
+     */
+    private static boolean topicNameIsUnrepresentable(String topicName) {
+        return (topicName == null) || topicName.isEmpty();
+    }
+
     @Override
     public CreateTopicsResult createTopics(final Collection<NewTopic> newTopics,
                                            final CreateTopicsOptions options) {
         final Map<String, KafkaFutureImpl<Void>> topicFutures = new HashMap<>(newTopics.size());
         final Map<String, CreateTopicsRequest.TopicDetails> topicsMap = new HashMap<>(newTopics.size());
         for (NewTopic newTopic : newTopics) {
-            if (topicFutures.get(newTopic.name()) == null) {
+            if (topicNameIsUnrepresentable(newTopic.name())) {
+                KafkaFutureImpl<Void> future = new KafkaFutureImpl<>();
+                future.completeExceptionally(new InvalidTopicException("The given topic name '" +
+                    newTopic.name() + "' cannot be represented in a request."));
+                topicFutures.put(newTopic.name(), future);
+            } else if (!topicFutures.containsKey(newTopic.name())) {
                 topicFutures.put(newTopic.name(), new KafkaFutureImpl<Void>());
                 topicsMap.put(newTopic.name(), newTopic.convertToTopicDetails());
             }
         }
         final long now = time.milliseconds();
-        runnable.call(new Call("createTopics", calcDeadlineMs(now, options.timeoutMs()),
+        Call call = new Call("createTopics", calcDeadlineMs(now, options.timeoutMs()),
             new ControllerNodeProvider()) {
 
             @Override
@@ -1128,7 +1142,10 @@ public class KafkaAdminClient extends AdminClient {
             void handleFailure(Throwable throwable) {
                 completeAllExceptionally(topicFutures.values(), throwable);
             }
-        }, now);
+        };
+        if (!topicsMap.isEmpty()) {
+            runnable.call(call, now);
+        }
         return new CreateTopicsResult(new HashMap<String, KafkaFuture<Void>>(topicFutures));
     }
 
@@ -1137,12 +1154,17 @@ public class KafkaAdminClient extends AdminClient {
                                            DeleteTopicsOptions options) {
         final Map<String, KafkaFutureImpl<Void>> topicFutures = new HashMap<>(topicNames.size());
         for (String topicName : topicNames) {
-            if (topicFutures.get(topicName) == null) {
+            if (topicNameIsUnrepresentable(topicName)) {
+                KafkaFutureImpl<Void> future = new KafkaFutureImpl<>();
+                future.completeExceptionally(new InvalidTopicException("The given topic name '" +
+                    topicName + "' cannot be represented in a request."));
+                topicFutures.put(topicName, future);
+            } else if (!topicFutures.containsKey(topicName)) {
                 topicFutures.put(topicName, new KafkaFutureImpl<Void>());
             }
         }
         final long now = time.milliseconds();
-        runnable.call(new Call("deleteTopics", calcDeadlineMs(now, options.timeoutMs()),
+        Call call = new Call("deleteTopics", calcDeadlineMs(now, options.timeoutMs()),
             new ControllerNodeProvider()) {
 
             @Override
@@ -1181,7 +1203,10 @@ public class KafkaAdminClient extends AdminClient {
             void handleFailure(Throwable throwable) {
                 completeAllExceptionally(topicFutures.values(), throwable);
             }
-        }, now);
+        };
+        if (!topicNames.isEmpty()) {
+            runnable.call(call, now);
+        }
         return new DeleteTopicsResult(new HashMap<String, KafkaFuture<Void>>(topicFutures));
     }
 
@@ -1223,13 +1248,18 @@ public class KafkaAdminClient extends AdminClient {
         final Map<String, KafkaFutureImpl<TopicDescription>> topicFutures = new HashMap<>(topicNames.size());
         final ArrayList<String> topicNamesList = new ArrayList<>();
         for (String topicName : topicNames) {
-            if (!topicFutures.containsKey(topicName)) {
+            if (topicNameIsUnrepresentable(topicName)) {
+                KafkaFutureImpl<TopicDescription> future = new KafkaFutureImpl<TopicDescription>();
+                future.completeExceptionally(new InvalidTopicException("The given topic name '" +
+                    topicName + "' cannot be represented in a request."));
+                topicFutures.put(topicName, future);
+            } else if (!topicFutures.containsKey(topicName)) {
                 topicFutures.put(topicName, new KafkaFutureImpl<TopicDescription>());
                 topicNamesList.add(topicName);
             }
         }
         final long now = time.milliseconds();
-        runnable.call(new Call("describeTopics", calcDeadlineMs(now, options.timeoutMs()),
+        Call call = new Call("describeTopics", calcDeadlineMs(now, options.timeoutMs()),
             new ControllerNodeProvider()) {
 
             private boolean supportsDisablingTopicCreation = true;
@@ -1298,7 +1328,10 @@ public class KafkaAdminClient extends AdminClient {
             void handleFailure(Throwable throwable) {
                 completeAllExceptionally(topicFutures.values(), throwable);
             }
-        }, now);
+        };
+        if (!topicNamesList.isEmpty()) {
+            runnable.call(call, now);
+        }
         return new DescribeTopicsResult(new HashMap<String, KafkaFuture<TopicDescription>>(topicFutures));
     }
 
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 84588a9..186ccf0 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -64,6 +64,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -215,6 +216,38 @@ public class KafkaAdminClientTest {
         }
     }
 
+    @Test
+    public void testInvalidTopicNames() throws Exception {
+        try (AdminClientUnitTestEnv env = mockClientEnv()) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareMetadataUpdate(env.cluster(), Collections.<String>emptySet());
+            env.kafkaClient().setNode(env.cluster().controller());
+
+            List<String> sillyTopicNames = Arrays.asList(new String[] {"", null});
+            Map<String, KafkaFuture<Void>> deleteFutures =
+                env.adminClient().deleteTopics(sillyTopicNames).values();
+            for (String sillyTopicName : sillyTopicNames) {
+                assertFutureError(deleteFutures.get(sillyTopicName), InvalidTopicException.class);
+            }
+
+            Map<String, KafkaFuture<TopicDescription>> describeFutures =
+                    env.adminClient().describeTopics(sillyTopicNames).values();
+            for (String sillyTopicName : sillyTopicNames) {
+                assertFutureError(describeFutures.get(sillyTopicName), InvalidTopicException.class);
+            }
+
+            List<NewTopic> newTopics = new ArrayList<>();
+            for (String sillyTopicName : sillyTopicNames) {
+                newTopics.add(new NewTopic(sillyTopicName, 1, (short) 1));
+            }
+            Map<String, KafkaFuture<Void>> createFutures =
+                env.adminClient().createTopics(newTopics).values();
+            for (String sillyTopicName : sillyTopicNames) {
+                assertFutureError(createFutures .get(sillyTopicName), InvalidTopicException.class);
+            }
+        }
+    }
+
     private static final AclBinding ACL1 = new AclBinding(new Resource(ResourceType.TOPIC, "mytopic3"),
         new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW));
     private static final AclBinding ACL2 = new AclBinding(new Resource(ResourceType.TOPIC, "mytopic4"),

-- 
To stop receiving notification emails like this one, please contact
jgus@apache.org.