You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/09/30 10:55:26 UTC

[GitHub] [kafka] tombentley commented on a change in pull request #7898: KAFKA-9366: Change log4j dependency into log4j2

tombentley commented on a change in pull request #7898:
URL: https://github.com/apache/kafka/pull/7898#discussion_r497371849



##########
File path: config/connect-log4j2.properties
##########
@@ -0,0 +1,61 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+name=ConnectConfig
+appenders=stdout,connectAppender
+
+# Send the logs to the console.
+#
+appender.stdout.type=Console
+appender.stdout.name=STDOUT
+appender.stdout.layout.type=PatternLayout
+
+# Send the logs to a file, rolling the file at midnight local time. For example, the `File` option specifies the
+# location of the log files (e.g. ${kafka.logs.dir}/connect.log), and at midnight local time the file is closed
+# and compressed in the same directory but with a filename that ends in the `DatePattern` option.
+#
+appender.connectAppender.type=RollingFile
+appender.connectAppender.name=CONNECT_APPENDER
+appender.connectAppender.fileName=${kafka.logs.dir}/connect.log
+appender.connectAppender.filePattern=${kafka.logs.dir}/connect.log.%d{yyyy-MM-dd}.log.gz
+appender.connectAppender.layout.type=PatternLayout
+appender.connectAppender.policies.type=Policies
+appender.connectAppender.policies.time.type=TimeBasedTriggeringPolicy
+appender.connectAppender.policies.time.interval=1
+appender.connectAppender.policies.time.modulate=true
+appender.connectAppender.strategy.type=DefaultRolloverStrategy
+appender.connectAppender.strategy.max=1
+
+# The `%X{connector.context}` parameter in the layout includes connector-specific and task-specific information
+# in the log message, where appropriate. This makes it easier to identify those log messages that apply to a
+# specific connector. Simply add this parameter to the log layout configuration below to include the contextual information.
+#
+connect.log.pattern=[%d] %p %m (%c:%L)%n
+#connect.log.pattern=[%d] %p %X{connector.context}%m (%c:%L)%n

Review comment:
       Shouldn't `connect.log.pattern` actually be something like this:
   
   ```
   appender.stdout.layout.pattern=[%d] %p %m (%c:%L)%n
   # and 
   appender.connectAppender.layout.pattern=[%d] %p %m (%c:%L)%n
   ```
   ?

##########
File path: config/connect-log4j2.properties
##########
@@ -0,0 +1,61 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+name=ConnectConfig
+appenders=stdout,connectAppender
+
+# Send the logs to the console.
+#
+appender.stdout.type=Console
+appender.stdout.name=STDOUT
+appender.stdout.layout.type=PatternLayout
+
+# Send the logs to a file, rolling the file at midnight local time. For example, the `File` option specifies the
+# location of the log files (e.g. ${kafka.logs.dir}/connect.log), and at midnight local time the file is closed
+# and compressed in the same directory but with a filename that ends in the `DatePattern` option.
+#
+appender.connectAppender.type=RollingFile
+appender.connectAppender.name=CONNECT_APPENDER
+appender.connectAppender.fileName=${kafka.logs.dir}/connect.log
+appender.connectAppender.filePattern=${kafka.logs.dir}/connect.log.%d{yyyy-MM-dd}.log.gz
+appender.connectAppender.layout.type=PatternLayout
+appender.connectAppender.policies.type=Policies

Review comment:
       Is the `policies` really necessary if we're only using a single triggering policy? I t_hink_ you could just say 
   
   ```
   appender.connectAppender.policy.type=TimeBasedTriggeringPolicy
   appender.connectAppender.policy.interval=1
   appender.connectAppender.policy.modulate=true
   ```

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/LoggingResource.java
##########
@@ -60,67 +66,62 @@
     @GET
     @Path("/")
     public Response listLoggers() {
-        Map<String, Map<String, String>> loggers = new TreeMap<>();
-        Enumeration<Logger> enumeration = currentLoggers();
-        Collections.list(enumeration)
-                .stream()
-                .filter(logger -> logger.getLevel() != null)
-                .forEach(logger -> loggers.put(logger.getName(), levelToMap(logger)));
+        // current loggers
+        final Map<String, Map<String, String>> loggers = currentLoggers()
+            .stream()
+            .filter(logger -> logger.getLevel() != Level.OFF)
+            .collect(Collectors.toMap(logger -> logger.getName(), logger -> levelToMap(logger)));
 
+        // Replace "" logger to "root" logger
         Logger root = rootLogger();
-        if (root.getLevel() != null) {
+        if (root.getLevel() != Level.OFF) {
             loggers.put(ROOT_LOGGER_NAME, levelToMap(root));
         }
 
-        return Response.ok(loggers).build();
+        return Response.ok(new TreeMap<>(loggers)).build();
     }
 
     /**
      * Get the log level of a named logger.
      *
-     * @param namedLogger name of a logger
+     * @param loggerName name of a logger
      * @return level of the logger, effective level if the level was not explicitly set.
      */
     @GET
     @Path("/{logger}")
-    public Response getLogger(final @PathParam("logger") String namedLogger) {
-        Objects.requireNonNull(namedLogger, "require non-null name");
+    public Response getLogger(final @PathParam("logger") String loggerName) {
+        Objects.requireNonNull(loggerName, "require non-null name");
 
         Logger logger = null;
-        if (ROOT_LOGGER_NAME.equalsIgnoreCase(namedLogger)) {
+        if (ROOT_LOGGER_NAME.equalsIgnoreCase(loggerName)) {
             logger = rootLogger();
         } else {
-            Enumeration<Logger> en = currentLoggers();
-            // search within existing loggers for the given name.
-            // using LogManger.getLogger() will create a logger if it doesn't exist
-            // (potential leak since these don't get cleaned up).
-            while (en.hasMoreElements()) {
-                Logger l = en.nextElement();
-                if (namedLogger.equals(l.getName())) {
-                    logger = l;
-                    break;
-                }
+            List<Logger> en = currentLoggers();
+            Optional<Logger> found = en.stream().filter(existingLogger -> loggerName.equals(existingLogger.getName())).findAny();
+
+            if (found.isPresent()) {
+                logger = found.get();

Review comment:
       You can avoid the `if` using `found.orElse(null)`

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/LoggingResource.java
##########
@@ -60,67 +66,62 @@
     @GET
     @Path("/")
     public Response listLoggers() {
-        Map<String, Map<String, String>> loggers = new TreeMap<>();
-        Enumeration<Logger> enumeration = currentLoggers();
-        Collections.list(enumeration)
-                .stream()
-                .filter(logger -> logger.getLevel() != null)
-                .forEach(logger -> loggers.put(logger.getName(), levelToMap(logger)));
+        // current loggers
+        final Map<String, Map<String, String>> loggers = currentLoggers()
+            .stream()
+            .filter(logger -> logger.getLevel() != Level.OFF)
+            .collect(Collectors.toMap(logger -> logger.getName(), logger -> levelToMap(logger)));
 
+        // Replace "" logger to "root" logger
         Logger root = rootLogger();
-        if (root.getLevel() != null) {
+        if (root.getLevel() != Level.OFF) {
             loggers.put(ROOT_LOGGER_NAME, levelToMap(root));
         }
 
-        return Response.ok(loggers).build();
+        return Response.ok(new TreeMap<>(loggers)).build();
     }
 
     /**
      * Get the log level of a named logger.
      *
-     * @param namedLogger name of a logger
+     * @param loggerName name of a logger

Review comment:
       Was changing the parameter name really necessary? It makes the diff noisier and the old name wasn't _so_ bad.

##########
File path: core/src/main/scala/kafka/utils/Log4jController.scala
##########
@@ -20,62 +20,73 @@ package kafka.utils
 import java.util
 import java.util.Locale
 
-import org.apache.log4j.{Level, LogManager, Logger}
+import org.apache.logging.log4j.core.LoggerContext
+import org.apache.logging.log4j.{Level, LogManager}
+import org.apache.logging.log4j.core.config.{Configurator, LoggerConfig}
 
-import scala.collection.mutable
 import scala.jdk.CollectionConverters._
 
 
 object Log4jController {
+
+  /**
+   * Note: In log4j, the root logger's name was "root" and Kafka also followed that name for dynamic logging control feature.
+   *
+   * The root logger's name is changed in log4j2 to empty string (see: [[LogManager.ROOT_LOGGER_NAME]]) but for backward-
+   * compatibility. Kafka keeps its original root logger name. It is why here is a dedicated definition for the root logger name.
+   */
   val ROOT_LOGGER = "root"
 
   /**
     * Returns a map of the log4j loggers and their assigned log level.
     * If a logger does not have a log level assigned, we return the root logger's log level
     */
-  def loggers: mutable.Map[String, String] = {
-    val logs = new mutable.HashMap[String, String]()
-    val rootLoggerLvl = existingLogger(ROOT_LOGGER).getLevel.toString
-    logs.put(ROOT_LOGGER, rootLoggerLvl)
-
-    val loggers = LogManager.getCurrentLoggers
-    while (loggers.hasMoreElements) {
-      val logger = loggers.nextElement().asInstanceOf[Logger]
-      if (logger != null) {
-        val level = if (logger.getLevel != null) logger.getLevel.toString else rootLoggerLvl
-        logs.put(logger.getName, level)
-      }
-    }
-    logs
+  def loggers: Map[String, String] = {
+    val logContext = LogManager.getContext(false).asInstanceOf[LoggerContext]
+    val rootLoggerLevel = logContext.getRootLogger.getLevel.toString
+
+    logContext.getLoggers.asScala
+      .filter(_.getName != LogManager.ROOT_LOGGER_NAME)
+      .map { logger =>
+        val effectiveLevel = if (logger.getLevel != null) logger.getLevel.toString else rootLoggerLevel

Review comment:
       The default when the level is not set should be the ancestor logger's level, rather than the root logger level, But I guess you're waiting for my PR to be merged, right?

##########
File path: core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
##########
@@ -580,7 +581,9 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
     val assignment = Map(tp.partition -> Seq(0))
     TestUtils.createTopic(zkClient, tp.topic(), assignment, servers)
 
-    testControllerMove(() => zkClient.createPreferredReplicaElection(Set(tp)))
+    testControllerMove(
+      () => zkClient.createPreferredReplicaElection(Set(tp)),
+      classOf[ControllerIntegrationTest].toString + "#testControllerMoveOnPreferredReplicaElection")

Review comment:
       same comment.

##########
File path: streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
##########
@@ -81,7 +81,6 @@ public void setUp() {
         props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
         props.put("key.deserializer.encoding", "UTF8");
         props.put("value.deserializer.encoding", "UTF-16");
-        streamsConfig = new StreamsConfig(props);

Review comment:
       Was this really necessary?

##########
File path: config/log4j2.properties
##########
@@ -0,0 +1,171 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+name=KafkaConfig
+
+# Unspecified loggers and loggers with additivity=true output to server.log and stdout
+appenders=stdout,kafkaAppender,requestAppender,controllerAppender,cleanerAppender,stateChangeAppender,authorizerAppender
+
+appender.stdout.type=Console
+appender.stdout.name=STDOUT
+appender.stdout.layout.type=PatternLayout
+appender.stdout.layout.pattern=[%d] %p %m (%c)%n
+
+appender.kafkaAppender.type=RollingFile
+appender.kafkaAppender.name=KAFKA_APPENDER
+appender.kafkaAppender.fileName=${kafka.logs.dir}/server.log
+appender.kafkaAppender.filePattern=${kafka.logs.dir}/server.log.%d{yyyy-MM-dd}.log.gz
+appender.kafkaAppender.layout.type=PatternLayout
+appender.kafkaAppender.layout.pattern=[%d] %p %m (%c)%n
+appender.kafkaAppender.policies.type=Policies
+appender.kafkaAppender.policies.time.type=TimeBasedTriggeringPolicy
+appender.kafkaAppender.policies.time.interval=1
+appender.kafkaAppender.policies.time.modulate=true
+appender.kafkaAppender.strategy.type=DefaultRolloverStrategy
+appender.kafkaAppender.strategy.max=1
+
+appender.requestAppender.type=RollingFile
+appender.requestAppender.name=REQUEST_APPENDER
+appender.requestAppender.fileName=${kafka.logs.dir}/kafka-request.log
+appender.requestAppender.filePattern=${kafka.logs.dir}/kafka-request.log.%d{yyyy-MM-dd}.log.gz
+appender.requestAppender.layout.type=PatternLayout
+appender.requestAppender.layout.pattern=[%d] %p %m (%c)%n
+appender.requestAppender.policies.type=Policies
+appender.requestAppender.policies.time.type=TimeBasedTriggeringPolicy
+appender.requestAppender.policies.time.interval=1
+appender.requestAppender.policies.time.modulate=true
+appender.requestAppender.strategy.type=DefaultRolloverStrategy
+appender.requestAppender.strategy.max=1
+
+appender.controllerAppender.type=RollingFile
+appender.controllerAppender.name=CONTROLLER_APPENDER
+appender.controllerAppender.fileName=${kafka.logs.dir}/controller.log
+appender.controllerAppender.filePattern=${kafka.logs.dir}/controller.log.%d{yyyy-MM-dd}.log.gz
+appender.controllerAppender.layout.type=PatternLayout
+appender.controllerAppender.layout.pattern=[%d] %p %m (%c)%n
+appender.controllerAppender.policies.type=Policies
+appender.controllerAppender.policies.time.type=TimeBasedTriggeringPolicy
+appender.controllerAppender.policies.time.interval=1
+appender.controllerAppender.policies.time.modulate=true
+appender.controllerAppender.strategy.type=DefaultRolloverStrategy
+appender.controllerAppender.strategy.max=1
+
+appender.cleanerAppender.type=RollingFile
+appender.cleanerAppender.name=CLEANER_APPENDER
+appender.cleanerAppender.fileName=${kafka.logs.dir}/log-cleaner.log
+appender.cleanerAppender.filePattern=${kafka.logs.dir}/log-cleaner.log.%d{yyyy-MM-dd}.log.gz
+appender.cleanerAppender.layout.type=PatternLayout
+appender.cleanerAppender.layout.pattern=[%d] %p %m (%c)%n
+appender.cleanerAppender.policies.type=Policies
+appender.cleanerAppender.policies.time.type=TimeBasedTriggeringPolicy
+appender.cleanerAppender.policies.time.interval=1
+appender.cleanerAppender.policies.time.modulate=true
+appender.cleanerAppender.strategy.type=DefaultRolloverStrategy
+appender.cleanerAppender.strategy.max=1
+
+appender.stateChangeAppender.type=RollingFile
+appender.stateChangeAppender.name=STATE_CHANGE_APPENDER
+appender.stateChangeAppender.fileName=${kafka.logs.dir}/state-change.log
+appender.stateChangeAppender.filePattern=${kafka.logs.dir}/state-change.log.%d{yyyy-MM-dd}.log.gz
+appender.stateChangeAppender.layout.type=PatternLayout
+appender.stateChangeAppender.layout.pattern=[%d] %p %m (%c)%n
+appender.stateChangeAppender.policies.type=Policies
+appender.stateChangeAppender.policies.time.type=TimeBasedTriggeringPolicy
+appender.stateChangeAppender.policies.time.interval=1
+appender.stateChangeAppender.policies.time.modulate=true
+appender.stateChangeAppender.strategy.type=DefaultRolloverStrategy
+appender.stateChangeAppender.strategy.max=1
+
+appender.authorizerAppender.type=RollingFile
+appender.authorizerAppender.name=AUTHORIZER_APPENDER
+appender.authorizerAppender.fileName=${kafka.logs.dir}/kafka-authorizer.log
+appender.authorizerAppender.filePattern=${kafka.logs.dir}/kafka-authorizer.log.%d{yyyy-MM-dd}.log.gz
+appender.authorizerAppender.layout.type=PatternLayout
+appender.authorizerAppender.layout.pattern=[%d] %p %m (%c)%n
+appender.authorizerAppender.policies.type=Policies
+appender.authorizerAppender.policies.time.type=TimeBasedTriggeringPolicy
+appender.authorizerAppender.policies.time.interval=1
+appender.authorizerAppender.policies.time.modulate=true
+appender.authorizerAppender.strategy.type=DefaultRolloverStrategy
+appender.authorizerAppender.strategy.max=1
+
+# Note that INFO only applies to unspecified loggers, the log level of the child logger is used otherwise
+rootLogger.level=INFO
+rootLogger.appenderRefs=stdout,kafkaAppender
+rootLogger.appenderRef.stdout.ref=STDOUT
+rootLogger.appenderRef.kafkaAppender.ref=KAFKA_APPENDER
+
+loggers=org.apache.zookeeper,kafka,org.apache.kafka,kafka.request.logger,kafka.network.RequestChannel$,kafka.network.Processor,kafka.server.KafkaApis,kafka.network.RequestChannel$,kafka.controller,kafka.log.LogCleaner,state.change.logger,kafka.authorizer.logger

Review comment:
       Also since log4j2 I think it's often unnecessary to have to specify` loggers` and `appenders` upfront, they can be determined during parsing. Unfortunately the docs don't bother saying when it _is_ necessary, but it would be easy for users to add their `logger....name` and `logger....level` properties only to find it didn't work because they forgot about adding the name to `loggers`, so it would be good to only specify `loggers` if we really need to.
   
   Same applies to appenders and the other log4j config files too, of course.

##########
File path: config/tools-log4j2.properties
##########
@@ -12,10 +12,14 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-log4j.rootLogger=OFF, stdout
+name=ToolsConfig
+appenders=stderr
 
-log4j.appender.stdout=org.apache.log4j.ConsoleAppender
-log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n
+appender.stderr.type=Console
+appender.stderr.name=STDERR
+appender.stderr.layout.type=PatternLayout
+appender.stderr.layout.pattern=[%d] %p %m (%c)%n
 
-log4j.logger.org.apache.kafka=ERROR
+rootLogger.level=WARN

Review comment:
       Why was `log4j.rootLogger=OFF` before but `WARN` now?

##########
File path: config/log4j2.properties
##########
@@ -0,0 +1,171 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+name=KafkaConfig
+
+# Unspecified loggers and loggers with additivity=true output to server.log and stdout
+appenders=stdout,kafkaAppender,requestAppender,controllerAppender,cleanerAppender,stateChangeAppender,authorizerAppender
+
+appender.stdout.type=Console
+appender.stdout.name=STDOUT
+appender.stdout.layout.type=PatternLayout
+appender.stdout.layout.pattern=[%d] %p %m (%c)%n
+
+appender.kafkaAppender.type=RollingFile
+appender.kafkaAppender.name=KAFKA_APPENDER
+appender.kafkaAppender.fileName=${kafka.logs.dir}/server.log
+appender.kafkaAppender.filePattern=${kafka.logs.dir}/server.log.%d{yyyy-MM-dd}.log.gz
+appender.kafkaAppender.layout.type=PatternLayout
+appender.kafkaAppender.layout.pattern=[%d] %p %m (%c)%n
+appender.kafkaAppender.policies.type=Policies
+appender.kafkaAppender.policies.time.type=TimeBasedTriggeringPolicy
+appender.kafkaAppender.policies.time.interval=1
+appender.kafkaAppender.policies.time.modulate=true
+appender.kafkaAppender.strategy.type=DefaultRolloverStrategy
+appender.kafkaAppender.strategy.max=1
+
+appender.requestAppender.type=RollingFile
+appender.requestAppender.name=REQUEST_APPENDER
+appender.requestAppender.fileName=${kafka.logs.dir}/kafka-request.log
+appender.requestAppender.filePattern=${kafka.logs.dir}/kafka-request.log.%d{yyyy-MM-dd}.log.gz
+appender.requestAppender.layout.type=PatternLayout
+appender.requestAppender.layout.pattern=[%d] %p %m (%c)%n
+appender.requestAppender.policies.type=Policies
+appender.requestAppender.policies.time.type=TimeBasedTriggeringPolicy
+appender.requestAppender.policies.time.interval=1
+appender.requestAppender.policies.time.modulate=true
+appender.requestAppender.strategy.type=DefaultRolloverStrategy
+appender.requestAppender.strategy.max=1
+
+appender.controllerAppender.type=RollingFile
+appender.controllerAppender.name=CONTROLLER_APPENDER
+appender.controllerAppender.fileName=${kafka.logs.dir}/controller.log
+appender.controllerAppender.filePattern=${kafka.logs.dir}/controller.log.%d{yyyy-MM-dd}.log.gz
+appender.controllerAppender.layout.type=PatternLayout
+appender.controllerAppender.layout.pattern=[%d] %p %m (%c)%n
+appender.controllerAppender.policies.type=Policies
+appender.controllerAppender.policies.time.type=TimeBasedTriggeringPolicy
+appender.controllerAppender.policies.time.interval=1
+appender.controllerAppender.policies.time.modulate=true
+appender.controllerAppender.strategy.type=DefaultRolloverStrategy
+appender.controllerAppender.strategy.max=1
+
+appender.cleanerAppender.type=RollingFile
+appender.cleanerAppender.name=CLEANER_APPENDER
+appender.cleanerAppender.fileName=${kafka.logs.dir}/log-cleaner.log
+appender.cleanerAppender.filePattern=${kafka.logs.dir}/log-cleaner.log.%d{yyyy-MM-dd}.log.gz
+appender.cleanerAppender.layout.type=PatternLayout
+appender.cleanerAppender.layout.pattern=[%d] %p %m (%c)%n
+appender.cleanerAppender.policies.type=Policies
+appender.cleanerAppender.policies.time.type=TimeBasedTriggeringPolicy
+appender.cleanerAppender.policies.time.interval=1
+appender.cleanerAppender.policies.time.modulate=true
+appender.cleanerAppender.strategy.type=DefaultRolloverStrategy
+appender.cleanerAppender.strategy.max=1
+
+appender.stateChangeAppender.type=RollingFile
+appender.stateChangeAppender.name=STATE_CHANGE_APPENDER
+appender.stateChangeAppender.fileName=${kafka.logs.dir}/state-change.log
+appender.stateChangeAppender.filePattern=${kafka.logs.dir}/state-change.log.%d{yyyy-MM-dd}.log.gz
+appender.stateChangeAppender.layout.type=PatternLayout
+appender.stateChangeAppender.layout.pattern=[%d] %p %m (%c)%n
+appender.stateChangeAppender.policies.type=Policies
+appender.stateChangeAppender.policies.time.type=TimeBasedTriggeringPolicy
+appender.stateChangeAppender.policies.time.interval=1
+appender.stateChangeAppender.policies.time.modulate=true
+appender.stateChangeAppender.strategy.type=DefaultRolloverStrategy
+appender.stateChangeAppender.strategy.max=1
+
+appender.authorizerAppender.type=RollingFile
+appender.authorizerAppender.name=AUTHORIZER_APPENDER
+appender.authorizerAppender.fileName=${kafka.logs.dir}/kafka-authorizer.log
+appender.authorizerAppender.filePattern=${kafka.logs.dir}/kafka-authorizer.log.%d{yyyy-MM-dd}.log.gz
+appender.authorizerAppender.layout.type=PatternLayout
+appender.authorizerAppender.layout.pattern=[%d] %p %m (%c)%n
+appender.authorizerAppender.policies.type=Policies
+appender.authorizerAppender.policies.time.type=TimeBasedTriggeringPolicy
+appender.authorizerAppender.policies.time.interval=1
+appender.authorizerAppender.policies.time.modulate=true
+appender.authorizerAppender.strategy.type=DefaultRolloverStrategy
+appender.authorizerAppender.strategy.max=1
+
+# Note that INFO only applies to unspecified loggers, the log level of the child logger is used otherwise
+rootLogger.level=INFO
+rootLogger.appenderRefs=stdout,kafkaAppender
+rootLogger.appenderRef.stdout.ref=STDOUT
+rootLogger.appenderRef.kafkaAppender.ref=KAFKA_APPENDER
+
+loggers=org.apache.zookeeper,kafka,org.apache.kafka,kafka.request.logger,kafka.network.RequestChannel$,kafka.network.Processor,kafka.server.KafkaApis,kafka.network.RequestChannel$,kafka.controller,kafka.log.LogCleaner,state.change.logger,kafka.authorizer.logger

Review comment:
       Use a `\` escaped newline to avoid the very long line.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/LoggingResource.java
##########
@@ -60,67 +66,62 @@
     @GET
     @Path("/")
     public Response listLoggers() {
-        Map<String, Map<String, String>> loggers = new TreeMap<>();
-        Enumeration<Logger> enumeration = currentLoggers();
-        Collections.list(enumeration)
-                .stream()
-                .filter(logger -> logger.getLevel() != null)
-                .forEach(logger -> loggers.put(logger.getName(), levelToMap(logger)));
+        // current loggers
+        final Map<String, Map<String, String>> loggers = currentLoggers()
+            .stream()
+            .filter(logger -> logger.getLevel() != Level.OFF)
+            .collect(Collectors.toMap(logger -> logger.getName(), logger -> levelToMap(logger)));

Review comment:
       collect has a overload which takes a supplier of empty maps (I think you also have to provide a lamda for duplicate keys, annoyingly). That would allow you to avoid needing the `new TreeMap` in the `return`. Alternatively just use `forEach` as previously. 

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/LoggingResource.java
##########
@@ -133,20 +134,18 @@ public Response setLevel(final @PathParam("logger") String namedLogger,
         }
 
         List<Logger> childLoggers;
-        if (ROOT_LOGGER_NAME.equalsIgnoreCase(namedLogger)) {
-            childLoggers = Collections.list(currentLoggers());
+        if (ROOT_LOGGER_NAME.equalsIgnoreCase(loggerName)) {
+            childLoggers = new ArrayList<>(currentLoggers());
             childLoggers.add(rootLogger());
         } else {
             childLoggers = new ArrayList<>();
-            Logger ancestorLogger = lookupLogger(namedLogger);
-            Enumeration en = currentLoggers();
+            Logger ancestorLogger = lookupLogger(loggerName);
             boolean present = false;
-            while (en.hasMoreElements()) {
-                Logger current = (Logger) en.nextElement();
-                if (current.getName().startsWith(namedLogger)) {
-                    childLoggers.add(current);
+            for (Logger logger : currentLoggers()) {
+                if (logger.getName().startsWith(loggerName)) {

Review comment:
       It's a pre-existing issue, but I think this is slightly incorrect since it would tread `com.foo` as an ancestor of `com.foobar`. Really we should be using `startsWith` with a logger name that we know ends with a `.`.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/LoggingResource.java
##########
@@ -158,43 +157,50 @@ public Response setLevel(final @PathParam("logger") String namedLogger,
         List<String> modifiedLoggerNames = new ArrayList<>();
         for (Logger logger: childLoggers) {
             logger.setLevel(level);
-            modifiedLoggerNames.add(logger.getName());
+            if ("".equals(logger.getName())) {
+                modifiedLoggerNames.add("root");
+            } else {
+                modifiedLoggerNames.add(logger.getName());
+            }
         }
         Collections.sort(modifiedLoggerNames);
 
         return Response.ok(modifiedLoggerNames).build();
     }
 
-    protected Logger lookupLogger(String namedLogger) {
-        return LogManager.getLogger(namedLogger);
+    protected Logger lookupLogger(String loggerName) {
+        final LoggerContext loggerContext = (LoggerContext) LogManager.getContext(false);
+
+        return loggerContext.getLogger(loggerName);
     }
 
     @SuppressWarnings("unchecked")
-    protected Enumeration<Logger> currentLoggers() {
-        return LogManager.getCurrentLoggers();
+    protected List<Logger> currentLoggers() {
+        final LoggerContext loggerContext = (LoggerContext) LogManager.getContext(false);
+
+        return loggerContext.getLoggers()
+            .stream()
+            .filter(logger -> !logger.getName().equals(""))

Review comment:
       This is the second time you've got a `.equals("")`. It might be worth factoring into a `isRootLogger(Logger)` method.

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java
##########
@@ -63,7 +63,7 @@
 import static org.junit.Assert.assertTrue;
 
 @RunWith(PowerMockRunner.class)
-@PowerMockIgnore({"javax.net.ssl.*", "javax.security.*", "javax.crypto.*"})
+@PowerMockIgnore({"javax.net.ssl.*", "javax.security.*", "javax.crypto.*", "javax.management.*"})

Review comment:
       Why was this necessary?

##########
File path: core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
##########
@@ -556,7 +557,7 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
     testControllerMove(() => {
       val adminZkClient = new AdminZkClient(zkClient)
       adminZkClient.createTopicWithAssignment(tp.topic, config = new Properties(), assignment)
-    })
+    }, classOf[ControllerIntegrationTest].toString + "#testControllerMoveOnTopicCreation")

Review comment:
       `s"${classOf[ControllerIntegrationTest]}#testControllerMoveOnTopicCreation"`, and if not then there should be no need for the `toString`

##########
File path: core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
##########
@@ -783,14 +788,13 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
       TestUtils.waitUntilTrue(() => !controller.isActive, "Controller fails to resign")
 
       // Expect to capture the ControllerMovedException in the log of ControllerEventThread
-      val event = appender.getMessages.find(e => e.getLevel == Level.INFO
-        && e.getThrowableInformation != null
-        && e.getThrowableInformation.getThrowable.getClass.getName.equals(classOf[ControllerMovedException].getName))
+      logCaptureContext.await(30, TimeUnit.SECONDS)

Review comment:
       It's a shame about the new asynchrony here, but I don't see an obvious we of avoiding it.

##########
File path: core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
##########
@@ -570,7 +571,7 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
     testControllerMove(() => {
       val adminZkClient = new AdminZkClient(zkClient)
       adminZkClient.deleteTopic(tp.topic())
-    })
+    }, classOf[ControllerIntegrationTest].toString + "#testControllerMoveOnTopicDeletion")

Review comment:
       same comment.

##########
File path: core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
##########
@@ -592,7 +595,9 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
     TestUtils.createTopic(zkClient, tp.topic(), assignment, servers)
 
     val reassignment = Map(tp -> Seq(0))
-    testControllerMove(() => zkClient.createPartitionReassignment(reassignment))
+    testControllerMove(
+      () => zkClient.createPartitionReassignment(reassignment),
+      classOf[ControllerIntegrationTest].toString + "#testControllerMoveOnPartitionReassignment")

Review comment:
       same comment

##########
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##########
@@ -102,6 +103,7 @@
 import static org.junit.Assert.fail;
 
 @RunWith(PowerMockRunner.class)
+@PowerMockIgnore("javax.management.*")

Review comment:
       Again it's not really clear to me why this is added

##########
File path: core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
##########
@@ -2063,10 +2063,21 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
     client = Admin.create(createConfig)
 
     val loggerConfig = describeBrokerLoggers()
-    val rootLogLevel = loggerConfig.get(Log4jController.ROOT_LOGGER).value()
-    val logCleanerLogLevelConfig = loggerConfig.get("kafka.cluster.Replica")
-    assertEquals(rootLogLevel, logCleanerLogLevelConfig.value()) // we expect an undefined log level to be the same as the root logger
-    assertEquals("kafka.cluster.Replica", logCleanerLogLevelConfig.name())
+
+    // Root logger is always contained in the loggers list.
+    assertEquals("OFF", loggerConfig.get(Log4jController.ROOT_LOGGER).value())
+
+    // Logger name can't be empty.
+    assertNull(loggerConfig.get(""))
+
+    // Since `kafka` is not defined, it is not contained in the loggers list.
+    assertNull(loggerConfig.get("kafka"))
+
+    // We expect the logger level is inherited from its parent logger level configuration.
+    // For example, `kafka.cluster.Partition` from `kafka` (ERROR).
+    val logCleanerLogLevelConfig = loggerConfig.get("kafka.cluster.Partition")
+    assertEquals("ERROR", logCleanerLogLevelConfig.value()) // we expect an undefined log level to be the same as its parent logger
+    assertEquals("kafka.cluster.Partition", logCleanerLogLevelConfig.name())

Review comment:
       We probably need better coverage in the alter case (`testIncrementalAlterConfigsForLog4jLogLevels()`, below). It tests inheritance from the root logger, but not an ancestor logger. But I guess this is something I should add to my PR.

##########
File path: core/src/test/scala/unit/kafka/utils/LogCaptureContext.scala
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package unit.kafka.utils
+
+import java.util.concurrent.{CountDownLatch, TimeUnit}
+
+import org.apache.logging.log4j.Level
+import org.apache.logging.log4j.core.{LogEvent, LoggerContext}
+import org.apache.logging.log4j.test.appender.ListAppender
+
+import scala.jdk.CollectionConverters._
+
+class LogCaptureContext(listAppender: ListAppender, prevLevelMap: Map[String, Level]) extends AutoCloseable {
+
+  def setLatch(size: Int): Unit = {
+    this.listAppender.countDownLatch = new CountDownLatch(size)
+  }
+
+  @throws[InterruptedException]
+  def await(l: Long, timeUnit: TimeUnit): Unit = {
+    this.listAppender.countDownLatch.await(l, timeUnit)
+  }
+
+  def getMessages: Seq[LogEvent] = listAppender.getEvents.asScala.toSeq
+
+  override def close(): Unit = {
+    val loggerContext = LoggerContext.getContext(false)
+    loggerContext.getRootLogger.removeAppender(listAppender)
+    listAppender.stop()
+
+    // Restore previous logger levels
+    prevLevelMap.foreach { e =>
+      val loggerName = e._1
+      val level = e._2
+      loggerContext.getLogger(loggerName).setLevel(level)
+    }
+  }
+}
+
+object LogCaptureContext {
+  def apply(name: String, levelMap: Map[String, String] = Map()): LogCaptureContext = {

Review comment:
       Do we really need the caller to supply a `name`? It seems to force you to have to construct a unique name each time you want to use it, based on the test class and method name. But all that's needed is uniqueness and the removal of the appender at the end of the test, AFAICS. So just using a UUID or similar generated name would be sufficient and make call sites rather easier to read.

##########
File path: core/src/test/scala/unit/kafka/utils/LogCaptureContext.scala
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package unit.kafka.utils
+
+import java.util.concurrent.{CountDownLatch, TimeUnit}
+
+import org.apache.logging.log4j.Level
+import org.apache.logging.log4j.core.{LogEvent, LoggerContext}
+import org.apache.logging.log4j.test.appender.ListAppender
+
+import scala.jdk.CollectionConverters._
+
+class LogCaptureContext(listAppender: ListAppender, prevLevelMap: Map[String, Level]) extends AutoCloseable {
+
+  def setLatch(size: Int): Unit = {

Review comment:
       I can see this being a source of difficult to maintain tests, if you have to tweak the latch every time logging statements are added or removed. 

##########
File path: core/src/test/scala/unit/kafka/utils/LogCaptureContext.scala
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package unit.kafka.utils
+
+import java.util.concurrent.{CountDownLatch, TimeUnit}
+
+import org.apache.logging.log4j.Level
+import org.apache.logging.log4j.core.{LogEvent, LoggerContext}
+import org.apache.logging.log4j.test.appender.ListAppender
+
+import scala.jdk.CollectionConverters._
+
+class LogCaptureContext(listAppender: ListAppender, prevLevelMap: Map[String, Level]) extends AutoCloseable {
+
+  def setLatch(size: Int): Unit = {
+    this.listAppender.countDownLatch = new CountDownLatch(size)
+  }
+
+  @throws[InterruptedException]
+  def await(l: Long, timeUnit: TimeUnit): Unit = {
+    this.listAppender.countDownLatch.await(l, timeUnit)
+  }
+
+  def getMessages: Seq[LogEvent] = listAppender.getEvents.asScala.toSeq

Review comment:
       I wonder if it would simplify the tests if this had methods for asserting the existence of messages (optionally within a timeout) rather than having to use the `setLatch(), await(), getMessages()` pattern in every test.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org