You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rh...@apache.org on 2020/06/17 15:21:00 UTC
[kafka] branch 2.6 updated: KAFKA-10147
MockAdminClient#describeConfigs(Collection) is unable to
handle broker resource (#8853)
This is an automated email from the ASF dual-hosted git repository.
rhauch pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.6 by this push:
new be695c4 KAFKA-10147 MockAdminClient#describeConfigs(Collection<ConfigResource>) is unable to handle broker resource (#8853)
be695c4 is described below
commit be695c432613f3c84e309676e4eb926e806dade4
Author: Chia-Ping Tsai <ch...@gmail.com>
AuthorDate: Wed Jun 17 22:56:07 2020 +0800
KAFKA-10147 MockAdminClient#describeConfigs(Collection<ConfigResource>) is unable to handle broker resource (#8853)
Author: Chia-Ping Tsai <ch...@gmail.com>
Reviewers: Boyang Chen <bo...@confluent.io>, Randall Hauch <rh...@gmail.com>
---
.../kafka/clients/admin/MockAdminClient.java | 72 +++++++---------------
1 file changed, 23 insertions(+), 49 deletions(-)
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
index 2b86d4f..7c6a955 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
@@ -51,6 +51,7 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.stream.Collectors;
public class MockAdminClient extends AdminClient {
public static final String DEFAULT_CLUSTER_ID = "I4ZmrWqfT2e-upky_4fdPA";
@@ -366,51 +367,6 @@ public class MockAdminClient extends AdminClient {
}
@Override
- public DescribeConfigsResult describeConfigs(Collection<ConfigResource> resources) {
- Map<ConfigResource, KafkaFuture<Config>> topicConfigs = new HashMap<>();
-
- if (timeoutNextRequests > 0) {
- for (ConfigResource requestedResource : resources) {
- KafkaFutureImpl<Config> future = new KafkaFutureImpl<>();
- future.completeExceptionally(new TimeoutException());
- topicConfigs.put(requestedResource, future);
- }
-
- --timeoutNextRequests;
- return new DescribeConfigsResult(topicConfigs);
- }
-
- for (ConfigResource requestedResource : resources) {
- if (requestedResource.type() != ConfigResource.Type.TOPIC) {
- continue;
- }
- for (Map.Entry<String, TopicMetadata> topicDescription : allTopics.entrySet()) {
- String topicName = topicDescription.getKey();
- if (topicName.equals(requestedResource.name()) && !topicDescription.getValue().markedForDeletion) {
- if (topicDescription.getValue().fetchesRemainingUntilVisible > 0) {
- topicDescription.getValue().fetchesRemainingUntilVisible--;
- } else {
- TopicMetadata topicMetadata = topicDescription.getValue();
- KafkaFutureImpl<Config> future = new KafkaFutureImpl<>();
- Collection<ConfigEntry> entries = new ArrayList<>();
- topicMetadata.configs.forEach((k, v) -> entries.add(new ConfigEntry(k, v)));
- future.complete(new Config(entries));
- topicConfigs.put(requestedResource, future);
- break;
- }
- }
- }
- if (!topicConfigs.containsKey(requestedResource)) {
- KafkaFutureImpl<Config> future = new KafkaFutureImpl<>();
- future.completeExceptionally(new UnknownTopicOrPartitionException("Resource " + requestedResource + " not found."));
- topicConfigs.put(requestedResource, future);
- }
- }
-
- return new DescribeConfigsResult(topicConfigs);
- }
-
- @Override
synchronized public DeleteTopicsResult deleteTopics(Collection<String> topicsToDelete, DeleteTopicsOptions options) {
Map<String, KafkaFuture<Void>> deleteTopicsResult = new HashMap<>();
@@ -535,6 +491,19 @@ public class MockAdminClient extends AdminClient {
@Override
synchronized public DescribeConfigsResult describeConfigs(Collection<ConfigResource> resources, DescribeConfigsOptions options) {
+
+ if (timeoutNextRequests > 0) {
+ Map<ConfigResource, KafkaFuture<Config>> configs = new HashMap<>();
+ for (ConfigResource requestedResource : resources) {
+ KafkaFutureImpl<Config> future = new KafkaFutureImpl<>();
+ future.completeExceptionally(new TimeoutException());
+ configs.put(requestedResource, future);
+ }
+
+ --timeoutNextRequests;
+ return new DescribeConfigsResult(configs);
+ }
+
Map<ConfigResource, KafkaFuture<Config>> results = new HashMap<>();
for (ConfigResource resource : resources) {
KafkaFutureImpl<Config> future = new KafkaFutureImpl<>();
@@ -551,7 +520,7 @@ public class MockAdminClient extends AdminClient {
synchronized private Config getResourceDescription(ConfigResource resource) {
switch (resource.type()) {
case BROKER: {
- int brokerId = Integer.valueOf(resource.name());
+ int brokerId = Integer.parseInt(resource.name());
if (brokerId >= brokerConfigs.size()) {
throw new InvalidRequestException("Broker " + resource.name() +
" not found.");
@@ -560,10 +529,15 @@ public class MockAdminClient extends AdminClient {
}
case TOPIC: {
TopicMetadata topicMetadata = allTopics.get(resource.name());
- if (topicMetadata == null) {
- throw new UnknownTopicOrPartitionException();
+ if (topicMetadata != null && !topicMetadata.markedForDeletion) {
+ if (topicMetadata.fetchesRemainingUntilVisible > 0)
+ topicMetadata.fetchesRemainingUntilVisible = Math.max(0, topicMetadata.fetchesRemainingUntilVisible - 1);
+ else return new Config(topicMetadata.configs.entrySet().stream()
+ .map(entry -> new ConfigEntry(entry.getKey(), entry.getValue()))
+ .collect(Collectors.toList()));
+
}
- return toConfigObject(topicMetadata.configs);
+ throw new UnknownTopicOrPartitionException("Resource " + resource + " not found.");
}
default:
throw new UnsupportedOperationException("Not implemented yet");