You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by mp...@apache.org on 2012/07/17 10:17:19 UTC
svn commit: r1362401 - in /flume/trunk:
flume-ng-core/src/main/java/org/apache/flume/instrumentation/
flume-ng-doc/sphinx/
flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/
Author: mpercy
Date: Tue Jul 17 08:17:19 2012
New Revision: 1362401
URL: http://svn.apache.org/viewvc?rev=1362401&view=rev
Log:
FLUME-1374. Support ganglia reporting.
(Hari Shreedharan via Mike Percy)
Added:
flume/trunk/flume-ng-core/src/main/java/org/apache/flume/instrumentation/GangliaServer.java
flume/trunk/flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitorService.java
flume/trunk/flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitoringType.java
Modified:
flume/trunk/flume-ng-doc/sphinx/FlumeUserGuide.rst
flume/trunk/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java
Added: flume/trunk/flume-ng-core/src/main/java/org/apache/flume/instrumentation/GangliaServer.java
URL: http://svn.apache.org/viewvc/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/instrumentation/GangliaServer.java?rev=1362401&view=auto
==============================================================================
--- flume/trunk/flume-ng-core/src/main/java/org/apache/flume/instrumentation/GangliaServer.java (added)
+++ flume/trunk/flume-ng-core/src/main/java/org/apache/flume/instrumentation/GangliaServer.java Tue Jul 17 08:17:19 2012
@@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.instrumentation;
+
+import com.google.common.base.Throwables;
+import java.lang.management.ManagementFactory;
+import java.net.DatagramPacket;
+import java.net.DatagramSocket;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.net.SocketException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import javax.management.Attribute;
+import javax.management.AttributeList;
+import javax.management.MBeanAttributeInfo;
+import javax.management.MBeanServer;
+import javax.management.ObjectInstance;
+import javax.management.ObjectName;
+import org.apache.flume.Context;
+import org.apache.flume.FlumeException;
+import org.apache.flume.api.HostInfo;
+import org.apache.flume.conf.ConfigurationException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A Ganglia server that polls JMX based at a configured frequency (defaults to
+ * once every 60 seconds). This implementation can send data to ganglia 3 and
+ * ganglia 3.1. <p>
+ *
+ * <b>Mandatory Parameters:</b><p>
+ * <tt>hosts: </tt> List of comma separated hostname:ports of ganglia
+ * servers to report metrics to. <p>
+ * <b>Optional Parameters: </b><p>
+ * <tt>pollFrequency:</tt>Interval in seconds between consecutive reports to
+ * ganglia servers. Default = 60 seconds.<p>
+ * <tt>isGanglia3:</tt> Report to ganglia 3 ? Default = false - reports to
+ * ganglia 3.1.
+ *
+ *
+ *
+ */
+
+public class GangliaServer implements MonitorService {
+ /*
+ * The Ganglia protocol specific stuff: the xdr_* methods
+ * and the sendToGanglia* methods have been shamelessly ripped off
+ * from Hadoop. All hail the yellow elephant!
+ */
+
+ private static final Logger logger =
+ LoggerFactory.getLogger(GangliaServer.class);
+ public static final int BUFFER_SIZE = 1500; // as per libgmond.c
+ protected byte[] buffer = new byte[BUFFER_SIZE];
+ protected int offset;
+ private final List<SocketAddress> addresses = new ArrayList<SocketAddress>();
+ private DatagramSocket socket = null;
+ private ScheduledExecutorService service =
+ Executors.newSingleThreadScheduledExecutor();
+ private List<HostInfo> hosts;
+ protected final GangliaCollector collectorRunnable;
+ private int pollFrequency = 60;
+ public static final String DEFAULT_UNITS = "";
+ public static final int DEFAULT_TMAX = 60;
+ public static final int DEFAULT_DMAX = 0;
+ public static final int DEFAULT_SLOPE = 3;
+ public static final String GANGLIA_DOUBLE_TYPE = "double";
+ private volatile boolean isGanglia3 = false;
+ private String hostname;
+ public final String CONF_POLL_FREQUENCY = "pollFrequency";
+ public final int DEFAULT_POLL_FREQUENCY = 60;
+ public final String CONF_HOSTS = "hosts";
+ public final String CONF_ISGANGLIA3 = "isGanglia3";
+
+ /**
+ *
+ * @param hosts List of hosts to send the metrics to. All of them have to be
+ * running the version of ganglia specified by the configuration.
+ * @throws FlumeException
+ */
+ public GangliaServer() throws FlumeException {
+ collectorRunnable = new GangliaCollector();
+ }
+
+ /**
+ * Puts a string into the buffer by first writing the size of the string as an
+ * int, followed by the bytes of the string, padded if necessary to a multiple
+ * of 4.
+ *
+ * @param s the string to be written to buffer at offset location
+ */
+ protected void xdr_string(String s) {
+ byte[] bytes = s.getBytes();
+ int len = bytes.length;
+ xdr_int(len);
+ System.arraycopy(bytes, 0, buffer, offset, len);
+ offset += len;
+ pad();
+ }
+
+ /**
+ * Pads the buffer with zero bytes up to the nearest multiple of 4.
+ */
+ private void pad() {
+ int newOffset = ((offset + 3) / 4) * 4;
+ while (offset < newOffset) {
+ buffer[offset++] = 0;
+ }
+ }
+
+ /**
+ * Puts an integer into the buffer as 4 bytes, big-endian.
+ */
+ protected void xdr_int(int i) {
+ buffer[offset++] = (byte) ((i >> 24) & 0xff);
+ buffer[offset++] = (byte) ((i >> 16) & 0xff);
+ buffer[offset++] = (byte) ((i >> 8) & 0xff);
+ buffer[offset++] = (byte) (i & 0xff);
+ }
+
+ public synchronized void sendToGangliaNodes() {
+ DatagramPacket packet;
+ for (SocketAddress addr : addresses) {
+ try {
+ packet = new DatagramPacket(buffer, offset, addr);
+ socket.send(packet);
+ } catch (Exception ex) {
+ logger.warn("Could not send metrics to metrics server: "
+ + addr.toString(), ex);
+ }
+ }
+ offset = 0;
+ }
+
+ /**
+ * Start this server, causing it to poll JMX at the configured frequency.
+ */
+ @Override
+ public void start() {
+ try {
+ socket = new DatagramSocket();
+ hostname = InetAddress.getLocalHost().getHostName();
+ } catch (SocketException ex) {
+ logger.error("Could not create socket for metrics collection.");
+ throw new FlumeException(
+ "Could not create socket for metrics collection.", ex);
+ } catch (Exception ex2) {
+ logger.warn("Unknown error occured", ex2);
+ }
+ for (HostInfo host : hosts) {
+ addresses.add(new InetSocketAddress(
+ host.getHostName(), host.getPortNumber()));
+ }
+ collectorRunnable.server = this;
+ if (service.isShutdown() || service.isTerminated()) {
+ service = Executors.newSingleThreadScheduledExecutor();
+ }
+ service.scheduleWithFixedDelay(collectorRunnable, 0,
+ pollFrequency, TimeUnit.SECONDS);
+ }
+
+ /**
+ * Stop this server.
+ */
+ @Override
+ public void stop() {
+ service.shutdown();
+
+ while (!service.isTerminated()) {
+ try {
+ logger.warn("Waiting for ganglia service to stop");
+ service.awaitTermination(500, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException ex) {
+ logger.warn("Interrupted while waiting"
+ + " for ganglia monitor to shutdown", ex);
+ service.shutdownNow();
+ }
+ }
+ addresses.clear();
+ }
+
+ /**
+ *
+ * @param pollFrequency Seconds between consecutive JMX polls.
+ */
+ public void setPollFrequency(int pollFrequency) {
+ this.pollFrequency = pollFrequency;
+ }
+
+ /**
+ *
+ * @return Seconds between consecutive JMX polls
+ */
+ public int getPollFrequency() {
+ return pollFrequency;
+ }
+
+ /**
+ *
+ * @param isGanglia3 When true, ganglia 3 messages will be sent, else Ganglia
+ * 3.1 formatted messages are sent.
+ */
+ public void setIsGanglia3(boolean isGanglia3) {
+ this.isGanglia3 = isGanglia3;
+ }
+
+ /**
+ *
+ * @return True if the server is currently sending ganglia 3 formatted msgs.
+ * False if the server returns Ganglia 3.1
+ */
+ public boolean isGanglia3() {
+ return this.isGanglia3;
+ }
+
+ protected void createGangliaMessage(String name, String value) {
+ logger.debug("Sending ganglia3 formatted message.");
+ name = hostname + "." + name;
+ xdr_int(0);
+ xdr_string("float");
+ xdr_string(name);
+ xdr_string(value);
+ xdr_string(DEFAULT_UNITS);
+ xdr_int(DEFAULT_SLOPE);
+ xdr_int(DEFAULT_TMAX);
+ xdr_int(DEFAULT_DMAX);
+ }
+
+ protected void createGangliaMessage31(String name, String value) {
+ logger.debug("Sending ganglia 3.1 formatted message.");
+ xdr_int(128); // metric_id = metadata_msg
+ xdr_string(hostname); // hostname
+ xdr_string(name); // metric name
+ xdr_int(0); // spoof = False
+ xdr_string("string"); // metric type
+ xdr_string(name); // metric name
+ xdr_string(DEFAULT_UNITS); // units
+ xdr_int(DEFAULT_SLOPE); // slope
+ xdr_int(DEFAULT_TMAX); // tmax, the maximum time between metrics
+ xdr_int(DEFAULT_DMAX); // dmax, the maximum data value
+ xdr_int(1); /*Num of the entries in extra_value field for Ganglia 3.1.x*/
+ xdr_string("GROUP"); /*Group attribute*/
+ xdr_string("flume"); /*Group value*/
+
+ this.sendToGangliaNodes();
+
+ // Now we send out a message with the actual value.
+ // Technically, we only need to send out the metadata message once for
+ // each metric, but I don't want to have to record which metrics we did and
+ // did not send.
+ xdr_int(133); // we are sending a string value
+ xdr_string(hostname); // hostName
+ xdr_string(name); // metric name
+ xdr_int(0); // spoof = False
+ xdr_string("%s"); // format field
+ xdr_string(value); // metric value
+ }
+
+ @Override
+ public void configure(Context context) {
+ this.pollFrequency = context.getInteger(this.CONF_POLL_FREQUENCY, 60);
+ String localHosts = context.getString(this.CONF_HOSTS);
+ if(localHosts == null || localHosts.isEmpty()){
+ throw new ConfigurationException("Hosts list cannot be empty.");
+ }
+ this.hosts = this.getHostsFromString(localHosts);
+ this.isGanglia3 = context.getBoolean(this.CONF_ISGANGLIA3, false);
+ }
+
+
+ private List<HostInfo> getHostsFromString(String hosts)
+ throws FlumeException {
+ List<HostInfo> hostInfoList = new ArrayList<HostInfo>();
+ String[] hostsAndPorts = hosts.split(",");
+ int i = 0;
+ for (String host : hostsAndPorts) {
+ String[] hostAndPort = host.split(":");
+ if (hostAndPort.length < 2) {
+ logger.warn("Invalid ganglia host: ", host);
+ continue;
+ }
+ try {
+ hostInfoList.add(new HostInfo("ganglia_host-" + String.valueOf(i),
+ hostAndPort[0], Integer.parseInt(hostAndPort[1])));
+ } catch (Exception e) {
+ logger.warn("Invalid ganglia host: " + host, e);
+ continue;
+ }
+ }
+ if (hostInfoList.isEmpty()) {
+ throw new FlumeException("No valid ganglia hosts defined!");
+ }
+ return hostInfoList;
+ }
+ /**
+ * Worker which polls JMX for all mbeans with
+ * {@link javax.management.ObjectName} within the flume namespace:
+ * org.apache.flume. All attributes of such beans are sent to the all hosts
+ * specified by the server that owns it's instance.
+ *
+ */
+ protected class GangliaCollector implements Runnable {
+
+ private GangliaServer server;
+ private final MBeanServer mbeanServer = ManagementFactory.
+ getPlatformMBeanServer();
+
+ @Override
+ public void run() {
+ Set<ObjectInstance> queryMBeans = null;
+ try {
+ queryMBeans = mbeanServer.queryMBeans(
+ null, null);
+ } catch (Exception ex) {
+ logger.error("Could not get Mbeans for monitoring", ex);
+ Throwables.propagate(ex);
+ }
+ for (ObjectInstance obj : queryMBeans) {
+ try {
+ if (!obj.getObjectName().toString().startsWith("org.apache.flume")) {
+ continue;
+ }
+ MBeanAttributeInfo[] attrs = mbeanServer.
+ getMBeanInfo(obj.getObjectName()).getAttributes();
+ String strAtts[] = new String[attrs.length];
+ for (int i = 0; i < strAtts.length; i++) {
+ strAtts[i] = attrs[i].getName();
+ }
+ AttributeList attrList = mbeanServer.getAttributes(
+ obj.getObjectName(), strAtts);
+ for (Object attr : attrList) {
+ Attribute localAttr = (Attribute) attr;
+ if (isGanglia3) {
+ server.createGangliaMessage(obj.getObjectName() + "."
+ + localAttr.getName(),
+ localAttr.getValue().toString());
+ } else {
+ server.createGangliaMessage31(obj.getObjectName() + "."
+ + localAttr.getName(),
+ localAttr.getValue().toString());
+ }
+ server.sendToGangliaNodes();
+ }
+ } catch (Exception ex) {
+ logger.error("Error getting mbean attributes", ex);
+ }
+ }
+ }
+ }
+}
Added: flume/trunk/flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitorService.java
URL: http://svn.apache.org/viewvc/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitorService.java?rev=1362401&view=auto
==============================================================================
--- flume/trunk/flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitorService.java (added)
+++ flume/trunk/flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitorService.java Tue Jul 17 08:17:19 2012
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.instrumentation;
+
+import org.apache.flume.conf.Configurable;
+
+/**
+ * Interface that any monitoring service should implement. If the monitor
+ * service is to be started up when Flume starts, it should implement this
+ * and the class name should be passed in during startup, with any additional
+ * context it requires.
+ */
+public interface MonitorService extends Configurable {
+
+ public void start();
+
+ public void stop();
+
+}
Added: flume/trunk/flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitoringType.java
URL: http://svn.apache.org/viewvc/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitoringType.java?rev=1362401&view=auto
==============================================================================
--- flume/trunk/flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitoringType.java (added)
+++ flume/trunk/flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitoringType.java Tue Jul 17 08:17:19 2012
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flume.instrumentation;
+
+/**
+ * Enum for Monitoring types.
+ */
+public enum MonitoringType {
+ OTHER(null),
+ GANGLIA(org.apache.flume.instrumentation.GangliaServer.class);
+
+ private Class<? extends MonitorService> monitoringClass;
+
+ private MonitoringType(Class<? extends MonitorService> klass) {
+ this.monitoringClass = klass;
+ }
+
+ public Class<? extends MonitorService> getMonitorClass(){
+ return this.monitoringClass;
+ }
+}
Modified: flume/trunk/flume-ng-doc/sphinx/FlumeUserGuide.rst
URL: http://svn.apache.org/viewvc/flume/trunk/flume-ng-doc/sphinx/FlumeUserGuide.rst?rev=1362401&r1=1362400&r2=1362401&view=diff
==============================================================================
--- flume/trunk/flume-ng-doc/sphinx/FlumeUserGuide.rst (original)
+++ flume/trunk/flume-ng-doc/sphinx/FlumeUserGuide.rst Tue Jul 17 08:17:19 2012
@@ -1744,7 +1744,61 @@ configuring the HDFS sink Kerberos-relat
Monitoring
==========
-TBD
+Monitoring in Flume is still a work in progress. Changes can happen very often.
+Several Flume components report metrics to the JMX platform MBean server. These
+metrics can be queried using Jconsole.
+
+Ganglia Reporting
+-----------------
+Flume can also report these metrics to
+Ganglia 3 or Ganglia 3.1 metanodes. To report metrics to Ganglia, a flume agent
+must be started with this support. The Flume agent has to be started by passing
+in the following parameters as system properties prefixed by ``flume.monitoring.``,
+and can be specified in the flume-env.sh:
+
+======================= ======= =====================================================================================
+Property Name Default Description
+======================= ======= =====================================================================================
+**type** -- The component type name, has to be ``GANGLIA``
+**hosts** -- Comma separated list of ``hostname:port``
+pollInterval 60 Time, in seconds, between consecutive reporting to ganglia server
+isGanglia3 false Ganglia server version is 3. By default, Flume sends in ganglia 3.1 format
+======================= ======= =====================================================================================
+
+We can start Flume with Ganglia support as follows::
+
+ $ bin/flume-ng agent --conf-file example.conf --name agent1 -Dflume.monitoring.type=GANGLIA -Dflume.monitoring.hosts=com.example:1234,com.example2:5455
+
+Any custom flume components should use Java MBean ObjectNames which begin
+with ``org.apache.flume`` for Flume to report the metrics to Ganglia. This can
+be done by adding the ObjectName as follows(the name can be anything provided it
+starts with ``org.apache.flume``):
+
+.. code-block:: java
+
+ ObjectName objName = new ObjectName("org.apache.flume." + myClassName + ":type=" + name);
+
+ ManagementFactory.getPlatformMBeanServer().registerMBean(this, objName);
+
+
+Custom Reporting
+----------------
+It is possible to report metrics to other systems by writing servers that do
+the reporting. Any reporting class has to implement the interface,
+``org.apache.flume.instrumentation.MonitorService``. Such a class can be used
+the same way the GangliaServer is used for reporting. They can poll the platform
+mbean server to poll the mbeans for metrics. For example, if an HTTP
+monitoring service called ``HTTPReporting`` can be used as follows::
+
+ $ bin/flume-ng agent --conf-file example.conf --name agent1 -Dflume.monitoring.type=com.example.reporting.HTTPReporting -Dflume.monitoring.node=com.example:332
+
+======================= ======= ========================================
+Property Name Default Description
+======================= ======= ========================================
+**type** -- The component type name, has to be FQCN
+======================= ======= ========================================
+
+
Troubleshooting
===============
Modified: flume/trunk/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java
URL: http://svn.apache.org/viewvc/flume/trunk/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java?rev=1362401&r1=1362400&r2=1362401&view=diff
==============================================================================
--- flume/trunk/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java (original)
+++ flume/trunk/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java Tue Jul 17 08:17:19 2012
@@ -34,6 +34,12 @@ import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
+import java.util.Properties;
+import java.util.Set;
+import org.apache.flume.Context;
+import org.apache.flume.instrumentation.MonitorService;
+import org.apache.flume.instrumentation.MonitoringType;
+
public class DefaultLogicalNodeManager extends AbstractLogicalNodeManager
implements NodeConfigurationAware {
@@ -45,6 +51,11 @@ public class DefaultLogicalNodeManager e
private LifecycleState lifecycleState;
private NodeConfiguration nodeConfiguration;
+ private MonitorService monitorServer;
+
+ public static final String CONF_MONITOR_CLASS = "flume.monitoring.type";
+ public static final String CONF_MONITOR_PREFIX = "flume.monitoring.";
+
public DefaultLogicalNodeManager() {
nodeSupervisor = new LifecycleSupervisor();
lifecycleState = LifecycleState.IDLE;
@@ -85,6 +96,9 @@ public class DefaultLogicalNodeManager e
}
}
}
+ if(monitorServer != null) {
+ monitorServer.stop();
+ }
}
@Override
@@ -142,6 +156,8 @@ public class DefaultLogicalNodeManager e
logger.error("Error while starting {}", entry.getValue(), e);
}
}
+
+ this.loadMonitoring();
}
@Override
@@ -212,4 +228,36 @@ public class DefaultLogicalNodeManager e
return lifecycleState;
}
+ private void loadMonitoring() {
+ Properties systemProps = System.getProperties();
+ Set<String> keys = systemProps.stringPropertyNames();
+ try {
+ if (keys.contains(CONF_MONITOR_CLASS)) {
+ String monitorType = systemProps.getProperty(CONF_MONITOR_CLASS);
+ Class<? extends MonitorService> klass;
+ try {
+ //Is it a known type?
+ klass = MonitoringType.valueOf(
+ monitorType.toUpperCase()).getMonitorClass();
+ } catch (Exception e) {
+ //Not a known type, use FQCN
+ klass = (Class<? extends MonitorService>) Class.forName(monitorType);
+ }
+ this.monitorServer = klass.newInstance();
+ Context context = new Context();
+ for (String key : keys) {
+ if (key.startsWith(CONF_MONITOR_PREFIX)) {
+ context.put(key.substring(CONF_MONITOR_PREFIX.length()),
+ systemProps.getProperty(key));
+ }
+ }
+ monitorServer.configure(context);
+ monitorServer.start();
+ }
+ } catch (Exception e) {
+ logger.warn("Error starting monitoring. "
+ + "Monitoring might not be available.", e);
+ }
+
+ }
}