You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/09/02 07:55:10 UTC
kafka git commit: KAFKA-5659;
Fix error handling, efficiency issue in AdminClient#describeConfigs
Repository: kafka
Updated Branches:
refs/heads/trunk 14a7c297a -> b2a328daf
KAFKA-5659; Fix error handling, efficiency issue in AdminClient#describeConfigs
If a request for a broker configuration failed due to a timeout or
the broker not being available, we would fail the futures
associated with the non-broker request instead (and never fail the
broker future, which would be left uncompleted forever).
We would also do an unnecessary request if only broker configs were
requested.
Author: Colin P. Mccabe <cm...@confluent.io>
Reviewers: Ismael Juma <is...@juma.me.uk>
Closes #3585 from cmccabe/KAFKA-5659
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b2a328da
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b2a328da
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b2a328da
Branch: refs/heads/trunk
Commit: b2a328daf2a51520290fc4ec31ce5d92b2c0ef01
Parents: 14a7c29
Author: Colin P. Mccabe <cm...@confluent.io>
Authored: Sat Sep 2 08:14:13 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Sat Sep 2 08:54:58 2017 +0100
----------------------------------------------------------------------
.../kafka/clients/admin/KafkaAdminClient.java | 98 ++++++++++++--------
.../clients/admin/KafkaAdminClientTest.java | 17 ++++
2 files changed, 74 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/b2a328da/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index a5f1d5c..dca9f16 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -1426,62 +1426,74 @@ public class KafkaAdminClient extends AdminClient {
@Override
public DescribeConfigsResult describeConfigs(Collection<ConfigResource> configResources, final DescribeConfigsOptions options) {
- final Map<ConfigResource, KafkaFutureImpl<Config>> singleRequestFutures = new HashMap<>();
- final Collection<Resource> singleRequestResources = new ArrayList<>(configResources.size());
-
+ final Map<ConfigResource, KafkaFutureImpl<Config>> unifiedRequestFutures = new HashMap<>();
final Map<ConfigResource, KafkaFutureImpl<Config>> brokerFutures = new HashMap<>(configResources.size());
+
+ // The BROKER resources which we want to describe. We must make a separate DescribeConfigs
+ // request for every BROKER resource we want to describe.
final Collection<Resource> brokerResources = new ArrayList<>();
+ // The non-BROKER resources which we want to describe. These resources can be described by a
+ // single, unified DescribeConfigs request.
+ final Collection<Resource> unifiedRequestResources = new ArrayList<>(configResources.size());
+
for (ConfigResource resource : configResources) {
- if (resource.type() != ConfigResource.Type.BROKER) {
- singleRequestFutures.put(resource, new KafkaFutureImpl<Config>());
- singleRequestResources.add(configResourceToResource(resource));
- } else {
+ if (resource.type() == ConfigResource.Type.BROKER) {
brokerFutures.put(resource, new KafkaFutureImpl<Config>());
brokerResources.add(configResourceToResource(resource));
+ } else {
+ unifiedRequestFutures.put(resource, new KafkaFutureImpl<Config>());
+ unifiedRequestResources.add(configResourceToResource(resource));
}
}
final long now = time.milliseconds();
- runnable.call(new Call("describeConfigs", calcDeadlineMs(now, options.timeoutMs()),
+ if (!unifiedRequestResources.isEmpty()) {
+ runnable.call(new Call("describeConfigs", calcDeadlineMs(now, options.timeoutMs()),
new LeastLoadedNodeProvider()) {
- @Override
- AbstractRequest.Builder createRequest(int timeoutMs) {
- return new DescribeConfigsRequest.Builder(singleRequestResources);
- }
+ @Override
+ AbstractRequest.Builder createRequest(int timeoutMs) {
+ return new DescribeConfigsRequest.Builder(unifiedRequestResources);
+ }
- @Override
- void handleResponse(AbstractResponse abstractResponse) {
- DescribeConfigsResponse response = (DescribeConfigsResponse) abstractResponse;
- for (Map.Entry<ConfigResource, KafkaFutureImpl<Config>> entry : singleRequestFutures.entrySet()) {
- ConfigResource configResource = entry.getKey();
- KafkaFutureImpl<Config> future = entry.getValue();
- DescribeConfigsResponse.Config config = response.config(configResourceToResource(configResource));
- if (config.error().isFailure()) {
- future.completeExceptionally(config.error().exception());
- continue;
- }
- List<ConfigEntry> configEntries = new ArrayList<>();
- for (DescribeConfigsResponse.ConfigEntry configEntry : config.entries()) {
- configEntries.add(new ConfigEntry(configEntry.name(), configEntry.value(),
+ @Override
+ void handleResponse(AbstractResponse abstractResponse) {
+ DescribeConfigsResponse response = (DescribeConfigsResponse) abstractResponse;
+ for (Map.Entry<ConfigResource, KafkaFutureImpl<Config>> entry : unifiedRequestFutures.entrySet()) {
+ ConfigResource configResource = entry.getKey();
+ KafkaFutureImpl<Config> future = entry.getValue();
+ DescribeConfigsResponse.Config config = response.config(configResourceToResource(configResource));
+ if (config == null) {
+ future.completeExceptionally(new UnknownServerException(
+ "Malformed broker response: missing config for " + configResource));
+ continue;
+ }
+ if (config.error().isFailure()) {
+ future.completeExceptionally(config.error().exception());
+ continue;
+ }
+ List<ConfigEntry> configEntries = new ArrayList<>();
+ for (DescribeConfigsResponse.ConfigEntry configEntry : config.entries()) {
+ configEntries.add(new ConfigEntry(configEntry.name(), configEntry.value(),
configEntry.isDefault(), configEntry.isSensitive(), configEntry.isReadOnly()));
+ }
+ future.complete(new Config(configEntries));
}
- future.complete(new Config(configEntries));
}
- }
- @Override
- void handleFailure(Throwable throwable) {
- completeAllExceptionally(singleRequestFutures.values(), throwable);
- }
- }, now);
+ @Override
+ void handleFailure(Throwable throwable) {
+ completeAllExceptionally(unifiedRequestFutures.values(), throwable);
+ }
+ }, now);
+ }
for (Map.Entry<ConfigResource, KafkaFutureImpl<Config>> entry : brokerFutures.entrySet()) {
final KafkaFutureImpl<Config> brokerFuture = entry.getValue();
final Resource resource = configResourceToResource(entry.getKey());
- int nodeId = Integer.parseInt(resource.name());
- runnable.call(new Call("describeConfigs", calcDeadlineMs(now, options.timeoutMs()),
+ final int nodeId = Integer.parseInt(resource.name());
+ runnable.call(new Call("describeBrokerConfigs", calcDeadlineMs(now, options.timeoutMs()),
new ConstantNodeIdProvider(nodeId)) {
@Override
@@ -1494,13 +1506,18 @@ public class KafkaAdminClient extends AdminClient {
DescribeConfigsResponse response = (DescribeConfigsResponse) abstractResponse;
DescribeConfigsResponse.Config config = response.configs().get(resource);
+ if (config == null) {
+ brokerFuture.completeExceptionally(new UnknownServerException(
+ "Malformed broker response: missing config for " + resource));
+ return;
+ }
if (config.error().isFailure())
brokerFuture.completeExceptionally(config.error().exception());
else {
List<ConfigEntry> configEntries = new ArrayList<>();
for (DescribeConfigsResponse.ConfigEntry configEntry : config.entries()) {
configEntries.add(new ConfigEntry(configEntry.name(), configEntry.value(),
- configEntry.isDefault(), configEntry.isSensitive(), configEntry.isReadOnly()));
+ configEntry.isDefault(), configEntry.isSensitive(), configEntry.isReadOnly()));
}
brokerFuture.complete(new Config(configEntries));
}
@@ -1508,15 +1525,14 @@ public class KafkaAdminClient extends AdminClient {
@Override
void handleFailure(Throwable throwable) {
- completeAllExceptionally(singleRequestFutures.values(), throwable);
+ brokerFuture.completeExceptionally(throwable);
}
}, now);
}
-
- Map<ConfigResource, KafkaFutureImpl<Config>> allFutures = new HashMap<>(configResources.size());
- allFutures.putAll(singleRequestFutures);
+ final Map<ConfigResource, KafkaFuture<Config>> allFutures = new HashMap<>();
allFutures.putAll(brokerFutures);
- return new DescribeConfigsResult(new HashMap<ConfigResource, KafkaFuture<Config>>(allFutures));
+ allFutures.putAll(unifiedRequestFutures);
+ return new DescribeConfigsResult(allFutures);
}
private Resource configResourceToResource(ConfigResource configResource) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/b2a328da/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index d41df48..96f7e8a 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -65,6 +65,7 @@ import java.util.concurrent.Future;
import static java.util.Arrays.asList;
import static org.apache.kafka.common.requests.ResourceType.TOPIC;
+import static org.apache.kafka.common.requests.ResourceType.BROKER;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
@@ -357,6 +358,22 @@ public class KafkaAdminClientTest {
}
}
+ @Test
+ public void testDescribeConfigs() throws Exception {
+ try (MockKafkaAdminClientEnv env = mockClientEnv()) {
+ env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+ env.kafkaClient().prepareMetadataUpdate(env.cluster(), Collections.<String>emptySet());
+ env.kafkaClient().setNode(env.cluster().controller());
+ env.kafkaClient().prepareResponse(new DescribeConfigsResponse(0,
+ Collections.singletonMap(new org.apache.kafka.common.requests.Resource(BROKER, "0"),
+ new DescribeConfigsResponse.Config(ApiError.NONE,
+ Collections.<DescribeConfigsResponse.ConfigEntry>emptySet()))));
+ DescribeConfigsResult result2 = env.adminClient().describeConfigs(Collections.singleton(
+ new ConfigResource(ConfigResource.Type.BROKER, "0")));
+ result2.all().get();
+ }
+ }
+
private static <T> void assertCollectionIs(Collection<T> collection, T... elements) {
for (T element : elements) {
assertTrue("Did not find " + element, collection.contains(element));