You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/04/05 07:06:19 UTC
[kafka] branch trunk updated: MINOR: Don’t send the DeleteTopicsRequest for invalid topic names (#4763)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 53d4267 MINOR: Don’t send the DeleteTopicsRequest for invalid topic names (#4763)
53d4267 is described below
commit 53d4267c59177cd826e1b0645168b1e9330bdbcd
Author: Chia-Ping Tsai <ch...@gmail.com>
AuthorDate: Thu Apr 5 15:06:14 2018 +0800
MINOR: Don’t send the DeleteTopicsRequest for invalid topic names (#4763)
The invalid topic name is already handled locally so it is unnecessary to send the DeleteTopicsRequest. This PR adds a count to MockClient for testing.
Reviewers: Colin Patrick McCabe <co...@cmccabe.xyz>, Jason Gustafson <ja...@confluent.io>
---
.../java/org/apache/kafka/clients/admin/KafkaAdminClient.java | 8 +++++---
clients/src/test/java/org/apache/kafka/clients/MockClient.java | 10 +++++++++-
.../org/apache/kafka/clients/admin/KafkaAdminClientTest.java | 4 ++++
3 files changed, 18 insertions(+), 4 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index e455b9c..5118953 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -1150,9 +1150,10 @@ public class KafkaAdminClient extends AdminClient {
}
@Override
- public DeleteTopicsResult deleteTopics(final Collection<String> topicNames,
+ public DeleteTopicsResult deleteTopics(Collection<String> topicNames,
DeleteTopicsOptions options) {
final Map<String, KafkaFutureImpl<Void>> topicFutures = new HashMap<>(topicNames.size());
+ final List<String> validTopicNames = new ArrayList<>(topicNames.size());
for (String topicName : topicNames) {
if (topicNameIsUnrepresentable(topicName)) {
KafkaFutureImpl<Void> future = new KafkaFutureImpl<>();
@@ -1161,6 +1162,7 @@ public class KafkaAdminClient extends AdminClient {
topicFutures.put(topicName, future);
} else if (!topicFutures.containsKey(topicName)) {
topicFutures.put(topicName, new KafkaFutureImpl<Void>());
+ validTopicNames.add(topicName);
}
}
final long now = time.milliseconds();
@@ -1169,7 +1171,7 @@ public class KafkaAdminClient extends AdminClient {
@Override
AbstractRequest.Builder createRequest(int timeoutMs) {
- return new DeleteTopicsRequest.Builder(new HashSet<>(topicNames), timeoutMs);
+ return new DeleteTopicsRequest.Builder(new HashSet<>(validTopicNames), timeoutMs);
}
@Override
@@ -1204,7 +1206,7 @@ public class KafkaAdminClient extends AdminClient {
completeAllExceptionally(topicFutures.values(), throwable);
}
};
- if (!topicNames.isEmpty()) {
+ if (!validTopicNames.isEmpty()) {
runnable.call(call, now);
}
return new DeleteTopicsResult(new HashMap<String, KafkaFuture<Void>>(topicFutures));
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index 60af9bc..a73175c 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.clients;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.AuthenticationException;
@@ -88,7 +89,7 @@ public class MockClient implements KafkaClient {
private final Queue<MetadataUpdate> metadataUpdates = new ArrayDeque<>();
private volatile NodeApiVersions nodeApiVersions = NodeApiVersions.create();
private volatile int numBlockingWakeups = 0;
-
+ private final AtomicInteger totalRequestCount = new AtomicInteger(0);
public MockClient(Time time) {
this(time, null);
}
@@ -394,6 +395,7 @@ public class MockClient implements KafkaClient {
futureResponses.clear();
metadataUpdates.clear();
authenticationErrors.clear();
+ totalRequestCount.set(0);
}
public boolean hasPendingMetadataUpdates() {
@@ -461,6 +463,7 @@ public class MockClient implements KafkaClient {
@Override
public ClientRequest newClientRequest(String nodeId, AbstractRequest.Builder<?> requestBuilder, long createdTimeMs,
boolean expectResponse, RequestCompletionHandler callback) {
+ totalRequestCount.incrementAndGet();
return new ClientRequest(nodeId, requestBuilder, 0, "mockClientId", createdTimeMs,
expectResponse, callback);
}
@@ -503,4 +506,9 @@ public class MockClient implements KafkaClient {
this.expectMatchRefreshTopics = expectMatchRefreshTopics;
}
}
+
+ // visible for testing
+ public int totalRequestCount() {
+ return totalRequestCount.get();
+ }
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index f08a99b..0d4dee6 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -230,22 +230,26 @@ public class KafkaAdminClientTest {
for (String sillyTopicName : sillyTopicNames) {
assertFutureError(deleteFutures.get(sillyTopicName), InvalidTopicException.class);
}
+ assertEquals(0, env.kafkaClient().totalRequestCount());
Map<String, KafkaFuture<TopicDescription>> describeFutures =
env.adminClient().describeTopics(sillyTopicNames).values();
for (String sillyTopicName : sillyTopicNames) {
assertFutureError(describeFutures.get(sillyTopicName), InvalidTopicException.class);
}
+ assertEquals(0, env.kafkaClient().totalRequestCount());
List<NewTopic> newTopics = new ArrayList<>();
for (String sillyTopicName : sillyTopicNames) {
newTopics.add(new NewTopic(sillyTopicName, 1, (short) 1));
}
+
Map<String, KafkaFuture<Void>> createFutures =
env.adminClient().createTopics(newTopics).values();
for (String sillyTopicName : sillyTopicNames) {
assertFutureError(createFutures .get(sillyTopicName), InvalidTopicException.class);
}
+ assertEquals(0, env.kafkaClient().totalRequestCount());
}
}
--
To stop receiving notification emails like this one, please contact
jgus@apache.org.