You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2012/08/25 03:14:41 UTC
svn commit: r1377175 - in
/incubator/kafka/branches/0.8/core/src/main/scala/kafka: Kafka.scala
producer/async/DefaultEventHandler.scala utils/Log4jController.scala
utils/Logging.scala utils/Utils.scala
Author: junrao
Date: Sat Aug 25 01:14:41 2012
New Revision: 1377175
URL: http://svn.apache.org/viewvc?rev=1377175&view=rev
Log:
Expose JMX operation to set logger level dynamically (0.8 branch); patched by Jun Rao; reviewed by Jay Kreps; KAFKA-429
Added:
incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Log4jController.scala
Modified:
incubator/kafka/branches/0.8/core/src/main/scala/kafka/Kafka.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Logging.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/Kafka.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/Kafka.scala?rev=1377175&r1=1377174&r2=1377175&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/Kafka.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/Kafka.scala Sat Aug 25 01:14:41 2012
@@ -17,18 +17,13 @@
package kafka
-
import metrics.{KafkaMetricsReporterMBean, KafkaMetricsReporter, KafkaMetricsConfig}
import server.{KafkaConfig, KafkaServerStartable, KafkaServer}
import utils.{Utils, Logging}
-import org.apache.log4j.jmx.LoggerDynamicMBean
object Kafka extends Logging {
def main(args: Array[String]): Unit = {
- val kafkaLog4jMBeanName = "kafka:type=kafka.KafkaLog4j"
- import org.apache.log4j.Logger
- Utils.registerMBean(new LoggerDynamicMBean(Logger.getRootLogger()), kafkaLog4jMBeanName)
if (args.length != 1) {
println("USAGE: java [options] %s server.properties".format(classOf[KafkaServer].getSimpleName()))
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala?rev=1377175&r1=1377174&r2=1377175&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala Sat Aug 25 01:14:41 2012
@@ -54,7 +54,7 @@ class DefaultEventHandler[K,V](config: P
}
}
if(outstandingProduceRequests.size > 0) {
- error("Failed to send the following reqeusts: " + outstandingProduceRequests)
+ error("Failed to send the following requests: " + outstandingProduceRequests)
throw new FailedToSendMessageException("Failed to send messages after " + config.producerRetries + " tries.", null)
}
}
@@ -81,7 +81,7 @@ class DefaultEventHandler[K,V](config: P
}
}
} catch {
- case t: Throwable => error("Failed to send messages")
+ case t: Throwable => error("Failed to send messages", t)
}
failedProduceRequests
case None => // all produce requests failed
Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Log4jController.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Log4jController.scala?rev=1377175&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Log4jController.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Log4jController.scala Sat Aug 25 01:14:41 2012
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.utils
+
+
+import org.apache.log4j.{Logger, Level, LogManager}
+import java.util
+
+
+object Log4jController {
+
+ private val controller = new Log4jController
+
+ Utils.registerMBean(controller, "kafka:type=kafka.Log4jController")
+
+}
+
+
+/**
+ * An MBean that allows the user to dynamically alter log4j levels at runtime.
+ * The companion object contains the singleton instance of this class and
+ * registers the MBean. The [[kafka.utils.Logging]] trait forces initialization
+ * of the companion object.
+ */
+private class Log4jController extends Log4jControllerMBean {
+
+ def getLoggers = {
+ val lst = new util.ArrayList[String]()
+ lst.add("root=" + existingLogger("root").getLevel.toString)
+ val loggers = LogManager.getCurrentLoggers
+ while (loggers.hasMoreElements) {
+ val logger = loggers.nextElement().asInstanceOf[Logger]
+ if (logger != null) {
+ val level = if (logger != null) logger.getLevel else null
+ lst.add("%s=%s".format(logger.getName, if (level != null) level.toString else "null"))
+ }
+ }
+ lst
+ }
+
+
+ private def newLogger(loggerName: String) =
+ if (loggerName == "root")
+ LogManager.getRootLogger
+ else LogManager.getLogger(loggerName)
+
+
+ private def existingLogger(loggerName: String) =
+ if (loggerName == "root")
+ LogManager.getRootLogger
+ else LogManager.exists(loggerName)
+
+
+ def getLogLevel(loggerName: String) = {
+ val log = existingLogger(loggerName)
+ if (log != null) {
+ val level = log.getLevel
+ if (level != null)
+ log.getLevel.toString
+ else "Null log level."
+ }
+ else "No such logger."
+ }
+
+
+ def setLogLevel(loggerName: String, level: String) = {
+ val log = newLogger(loggerName)
+ if (!loggerName.trim.isEmpty && !level.trim.isEmpty && log != null) {
+ log.setLevel(Level.toLevel(level.toUpperCase))
+ true
+ }
+ else false
+ }
+
+}
+
+
+private trait Log4jControllerMBean {
+ def getLoggers: java.util.List[String]
+ def getLogLevel(logger: String): String
+ def setLogLevel(logger: String, level: String): Boolean
+}
+
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Logging.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Logging.scala?rev=1377175&r1=1377174&r2=1377175&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Logging.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Logging.scala Sat Aug 25 01:14:41 2012
@@ -24,7 +24,10 @@ trait Logging {
lazy val logger = Logger.getLogger(loggerName)
protected var logIdent = ""
-
+
+ // Force initialization to register Log4jControllerMBean
+ private val log4jController = Log4jController
+
private def msgWithLogIdent(msg: String) = "%s%s".format(logIdent, msg)
def trace(msg: => String): Unit = {
@@ -111,4 +114,4 @@ trait Logging {
def fatal(msg: => String, e: => Throwable) = {
logger.fatal(msgWithLogIdent(msg),e)
}
-}
\ No newline at end of file
+}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala?rev=1377175&r1=1377174&r2=1377175&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala Sat Aug 25 01:14:41 2012
@@ -525,7 +525,7 @@ object Utils extends Logging {
/**
* Read an unsigned integer from the current position in the buffer,
* incrementing the position by 4 bytes
- * @param The buffer to read from
+ * @param buffer The buffer to read from
* @return The integer read, as a long to avoid signedness
*/
def getUnsignedInt(buffer: ByteBuffer): Long =