You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/01/30 22:47:33 UTC
[kafka] branch trunk updated: KAFKA-6418;
AdminClient should handle empty or null topic names better (#4470)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new ef93998 KAFKA-6418; AdminClient should handle empty or null topic names better (#4470)
ef93998 is described below
commit ef93998fa7e31f6222d6f0b12b9a01fa785faf48
Author: Colin Patrick McCabe <co...@cmccabe.xyz>
AuthorDate: Tue Jan 30 14:47:28 2018 -0800
KAFKA-6418; AdminClient should handle empty or null topic names better (#4470)
---
.../kafka/clients/admin/KafkaAdminClient.java | 51 ++++++++++++++++++----
.../kafka/clients/admin/KafkaAdminClientTest.java | 33 ++++++++++++++
2 files changed, 75 insertions(+), 9 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 6d6788d..36cbe6c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -1077,19 +1077,33 @@ public class KafkaAdminClient extends AdminClient {
}
}
+ /**
+ * Returns true if a topic name cannot be represented in an RPC. This function does NOT check
+ * whether the name is too long, contains invalid characters, etc. It is better to enforce
+ * those policies on the server, so that they can be changed in the future if needed.
+ */
+ private static boolean topicNameIsUnrepresentable(String topicName) {
+ return (topicName == null) || topicName.isEmpty();
+ }
+
@Override
public CreateTopicsResult createTopics(final Collection<NewTopic> newTopics,
final CreateTopicsOptions options) {
final Map<String, KafkaFutureImpl<Void>> topicFutures = new HashMap<>(newTopics.size());
final Map<String, CreateTopicsRequest.TopicDetails> topicsMap = new HashMap<>(newTopics.size());
for (NewTopic newTopic : newTopics) {
- if (topicFutures.get(newTopic.name()) == null) {
+ if (topicNameIsUnrepresentable(newTopic.name())) {
+ KafkaFutureImpl<Void> future = new KafkaFutureImpl<>();
+ future.completeExceptionally(new InvalidTopicException("The given topic name '" +
+ newTopic.name() + "' cannot be represented in a request."));
+ topicFutures.put(newTopic.name(), future);
+ } else if (!topicFutures.containsKey(newTopic.name())) {
topicFutures.put(newTopic.name(), new KafkaFutureImpl<Void>());
topicsMap.put(newTopic.name(), newTopic.convertToTopicDetails());
}
}
final long now = time.milliseconds();
- runnable.call(new Call("createTopics", calcDeadlineMs(now, options.timeoutMs()),
+ Call call = new Call("createTopics", calcDeadlineMs(now, options.timeoutMs()),
new ControllerNodeProvider()) {
@Override
@@ -1128,7 +1142,10 @@ public class KafkaAdminClient extends AdminClient {
void handleFailure(Throwable throwable) {
completeAllExceptionally(topicFutures.values(), throwable);
}
- }, now);
+ };
+ if (!topicsMap.isEmpty()) {
+ runnable.call(call, now);
+ }
return new CreateTopicsResult(new HashMap<String, KafkaFuture<Void>>(topicFutures));
}
@@ -1137,12 +1154,17 @@ public class KafkaAdminClient extends AdminClient {
DeleteTopicsOptions options) {
final Map<String, KafkaFutureImpl<Void>> topicFutures = new HashMap<>(topicNames.size());
for (String topicName : topicNames) {
- if (topicFutures.get(topicName) == null) {
+ if (topicNameIsUnrepresentable(topicName)) {
+ KafkaFutureImpl<Void> future = new KafkaFutureImpl<>();
+ future.completeExceptionally(new InvalidTopicException("The given topic name '" +
+ topicName + "' cannot be represented in a request."));
+ topicFutures.put(topicName, future);
+ } else if (!topicFutures.containsKey(topicName)) {
topicFutures.put(topicName, new KafkaFutureImpl<Void>());
}
}
final long now = time.milliseconds();
- runnable.call(new Call("deleteTopics", calcDeadlineMs(now, options.timeoutMs()),
+ Call call = new Call("deleteTopics", calcDeadlineMs(now, options.timeoutMs()),
new ControllerNodeProvider()) {
@Override
@@ -1181,7 +1203,10 @@ public class KafkaAdminClient extends AdminClient {
void handleFailure(Throwable throwable) {
completeAllExceptionally(topicFutures.values(), throwable);
}
- }, now);
+ };
+ if (!topicNames.isEmpty()) {
+ runnable.call(call, now);
+ }
return new DeleteTopicsResult(new HashMap<String, KafkaFuture<Void>>(topicFutures));
}
@@ -1223,13 +1248,18 @@ public class KafkaAdminClient extends AdminClient {
final Map<String, KafkaFutureImpl<TopicDescription>> topicFutures = new HashMap<>(topicNames.size());
final ArrayList<String> topicNamesList = new ArrayList<>();
for (String topicName : topicNames) {
- if (!topicFutures.containsKey(topicName)) {
+ if (topicNameIsUnrepresentable(topicName)) {
+ KafkaFutureImpl<TopicDescription> future = new KafkaFutureImpl<TopicDescription>();
+ future.completeExceptionally(new InvalidTopicException("The given topic name '" +
+ topicName + "' cannot be represented in a request."));
+ topicFutures.put(topicName, future);
+ } else if (!topicFutures.containsKey(topicName)) {
topicFutures.put(topicName, new KafkaFutureImpl<TopicDescription>());
topicNamesList.add(topicName);
}
}
final long now = time.milliseconds();
- runnable.call(new Call("describeTopics", calcDeadlineMs(now, options.timeoutMs()),
+ Call call = new Call("describeTopics", calcDeadlineMs(now, options.timeoutMs()),
new ControllerNodeProvider()) {
private boolean supportsDisablingTopicCreation = true;
@@ -1298,7 +1328,10 @@ public class KafkaAdminClient extends AdminClient {
void handleFailure(Throwable throwable) {
completeAllExceptionally(topicFutures.values(), throwable);
}
- }, now);
+ };
+ if (!topicNamesList.isEmpty()) {
+ runnable.call(call, now);
+ }
return new DescribeTopicsResult(new HashMap<String, KafkaFuture<TopicDescription>>(topicFutures));
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 84588a9..186ccf0 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -64,6 +64,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -215,6 +216,38 @@ public class KafkaAdminClientTest {
}
}
+ @Test
+ public void testInvalidTopicNames() throws Exception {
+ try (AdminClientUnitTestEnv env = mockClientEnv()) {
+ env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+ env.kafkaClient().prepareMetadataUpdate(env.cluster(), Collections.<String>emptySet());
+ env.kafkaClient().setNode(env.cluster().controller());
+
+ List<String> sillyTopicNames = Arrays.asList(new String[] {"", null});
+ Map<String, KafkaFuture<Void>> deleteFutures =
+ env.adminClient().deleteTopics(sillyTopicNames).values();
+ for (String sillyTopicName : sillyTopicNames) {
+ assertFutureError(deleteFutures.get(sillyTopicName), InvalidTopicException.class);
+ }
+
+ Map<String, KafkaFuture<TopicDescription>> describeFutures =
+ env.adminClient().describeTopics(sillyTopicNames).values();
+ for (String sillyTopicName : sillyTopicNames) {
+ assertFutureError(describeFutures.get(sillyTopicName), InvalidTopicException.class);
+ }
+
+ List<NewTopic> newTopics = new ArrayList<>();
+ for (String sillyTopicName : sillyTopicNames) {
+ newTopics.add(new NewTopic(sillyTopicName, 1, (short) 1));
+ }
+ Map<String, KafkaFuture<Void>> createFutures =
+ env.adminClient().createTopics(newTopics).values();
+ for (String sillyTopicName : sillyTopicNames) {
+ assertFutureError(createFutures .get(sillyTopicName), InvalidTopicException.class);
+ }
+ }
+ }
+
private static final AclBinding ACL1 = new AclBinding(new Resource(ResourceType.TOPIC, "mytopic3"),
new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW));
private static final AclBinding ACL2 = new AclBinding(new Resource(ResourceType.TOPIC, "mytopic4"),
--
To stop receiving notification emails like this one, please contact
jgus@apache.org.