You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2019/08/02 18:52:00 UTC
[kafka] branch trunk updated: KAFKA-7800;
Dynamic log levels admin API (KIP-412)
This is an automated email from the ASF dual-hosted git repository.
gwenshap pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new a99e011 KAFKA-7800; Dynamic log levels admin API (KIP-412)
a99e011 is described below
commit a99e0111114d1cb8c762494ac195cf84e6425bb3
Author: Stanislav Kozlovski <fa...@windowslive.com>
AuthorDate: Fri Aug 2 11:51:35 2019 -0700
KAFKA-7800; Dynamic log levels admin API (KIP-412)
<!--
Is there any breaking changes? If so this is a major release, make sure '#major' is in at least one
commit message to get CI to bump the major. This will prevent automatic down stream dependency
bumping / consuming. For more information about semantic versioning see: https://semver.org/
Suggested PR template: Fill/delete/add sections as needed. Optionally delete any commented block.
-->
What
----
<!--
Briefly describe **what** you have changed and **why**.
Optionally include implementation strategy.
-->
References
----------
[**KIP-412**](https://cwiki.apache.org/confluence/display/KAFKA/KIP-412%3A+Extend+Admin+API+to+support+dynamic+application+log+levels)
[**KAFKA-7800**](https://issues.apache.org/jira/browse/KAFKA-7800)
[**Discussion Thread**](http://mail-archives.apache.org/mod_mbox/kafka-dev/201901.mbox/%3CCANZZNGyeVw8q%3Dx9uOQS-18wL3FEmnOwpBnpJ9x3iMLdXY3gEug%40mail.gmail.com%3E)
[**Vote Thread**](http://mail-archives.apache.org/mod_mbox/kafka-dev/201902.mbox/%3CCANZZNGzpTJg5YX1Gpe5S%3DHSr%3DXGvmxvYLTdA3jWq_qwH-UvorQ%40mail.gmail.com%3E)
<!--
Copy&paste links: to Jira ticket, other PRs, issues, Slack conversations...
For code bumps: link to PR, tag or GitHub `/compare/master...master`
-->
Test&Review
------------
Test cases covered:
* DescribeConfigs
* Alter the log level with and without validateOnly, validate the results with DescribeConfigs
Open questions / Follow ups
--------------------------
If you're a reviewer, I'd appreciate your thoughts on these questions I have open:
1. Should we add synchronization to the Log4jController methods? - Seems like we don't get much value from it
2. Should we instantiate a new Log4jController instead of it having static methods? - All operations are stateless, so I thought static methods would do well
3. A logger which does not have a set value returns "null" (as seen in the unit tests). Should we just return the Root logger's level?
Author: Stanislav Kozlovski <fa...@windowslive.com>
Reviewers: Gwen Shapira
Closes #6903 from stanislavkozlovski/KAFKA-7800-dynamic-log-levels-admin-ap
---
.../apache/kafka/clients/admin/ConfigEntry.java | 1 +
.../kafka/clients/admin/KafkaAdminClient.java | 17 +-
.../apache/kafka/common/config/ConfigResource.java | 2 +-
.../apache/kafka/common/config/LogLevelConfig.java | 71 +++++++
.../common/requests/DescribeConfigsResponse.java | 3 +-
.../src/main/scala/kafka/admin/ConfigCommand.scala | 138 +++++++++----
.../src/main/scala/kafka/server/AdminManager.scala | 69 ++++++-
core/src/main/scala/kafka/server/KafkaApis.scala | 8 +-
.../main/scala/kafka/utils/Log4jController.scala | 92 ++++++---
.../kafka/api/AdminClientIntegrationTest.scala | 229 ++++++++++++++++++++-
.../kafka/api/AuthorizerIntegrationTest.scala | 34 ++-
.../scala/unit/kafka/admin/ConfigCommandTest.scala | 164 +++++++++++++--
12 files changed, 719 insertions(+), 109 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ConfigEntry.java b/clients/src/main/java/org/apache/kafka/clients/admin/ConfigEntry.java
index 7775b6a..42cc627 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/ConfigEntry.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ConfigEntry.java
@@ -189,6 +189,7 @@ public class ConfigEntry {
*/
public enum ConfigSource {
DYNAMIC_TOPIC_CONFIG, // dynamic topic config that is configured for a specific topic
+ DYNAMIC_BROKER_LOGGER_CONFIG, // dynamic broker logger config that is configured for a specific broker
DYNAMIC_BROKER_CONFIG, // dynamic broker config that is configured for a specific broker
DYNAMIC_DEFAULT_BROKER_CONFIG, // dynamic broker config that is configured as default for all brokers in the cluster
STATIC_BROKER_CONFIG, // static broker config provided as broker properties at start up (e.g. server.properties file)
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 227a03b..8092eec 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -1762,7 +1762,7 @@ public class KafkaAdminClient extends AdminClient {
final Collection<ConfigResource> unifiedRequestResources = new ArrayList<>(configResources.size());
for (ConfigResource resource : configResources) {
- if (resource.type() == ConfigResource.Type.BROKER && !resource.isDefault()) {
+ if (dependsOnSpecificNode(resource)) {
brokerFutures.put(resource, new KafkaFutureImpl<>());
brokerResources.add(resource);
} else {
@@ -1887,6 +1887,9 @@ public class KafkaAdminClient extends AdminClient {
case STATIC_BROKER_CONFIG:
configSource = ConfigEntry.ConfigSource.STATIC_BROKER_CONFIG;
break;
+ case DYNAMIC_BROKER_LOGGER_CONFIG:
+ configSource = ConfigEntry.ConfigSource.DYNAMIC_BROKER_LOGGER_CONFIG;
+ break;
case DEFAULT_CONFIG:
configSource = ConfigEntry.ConfigSource.DEFAULT_CONFIG;
break;
@@ -1906,7 +1909,7 @@ public class KafkaAdminClient extends AdminClient {
final Collection<ConfigResource> unifiedRequestResources = new ArrayList<>();
for (ConfigResource resource : configs.keySet()) {
- if (resource.type() == ConfigResource.Type.BROKER && !resource.isDefault()) {
+ if (dependsOnSpecificNode(resource)) {
NodeProvider nodeProvider = new ConstantNodeIdProvider(Integer.parseInt(resource.name()));
allFutures.putAll(alterConfigs(configs, options, Collections.singleton(resource), nodeProvider));
} else
@@ -1971,7 +1974,7 @@ public class KafkaAdminClient extends AdminClient {
final Collection<ConfigResource> unifiedRequestResources = new ArrayList<>();
for (ConfigResource resource : configs.keySet()) {
- if (resource.type() == ConfigResource.Type.BROKER && !resource.isDefault()) {
+ if (dependsOnSpecificNode(resource)) {
NodeProvider nodeProvider = new ConstantNodeIdProvider(Integer.parseInt(resource.name()));
allFutures.putAll(incrementalAlterConfigs(configs, options, Collections.singleton(resource), nodeProvider));
} else
@@ -3070,4 +3073,12 @@ public class KafkaAdminClient extends AdminClient {
return new ElectLeadersResult(electionFuture);
}
+
+ /**
+ * Returns a boolean indicating whether the resource needs to go to a specific node
+ */
+ private boolean dependsOnSpecificNode(ConfigResource resource) {
+ return (resource.type() == ConfigResource.Type.BROKER && !resource.isDefault())
+ || resource.type() == ConfigResource.Type.BROKER_LOGGER;
+ }
}
diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigResource.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigResource.java
index 5343a6b..8870238 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/ConfigResource.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigResource.java
@@ -33,7 +33,7 @@ public final class ConfigResource {
* Type of resource.
*/
public enum Type {
- BROKER((byte) 4), TOPIC((byte) 2), UNKNOWN((byte) 0);
+ BROKER_LOGGER((byte) 8), BROKER((byte) 4), TOPIC((byte) 2), UNKNOWN((byte) 0);
private static final Map<Byte, Type> TYPES = Collections.unmodifiableMap(
Arrays.stream(values()).collect(Collectors.toMap(Type::id, Function.identity()))
diff --git a/clients/src/main/java/org/apache/kafka/common/config/LogLevelConfig.java b/clients/src/main/java/org/apache/kafka/common/config/LogLevelConfig.java
new file mode 100644
index 0000000..fe7e2eb
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/config/LogLevelConfig.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.config;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * This class holds definitions for log level configurations related to Kafka's application logging. See KIP-412 for additional information
+ */
+public class LogLevelConfig {
+ /*
+ * NOTE: DO NOT CHANGE EITHER CONFIG NAMES AS THESE ARE PART OF THE PUBLIC API AND CHANGE WILL BREAK USER CODE.
+ */
+
+ /**
+ * The <code>FATAL</code> level designates a very severe error
+ * that will lead the Kafka broker to abort.
+ */
+ public static final String FATAL_LOG_LEVEL = "FATAL";
+
+ /**
+ * The <code>ERROR</code> level designates error events that
+ * might still allow the broker to continue running.
+ */
+ public static final String ERROR_LOG_LEVEL = "ERROR";
+
+ /**
+ * The <code>WARN</code> level designates potentially harmful situations.
+ */
+ public static final String WARN_LOG_LEVEL = "WARN";
+
+ /**
+ * The <code>INFO</code> level designates informational messages
+ * that highlight normal Kafka events at a coarse-grained level
+ */
+ public static final String INFO_LOG_LEVEL = "INFO";
+
+ /**
+ * The <code>DEBUG</code> level designates fine-grained
+ * informational events that are most useful to debug Kafka
+ */
+ public static final String DEBUG_LOG_LEVEL = "DEBUG";
+
+ /**
+ * The <code>TRACE</code> level designates finer-grained
+ * informational events than the <code>DEBUG</code> level.
+ */
+ public static final String TRACE_LOG_LEVEL = "TRACE";
+
+ public static final Set<String> VALID_LOG_LEVELS = new HashSet<>(Arrays.asList(
+ FATAL_LOG_LEVEL, ERROR_LOG_LEVEL, WARN_LOG_LEVEL,
+ INFO_LOG_LEVEL, DEBUG_LOG_LEVEL, TRACE_LOG_LEVEL
+ ));
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java
index 51c35d5..6d424f2 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java
@@ -179,7 +179,8 @@ public class DescribeConfigsResponse extends AbstractResponse {
DYNAMIC_BROKER_CONFIG((byte) 2),
DYNAMIC_DEFAULT_BROKER_CONFIG((byte) 3),
STATIC_BROKER_CONFIG((byte) 4),
- DEFAULT_CONFIG((byte) 5);
+ DEFAULT_CONFIG((byte) 5),
+ DYNAMIC_BROKER_LOGGER_CONFIG((byte) 6);
final byte id;
private static final ConfigSource[] VALUES = values();
diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala
index 7edc4a4..781cc1a 100644
--- a/core/src/main/scala/kafka/admin/ConfigCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala
@@ -28,8 +28,8 @@ import kafka.utils.{CommandDefaultOptions, CommandLineUtils, Exit, PasswordEncod
import kafka.utils.Implicits._
import kafka.zk.{AdminZkClient, KafkaZkClient}
import org.apache.kafka.clients.CommonClientConfigs
-import org.apache.kafka.clients.admin.{Admin, AlterConfigsOptions, ConfigEntry, DescribeConfigsOptions, AdminClient => JAdminClient, Config => JConfig}
-import org.apache.kafka.common.config.ConfigResource
+import org.apache.kafka.clients.admin.{Admin, AlterConfigOp, AlterConfigsOptions, ConfigEntry, DescribeConfigsOptions, AdminClient => JAdminClient, Config => JConfig}
+import org.apache.kafka.common.config.{ConfigResource, LogLevelConfig}
import org.apache.kafka.common.config.types.Password
import org.apache.kafka.common.errors.InvalidConfigurationException
import org.apache.kafka.common.security.JaasUtils
@@ -41,7 +41,6 @@ import scala.collection._
/**
- * This script can be used to change configs for topics/clients/brokers dynamically
* This script can be used to change configs for topics/clients/users/brokers dynamically
* An entity described or altered by the command may be one of:
* <ul>
@@ -50,12 +49,15 @@ import scala.collection._
* <li> user: --entity-type users --entity-name <user-principal>
* <li> <user, client>: --entity-type users --entity-name <user-principal> --entity-type clients --entity-name <client-id>
* <li> broker: --entity-type brokers --entity-name <broker>
+ * <li> broker-logger: --entity-type broker-loggers --entity-name <broker>
* </ul>
* --entity-default may be used instead of --entity-name when describing or altering default configuration for users and clients.
*
*/
object ConfigCommand extends Config {
+ val BrokerLoggerConfigType = "broker-loggers"
+ val BrokerSupportedConfigTypes = Seq(ConfigType.Broker, BrokerLoggerConfigType)
val DefaultScramIterations = 4096
// Dynamic broker configs can only be updated using the new AdminClient once brokers have started
// so that configs may be fully validated. Prior to starting brokers, updates may be performed using
@@ -274,49 +276,61 @@ object ConfigCommand extends Config {
val adminClient = JAdminClient.create(props)
val entityName = if (opts.options.has(opts.entityName))
opts.options.valueOf(opts.entityName)
- else if (opts.options.has(opts.entityDefault))
+ else // default entity
""
- else
- throw new IllegalArgumentException("At least one of --entity-name or --entity-default must be specified with --bootstrap-server")
val entityTypes = opts.options.valuesOf(opts.entityType).asScala
if (entityTypes.size != 1)
- throw new IllegalArgumentException("Exactly one --entity-type must be specified with --bootstrap-server")
- if (entityTypes.head != ConfigType.Broker)
- throw new IllegalArgumentException(s"--zookeeper option must be specified for entity-type $entityTypes")
+ throw new IllegalArgumentException(s"Exactly one --entity-type (out of ${BrokerSupportedConfigTypes.mkString(",")}) must be specified with --bootstrap-server")
try {
if (opts.options.has(opts.alterOpt))
- alterBrokerConfig(adminClient, opts, entityName)
+ alterBrokerConfig(adminClient, opts, entityTypes.head, entityName)
else if (opts.options.has(opts.describeOpt))
- describeBrokerConfig(adminClient, opts, entityName)
+ describeBrokerConfig(adminClient, opts, entityTypes.head, entityName)
} finally {
adminClient.close()
}
}
- private[admin] def alterBrokerConfig(adminClient: Admin, opts: ConfigCommandOptions, entityName: String) {
+ private[admin] def alterBrokerConfig(adminClient: Admin, opts: ConfigCommandOptions,
+ entityType: String, entityName: String) {
val configsToBeAdded = parseConfigsToBeAdded(opts).asScala.map { case (k, v) => (k, new ConfigEntry(k, v)) }
val configsToBeDeleted = parseConfigsToBeDeleted(opts)
- // compile the final set of configs
- val configResource = new ConfigResource(ConfigResource.Type.BROKER, entityName)
- val oldConfig = brokerConfig(adminClient, entityName, includeSynonyms = false)
+ if (entityType == ConfigType.Broker) {
+ val configResource = new ConfigResource(ConfigResource.Type.BROKER, entityName)
+ val oldConfig = brokerConfig(adminClient, entityName, includeSynonyms = false)
.map { entry => (entry.name, entry) }.toMap
- // fail the command if any of the configs to be deleted does not exist
- val invalidConfigs = configsToBeDeleted.filterNot(oldConfig.contains)
- if (invalidConfigs.nonEmpty)
- throw new InvalidConfigurationException(s"Invalid config(s): ${invalidConfigs.mkString(",")}")
-
- val newEntries = oldConfig ++ configsToBeAdded -- configsToBeDeleted
- val sensitiveEntries = newEntries.filter(_._2.value == null)
- if (sensitiveEntries.nonEmpty)
- throw new InvalidConfigurationException(s"All sensitive broker config entries must be specified for --alter, missing entries: ${sensitiveEntries.keySet}")
- val newConfig = new JConfig(newEntries.asJava.values)
-
- val alterOptions = new AlterConfigsOptions().timeoutMs(30000).validateOnly(false)
- adminClient.alterConfigs(Map(configResource -> newConfig).asJava, alterOptions).all().get(60, TimeUnit.SECONDS)
+ // fail the command if any of the configs to be deleted does not exist
+ val invalidConfigs = configsToBeDeleted.filterNot(oldConfig.contains)
+ if (invalidConfigs.nonEmpty)
+ throw new InvalidConfigurationException(s"Invalid config(s): ${invalidConfigs.mkString(",")}")
+
+ val newEntries = oldConfig ++ configsToBeAdded -- configsToBeDeleted
+ val sensitiveEntries = newEntries.filter(_._2.value == null)
+ if (sensitiveEntries.nonEmpty)
+ throw new InvalidConfigurationException(s"All sensitive broker config entries must be specified for --alter, missing entries: ${sensitiveEntries.keySet}")
+ val newConfig = new JConfig(newEntries.asJava.values)
+
+ val alterOptions = new AlterConfigsOptions().timeoutMs(30000).validateOnly(false)
+ adminClient.alterConfigs(Map(configResource -> newConfig).asJava, alterOptions).all().get(60, TimeUnit.SECONDS)
+ } else if (entityType == BrokerLoggerConfigType) {
+ val configResource = new ConfigResource(ConfigResource.Type.BROKER_LOGGER, entityName)
+ val validLoggers = brokerLoggerConfigs(adminClient, entityName).map(_.name)
+ // fail the command if any of the configured broker loggers do not exist
+ val invalidBrokerLoggers = configsToBeDeleted.filterNot(validLoggers.contains) ++ configsToBeAdded.keys.filterNot(validLoggers.contains)
+ if (invalidBrokerLoggers.nonEmpty)
+ throw new InvalidConfigurationException(s"Invalid broker logger(s): ${invalidBrokerLoggers.mkString(",")}")
+
+ val alterOptions = new AlterConfigsOptions().timeoutMs(30000).validateOnly(false)
+ val alterLogLevelEntries = (configsToBeAdded.values.map(new AlterConfigOp(_, AlterConfigOp.OpType.SET))
+ ++ configsToBeDeleted.map { k => new AlterConfigOp(new ConfigEntry(k, ""), AlterConfigOp.OpType.DELETE) }
+ ).asJavaCollection
+
+ adminClient.incrementalAlterConfigs(Map(configResource -> alterLogLevelEntries).asJava, alterOptions).all().get(60, TimeUnit.SECONDS)
+ }
if (entityName.nonEmpty)
println(s"Completed updating config for broker: $entityName.")
@@ -324,8 +338,13 @@ object ConfigCommand extends Config {
println(s"Completed updating default config for brokers in the cluster,")
}
- private def describeBrokerConfig(adminClient: Admin, opts: ConfigCommandOptions, entityName: String) {
- val configs = brokerConfig(adminClient, entityName, includeSynonyms = true)
+ private def describeBrokerConfig(adminClient: Admin, opts: ConfigCommandOptions,
+ entityType: String, entityName: String) {
+ val configs = if (entityType == ConfigType.Broker)
+ brokerConfig(adminClient, entityName, includeSynonyms = true)
+ else // broker logger
+ brokerLoggerConfigs(adminClient, entityName)
+
if (entityName.nonEmpty)
println(s"Configs for broker $entityName are:")
else
@@ -349,6 +368,15 @@ object ConfigCommand extends Config {
.toSeq
}
+ /**
+ * Returns all the valid broker logger configurations
+ */
+ private def brokerLoggerConfigs(adminClient: Admin, entityName: String): Seq[ConfigEntry] = {
+ val configResource = new ConfigResource(ConfigResource.Type.BROKER_LOGGER, entityName)
+ val configs = adminClient.describeConfigs(Collections.singleton(configResource)).all.get(30, TimeUnit.SECONDS)
+ configs.get(configResource).entries.asScala.toSeq
+ }
+
case class Entity(entityType: String, sanitizedName: Option[String]) {
val entityPath = sanitizedName match {
case Some(n) => entityType + "/" + n
@@ -445,7 +473,7 @@ object ConfigCommand extends Config {
if (opts.options.has(opts.alterOpt) && names.size != types.size)
throw new IllegalArgumentException("--entity-name or --entity-default must be specified with each --entity-type for --alter")
- val reverse = types.size == 2 && types(0) == ConfigType.Client
+ val reverse = types.size == 2 && types.head == ConfigType.Client
val entityTypes = if (reverse) types.reverse else types
val sortedNames = (if (reverse && names.length == 2) names.reverse else names).iterator
@@ -483,7 +511,7 @@ object ConfigCommand extends Config {
.ofType(classOf[String])
val alterOpt = parser.accepts("alter", "Alter the configuration for the entity.")
val describeOpt = parser.accepts("describe", "List configs for the given entity.")
- val entityType = parser.accepts("entity-type", "Type of entity (topics/clients/users/brokers)")
+ val entityType = parser.accepts("entity-type", "Type of entity (topics/clients/users/brokers/broker-loggers)")
.withRequiredArg
.ofType(classOf[String])
val entityName = parser.accepts("entity-name", "Name of entity (topic name/client id/user principal name/broker id)")
@@ -514,36 +542,58 @@ object ConfigCommand extends Config {
val actions = Seq(alterOpt, describeOpt).count(options.has _)
if(actions != 1)
CommandLineUtils.printUsageAndDie(parser, "Command must include exactly one action: --describe, --alter")
-
// check required args
CommandLineUtils.checkInvalidArgs(parser, options, alterOpt, Set(describeOpt))
CommandLineUtils.checkInvalidArgs(parser, options, describeOpt, Set(alterOpt, addConfig, deleteConfig))
+
val entityTypeVals = options.valuesOf(entityType).asScala
+ val (allowedEntityTypes, connectOptString) = if (options.has(bootstrapServerOpt))
+ (BrokerSupportedConfigTypes, "--bootstrap-server")
+ else
+ (ConfigType.all, "--zookeeper")
+
+ entityTypeVals.foreach(entityTypeVal =>
+ if (!allowedEntityTypes.contains(entityTypeVal))
+ throw new IllegalArgumentException(s"Invalid entity-type $entityTypeVal, --entity-type must be one of ${allowedEntityTypes.mkString(",")} with the $connectOptString argument")
+ )
+ if (entityTypeVals.isEmpty)
+ throw new IllegalArgumentException("At least one --entity-type must be specified")
+ else if (entityTypeVals.size > 1 && !entityTypeVals.toSet.equals(Set(ConfigType.User, ConfigType.Client)))
+ throw new IllegalArgumentException(s"Only '${ConfigType.User}' and '${ConfigType.Client}' entity types may be specified together")
- if (options.has(bootstrapServerOpt) == options.has(zkConnectOpt))
+ if (!options.has(bootstrapServerOpt) && !options.has(zkConnectOpt))
+ throw new IllegalArgumentException("One of the required --bootstrap-server or --zookeeper arguments must be specified")
+ else if (options.has(bootstrapServerOpt) && options.has(zkConnectOpt))
throw new IllegalArgumentException("Only one of --bootstrap-server or --zookeeper must be specified")
+ else if (options.has(bootstrapServerOpt) && !options.has(entityName) && !options.has(entityDefault))
+ throw new IllegalArgumentException(s"At least one of --entity-name or --entity-default must be specified with --bootstrap-server")
+
+ if (options.has(entityName) && (entityTypeVals.contains(ConfigType.Broker) || entityTypeVals.contains(BrokerLoggerConfigType))) {
+ val brokerId = options.valueOf(entityName)
+ try brokerId.toInt catch {
+ case _: NumberFormatException =>
+ throw new IllegalArgumentException(s"The entity name for ${entityTypeVals.head} must be a valid integer broker id , but it is: $brokerId")
+ }
+ }
+
if (entityTypeVals.contains(ConfigType.Client) || entityTypeVals.contains(ConfigType.Topic) || entityTypeVals.contains(ConfigType.User))
CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt, entityType)
- if(options.has(alterOpt)) {
+
+ if (options.has(describeOpt) && entityTypeVals.contains(BrokerLoggerConfigType) && !options.has(entityName))
+ throw new IllegalArgumentException(s"--entity-name must be specified with --describe of ${entityTypeVals.mkString(",")}")
+
+ if (options.has(alterOpt)) {
if (entityTypeVals.contains(ConfigType.User) || entityTypeVals.contains(ConfigType.Client) || entityTypeVals.contains(ConfigType.Broker)) {
if (!options.has(entityName) && !options.has(entityDefault))
throw new IllegalArgumentException("--entity-name or --entity-default must be specified with --alter of users, clients or brokers")
} else if (!options.has(entityName))
- throw new IllegalArgumentException(s"--entity-name must be specified with --alter of ${entityTypeVals}")
+ throw new IllegalArgumentException(s"--entity-name must be specified with --alter of ${entityTypeVals.mkString(",")}")
val isAddConfigPresent: Boolean = options.has(addConfig)
val isDeleteConfigPresent: Boolean = options.has(deleteConfig)
if(! isAddConfigPresent && ! isDeleteConfigPresent)
throw new IllegalArgumentException("At least one of --add-config or --delete-config must be specified with --alter")
}
- entityTypeVals.foreach(entityTypeVal =>
- if (!ConfigType.all.contains(entityTypeVal))
- throw new IllegalArgumentException(s"Invalid entity-type ${entityTypeVal}, --entity-type must be one of ${ConfigType.all}")
- )
- if (entityTypeVals.isEmpty)
- throw new IllegalArgumentException("At least one --entity-type must be specified")
- else if (entityTypeVals.size > 1 && !entityTypeVals.toSet.equals(Set(ConfigType.User, ConfigType.Client)))
- throw new IllegalArgumentException(s"Only '${ConfigType.User}' and '${ConfigType.Client}' entity types may be specified together")
}
}
}
diff --git a/core/src/main/scala/kafka/server/AdminManager.scala b/core/src/main/scala/kafka/server/AdminManager.scala
index 1ed55fb..1daaeec 100644
--- a/core/src/main/scala/kafka/server/AdminManager.scala
+++ b/core/src/main/scala/kafka/server/AdminManager.scala
@@ -21,13 +21,14 @@ import java.util.{Collections, Properties}
import kafka.admin.{AdminOperationException, AdminUtils}
import kafka.common.TopicAlreadyMarkedForDeletionException
import kafka.log.LogConfig
+import kafka.utils.Log4jController
import kafka.metrics.KafkaMetricsGroup
import kafka.utils._
import kafka.zk.{AdminZkClient, KafkaZkClient}
import org.apache.kafka.clients.admin.AlterConfigOp
import org.apache.kafka.clients.admin.AlterConfigOp.OpType
import org.apache.kafka.common.config.ConfigDef.ConfigKey
-import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigException, ConfigResource}
+import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigException, ConfigResource, LogLevelConfig}
import org.apache.kafka.common.errors.{ApiException, InvalidConfigurationException, InvalidPartitionsException, InvalidReplicaAssignmentException, InvalidRequestException, ReassignmentInProgressException, UnknownTopicOrPartitionException}
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
@@ -347,8 +348,16 @@ class AdminManager(val config: KafkaConfig,
createResponseConfig(allConfigs(config),
createBrokerConfigEntry(perBrokerConfig = true, includeSynonyms))
else
- throw new InvalidRequestException(s"Unexpected broker id, expected ${config.brokerId} or empty string, but received $resource.name")
+ throw new InvalidRequestException(s"Unexpected broker id, expected ${config.brokerId} or empty string, but received ${resource.name}")
+ case ConfigResource.Type.BROKER_LOGGER =>
+ if (resource.name == null || resource.name.isEmpty)
+ throw new InvalidRequestException("Broker id must not be empty")
+ else if (resourceNameToBrokerId(resource.name) != config.brokerId)
+ throw new InvalidRequestException(s"Unexpected broker id, expected ${config.brokerId} but received ${resource.name}")
+ else
+ createResponseConfig(Log4jController.loggers,
+ (name, value) => new DescribeConfigsResponse.ConfigEntry(name, value.toString, ConfigSource.DYNAMIC_BROKER_LOGGER_CONFIG, false, false, List.empty.asJava))
case resourceType => throw new InvalidRequestException(s"Unsupported resource type: $resourceType")
}
resource -> resourceConfig
@@ -428,13 +437,24 @@ class AdminManager(val config: KafkaConfig,
resource -> ApiError.NONE
}
+ private def alterLogLevelConfigs(alterConfigOps: List[AlterConfigOp]): Unit = {
+ alterConfigOps.foreach { alterConfigOp =>
+ val loggerName = alterConfigOp.configEntry().name()
+ val logLevel = alterConfigOp.configEntry().value()
+ alterConfigOp.opType() match {
+ case OpType.SET => Log4jController.logLevel(loggerName, logLevel)
+ case OpType.DELETE => Log4jController.unsetLogLevel(loggerName)
+ }
+ }
+ }
+
private def getBrokerId(resource: ConfigResource) = {
if (resource.name == null || resource.name.isEmpty)
None
else {
val id = resourceNameToBrokerId(resource.name)
if (id != this.config.brokerId)
- throw new InvalidRequestException(s"Unexpected broker id, expected ${this.config.brokerId}, but received $resource.name")
+ throw new InvalidRequestException(s"Unexpected broker id, expected ${this.config.brokerId}, but received ${resource.name}")
Some(id)
}
}
@@ -451,7 +471,7 @@ class AdminManager(val config: KafkaConfig,
def incrementalAlterConfigs(configs: Map[ConfigResource, List[AlterConfigOp]], validateOnly: Boolean): Map[ConfigResource, ApiError] = {
configs.map { case (resource, alterConfigOps) =>
try {
- //throw InvalidRequestException if any duplicate keys
+ // throw InvalidRequestException if any duplicate keys
val duplicateKeys = alterConfigOps.groupBy(config => config.configEntry().name())
.mapValues(_.size).filter(_._2 > 1).keys.toSet
if (duplicateKeys.nonEmpty)
@@ -475,6 +495,14 @@ class AdminManager(val config: KafkaConfig,
val configProps = this.config.dynamicConfig.fromPersistentProps(persistentProps, perBrokerConfig)
prepareIncrementalConfigs(alterConfigOps, configProps, KafkaConfig.configKeys)
alterBrokerConfigs(resource, validateOnly, configProps, configEntriesMap)
+
+ case ConfigResource.Type.BROKER_LOGGER =>
+ getBrokerId(resource)
+ validateLogLevelConfigs(alterConfigOps)
+
+ if (!validateOnly)
+ alterLogLevelConfigs(alterConfigOps)
+ resource -> ApiError.NONE
case resourceType =>
throw new InvalidRequestException(s"AlterConfigs is only supported for topics and brokers, but resource type is $resourceType")
}
@@ -495,6 +523,35 @@ class AdminManager(val config: KafkaConfig,
}.toMap
}
+ private def validateLogLevelConfigs(alterConfigOps: List[AlterConfigOp]): Unit = {
+ def validateLoggerNameExists(loggerName: String): Unit = {
+ if (!Log4jController.loggerExists(loggerName))
+ throw new ConfigException(s"Logger $loggerName does not exist!")
+ }
+
+ alterConfigOps.foreach { alterConfigOp =>
+ val loggerName = alterConfigOp.configEntry().name()
+ alterConfigOp.opType() match {
+ case OpType.SET =>
+ validateLoggerNameExists(loggerName)
+ val logLevel = alterConfigOp.configEntry().value()
+ if (!LogLevelConfig.VALID_LOG_LEVELS.contains(logLevel)) {
+ val validLevelsStr = LogLevelConfig.VALID_LOG_LEVELS.asScala.mkString(", ")
+ throw new ConfigException(
+ s"Cannot set the log level of $loggerName to $logLevel as it is not a supported log level. " +
+ s"Valid log levels are $validLevelsStr"
+ )
+ }
+ case OpType.DELETE =>
+ validateLoggerNameExists(loggerName)
+ if (loggerName == Log4jController.ROOT_LOGGER)
+ throw new InvalidRequestException(s"Removing the log level of the ${Log4jController.ROOT_LOGGER} logger is not allowed")
+ case OpType.APPEND => throw new InvalidRequestException(s"${OpType.APPEND} operation is not allowed for the ${ConfigResource.Type.BROKER_LOGGER} resource")
+ case OpType.SUBTRACT => throw new InvalidRequestException(s"${OpType.SUBTRACT} operation is not allowed for the ${ConfigResource.Type.BROKER_LOGGER} resource")
+ }
+ }
+ }
+
private def prepareIncrementalConfigs(alterConfigOps: List[AlterConfigOp], configProps: Properties, configKeys: Map[String, ConfigKey]): Unit = {
def listType(configName: String, configKeys: Map[String, ConfigKey]): Boolean = {
@@ -512,14 +569,14 @@ class AdminManager(val config: KafkaConfig,
if (!listType(alterConfigOp.configEntry().name(), configKeys))
throw new InvalidRequestException(s"Config value append is not allowed for config key: ${alterConfigOp.configEntry().name()}")
val oldValueList = configProps.getProperty(alterConfigOp.configEntry().name()).split(",").toList
- val newValueList = oldValueList ::: alterConfigOp.configEntry().value().split(",").toList
+ val newValueList = oldValueList ::: alterConfigOp.configEntry().value().split(",").toList
configProps.setProperty(alterConfigOp.configEntry().name(), newValueList.mkString(","))
}
case OpType.SUBTRACT => {
if (!listType(alterConfigOp.configEntry().name(), configKeys))
throw new InvalidRequestException(s"Config value subtract is not allowed for config key: ${alterConfigOp.configEntry().name()}")
val oldValueList = configProps.getProperty(alterConfigOp.configEntry().name()).split(",").toList
- val newValueList = oldValueList.diff(alterConfigOp.configEntry().value().split(",").toList)
+ val newValueList = oldValueList.diff(alterConfigOp.configEntry().value().split(",").toList)
configProps.setProperty(alterConfigOp.configEntry().name(), newValueList.mkString(","))
}
}
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 3ec6b23..a88cd92 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -2288,6 +2288,8 @@ class KafkaApis(val requestChannel: RequestChannel,
val alterConfigsRequest = request.body[AlterConfigsRequest]
val (authorizedResources, unauthorizedResources) = alterConfigsRequest.configs.asScala.partition { case (resource, _) =>
resource.`type` match {
+ case ConfigResource.Type.BROKER_LOGGER =>
+ throw new InvalidRequestException(s"AlterConfigs is deprecated and does not support the resource type ${ConfigResource.Type.BROKER_LOGGER}")
case ConfigResource.Type.BROKER =>
authorize(request.session, AlterConfigs, Resource.ClusterResource)
case ConfigResource.Type.TOPIC =>
@@ -2331,7 +2333,7 @@ class KafkaApis(val requestChannel: RequestChannel,
private def configsAuthorizationApiError(session: RequestChannel.Session, resource: ConfigResource): ApiError = {
val error = resource.`type` match {
- case ConfigResource.Type.BROKER => Errors.CLUSTER_AUTHORIZATION_FAILED
+ case ConfigResource.Type.BROKER | ConfigResource.Type.BROKER_LOGGER => Errors.CLUSTER_AUTHORIZATION_FAILED
case ConfigResource.Type.TOPIC => Errors.TOPIC_AUTHORIZATION_FAILED
case rt => throw new InvalidRequestException(s"Unexpected resource type $rt for resource ${resource.name}")
}
@@ -2349,7 +2351,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val (authorizedResources, unauthorizedResources) = configs.partition { case (resource, _) =>
resource.`type` match {
- case ConfigResource.Type.BROKER =>
+ case ConfigResource.Type.BROKER | ConfigResource.Type.BROKER_LOGGER =>
authorize(request.session, AlterConfigs, Resource.ClusterResource)
case ConfigResource.Type.TOPIC =>
authorize(request.session, AlterConfigs, Resource(Topic, resource.name, LITERAL))
@@ -2370,7 +2372,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val describeConfigsRequest = request.body[DescribeConfigsRequest]
val (authorizedResources, unauthorizedResources) = describeConfigsRequest.resources.asScala.partition { resource =>
resource.`type` match {
- case ConfigResource.Type.BROKER => authorize(request.session, DescribeConfigs, Resource.ClusterResource)
+ case ConfigResource.Type.BROKER | ConfigResource.Type.BROKER_LOGGER => authorize(request.session, DescribeConfigs, Resource.ClusterResource)
case ConfigResource.Type.TOPIC =>
authorize(request.session, DescribeConfigs, Resource(Topic, resource.name, LITERAL))
case rt => throw new InvalidRequestException(s"Unexpected resource type $rt for resource ${resource.name}")
diff --git a/core/src/main/scala/kafka/utils/Log4jController.scala b/core/src/main/scala/kafka/utils/Log4jController.scala
index 95d0733..ba0649c 100755
--- a/core/src/main/scala/kafka/utils/Log4jController.scala
+++ b/core/src/main/scala/kafka/utils/Log4jController.scala
@@ -22,69 +22,95 @@ import java.util.Locale
import org.apache.log4j.{Level, LogManager, Logger}
+import scala.collection.mutable
+import scala.collection.JavaConverters._
-/**
- * An MBean that allows the user to dynamically alter log4j levels at runtime.
- * The companion object contains the singleton instance of this class and
- * registers the MBean. The [[kafka.utils.Logging]] trait forces initialization
- * of the companion object.
- */
-private class Log4jController extends Log4jControllerMBean {
- def getLoggers = {
- val lst = new util.ArrayList[String]()
- lst.add("root=" + existingLogger("root").getLevel.toString)
+object Log4jController {
+ val ROOT_LOGGER = "root"
+
+ /**
+ * Returns a map of the log4j loggers and their assigned log level.
+ * If a logger does not have a log level assigned, we return the root logger's log level
+ */
+ def loggers: mutable.Map[String, String] = {
+ val logs = new mutable.HashMap[String, String]()
+ val rootLoggerLvl = existingLogger(ROOT_LOGGER).getLevel.toString
+ logs.put(ROOT_LOGGER, rootLoggerLvl)
+
val loggers = LogManager.getCurrentLoggers
while (loggers.hasMoreElements) {
val logger = loggers.nextElement().asInstanceOf[Logger]
if (logger != null) {
- val level = if (logger != null) logger.getLevel else null
- lst.add("%s=%s".format(logger.getName, if (level != null) level.toString else "null"))
+ val level = if (logger.getLevel != null) logger.getLevel.toString else rootLoggerLvl
+ logs.put(logger.getName, level)
}
}
- lst
+ logs
}
+ /**
+ * Sets the log level of a particular logger
+ */
+ def logLevel(loggerName: String, logLevel: String): Boolean = {
+ val log = existingLogger(loggerName)
+ if (!loggerName.trim.isEmpty && !logLevel.trim.isEmpty && log != null) {
+ log.setLevel(Level.toLevel(logLevel.toUpperCase(Locale.ROOT)))
+ true
+ }
+ else false
+ }
- private def newLogger(loggerName: String) =
- if (loggerName == "root")
- LogManager.getRootLogger
- else LogManager.getLogger(loggerName)
+ def unsetLogLevel(loggerName: String): Boolean = {
+ val log = existingLogger(loggerName)
+ if (!loggerName.trim.isEmpty && log != null) {
+ log.setLevel(null)
+ true
+ }
+ else false
+ }
+ def loggerExists(loggerName: String): Boolean = existingLogger(loggerName) != null
private def existingLogger(loggerName: String) =
- if (loggerName == "root")
+ if (loggerName == ROOT_LOGGER)
LogManager.getRootLogger
else LogManager.exists(loggerName)
+}
+/**
+ * An MBean that allows the user to dynamically alter log4j levels at runtime.
+ * The companion object contains the singleton instance of this class and
+ * registers the MBean. The [[kafka.utils.Logging]] trait forces initialization
+ * of the companion object.
+ */
+class Log4jController extends Log4jControllerMBean {
- def getLogLevel(loggerName: String) = {
- val log = existingLogger(loggerName)
+ def getLoggers: util.List[String] = {
+ Log4jController.loggers.map {
+ case (logger, level) => s"$logger=$level"
+ }.toList.asJava
+ }
+
+
+ def getLogLevel(loggerName: String): String = {
+ val log = Log4jController.existingLogger(loggerName)
if (log != null) {
val level = log.getLevel
if (level != null)
log.getLevel.toString
- else "Null log level."
+ else
+ Log4jController.existingLogger(Log4jController.ROOT_LOGGER).getLevel.toString
}
else "No such logger."
}
-
- def setLogLevel(loggerName: String, level: String) = {
- val log = newLogger(loggerName)
- if (!loggerName.trim.isEmpty && !level.trim.isEmpty && log != null) {
- log.setLevel(Level.toLevel(level.toUpperCase(Locale.ROOT)))
- true
- }
- else false
- }
-
+ def setLogLevel(loggerName: String, level: String): Boolean = Log4jController.logLevel(loggerName, level)
}
-private trait Log4jControllerMBean {
+trait Log4jControllerMBean {
def getLoggers: java.util.List[String]
def getLogLevel(logger: String): String
def setLogLevel(logger: String, level: String): Boolean
}
-
diff --git a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
index 7f04de1..ff8e379 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
@@ -24,12 +24,13 @@ import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
import java.util.concurrent.{CountDownLatch, ExecutionException, TimeUnit}
import java.util.{Collections, Properties}
import java.{time, util}
+
import kafka.log.LogConfig
import kafka.security.auth.{Cluster, Group, Topic}
import kafka.server.{Defaults, KafkaConfig, KafkaServer}
import kafka.utils.Implicits._
import kafka.utils.TestUtils._
-import kafka.utils.{Logging, TestUtils}
+import kafka.utils.{Log4jController, Logging, TestUtils}
import kafka.zk.KafkaZkClient
import org.apache.kafka.clients.admin._
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
@@ -40,7 +41,7 @@ import org.apache.kafka.common.ElectionType
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.TopicPartitionReplica
import org.apache.kafka.common.acl._
-import org.apache.kafka.common.config.ConfigResource
+import org.apache.kafka.common.config.{ConfigResource, LogLevelConfig}
import org.apache.kafka.common.errors._
import org.apache.kafka.common.requests.{DeleteRecordsRequest, MetadataResponse}
import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourceType}
@@ -49,6 +50,7 @@ import org.junit.Assert._
import org.junit.rules.Timeout
import org.junit.{After, Before, Rule, Test}
import org.scalatest.Assertions.intercept
+
import scala.collection.JavaConverters._
import scala.collection.Seq
import scala.compat.java8.OptionConverters._
@@ -68,6 +70,8 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
def globalTimeout = Timeout.millis(120000)
var client: Admin = null
+ var brokerLoggerConfigResource: ConfigResource = null
+ var changedBrokerLoggers = scala.collection.mutable.Set[String]()
val topic = "topic"
val partition = 0
@@ -77,10 +81,12 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
override def setUp(): Unit = {
super.setUp
TestUtils.waitUntilBrokerMetadataIsPropagated(servers)
+ brokerLoggerConfigResource = new ConfigResource(ConfigResource.Type.BROKER_LOGGER, servers.head.config.brokerId.toString)
}
@After
override def tearDown(): Unit = {
+ teardownBrokerLoggers()
if (client != null)
Utils.closeQuietly(client, "AdminClient")
super.tearDown()
@@ -1819,6 +1825,225 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
classOf[InvalidTopicException])
client.close()
}
+
+ @Test
+ def testDescribeConfigsForLog4jLogLevels(): Unit = {
+ client = AdminClient.create(createConfig())
+
+ val loggerConfig = describeBrokerLoggers()
+ val rootLogLevel = loggerConfig.get(Log4jController.ROOT_LOGGER).value()
+ val logCleanerLogLevelConfig = loggerConfig.get("kafka.cluster.Replica")
+ assertEquals(rootLogLevel, logCleanerLogLevelConfig.value()) // we expect an undefined log level to be the same as the root logger
+ assertEquals("kafka.cluster.Replica", logCleanerLogLevelConfig.name())
+ assertEquals(ConfigEntry.ConfigSource.DYNAMIC_BROKER_LOGGER_CONFIG, logCleanerLogLevelConfig.source())
+ assertEquals(false, logCleanerLogLevelConfig.isReadOnly)
+ assertEquals(false, logCleanerLogLevelConfig.isSensitive)
+ assertTrue(logCleanerLogLevelConfig.synonyms().isEmpty)
+ }
+
+ @Test
+ def testIncrementalAlterConfigsForLog4jLogLevels(): Unit = {
+ client = AdminClient.create(createConfig())
+
+ val initialLoggerConfig = describeBrokerLoggers()
+ val initialRootLogLevel = initialLoggerConfig.get(Log4jController.ROOT_LOGGER).value()
+ assertEquals(initialRootLogLevel, initialLoggerConfig.get("kafka.controller.KafkaController").value())
+ assertEquals(initialRootLogLevel, initialLoggerConfig.get("kafka.log.LogCleaner").value())
+ assertEquals(initialRootLogLevel, initialLoggerConfig.get("kafka.server.ReplicaManager").value())
+
+ val newRootLogLevel = LogLevelConfig.DEBUG_LOG_LEVEL
+ val alterRootLoggerEntry = Seq(
+ new AlterConfigOp(new ConfigEntry(Log4jController.ROOT_LOGGER, newRootLogLevel), AlterConfigOp.OpType.SET)
+ ).asJavaCollection
+ // Test validateOnly does not change anything
+ alterBrokerLoggers(alterRootLoggerEntry, validateOnly = true)
+ val validatedLoggerConfig = describeBrokerLoggers()
+ assertEquals(initialRootLogLevel, validatedLoggerConfig.get(Log4jController.ROOT_LOGGER).value())
+ assertEquals(initialRootLogLevel, validatedLoggerConfig.get("kafka.controller.KafkaController").value())
+ assertEquals(initialRootLogLevel, validatedLoggerConfig.get("kafka.log.LogCleaner").value())
+ assertEquals(initialRootLogLevel, validatedLoggerConfig.get("kafka.server.ReplicaManager").value())
+ assertEquals(initialRootLogLevel, validatedLoggerConfig.get("kafka.zookeeper.ZooKeeperClient").value())
+
+ // test that we can change them and unset loggers still use the root's log level
+ alterBrokerLoggers(alterRootLoggerEntry)
+ val changedRootLoggerConfig = describeBrokerLoggers()
+ assertEquals(newRootLogLevel, changedRootLoggerConfig.get(Log4jController.ROOT_LOGGER).value())
+ assertEquals(newRootLogLevel, changedRootLoggerConfig.get("kafka.controller.KafkaController").value())
+ assertEquals(newRootLogLevel, changedRootLoggerConfig.get("kafka.log.LogCleaner").value())
+ assertEquals(newRootLogLevel, changedRootLoggerConfig.get("kafka.server.ReplicaManager").value())
+ assertEquals(newRootLogLevel, changedRootLoggerConfig.get("kafka.zookeeper.ZooKeeperClient").value())
+
+ // alter the ZK client's logger so we can later test resetting it
+ val alterZKLoggerEntry = Seq(
+ new AlterConfigOp(new ConfigEntry("kafka.zookeeper.ZooKeeperClient", LogLevelConfig.ERROR_LOG_LEVEL), AlterConfigOp.OpType.SET)
+ ).asJavaCollection
+ alterBrokerLoggers(alterZKLoggerEntry)
+ val changedZKLoggerConfig = describeBrokerLoggers()
+ assertEquals(LogLevelConfig.ERROR_LOG_LEVEL, changedZKLoggerConfig.get("kafka.zookeeper.ZooKeeperClient").value())
+
+ // properly test various set operations and one delete
+ val alterLogLevelsEntries = Seq(
+ new AlterConfigOp(new ConfigEntry("kafka.controller.KafkaController", LogLevelConfig.INFO_LOG_LEVEL), AlterConfigOp.OpType.SET),
+ new AlterConfigOp(new ConfigEntry("kafka.log.LogCleaner", LogLevelConfig.ERROR_LOG_LEVEL), AlterConfigOp.OpType.SET),
+ new AlterConfigOp(new ConfigEntry("kafka.server.ReplicaManager", LogLevelConfig.TRACE_LOG_LEVEL), AlterConfigOp.OpType.SET),
+ new AlterConfigOp(new ConfigEntry("kafka.zookeeper.ZooKeeperClient", ""), AlterConfigOp.OpType.DELETE) // should reset to the root logger level
+ ).asJavaCollection
+ alterBrokerLoggers(alterLogLevelsEntries)
+ val alteredLoggerConfig = describeBrokerLoggers()
+ assertEquals(newRootLogLevel, alteredLoggerConfig.get(Log4jController.ROOT_LOGGER).value())
+ assertEquals(LogLevelConfig.INFO_LOG_LEVEL, alteredLoggerConfig.get("kafka.controller.KafkaController").value())
+ assertEquals(LogLevelConfig.ERROR_LOG_LEVEL, alteredLoggerConfig.get("kafka.log.LogCleaner").value())
+ assertEquals(LogLevelConfig.TRACE_LOG_LEVEL, alteredLoggerConfig.get("kafka.server.ReplicaManager").value())
+ assertEquals(newRootLogLevel, alteredLoggerConfig.get("kafka.zookeeper.ZooKeeperClient").value())
+ }
+
+ /**
+ * 1. Assume ROOT logger == TRACE
+ * 2. Change kafka.controller.KafkaController logger to INFO
+ * 3. Unset kafka.controller.KafkaController via AlterConfigOp.OpType.DELETE (resets it to the root logger - TRACE)
+ * 4. Change ROOT logger to ERROR
+ * 5. Ensure the kafka.controller.KafkaController logger's level is ERROR (the curent root logger level)
+ */
+ @Test
+ def testIncrementalAlterConfigsForLog4jLogLevelsCanResetLoggerToCurrentRoot(): Unit = {
+ client = AdminClient.create(createConfig())
+ // step 1 - configure root logger
+ val initialRootLogLevel = LogLevelConfig.TRACE_LOG_LEVEL
+ val alterRootLoggerEntry = Seq(
+ new AlterConfigOp(new ConfigEntry(Log4jController.ROOT_LOGGER, initialRootLogLevel), AlterConfigOp.OpType.SET)
+ ).asJavaCollection
+ alterBrokerLoggers(alterRootLoggerEntry)
+ val initialLoggerConfig = describeBrokerLoggers()
+ assertEquals(initialRootLogLevel, initialLoggerConfig.get(Log4jController.ROOT_LOGGER).value())
+ assertEquals(initialRootLogLevel, initialLoggerConfig.get("kafka.controller.KafkaController").value())
+
+ // step 2 - change KafkaController logger to INFO
+ val alterControllerLoggerEntry = Seq(
+ new AlterConfigOp(new ConfigEntry("kafka.controller.KafkaController", LogLevelConfig.INFO_LOG_LEVEL), AlterConfigOp.OpType.SET)
+ ).asJavaCollection
+ alterBrokerLoggers(alterControllerLoggerEntry)
+ val changedControllerLoggerConfig = describeBrokerLoggers()
+ assertEquals(initialRootLogLevel, changedControllerLoggerConfig.get(Log4jController.ROOT_LOGGER).value())
+ assertEquals(LogLevelConfig.INFO_LOG_LEVEL, changedControllerLoggerConfig.get("kafka.controller.KafkaController").value())
+
+ // step 3 - unset KafkaController logger
+ val deleteControllerLoggerEntry = Seq(
+ new AlterConfigOp(new ConfigEntry("kafka.controller.KafkaController", ""), AlterConfigOp.OpType.DELETE)
+ ).asJavaCollection
+ alterBrokerLoggers(deleteControllerLoggerEntry)
+ val deletedControllerLoggerConfig = describeBrokerLoggers()
+ assertEquals(initialRootLogLevel, deletedControllerLoggerConfig.get(Log4jController.ROOT_LOGGER).value())
+ assertEquals(initialRootLogLevel, deletedControllerLoggerConfig.get("kafka.controller.KafkaController").value())
+
+ val newRootLogLevel = LogLevelConfig.ERROR_LOG_LEVEL
+ val newAlterRootLoggerEntry = Seq(
+ new AlterConfigOp(new ConfigEntry(Log4jController.ROOT_LOGGER, newRootLogLevel), AlterConfigOp.OpType.SET)
+ ).asJavaCollection
+ alterBrokerLoggers(newAlterRootLoggerEntry)
+ val newRootLoggerConfig = describeBrokerLoggers()
+ assertEquals(newRootLogLevel, newRootLoggerConfig.get(Log4jController.ROOT_LOGGER).value())
+ assertEquals(newRootLogLevel, newRootLoggerConfig.get("kafka.controller.KafkaController").value())
+ }
+
+ @Test
+ def testIncrementalAlterConfigsForLog4jLogLevelsCannotResetRootLogger(): Unit = {
+ client = AdminClient.create(createConfig())
+ val deleteRootLoggerEntry = Seq(
+ new AlterConfigOp(new ConfigEntry(Log4jController.ROOT_LOGGER, ""), AlterConfigOp.OpType.DELETE)
+ ).asJavaCollection
+
+ assertTrue(intercept[ExecutionException](alterBrokerLoggers(deleteRootLoggerEntry)).getCause.isInstanceOf[InvalidRequestException])
+ }
+
+ @Test
+ def testIncrementalAlterConfigsForLog4jLogLevelsDoesNotWorkWithInvalidConfigs(): Unit = {
+ client = AdminClient.create(createConfig())
+ val validLoggerName = "kafka.server.KafkaRequestHandler"
+ val expectedValidLoggerLogLevel = describeBrokerLoggers().get(validLoggerName)
+ def assertLogLevelDidNotChange(): Unit = {
+ assertEquals(
+ expectedValidLoggerLogLevel,
+ describeBrokerLoggers().get(validLoggerName)
+ )
+ }
+
+ val appendLogLevelEntries = Seq(
+ new AlterConfigOp(new ConfigEntry("kafka.server.KafkaRequestHandler", LogLevelConfig.INFO_LOG_LEVEL), AlterConfigOp.OpType.SET), // valid
+ new AlterConfigOp(new ConfigEntry("kafka.network.SocketServer", LogLevelConfig.ERROR_LOG_LEVEL), AlterConfigOp.OpType.APPEND) // append is not supported
+ ).asJavaCollection
+ assertTrue(intercept[ExecutionException](alterBrokerLoggers(appendLogLevelEntries)).getCause.isInstanceOf[InvalidRequestException])
+ assertLogLevelDidNotChange()
+
+ val subtractLogLevelEntries = Seq(
+ new AlterConfigOp(new ConfigEntry("kafka.server.KafkaRequestHandler", LogLevelConfig.INFO_LOG_LEVEL), AlterConfigOp.OpType.SET), // valid
+ new AlterConfigOp(new ConfigEntry("kafka.network.SocketServer", LogLevelConfig.ERROR_LOG_LEVEL), AlterConfigOp.OpType.SUBTRACT) // subtract is not supported
+ ).asJavaCollection
+ assertTrue(intercept[ExecutionException](alterBrokerLoggers(subtractLogLevelEntries)).getCause.isInstanceOf[InvalidRequestException])
+ assertLogLevelDidNotChange()
+
+ val invalidLogLevelLogLevelEntries = Seq(
+ new AlterConfigOp(new ConfigEntry("kafka.server.KafkaRequestHandler", LogLevelConfig.INFO_LOG_LEVEL), AlterConfigOp.OpType.SET), // valid
+ new AlterConfigOp(new ConfigEntry("kafka.network.SocketServer", "OFF"), AlterConfigOp.OpType.SET) // OFF is not a valid log level
+ ).asJavaCollection
+ assertTrue(intercept[ExecutionException](alterBrokerLoggers(invalidLogLevelLogLevelEntries)).getCause.isInstanceOf[InvalidRequestException])
+ assertLogLevelDidNotChange()
+
+ val invalidLoggerNameLogLevelEntries = Seq(
+ new AlterConfigOp(new ConfigEntry("kafka.server.KafkaRequestHandler", LogLevelConfig.INFO_LOG_LEVEL), AlterConfigOp.OpType.SET), // valid
+ new AlterConfigOp(new ConfigEntry("Some Other LogCleaner", LogLevelConfig.ERROR_LOG_LEVEL), AlterConfigOp.OpType.SET) // invalid logger name is not supported
+ ).asJavaCollection
+ assertTrue(intercept[ExecutionException](alterBrokerLoggers(invalidLoggerNameLogLevelEntries)).getCause.isInstanceOf[InvalidRequestException])
+ assertLogLevelDidNotChange()
+ }
+
+ /**
+ * The AlterConfigs API is deprecated and should not support altering log levels
+ */
+ @Test
+ def testAlterConfigsForLog4jLogLevelsDoesNotWork(): Unit = {
+ client = AdminClient.create(createConfig())
+
+ val alterLogLevelsEntries = Seq(
+ new ConfigEntry("kafka.controller.KafkaController", LogLevelConfig.INFO_LOG_LEVEL)
+ ).asJavaCollection
+ val alterResult = client.alterConfigs(Map(brokerLoggerConfigResource -> new Config(alterLogLevelsEntries)).asJava)
+ assertTrue(intercept[ExecutionException](alterResult.values.get(brokerLoggerConfigResource).get).getCause.isInstanceOf[InvalidRequestException])
+ }
+
+ def alterBrokerLoggers(entries: util.Collection[AlterConfigOp], validateOnly: Boolean = false): Unit = {
+ if (!validateOnly) {
+ for (entry <- entries.asScala)
+ changedBrokerLoggers.add(entry.configEntry().name())
+ }
+
+ client.incrementalAlterConfigs(Map(brokerLoggerConfigResource -> entries).asJava, new AlterConfigsOptions().validateOnly(validateOnly))
+ .values.get(brokerLoggerConfigResource).get()
+ }
+
+ def describeBrokerLoggers(): Config =
+ client.describeConfigs(Collections.singletonList(brokerLoggerConfigResource)).values.get(brokerLoggerConfigResource).get()
+
+ /**
+ * Due to the fact that log4j is not re-initialized across tests, changing a logger's log level persists across test classes.
+ * We need to clean up the changes done while testing.
+ */
+ def teardownBrokerLoggers(): Unit = {
+ if (changedBrokerLoggers.nonEmpty) {
+ val validLoggers = describeBrokerLoggers().entries().asScala.filterNot(_.name().equals(Log4jController.ROOT_LOGGER)).map(_.name).toSet
+ val unsetBrokerLoggersEntries = changedBrokerLoggers
+ .intersect(validLoggers)
+ .map { logger => new AlterConfigOp(new ConfigEntry(logger, ""), AlterConfigOp.OpType.DELETE) }
+ .asJavaCollection
+
+ // ensure that we first reset the root logger to an arbitrary log level. Note that we cannot reset it to its original value
+ alterBrokerLoggers(List(
+ new AlterConfigOp(new ConfigEntry(Log4jController.ROOT_LOGGER, LogLevelConfig.FATAL_LOG_LEVEL), AlterConfigOp.OpType.SET)
+ ).asJavaCollection)
+ alterBrokerLoggers(unsetBrokerLoggersEntries)
+
+ changedBrokerLoggers.clear()
+ }
+ }
}
object AdminClientIntegrationTest {
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 8f3f24f..387a7f9 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -31,7 +31,7 @@ import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener
import org.apache.kafka.clients.producer._
import org.apache.kafka.common.ElectionType
import org.apache.kafka.common.acl.{AccessControlEntry, AccessControlEntryFilter, AclBinding, AclBindingFilter, AclOperation, AclPermissionType}
-import org.apache.kafka.common.config.ConfigResource
+import org.apache.kafka.common.config.{ConfigResource, LogLevelConfig}
import org.apache.kafka.common.errors._
import org.apache.kafka.common.internals.Topic.GROUP_METADATA_TOPIC_NAME
import org.apache.kafka.common.message.ControlledShutdownRequestData
@@ -101,6 +101,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
val clusterCreateAcl = Map(Resource.ClusterResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Create)))
val clusterAlterAcl = Map(Resource.ClusterResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Alter)))
val clusterDescribeAcl = Map(Resource.ClusterResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe)))
+ val clusterAlterConfigsAcl = Map(Resource.ClusterResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, AlterConfigs)))
val clusterIdempotentWriteAcl = Map(Resource.ClusterResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, IdempotentWrite)))
val topicCreateAcl = Map(createTopicResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Create)))
val topicReadAcl = Map(topicResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read)))
@@ -215,8 +216,13 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
if (resp.logDirInfos.size() > 0) resp.logDirInfos.asScala.head._2.error else Errors.CLUSTER_AUTHORIZATION_FAILED),
ApiKeys.CREATE_PARTITIONS -> ((resp: CreatePartitionsResponse) => resp.errors.asScala.find(_._1 == topic).get._2.error),
ApiKeys.ELECT_LEADERS -> ((resp: ElectLeadersResponse) => Errors.forCode(resp.data().errorCode())),
- ApiKeys.INCREMENTAL_ALTER_CONFIGS -> ((resp: IncrementalAlterConfigsResponse) =>
- IncrementalAlterConfigsResponse.fromResponseData(resp.data()).get(new ConfigResource(ConfigResource.Type.TOPIC, tp.topic)).error),
+ ApiKeys.INCREMENTAL_ALTER_CONFIGS -> ((resp: IncrementalAlterConfigsResponse) => {
+ val topicResourceError = IncrementalAlterConfigsResponse.fromResponseData(resp.data()).get(new ConfigResource(ConfigResource.Type.TOPIC, tp.topic))
+ if (topicResourceError == null)
+ IncrementalAlterConfigsResponse.fromResponseData(resp.data()).get(new ConfigResource(ConfigResource.Type.BROKER_LOGGER, brokerId.toString)).error
+ else
+ topicResourceError.error()
+ }),
ApiKeys.ALTER_PARTITION_REASSIGNMENTS -> ((resp: AlterPartitionReassignmentsResponse) => Errors.forCode(resp.data().errorCode())),
ApiKeys.LIST_PARTITION_REASSIGNMENTS -> ((resp: ListPartitionReassignmentsResponse) => Errors.forCode(resp.data().errorCode()))
)
@@ -651,6 +657,28 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
}
@Test
+ def testIncrementalAlterConfigsRequestRequiresClusterPermissionForBrokerLogger(): Unit = {
+ val data = new IncrementalAlterConfigsRequestData
+ val alterableConfig = new AlterableConfig().setName("kafka.controller.KafkaController").
+ setValue(LogLevelConfig.DEBUG_LOG_LEVEL).setConfigOperation(AlterConfigOp.OpType.DELETE.id())
+ val alterableConfigSet = new AlterableConfigCollection
+ alterableConfigSet.add(alterableConfig)
+ data.resources().add(new AlterConfigsResource().
+ setResourceName(brokerId.toString).setResourceType(ConfigResource.Type.BROKER_LOGGER.id()).
+ setConfigs(alterableConfigSet))
+ val key = ApiKeys.INCREMENTAL_ALTER_CONFIGS
+ val request = new IncrementalAlterConfigsRequest.Builder(data).build()
+
+ removeAllAcls()
+ val resources = Set(topicResource.resourceType, Resource.ClusterResource.resourceType)
+ sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = false)
+
+ val clusterAcls = clusterAlterConfigsAcl(Resource.ClusterResource)
+ addAndVerifyAcls(clusterAcls, Resource.ClusterResource)
+ sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = true)
+ }
+
+ @Test
def testOffsetsForLeaderEpochClusterPermission(): Unit = {
val key = ApiKeys.OFFSET_FOR_LEADER_EPOCH
val request = offsetsForLeaderEpochRequest
diff --git a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
index bd26a61..e3396bb 100644
--- a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
@@ -22,7 +22,7 @@ import java.util.Properties
import kafka.admin.ConfigCommand.ConfigCommandOptions
import kafka.api.ApiVersion
import kafka.cluster.{Broker, EndPoint}
-import kafka.server.{ConfigEntityName, KafkaConfig}
+import kafka.server.{ConfigEntityName, ConfigType, KafkaConfig}
import kafka.utils.{Exit, Logging}
import kafka.zk.{AdminZkClient, BrokerInfo, KafkaZkClient, ZooKeeperTestHarness}
import org.apache.kafka.clients.admin._
@@ -101,33 +101,44 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
testArgumentParse("brokers")
}
- def testArgumentParse(entityType: String) = {
+ @Test
+ def shouldParseArgumentsForBrokerLoggersEntityType() {
+ testArgumentParse("broker-loggers",
+ zkConfig = false)
+ }
+
+ def testArgumentParse(entityType: String, zkConfig: Boolean=true): Unit = {
+ val connectOpts = if (zkConfig)
+ ("--zookeeper", zkConnect)
+ else
+ ("--bootstrap-server", "localhost:9092")
+
// Should parse correctly
- var createOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect,
- "--entity-name", "x",
+ var createOpts = new ConfigCommandOptions(Array(connectOpts._1, connectOpts._2,
+ "--entity-name", "1",
"--entity-type", entityType,
"--describe"))
createOpts.checkArgs()
// For --alter and added config
- createOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect,
- "--entity-name", "x",
+ createOpts = new ConfigCommandOptions(Array(connectOpts._1, connectOpts._2,
+ "--entity-name", "1",
"--entity-type", entityType,
"--alter",
"--add-config", "a=b,c=d"))
createOpts.checkArgs()
// For alter and deleted config
- createOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect,
- "--entity-name", "x",
+ createOpts = new ConfigCommandOptions(Array(connectOpts._1, connectOpts._2,
+ "--entity-name", "1",
"--entity-type", entityType,
"--alter",
"--delete-config", "a,b,c"))
createOpts.checkArgs()
// For alter and both added, deleted config
- createOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect,
- "--entity-name", "x",
+ createOpts = new ConfigCommandOptions(Array(connectOpts._1, connectOpts._2,
+ "--entity-name", "1",
"--entity-type", entityType,
"--alter",
"--add-config", "a=b,c=d",
@@ -143,8 +154,8 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
assertEquals(1, deletedProps.size)
assertEquals("a", deletedProps.head)
- createOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect,
- "--entity-name", "x",
+ createOpts = new ConfigCommandOptions(Array(connectOpts._1, connectOpts._2,
+ "--entity-name", "1",
"--entity-type", entityType,
"--alter",
"--add-config", "a=b,c=,d=e,f="))
@@ -165,6 +176,13 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
ConfigCommand.alterConfig(null, createOpts, new DummyAdminZkClient(zkClient))
}
+ @Test(expected = classOf[IllegalArgumentException])
+ def shouldFailIfBrokerEntityTypeIsNotAnInteger(): Unit = {
+ val createOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect,
+ "--entity-name", "A", "--entity-type", "brokers", "--alter", "--add-config", "a=b,c=d"))
+ ConfigCommand.alterConfig(null, createOpts, new DummyAdminZkClient(zkClient))
+ }
+
@Test
def shouldAddClientConfig(): Unit = {
val createOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect,
@@ -229,6 +247,71 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
}
@Test
+ def shouldAddBrokerLoggerConfig(): Unit = {
+ val node = new Node(1, "localhost", 9092)
+ verifyAlterBrokerLoggerConfig(node, "1", "1", List(
+ new ConfigEntry("kafka.log.LogCleaner", "INFO"),
+ new ConfigEntry("kafka.server.ReplicaManager", "INFO"),
+ new ConfigEntry("kafka.server.KafkaApi", "INFO")
+ ))
+ }
+
+ @Test
+ def testNoSpecifiedEntityOptionWithDescribeBrokersInZKIsAllowed(): Unit = {
+ val optsList = List("--zookeeper", "localhost:9092",
+ "--entity-type", ConfigType.Broker,
+ "--describe"
+ )
+
+ new ConfigCommandOptions(optsList.toArray).checkArgs()
+ }
+
+ @Test(expected = classOf[IllegalArgumentException])
+ def testNoSpecifiedEntityOptionWithDescribeBrokersInBootstrapServerIsNotAllowed(): Unit = {
+ val optsList = List("--bootstrap-server", "localhost:9092",
+ "--entity-type", ConfigType.Broker,
+ "--describe"
+ )
+
+ new ConfigCommandOptions(optsList.toArray).checkArgs()
+ }
+
+ @Test(expected = classOf[IllegalArgumentException])
+ def testEntityDefaultOptionWithDescribeBrokerLoggerIsNotAllowed(): Unit = {
+ val node = new Node(1, "localhost", 9092)
+ val optsList = List("--bootstrap-server", "localhost:9092",
+ "--entity-type", ConfigCommand.BrokerLoggerConfigType,
+ "--entity-default",
+ "--describe"
+ )
+
+ new ConfigCommandOptions(optsList.toArray).checkArgs()
+ }
+
+ @Test(expected = classOf[IllegalArgumentException])
+ def testEntityDefaultOptionWithAlterBrokerLoggerIsNotAllowed(): Unit = {
+ val node = new Node(1, "localhost", 9092)
+ val optsList = List("--bootstrap-server", "localhost:9092",
+ "--entity-type", ConfigCommand.BrokerLoggerConfigType,
+ "--entity-default",
+ "--alter",
+ "--add-config", "kafka.log.LogCleaner=DEBUG"
+ )
+
+ new ConfigCommandOptions(optsList.toArray).checkArgs()
+ }
+
+ @Test(expected = classOf[InvalidConfigurationException])
+ def shouldRaiseInvalidConfigurationExceptionWhenAddingInvalidBrokerLoggerConfig(): Unit = {
+ val node = new Node(1, "localhost", 9092)
+ // verifyAlterBrokerLoggerConfig tries to alter kafka.log.LogCleaner, kafka.server.ReplicaManager and kafka.server.KafkaApi
+ // yet, we make it so DescribeConfigs returns only one logger, implying that kafka.server.ReplicaManager and kafka.log.LogCleaner are invalid
+ verifyAlterBrokerLoggerConfig(node, "1", "1", List(
+ new ConfigEntry("kafka.server.KafkaApi", "INFO")
+ ))
+ }
+
+ @Test
def shouldAddDefaultBrokerDynamicConfig(): Unit = {
val node = new Node(1, "localhost", 9092)
verifyAlterBrokerConfig(node, "", List("--entity-default"))
@@ -274,11 +357,66 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
}
}
EasyMock.replay(alterResult, describeResult)
- ConfigCommand.alterBrokerConfig(mockAdminClient, alterOpts, resourceName)
+ ConfigCommand.alterBrokerConfig(mockAdminClient, alterOpts, ConfigType.Broker, resourceName)
assertEquals(Map("message.max.bytes" -> "10", "num.io.threads" -> "5"), brokerConfigs.toMap)
EasyMock.reset(alterResult, describeResult)
}
+ def verifyAlterBrokerLoggerConfig(node: Node, resourceName: String, entityName: String,
+ describeConfigEntries: List[ConfigEntry]): Unit = {
+ val optsList = List("--bootstrap-server", "localhost:9092",
+ "--entity-type", ConfigCommand.BrokerLoggerConfigType,
+ "--alter",
+ "--entity-name", entityName,
+ "--add-config", "kafka.log.LogCleaner=DEBUG",
+ "--delete-config", "kafka.server.ReplicaManager,kafka.server.KafkaApi")
+ val alterOpts = new ConfigCommandOptions(optsList.toArray)
+ var alteredConfigs = false
+
+ val resource = new ConfigResource(ConfigResource.Type.BROKER_LOGGER, resourceName)
+ val future = new KafkaFutureImpl[util.Map[ConfigResource, Config]]
+ future.complete(util.Collections.singletonMap(resource, new Config(describeConfigEntries.asJava)))
+ val describeResult: DescribeConfigsResult = EasyMock.createNiceMock(classOf[DescribeConfigsResult])
+ EasyMock.expect(describeResult.all()).andReturn(future).once()
+
+ val alterFuture = new KafkaFutureImpl[Void]
+ alterFuture.complete(null)
+ val alterResult: AlterConfigsResult = EasyMock.createNiceMock(classOf[AlterConfigsResult])
+ EasyMock.expect(alterResult.all()).andReturn(alterFuture)
+
+ val mockAdminClient = new MockAdminClient(util.Collections.singletonList(node), node) {
+ override def describeConfigs(resources: util.Collection[ConfigResource]): DescribeConfigsResult = {
+ assertEquals(1, resources.size)
+ val resource = resources.iterator.next
+ assertEquals(ConfigResource.Type.BROKER_LOGGER, resource.`type`)
+ assertEquals(resourceName, resource.name)
+ describeResult
+ }
+
+ override def incrementalAlterConfigs(configs: util.Map[ConfigResource, util.Collection[AlterConfigOp]], options: AlterConfigsOptions): AlterConfigsResult = {
+ assertEquals(1, configs.size)
+ val entry = configs.entrySet.iterator.next
+ val resource = entry.getKey
+ val alterConfigOps = entry.getValue
+ assertEquals(ConfigResource.Type.BROKER_LOGGER, resource.`type`)
+ assertEquals(3, alterConfigOps.size)
+
+ val expectedConfigOps = List(
+ new AlterConfigOp(new ConfigEntry("kafka.log.LogCleaner", "DEBUG"), AlterConfigOp.OpType.SET),
+ new AlterConfigOp(new ConfigEntry("kafka.server.ReplicaManager", ""), AlterConfigOp.OpType.DELETE),
+ new AlterConfigOp(new ConfigEntry("kafka.server.KafkaApi", ""), AlterConfigOp.OpType.DELETE)
+ )
+ assertEquals(expectedConfigOps, alterConfigOps.asScala.toList)
+ alteredConfigs = true
+ alterResult
+ }
+ }
+ EasyMock.replay(alterResult, describeResult)
+ ConfigCommand.alterBrokerConfig(mockAdminClient, alterOpts, ConfigCommand.BrokerLoggerConfigType, resourceName)
+ assertTrue(alteredConfigs)
+ EasyMock.reset(alterResult, describeResult)
+ }
+
@Test
def shouldSupportCommaSeparatedValues(): Unit = {
val createOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect,