You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2012/08/25 03:14:41 UTC

svn commit: r1377175 - in /incubator/kafka/branches/0.8/core/src/main/scala/kafka: Kafka.scala producer/async/DefaultEventHandler.scala utils/Log4jController.scala utils/Logging.scala utils/Utils.scala

Author: junrao
Date: Sat Aug 25 01:14:41 2012
New Revision: 1377175

URL: http://svn.apache.org/viewvc?rev=1377175&view=rev
Log:
Expose JMX operation to set logger level dynamically (0.8 branch); patched by Jun Rao; reviewed by Jay Kreps; KAFKA-429

Added:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Log4jController.scala
Modified:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/Kafka.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Logging.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/Kafka.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/Kafka.scala?rev=1377175&r1=1377174&r2=1377175&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/Kafka.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/Kafka.scala Sat Aug 25 01:14:41 2012
@@ -17,18 +17,13 @@
 
 package kafka
 
-
 import metrics.{KafkaMetricsReporterMBean, KafkaMetricsReporter, KafkaMetricsConfig}
 import server.{KafkaConfig, KafkaServerStartable, KafkaServer}
 import utils.{Utils, Logging}
-import org.apache.log4j.jmx.LoggerDynamicMBean
 
 object Kafka extends Logging {
 
   def main(args: Array[String]): Unit = {
-    val kafkaLog4jMBeanName = "kafka:type=kafka.KafkaLog4j"
-    import org.apache.log4j.Logger
-    Utils.registerMBean(new LoggerDynamicMBean(Logger.getRootLogger()), kafkaLog4jMBeanName)
 
     if (args.length != 1) {
       println("USAGE: java [options] %s server.properties".format(classOf[KafkaServer].getSimpleName()))

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala?rev=1377175&r1=1377174&r2=1377175&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala Sat Aug 25 01:14:41 2012
@@ -54,7 +54,7 @@ class DefaultEventHandler[K,V](config: P
         }
       }
       if(outstandingProduceRequests.size > 0) {
-        error("Failed to send the following reqeusts: " + outstandingProduceRequests)
+        error("Failed to send the following requests: " + outstandingProduceRequests)
         throw new FailedToSendMessageException("Failed to send messages after " + config.producerRetries + " tries.", null)
       }
     }
@@ -81,7 +81,7 @@ class DefaultEventHandler[K,V](config: P
             }
           }
         } catch {
-          case t: Throwable => error("Failed to send messages")
+          case t: Throwable => error("Failed to send messages", t)
         }
         failedProduceRequests
       case None => // all produce requests failed

Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Log4jController.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Log4jController.scala?rev=1377175&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Log4jController.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Log4jController.scala Sat Aug 25 01:14:41 2012
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.utils
+
+
+import org.apache.log4j.{Logger, Level, LogManager}
+import java.util
+
+
+object Log4jController {
+
+  private val controller = new Log4jController
+
+  Utils.registerMBean(controller, "kafka:type=kafka.Log4jController")
+
+}
+
+
+/**
+ * An MBean that allows the user to dynamically alter log4j levels at runtime.
+ * The companion object contains the singleton instance of this class and
+ * registers the MBean. The [[kafka.utils.Logging]] trait forces initialization
+ * of the companion object.
+ */
+private class Log4jController extends Log4jControllerMBean {
+
+  def getLoggers = {
+    val lst = new util.ArrayList[String]()
+    lst.add("root=" + existingLogger("root").getLevel.toString)
+    val loggers = LogManager.getCurrentLoggers
+    while (loggers.hasMoreElements) {
+      val logger = loggers.nextElement().asInstanceOf[Logger]
+      if (logger != null) {
+        val level =  if (logger != null) logger.getLevel else null
+        lst.add("%s=%s".format(logger.getName, if (level != null) level.toString else "null"))
+      }
+    }
+    lst
+  }
+
+
+  private def newLogger(loggerName: String) =
+    if (loggerName == "root")
+      LogManager.getRootLogger
+    else LogManager.getLogger(loggerName)
+
+
+  private def existingLogger(loggerName: String) =
+    if (loggerName == "root")
+      LogManager.getRootLogger
+    else LogManager.exists(loggerName)
+
+
+  def getLogLevel(loggerName: String) = {
+    val log = existingLogger(loggerName)
+    if (log != null) {
+      val level = log.getLevel
+      if (level != null)
+        log.getLevel.toString
+      else "Null log level."
+    }
+    else "No such logger."
+  }
+
+
+  def setLogLevel(loggerName: String, level: String) = {
+    val log = newLogger(loggerName)
+    if (!loggerName.trim.isEmpty && !level.trim.isEmpty && log != null) {
+      log.setLevel(Level.toLevel(level.toUpperCase))
+      true
+    }
+    else false
+  }
+
+}
+
+
+private trait Log4jControllerMBean {
+  def getLoggers: java.util.List[String]
+  def getLogLevel(logger: String): String
+  def setLogLevel(logger: String, level: String): Boolean
+}
+

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Logging.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Logging.scala?rev=1377175&r1=1377174&r2=1377175&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Logging.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Logging.scala Sat Aug 25 01:14:41 2012
@@ -24,7 +24,10 @@ trait Logging {
   lazy val logger = Logger.getLogger(loggerName)
 
   protected var logIdent = ""
-  
+
+  // Force initialization to register Log4jControllerMBean
+  private val log4jController = Log4jController
+
   private def msgWithLogIdent(msg: String) = "%s%s".format(logIdent, msg)
 
   def trace(msg: => String): Unit = {
@@ -111,4 +114,4 @@ trait Logging {
   def fatal(msg: => String, e: => Throwable) = {
     logger.fatal(msgWithLogIdent(msg),e)
   }
-}
\ No newline at end of file
+}

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala?rev=1377175&r1=1377174&r2=1377175&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala Sat Aug 25 01:14:41 2012
@@ -525,7 +525,7 @@ object Utils extends Logging {
   /**
    * Read an unsigned integer from the current position in the buffer, 
    * incrementing the position by 4 bytes
-   * @param The buffer to read from
+   * @param buffer The buffer to read from
    * @return The integer read, as a long to avoid signedness
    */
   def getUnsignedInt(buffer: ByteBuffer): Long =