You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2011/10/06 07:10:10 UTC
svn commit: r1179501 - in /incubator/kafka/trunk/core/src/main/scala/kafka:
server/KafkaServer.scala utils/Mx4jLoader.scala
Author: junrao
Date: Thu Oct 6 05:10:09 2011
New Revision: 1179501
URL: http://svn.apache.org/viewvc?rev=1179501&view=rev
Log:
add optional mx4j support; patched by Chris Burroughs; reviewed by Jun Rao; KAFKA-78
Added:
incubator/kafka/trunk/core/src/main/scala/kafka/utils/Mx4jLoader.scala
Modified:
incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServer.scala
Modified: incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServer.scala?rev=1179501&r1=1179500&r2=1179501&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServer.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/server/KafkaServer.scala Thu Oct 6 05:10:09 2011
@@ -22,7 +22,7 @@ import org.apache.log4j.Logger
import kafka.log.LogManager
import java.util.concurrent.CountDownLatch
import java.util.concurrent.atomic.AtomicBoolean
-import kafka.utils.{Utils, SystemTime, KafkaScheduler}
+import kafka.utils.{Mx4jLoader, Utils, SystemTime, KafkaScheduler}
import kafka.network.{SocketServerStats, SocketServer}
import java.io.File
@@ -72,6 +72,7 @@ class KafkaServer(val config: KafkaConfi
config.maxSocketRequestSize)
Utils.swallow(logger.warn, Utils.registerMBean(socketServer.stats, statsMBeanName))
socketServer.startup
+ Mx4jLoader.maybeLoad
/**
* Registers this broker in ZK. After this, consumers can connect to broker.
* So this should happen after socket server start.
Added: incubator/kafka/trunk/core/src/main/scala/kafka/utils/Mx4jLoader.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/utils/Mx4jLoader.scala?rev=1179501&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/utils/Mx4jLoader.scala (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/utils/Mx4jLoader.scala Thu Oct 6 05:10:09 2011
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.utils
+
+
+import java.lang.management.ManagementFactory
+import javax.management.ObjectName
+import org.apache.log4j.Logger
+
+/**
+ * If mx4j-tools is in the classpath call maybeLoad to load the HTTP interface of mx4j.
+ *
+ * The default port is 8082. To override that provide e.g. -Dmx4jport=8083
+ * The default listen address is 0.0.0.0. To override that provide -Dmx4jaddress=127.0.0.1
+ * This feature must be enabled with -Dmx4jenable=true
+ *
+ * This is a Scala port of org.apache.cassandra.utils.Mx4jTool written by Ran Tavory for CASSANDRA-1068
+ * */
+object Mx4jLoader {
+ private val logger = Logger.getLogger(getClass())
+
+ def maybeLoad(): Boolean = {
+ if (!Utils.getBoolean(System.getProperties(), "kafka_mx4jenable", false))
+ false
+ val address = System.getProperty("mx4jaddress", "0.0.0.0")
+ val port = Utils.getInt(System.getProperties(), "mx4jport", 8082)
+ try {
+ logger.debug("Will try to load MX4j now, if it's in the classpath");
+
+ val mbs = ManagementFactory.getPlatformMBeanServer()
+ val processorName = new ObjectName("Server:name=XSLTProcessor")
+
+ val httpAdaptorClass = Class.forName("mx4j.tools.adaptor.http.HttpAdaptor")
+ val httpAdaptor = httpAdaptorClass.newInstance()
+ httpAdaptorClass.getMethod("setHost", classOf[String]).invoke(httpAdaptor, address.asInstanceOf[AnyRef])
+ httpAdaptorClass.getMethod("setPort", Integer.TYPE).invoke(httpAdaptor, port.asInstanceOf[AnyRef])
+
+ val httpName = new ObjectName("system:name=http")
+ mbs.registerMBean(httpAdaptor, httpName)
+
+ val xsltProcessorClass = Class.forName("mx4j.tools.adaptor.http.XSLTProcessor")
+ val xsltProcessor = xsltProcessorClass.newInstance()
+ httpAdaptorClass.getMethod("setProcessor", Class.forName("mx4j.tools.adaptor.http.ProcessorMBean")).invoke(httpAdaptor, xsltProcessor.asInstanceOf[AnyRef])
+ mbs.registerMBean(xsltProcessor, processorName)
+ httpAdaptorClass.getMethod("start").invoke(httpAdaptor)
+ logger.info("mx4j successfuly loaded")
+ true
+ }
+ catch {
+ case e: ClassNotFoundException => {
+ logger.info("Will not load MX4J, mx4j-tools.jar is not in the classpath");
+ }
+ case e => {
+ logger.warn("Could not start register mbean in JMX", e);
+ }
+ }
+ false
+ }
+}