You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/06/11 11:08:27 UTC

[GitHub] [kafka] mimaison commented on a change in pull request #8312: KAFKA-9432 automated protocol for DescribeConfigs

mimaison commented on a change in pull request #8312:
URL: https://github.com/apache/kafka/pull/8312#discussion_r438708511



##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -1916,129 +1917,96 @@ void handleFailure(Throwable throwable) {
 
     @Override
     public DescribeConfigsResult describeConfigs(Collection<ConfigResource> configResources, final DescribeConfigsOptions options) {
-        final Map<ConfigResource, KafkaFutureImpl<Config>> unifiedRequestFutures = new HashMap<>();
-        final Map<ConfigResource, KafkaFutureImpl<Config>> brokerFutures = new HashMap<>(configResources.size());
-
-        // The BROKER resources which we want to describe.  We must make a separate DescribeConfigs
-        // request for every BROKER resource we want to describe.
-        final Collection<ConfigResource> brokerResources = new ArrayList<>();
-
-        // The non-BROKER resources which we want to describe.  These resources can be described by a
-        // single, unified DescribeConfigs request.
-        final Collection<ConfigResource> unifiedRequestResources = new ArrayList<>(configResources.size());
+        // Partition the requested config resources based on which broker they must be sent to with the
+        // null broker being used for config resources which can be obtained from any broker
+        final Map<Integer, Map<ConfigResource, KafkaFutureImpl<Config>>> brokerFutures = new HashMap<>(configResources.size());
 
         for (ConfigResource resource : configResources) {
-            if (dependsOnSpecificNode(resource)) {
-                brokerFutures.put(resource, new KafkaFutureImpl<>());
-                brokerResources.add(resource);
-            } else {
-                unifiedRequestFutures.put(resource, new KafkaFutureImpl<>());
-                unifiedRequestResources.add(resource);
-            }
+            Integer broker = nodeFor(resource);
+            brokerFutures.compute(broker, (key, value) -> {
+                if (value == null) {
+                    // Only BROKER and BROKER_LOGGER configs are broker-specific
+                    value = new HashMap<>(broker != null ? 2 : 16);

Review comment:
       I wonder if we can just get a Map with the default size. I don't expect this code path to be very hot

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -1916,129 +1917,96 @@ void handleFailure(Throwable throwable) {
 
     @Override
     public DescribeConfigsResult describeConfigs(Collection<ConfigResource> configResources, final DescribeConfigsOptions options) {
-        final Map<ConfigResource, KafkaFutureImpl<Config>> unifiedRequestFutures = new HashMap<>();
-        final Map<ConfigResource, KafkaFutureImpl<Config>> brokerFutures = new HashMap<>(configResources.size());
-
-        // The BROKER resources which we want to describe.  We must make a separate DescribeConfigs
-        // request for every BROKER resource we want to describe.
-        final Collection<ConfigResource> brokerResources = new ArrayList<>();
-
-        // The non-BROKER resources which we want to describe.  These resources can be described by a
-        // single, unified DescribeConfigs request.
-        final Collection<ConfigResource> unifiedRequestResources = new ArrayList<>(configResources.size());
+        // Partition the requested config resources based on which broker they must be sent to with the
+        // null broker being used for config resources which can be obtained from any broker
+        final Map<Integer, Map<ConfigResource, KafkaFutureImpl<Config>>> brokerFutures = new HashMap<>(configResources.size());
 
         for (ConfigResource resource : configResources) {
-            if (dependsOnSpecificNode(resource)) {
-                brokerFutures.put(resource, new KafkaFutureImpl<>());
-                brokerResources.add(resource);
-            } else {
-                unifiedRequestFutures.put(resource, new KafkaFutureImpl<>());
-                unifiedRequestResources.add(resource);
-            }
+            Integer broker = nodeFor(resource);
+            brokerFutures.compute(broker, (key, value) -> {
+                if (value == null) {
+                    // Only BROKER and BROKER_LOGGER configs are broker-specific
+                    value = new HashMap<>(broker != null ? 2 : 16);
+                }
+                value.put(resource, new KafkaFutureImpl<>());
+                return value;
+            });
         }
 
         final long now = time.milliseconds();
-        if (!unifiedRequestResources.isEmpty()) {
+        for (Map.Entry<Integer, Map<ConfigResource, KafkaFutureImpl<Config>>> entry : brokerFutures.entrySet()) {
+            Integer broker = entry.getKey();
+            Map<ConfigResource, KafkaFutureImpl<Config>> unified = entry.getValue();
+
             runnable.call(new Call("describeConfigs", calcDeadlineMs(now, options.timeoutMs()),
-                new LeastLoadedNodeProvider()) {
+                broker != null ? new ConstantNodeIdProvider(broker) : new LeastLoadedNodeProvider()) {
 
                 @Override
                 DescribeConfigsRequest.Builder createRequest(int timeoutMs) {
-                    return new DescribeConfigsRequest.Builder(unifiedRequestResources)
-                            .includeSynonyms(options.includeSynonyms())
-                            .includeDocumentation(options.includeDocumentation());
+                    return new DescribeConfigsRequest.Builder(new DescribeConfigsRequestData()
+                        .setResources(unified.keySet().stream()
+                            .map(config ->
+                                new DescribeConfigsRequestData.DescribeConfigsResource()
+                                    .setResourceName(config.name())
+                                    .setResourceType(config.type().id()))
+                            .collect(Collectors.toList()))
+                        .setIncludeSynonyms(options.includeSynonyms())
+                        .setIncludeDocumentation(options.includeDocumentation()));
                 }
 
                 @Override
                 void handleResponse(AbstractResponse abstractResponse) {
                     DescribeConfigsResponse response = (DescribeConfigsResponse) abstractResponse;
-                    for (Map.Entry<ConfigResource, KafkaFutureImpl<Config>> entry : unifiedRequestFutures.entrySet()) {
+                    for (Map.Entry<ConfigResource, DescribeConfigsResponseData.DescribeConfigsResult> entry : response.resultMap().entrySet()) {
                         ConfigResource configResource = entry.getKey();
-                        KafkaFutureImpl<Config> future = entry.getValue();
-                        DescribeConfigsResponse.Config config = response.config(configResource);
-                        if (config == null) {
-                            future.completeExceptionally(new UnknownServerException(
-                                "Malformed broker response: missing config for " + configResource));
-                            continue;
-                        }
-                        if (config.error().isFailure()) {
-                            future.completeExceptionally(config.error().exception());
-                            continue;
-                        }
-                        List<ConfigEntry> configEntries = new ArrayList<>();
-                        for (DescribeConfigsResponse.ConfigEntry configEntry : config.entries()) {
-                            configEntries.add(new ConfigEntry(configEntry.name(),
-                                    configEntry.value(), configSource(configEntry.source()),
-                                    configEntry.isSensitive(), configEntry.isReadOnly(),
-                                    configSynonyms(configEntry), configType(configEntry.type()),
-                                    configEntry.documentation()));
+                        DescribeConfigsResponseData.DescribeConfigsResult describeConfigsResult = entry.getValue();
+                        KafkaFutureImpl<Config> future = unified.get(configResource);
+                        if (future == null) {
+                            if (broker != null) {
+                                log.warn("The config {} in the response from broker {} is not in the request",
+                                        configResource, broker);
+                            } else {
+                                log.warn("The config {} in the response from the least loaded broker is not in the request",
+                                        configResource);
+                            }
+                        } else {
+                            if (describeConfigsResult.errorCode() != Errors.NONE.code()) {
+                                future.completeExceptionally(Errors.forCode(describeConfigsResult.errorCode())
+                                        .exception(describeConfigsResult.errorMessage()));
+                            } else {
+                                future.complete(describeConfigResult(describeConfigsResult));
+                            }
                         }
-                        future.complete(new Config(configEntries));
                     }
+                    completeUnrealizedFutures(
+                        unified.entrySet().stream(),
+                        configResource -> "The broker response did not contain a result for config resource " + configResource);
                 }
 
                 @Override
                 void handleFailure(Throwable throwable) {
-                    completeAllExceptionally(unifiedRequestFutures.values(), throwable);
+                    completeAllExceptionally(unified.values(), throwable);
                 }
             }, now);
         }
 
-        for (Map.Entry<ConfigResource, KafkaFutureImpl<Config>> entry : brokerFutures.entrySet()) {
-            final KafkaFutureImpl<Config> brokerFuture = entry.getValue();
-            final ConfigResource resource = entry.getKey();
-            final int nodeId = Integer.parseInt(resource.name());
-            runnable.call(new Call("describeBrokerConfigs", calcDeadlineMs(now, options.timeoutMs()),
-                    new ConstantNodeIdProvider(nodeId)) {
-
-                @Override
-                DescribeConfigsRequest.Builder createRequest(int timeoutMs) {
-                    return new DescribeConfigsRequest.Builder(Collections.singleton(resource))
-                            .includeSynonyms(options.includeSynonyms())
-                            .includeDocumentation(options.includeDocumentation());
-                }
-
-                @Override
-                void handleResponse(AbstractResponse abstractResponse) {
-                    DescribeConfigsResponse response = (DescribeConfigsResponse) abstractResponse;
-                    DescribeConfigsResponse.Config config = response.configs().get(resource);
-
-                    if (config == null) {
-                        brokerFuture.completeExceptionally(new UnknownServerException(
-                            "Malformed broker response: missing config for " + resource));
-                        return;
-                    }
-                    if (config.error().isFailure())
-                        brokerFuture.completeExceptionally(config.error().exception());
-                    else {
-                        List<ConfigEntry> configEntries = new ArrayList<>();
-                        for (DescribeConfigsResponse.ConfigEntry configEntry : config.entries()) {
-                            configEntries.add(new ConfigEntry(configEntry.name(), configEntry.value(),
-                                configSource(configEntry.source()), configEntry.isSensitive(), configEntry.isReadOnly(),
-                                configSynonyms(configEntry), configType(configEntry.type()), configEntry.documentation()));

Review comment:
       It looks like `configType()` is not used anymore. Can we delete it?

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -2568,25 +2568,32 @@ class KafkaApis(val requestChannel: RequestChannel,
 
   def handleDescribeConfigsRequest(request: RequestChannel.Request): Unit = {
     val describeConfigsRequest = request.body[DescribeConfigsRequest]
-    val (authorizedResources, unauthorizedResources) = describeConfigsRequest.resources.asScala.toBuffer.partition { resource =>
-      resource.`type` match {
+    val (authorizedResources, unauthorizedResources) = describeConfigsRequest.data.resources.asScala.toBuffer.partition { resource =>
+      ConfigResource.Type.forId(resource.resourceType) match {
         case ConfigResource.Type.BROKER | ConfigResource.Type.BROKER_LOGGER =>
           authorize(request.context, DESCRIBE_CONFIGS, CLUSTER, CLUSTER_NAME)
         case ConfigResource.Type.TOPIC =>
-          authorize(request.context, DESCRIBE_CONFIGS, TOPIC, resource.name)
-        case rt => throw new InvalidRequestException(s"Unexpected resource type $rt for resource ${resource.name}")
+          authorize(request.context, DESCRIBE_CONFIGS, TOPIC, resource.resourceName)
+        case rt => throw new InvalidRequestException(s"Unexpected resource type $rt for resource ${resource.resourceName}")
       }
     }
-    val authorizedConfigs = adminManager.describeConfigs(authorizedResources.map { resource =>
-      resource -> Option(describeConfigsRequest.configNames(resource)).map(_.asScala.toSet)
-    }.toMap, describeConfigsRequest.includeSynonyms, describeConfigsRequest.includeDocumentation)
+    val authorizedConfigs = adminManager.describeConfigs(authorizedResources.toList, describeConfigsRequest.data.includeSynonyms, describeConfigsRequest.data.includeDocumentation)
     val unauthorizedConfigs = unauthorizedResources.map { resource =>
-      val error = configsAuthorizationApiError(resource)
-      resource -> new DescribeConfigsResponse.Config(error, util.Collections.emptyList[DescribeConfigsResponse.ConfigEntry])
+      val error = ConfigResource.Type.forId(resource.resourceType) match {
+        case ConfigResource.Type.BROKER | ConfigResource.Type.BROKER_LOGGER => Errors.CLUSTER_AUTHORIZATION_FAILED
+        case ConfigResource.Type.TOPIC => Errors.TOPIC_AUTHORIZATION_FAILED
+        case rt => throw new InvalidRequestException(s"Unexpected resource type $rt for resource ${resource.resourceName}")
+      }
+      new DescribeConfigsResponseData.DescribeConfigsResult().setErrorCode(error.code)
+        .setErrorMessage(error.message)
+        .setConfigs(util.Collections.emptyList[DescribeConfigsResponseData.DescribeConfigsResourceResult])

Review comment:
       We can drop `util` here, we are importing `import java.util.{Collections, Optional}`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org