You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2011/10/06 07:10:10 UTC

svn commit: r1179501 - in /incubator/kafka/trunk/core/src/main/scala/kafka: server/KafkaServer.scala utils/Mx4jLoader.scala

Author: junrao
Date: Thu Oct  6 05:10:09 2011
New Revision: 1179501

URL: http://svn.apache.org/viewvc?rev=1179501&view=rev
Log:
add optional mx4j support; patched by Chris Burroughs; reviewed by Jun Rao; KAFKA-78

Added:
    incubator/kafka/trunk/core/src/main/scala/kafka/utils/Mx4jLoader.scala
Modified:
    incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServer.scala

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServer.scala?rev=1179501&r1=1179500&r2=1179501&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServer.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServer.scala Thu Oct  6 05:10:09 2011
@@ -22,7 +22,7 @@ import org.apache.log4j.Logger
 import kafka.log.LogManager
 import java.util.concurrent.CountDownLatch
 import java.util.concurrent.atomic.AtomicBoolean
-import kafka.utils.{Utils, SystemTime, KafkaScheduler}
+import kafka.utils.{Mx4jLoader, Utils, SystemTime, KafkaScheduler}
 import kafka.network.{SocketServerStats, SocketServer}
 import java.io.File
 
@@ -72,6 +72,7 @@ class KafkaServer(val config: KafkaConfi
                                       config.maxSocketRequestSize)
       Utils.swallow(logger.warn, Utils.registerMBean(socketServer.stats, statsMBeanName))
       socketServer.startup
+      Mx4jLoader.maybeLoad
       /**
        *  Registers this broker in ZK. After this, consumers can connect to broker.
        *  So this should happen after socket server start.

Added: incubator/kafka/trunk/core/src/main/scala/kafka/utils/Mx4jLoader.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/utils/Mx4jLoader.scala?rev=1179501&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/utils/Mx4jLoader.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/utils/Mx4jLoader.scala Thu Oct  6 05:10:09 2011
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.utils
+
+
+import java.lang.management.ManagementFactory
+import javax.management.ObjectName
+import org.apache.log4j.Logger
+
+/**
+ * If mx4j-tools is in the classpath call maybeLoad to load the HTTP interface of mx4j.
+ *
+ * The default port is 8082. To override that provide e.g. -Dmx4jport=8083
+ * The default listen address is 0.0.0.0. To override that provide -Dmx4jaddress=127.0.0.1
+ * This feature must be enabled with -Dmx4jenable=true
+ *
+ * This is a Scala port of org.apache.cassandra.utils.Mx4jTool written by Ran Tavory for CASSANDRA-1068
+ * */
+object Mx4jLoader {
+  private val logger = Logger.getLogger(getClass())
+
+  def maybeLoad(): Boolean = {
+    if (!Utils.getBoolean(System.getProperties(), "kafka_mx4jenable", false))
+      false
+    val address = System.getProperty("mx4jaddress", "0.0.0.0")
+    val port = Utils.getInt(System.getProperties(), "mx4jport", 8082)
+    try {
+      logger.debug("Will try to load MX4j now, if it's in the classpath");
+
+      val mbs = ManagementFactory.getPlatformMBeanServer()
+      val processorName = new ObjectName("Server:name=XSLTProcessor")
+
+      val httpAdaptorClass = Class.forName("mx4j.tools.adaptor.http.HttpAdaptor")
+      val httpAdaptor = httpAdaptorClass.newInstance()
+      httpAdaptorClass.getMethod("setHost", classOf[String]).invoke(httpAdaptor, address.asInstanceOf[AnyRef])
+      httpAdaptorClass.getMethod("setPort", Integer.TYPE).invoke(httpAdaptor, port.asInstanceOf[AnyRef])
+
+      val httpName = new ObjectName("system:name=http")
+      mbs.registerMBean(httpAdaptor, httpName)
+
+      val xsltProcessorClass = Class.forName("mx4j.tools.adaptor.http.XSLTProcessor")
+      val xsltProcessor = xsltProcessorClass.newInstance()
+      httpAdaptorClass.getMethod("setProcessor", Class.forName("mx4j.tools.adaptor.http.ProcessorMBean")).invoke(httpAdaptor, xsltProcessor.asInstanceOf[AnyRef])
+      mbs.registerMBean(xsltProcessor, processorName)
+      httpAdaptorClass.getMethod("start").invoke(httpAdaptor)
+      logger.info("mx4j successfuly loaded")
+      true
+    }
+    catch {
+	  case e: ClassNotFoundException => {
+        logger.info("Will not load MX4J, mx4j-tools.jar is not in the classpath");
+      }
+      case e => {
+        logger.warn("Could not start register mbean in JMX", e);
+      }
+    }
+    false
+  }
+}