You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/09/02 07:55:10 UTC

kafka git commit: KAFKA-5659; Fix error handling, efficiency issue in AdminClient#describeConfigs

Repository: kafka
Updated Branches:
  refs/heads/trunk 14a7c297a -> b2a328daf


KAFKA-5659; Fix error handling, efficiency issue in AdminClient#describeConfigs

If a request for a broker configuration failed due to a timeout or
the broker not being available, we would fail the futures
associated with the non-broker request instead (and never fail the
broker future, which would be left uncompleted forever).

We would also do an unnecessary request if only broker configs were
requested.

Author: Colin P. Mccabe <cm...@confluent.io>

Reviewers: Ismael Juma <is...@juma.me.uk>

Closes #3585 from cmccabe/KAFKA-5659


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b2a328da
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b2a328da
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b2a328da

Branch: refs/heads/trunk
Commit: b2a328daf2a51520290fc4ec31ce5d92b2c0ef01
Parents: 14a7c29
Author: Colin P. Mccabe <cm...@confluent.io>
Authored: Sat Sep 2 08:14:13 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Sat Sep 2 08:54:58 2017 +0100

----------------------------------------------------------------------
 .../kafka/clients/admin/KafkaAdminClient.java   | 98 ++++++++++++--------
 .../clients/admin/KafkaAdminClientTest.java     | 17 ++++
 2 files changed, 74 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b2a328da/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index a5f1d5c..dca9f16 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -1426,62 +1426,74 @@ public class KafkaAdminClient extends AdminClient {
 
     @Override
     public DescribeConfigsResult describeConfigs(Collection<ConfigResource> configResources, final DescribeConfigsOptions options) {
-        final Map<ConfigResource, KafkaFutureImpl<Config>> singleRequestFutures = new HashMap<>();
-        final Collection<Resource> singleRequestResources = new ArrayList<>(configResources.size());
-
+        final Map<ConfigResource, KafkaFutureImpl<Config>> unifiedRequestFutures = new HashMap<>();
         final Map<ConfigResource, KafkaFutureImpl<Config>> brokerFutures = new HashMap<>(configResources.size());
+
+        // The BROKER resources which we want to describe.  We must make a separate DescribeConfigs
+        // request for every BROKER resource we want to describe.
         final Collection<Resource> brokerResources = new ArrayList<>();
 
+        // The non-BROKER resources which we want to describe.  These resources can be described by a
+        // single, unified DescribeConfigs request.
+        final Collection<Resource> unifiedRequestResources = new ArrayList<>(configResources.size());
+
         for (ConfigResource resource : configResources) {
-            if (resource.type() != ConfigResource.Type.BROKER) {
-                singleRequestFutures.put(resource, new KafkaFutureImpl<Config>());
-                singleRequestResources.add(configResourceToResource(resource));
-            } else {
+            if (resource.type() == ConfigResource.Type.BROKER) {
                 brokerFutures.put(resource, new KafkaFutureImpl<Config>());
                 brokerResources.add(configResourceToResource(resource));
+            } else {
+                unifiedRequestFutures.put(resource, new KafkaFutureImpl<Config>());
+                unifiedRequestResources.add(configResourceToResource(resource));
             }
         }
 
         final long now = time.milliseconds();
-        runnable.call(new Call("describeConfigs", calcDeadlineMs(now, options.timeoutMs()),
+        if (!unifiedRequestResources.isEmpty()) {
+            runnable.call(new Call("describeConfigs", calcDeadlineMs(now, options.timeoutMs()),
                 new LeastLoadedNodeProvider()) {
 
-            @Override
-            AbstractRequest.Builder createRequest(int timeoutMs) {
-                return new DescribeConfigsRequest.Builder(singleRequestResources);
-            }
+                @Override
+                AbstractRequest.Builder createRequest(int timeoutMs) {
+                    return new DescribeConfigsRequest.Builder(unifiedRequestResources);
+                }
 
-            @Override
-            void handleResponse(AbstractResponse abstractResponse) {
-                DescribeConfigsResponse response = (DescribeConfigsResponse) abstractResponse;
-                for (Map.Entry<ConfigResource, KafkaFutureImpl<Config>> entry : singleRequestFutures.entrySet()) {
-                    ConfigResource configResource = entry.getKey();
-                    KafkaFutureImpl<Config> future = entry.getValue();
-                    DescribeConfigsResponse.Config config = response.config(configResourceToResource(configResource));
-                    if (config.error().isFailure()) {
-                        future.completeExceptionally(config.error().exception());
-                        continue;
-                    }
-                    List<ConfigEntry> configEntries = new ArrayList<>();
-                    for (DescribeConfigsResponse.ConfigEntry configEntry : config.entries()) {
-                        configEntries.add(new ConfigEntry(configEntry.name(), configEntry.value(),
+                @Override
+                void handleResponse(AbstractResponse abstractResponse) {
+                    DescribeConfigsResponse response = (DescribeConfigsResponse) abstractResponse;
+                    for (Map.Entry<ConfigResource, KafkaFutureImpl<Config>> entry : unifiedRequestFutures.entrySet()) {
+                        ConfigResource configResource = entry.getKey();
+                        KafkaFutureImpl<Config> future = entry.getValue();
+                        DescribeConfigsResponse.Config config = response.config(configResourceToResource(configResource));
+                        if (config == null) {
+                            future.completeExceptionally(new UnknownServerException(
+                                "Malformed broker response: missing config for " + configResource));
+                            continue;
+                        }
+                        if (config.error().isFailure()) {
+                            future.completeExceptionally(config.error().exception());
+                            continue;
+                        }
+                        List<ConfigEntry> configEntries = new ArrayList<>();
+                        for (DescribeConfigsResponse.ConfigEntry configEntry : config.entries()) {
+                            configEntries.add(new ConfigEntry(configEntry.name(), configEntry.value(),
                                 configEntry.isDefault(), configEntry.isSensitive(), configEntry.isReadOnly()));
+                        }
+                        future.complete(new Config(configEntries));
                     }
-                    future.complete(new Config(configEntries));
                 }
-            }
 
-            @Override
-            void handleFailure(Throwable throwable) {
-                completeAllExceptionally(singleRequestFutures.values(), throwable);
-            }
-        }, now);
+                @Override
+                void handleFailure(Throwable throwable) {
+                    completeAllExceptionally(unifiedRequestFutures.values(), throwable);
+                }
+            }, now);
+        }
 
         for (Map.Entry<ConfigResource, KafkaFutureImpl<Config>> entry : brokerFutures.entrySet()) {
             final KafkaFutureImpl<Config> brokerFuture = entry.getValue();
             final Resource resource = configResourceToResource(entry.getKey());
-            int nodeId = Integer.parseInt(resource.name());
-            runnable.call(new Call("describeConfigs", calcDeadlineMs(now, options.timeoutMs()),
+            final int nodeId = Integer.parseInt(resource.name());
+            runnable.call(new Call("describeBrokerConfigs", calcDeadlineMs(now, options.timeoutMs()),
                     new ConstantNodeIdProvider(nodeId)) {
 
                 @Override
@@ -1494,13 +1506,18 @@ public class KafkaAdminClient extends AdminClient {
                     DescribeConfigsResponse response = (DescribeConfigsResponse) abstractResponse;
                     DescribeConfigsResponse.Config config = response.configs().get(resource);
 
+                    if (config == null) {
+                        brokerFuture.completeExceptionally(new UnknownServerException(
+                            "Malformed broker response: missing config for " + resource));
+                        return;
+                    }
                     if (config.error().isFailure())
                         brokerFuture.completeExceptionally(config.error().exception());
                     else {
                         List<ConfigEntry> configEntries = new ArrayList<>();
                         for (DescribeConfigsResponse.ConfigEntry configEntry : config.entries()) {
                             configEntries.add(new ConfigEntry(configEntry.name(), configEntry.value(),
-                                    configEntry.isDefault(), configEntry.isSensitive(), configEntry.isReadOnly()));
+                                configEntry.isDefault(), configEntry.isSensitive(), configEntry.isReadOnly()));
                         }
                         brokerFuture.complete(new Config(configEntries));
                     }
@@ -1508,15 +1525,14 @@ public class KafkaAdminClient extends AdminClient {
 
                 @Override
                 void handleFailure(Throwable throwable) {
-                    completeAllExceptionally(singleRequestFutures.values(), throwable);
+                    brokerFuture.completeExceptionally(throwable);
                 }
             }, now);
         }
-
-        Map<ConfigResource, KafkaFutureImpl<Config>> allFutures = new HashMap<>(configResources.size());
-        allFutures.putAll(singleRequestFutures);
+        final Map<ConfigResource, KafkaFuture<Config>> allFutures = new HashMap<>();
         allFutures.putAll(brokerFutures);
-        return new DescribeConfigsResult(new HashMap<ConfigResource, KafkaFuture<Config>>(allFutures));
+        allFutures.putAll(unifiedRequestFutures);
+        return new DescribeConfigsResult(allFutures);
     }
 
     private Resource configResourceToResource(ConfigResource configResource) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/b2a328da/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index d41df48..96f7e8a 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -65,6 +65,7 @@ import java.util.concurrent.Future;
 
 import static java.util.Arrays.asList;
 import static org.apache.kafka.common.requests.ResourceType.TOPIC;
+import static org.apache.kafka.common.requests.ResourceType.BROKER;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
@@ -357,6 +358,22 @@ public class KafkaAdminClientTest {
         }
     }
 
+    @Test
+    public void testDescribeConfigs() throws Exception {
+        try (MockKafkaAdminClientEnv env = mockClientEnv()) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareMetadataUpdate(env.cluster(), Collections.<String>emptySet());
+            env.kafkaClient().setNode(env.cluster().controller());
+            env.kafkaClient().prepareResponse(new DescribeConfigsResponse(0,
+                Collections.singletonMap(new org.apache.kafka.common.requests.Resource(BROKER, "0"),
+                    new DescribeConfigsResponse.Config(ApiError.NONE,
+                        Collections.<DescribeConfigsResponse.ConfigEntry>emptySet()))));
+            DescribeConfigsResult result2 = env.adminClient().describeConfigs(Collections.singleton(
+                new ConfigResource(ConfigResource.Type.BROKER, "0")));
+            result2.all().get();
+        }
+    }
+
     private static <T> void assertCollectionIs(Collection<T> collection, T... elements) {
         for (T element : elements) {
             assertTrue("Did not find " + element, collection.contains(element));