You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by mp...@apache.org on 2012/07/17 10:17:19 UTC

svn commit: r1362401 - in /flume/trunk: flume-ng-core/src/main/java/org/apache/flume/instrumentation/ flume-ng-doc/sphinx/ flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/

Author: mpercy
Date: Tue Jul 17 08:17:19 2012
New Revision: 1362401

URL: http://svn.apache.org/viewvc?rev=1362401&view=rev
Log:
FLUME-1374. Support ganglia reporting.

(Hari Shreedharan via Mike Percy)

Added:
    flume/trunk/flume-ng-core/src/main/java/org/apache/flume/instrumentation/GangliaServer.java
    flume/trunk/flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitorService.java
    flume/trunk/flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitoringType.java
Modified:
    flume/trunk/flume-ng-doc/sphinx/FlumeUserGuide.rst
    flume/trunk/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java

Added: flume/trunk/flume-ng-core/src/main/java/org/apache/flume/instrumentation/GangliaServer.java
URL: http://svn.apache.org/viewvc/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/instrumentation/GangliaServer.java?rev=1362401&view=auto
==============================================================================
--- flume/trunk/flume-ng-core/src/main/java/org/apache/flume/instrumentation/GangliaServer.java (added)
+++ flume/trunk/flume-ng-core/src/main/java/org/apache/flume/instrumentation/GangliaServer.java Tue Jul 17 08:17:19 2012
@@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.instrumentation;
+
+import com.google.common.base.Throwables;
+import java.lang.management.ManagementFactory;
+import java.net.DatagramPacket;
+import java.net.DatagramSocket;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.net.SocketException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import javax.management.Attribute;
+import javax.management.AttributeList;
+import javax.management.MBeanAttributeInfo;
+import javax.management.MBeanServer;
+import javax.management.ObjectInstance;
+import javax.management.ObjectName;
+import org.apache.flume.Context;
+import org.apache.flume.FlumeException;
+import org.apache.flume.api.HostInfo;
+import org.apache.flume.conf.ConfigurationException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A Ganglia server that polls JMX based at a configured frequency (defaults to
+ * once every 60 seconds). This implementation can send data to ganglia 3 and
+ * ganglia 3.1. <p>
+ *
+ * <b>Mandatory Parameters:</b><p>
+ * <tt>hosts: </tt> List of comma separated hostname:ports of ganglia
+ * servers to report metrics to. <p>
+ * <b>Optional Parameters: </b><p>
+ * <tt>pollFrequency:</tt>Interval in seconds between consecutive reports to
+ * ganglia servers. Default = 60 seconds.<p>
+ * <tt>isGanglia3:</tt> Report to ganglia 3 ? Default = false - reports to
+ * ganglia 3.1.
+ *
+ *
+ *
+ */
+
+public class GangliaServer implements MonitorService {
+  /*
+   * The Ganglia protocol specific stuff: the xdr_* methods
+   * and the sendToGanglia* methods have been shamelessly ripped off
+   * from Hadoop. All hail the yellow elephant!
+   */
+
+  private static final Logger logger =
+          LoggerFactory.getLogger(GangliaServer.class);
+  public static final int BUFFER_SIZE = 1500; // as per libgmond.c
+  protected byte[] buffer = new byte[BUFFER_SIZE];
+  protected int offset;
+  private final List<SocketAddress> addresses = new ArrayList<SocketAddress>();
+  private DatagramSocket socket = null;
+  private ScheduledExecutorService service =
+          Executors.newSingleThreadScheduledExecutor();
+  private List<HostInfo> hosts;
+  protected final GangliaCollector collectorRunnable;
+  private int pollFrequency = 60;
+  public static final String DEFAULT_UNITS = "";
+  public static final int DEFAULT_TMAX = 60;
+  public static final int DEFAULT_DMAX = 0;
+  public static final int DEFAULT_SLOPE = 3;
+  public static final String GANGLIA_DOUBLE_TYPE = "double";
+  private volatile boolean isGanglia3 = false;
+  private String hostname;
+  public final String CONF_POLL_FREQUENCY = "pollFrequency";
+  public final int DEFAULT_POLL_FREQUENCY = 60;
+  public final String CONF_HOSTS = "hosts";
+  public final String CONF_ISGANGLIA3 = "isGanglia3";
+
+  /**
+   *
+   * @param hosts List of hosts to send the metrics to. All of them have to be
+   * running the version of ganglia specified by the configuration.
+   * @throws FlumeException
+   */
+  public GangliaServer() throws FlumeException {
+    collectorRunnable = new GangliaCollector();
+  }
+
+  /**
+   * Puts a string into the buffer by first writing the size of the string as an
+   * int, followed by the bytes of the string, padded if necessary to a multiple
+   * of 4.
+   *
+   * @param s the string to be written to buffer at offset location
+   */
+  protected void xdr_string(String s) {
+    byte[] bytes = s.getBytes();
+    int len = bytes.length;
+    xdr_int(len);
+    System.arraycopy(bytes, 0, buffer, offset, len);
+    offset += len;
+    pad();
+  }
+
+  /**
+   * Pads the buffer with zero bytes up to the nearest multiple of 4.
+   */
+  private void pad() {
+    int newOffset = ((offset + 3) / 4) * 4;
+    while (offset < newOffset) {
+      buffer[offset++] = 0;
+    }
+  }
+
+  /**
+   * Puts an integer into the buffer as 4 bytes, big-endian.
+   */
+  protected void xdr_int(int i) {
+    buffer[offset++] = (byte) ((i >> 24) & 0xff);
+    buffer[offset++] = (byte) ((i >> 16) & 0xff);
+    buffer[offset++] = (byte) ((i >> 8) & 0xff);
+    buffer[offset++] = (byte) (i & 0xff);
+  }
+
+  public synchronized void sendToGangliaNodes() {
+    DatagramPacket packet;
+    for (SocketAddress addr : addresses) {
+      try {
+        packet = new DatagramPacket(buffer, offset, addr);
+        socket.send(packet);
+      } catch (Exception ex) {
+        logger.warn("Could not send metrics to metrics server: "
+                + addr.toString(), ex);
+      }
+    }
+    offset = 0;
+  }
+
+  /**
+   * Start this server, causing it to poll JMX at the configured frequency.
+   */
+  @Override
+  public void start() {
+    try {
+      socket = new DatagramSocket();
+      hostname = InetAddress.getLocalHost().getHostName();
+    } catch (SocketException ex) {
+      logger.error("Could not create socket for metrics collection.");
+      throw new FlumeException(
+              "Could not create socket for metrics collection.", ex);
+    } catch (Exception ex2) {
+      logger.warn("Unknown error occured", ex2);
+    }
+    for (HostInfo host : hosts) {
+      addresses.add(new InetSocketAddress(
+              host.getHostName(), host.getPortNumber()));
+    }
+    collectorRunnable.server = this;
+    if (service.isShutdown() || service.isTerminated()) {
+      service = Executors.newSingleThreadScheduledExecutor();
+    }
+    service.scheduleWithFixedDelay(collectorRunnable, 0,
+            pollFrequency, TimeUnit.SECONDS);
+  }
+
+  /**
+   * Stop this server.
+   */
+  @Override
+  public void stop() {
+    service.shutdown();
+
+    while (!service.isTerminated()) {
+      try {
+        logger.warn("Waiting for ganglia service to stop");
+        service.awaitTermination(500, TimeUnit.MILLISECONDS);
+      } catch (InterruptedException ex) {
+        logger.warn("Interrupted while waiting"
+                + " for ganglia monitor to shutdown", ex);
+        service.shutdownNow();
+      }
+    }
+    addresses.clear();
+  }
+
+  /**
+   *
+   * @param pollFrequency Seconds between consecutive JMX polls.
+   */
+  public void setPollFrequency(int pollFrequency) {
+    this.pollFrequency = pollFrequency;
+  }
+
+  /**
+   *
+   * @return Seconds between consecutive JMX polls
+   */
+  public int getPollFrequency() {
+    return pollFrequency;
+  }
+
+  /**
+   *
+   * @param isGanglia3 When true, ganglia 3 messages will be sent, else Ganglia
+   * 3.1 formatted messages are sent.
+   */
+  public void setIsGanglia3(boolean isGanglia3) {
+    this.isGanglia3 = isGanglia3;
+  }
+
+  /**
+   *
+   * @return True if the server is currently sending ganglia 3 formatted msgs.
+   * False if the server returns Ganglia 3.1
+   */
+  public boolean isGanglia3() {
+    return this.isGanglia3;
+  }
+
+  protected void createGangliaMessage(String name, String value) {
+    logger.debug("Sending ganglia3 formatted message.");
+    name = hostname + "." + name;
+    xdr_int(0);
+    xdr_string("float");
+    xdr_string(name);
+    xdr_string(value);
+    xdr_string(DEFAULT_UNITS);
+    xdr_int(DEFAULT_SLOPE);
+    xdr_int(DEFAULT_TMAX);
+    xdr_int(DEFAULT_DMAX);
+  }
+
+  protected void createGangliaMessage31(String name, String value) {
+    logger.debug("Sending ganglia 3.1 formatted message.");
+    xdr_int(128); // metric_id = metadata_msg
+    xdr_string(hostname); // hostname
+    xdr_string(name); // metric name
+    xdr_int(0); // spoof = False
+    xdr_string("string"); // metric type
+    xdr_string(name); // metric name
+    xdr_string(DEFAULT_UNITS); // units
+    xdr_int(DEFAULT_SLOPE); // slope
+    xdr_int(DEFAULT_TMAX); // tmax, the maximum time between metrics
+    xdr_int(DEFAULT_DMAX); // dmax, the maximum data value
+    xdr_int(1); /*Num of the entries in extra_value field for Ganglia 3.1.x*/
+    xdr_string("GROUP"); /*Group attribute*/
+    xdr_string("flume"); /*Group value*/
+
+    this.sendToGangliaNodes();
+
+    // Now we send out a message with the actual value.
+    // Technically, we only need to send out the metadata message once for
+    // each metric, but I don't want to have to record which metrics we did and
+    // did not send.
+    xdr_int(133); // we are sending a string value
+    xdr_string(hostname); // hostName
+    xdr_string(name); // metric name
+    xdr_int(0); // spoof = False
+    xdr_string("%s"); // format field
+    xdr_string(value); // metric value
+  }
+
+  @Override
+  public void configure(Context context) {
+    this.pollFrequency = context.getInteger(this.CONF_POLL_FREQUENCY, 60);
+    String localHosts = context.getString(this.CONF_HOSTS);
+    if(localHosts == null || localHosts.isEmpty()){
+      throw new ConfigurationException("Hosts list cannot be empty.");
+    }
+    this.hosts = this.getHostsFromString(localHosts);
+    this.isGanglia3 = context.getBoolean(this.CONF_ISGANGLIA3, false);
+  }
+
+
+  private List<HostInfo> getHostsFromString(String hosts)
+          throws FlumeException {
+    List<HostInfo> hostInfoList = new ArrayList<HostInfo>();
+    String[] hostsAndPorts = hosts.split(",");
+    int i = 0;
+    for (String host : hostsAndPorts) {
+      String[] hostAndPort = host.split(":");
+      if (hostAndPort.length < 2) {
+        logger.warn("Invalid ganglia host: ", host);
+        continue;
+      }
+      try {
+        hostInfoList.add(new HostInfo("ganglia_host-" + String.valueOf(i),
+                hostAndPort[0], Integer.parseInt(hostAndPort[1])));
+      } catch (Exception e) {
+        logger.warn("Invalid ganglia host: " + host, e);
+        continue;
+      }
+    }
+    if (hostInfoList.isEmpty()) {
+      throw new FlumeException("No valid ganglia hosts defined!");
+    }
+    return hostInfoList;
+  }
+  /**
+   * Worker which polls JMX for all mbeans with
+   * {@link javax.management.ObjectName} within the flume namespace:
+   * org.apache.flume. All attributes of such beans are sent to the all hosts
+   * specified by the server that owns it's instance.
+   *
+   */
+  protected class GangliaCollector implements Runnable {
+
+    private GangliaServer server;
+    private final MBeanServer mbeanServer = ManagementFactory.
+            getPlatformMBeanServer();
+
+    @Override
+    public void run() {
+      Set<ObjectInstance> queryMBeans = null;
+      try {
+        queryMBeans = mbeanServer.queryMBeans(
+                null, null);
+      } catch (Exception ex) {
+        logger.error("Could not get Mbeans for monitoring", ex);
+        Throwables.propagate(ex);
+      }
+      for (ObjectInstance obj : queryMBeans) {
+        try {
+          if (!obj.getObjectName().toString().startsWith("org.apache.flume")) {
+            continue;
+          }
+          MBeanAttributeInfo[] attrs = mbeanServer.
+                  getMBeanInfo(obj.getObjectName()).getAttributes();
+          String strAtts[] = new String[attrs.length];
+          for (int i = 0; i < strAtts.length; i++) {
+            strAtts[i] = attrs[i].getName();
+          }
+          AttributeList attrList = mbeanServer.getAttributes(
+                  obj.getObjectName(), strAtts);
+          for (Object attr : attrList) {
+            Attribute localAttr = (Attribute) attr;
+            if (isGanglia3) {
+              server.createGangliaMessage(obj.getObjectName() + "."
+                      + localAttr.getName(),
+                      localAttr.getValue().toString());
+            } else {
+              server.createGangliaMessage31(obj.getObjectName() + "."
+                      + localAttr.getName(),
+                      localAttr.getValue().toString());
+            }
+            server.sendToGangliaNodes();
+          }
+        } catch (Exception ex) {
+          logger.error("Error getting mbean attributes", ex);
+        }
+      }
+    }
+  }
+}

Added: flume/trunk/flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitorService.java
URL: http://svn.apache.org/viewvc/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitorService.java?rev=1362401&view=auto
==============================================================================
--- flume/trunk/flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitorService.java (added)
+++ flume/trunk/flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitorService.java Tue Jul 17 08:17:19 2012
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.instrumentation;
+
+import org.apache.flume.conf.Configurable;
+
+/**
+ * Interface that any monitoring service should implement. If the monitor
+ * service is to be started up when Flume starts, it should implement this
+ * and the class name should be passed in during startup, with any additional
+ * context it requires.
+ */
+public interface MonitorService extends Configurable {
+
+  public void start();
+
+  public void stop();
+
+}

Added: flume/trunk/flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitoringType.java
URL: http://svn.apache.org/viewvc/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitoringType.java?rev=1362401&view=auto
==============================================================================
--- flume/trunk/flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitoringType.java (added)
+++ flume/trunk/flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitoringType.java Tue Jul 17 08:17:19 2012
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flume.instrumentation;
+
+/**
+ * Enum for Monitoring types.
+ */
+public enum MonitoringType {
+  OTHER(null),
+  GANGLIA(org.apache.flume.instrumentation.GangliaServer.class);
+
+  private Class<? extends MonitorService> monitoringClass;
+
+  private MonitoringType(Class<? extends MonitorService> klass) {
+    this.monitoringClass = klass;
+  }
+
+  public Class<? extends MonitorService> getMonitorClass(){
+    return this.monitoringClass;
+  }
+}

Modified: flume/trunk/flume-ng-doc/sphinx/FlumeUserGuide.rst
URL: http://svn.apache.org/viewvc/flume/trunk/flume-ng-doc/sphinx/FlumeUserGuide.rst?rev=1362401&r1=1362400&r2=1362401&view=diff
==============================================================================
--- flume/trunk/flume-ng-doc/sphinx/FlumeUserGuide.rst (original)
+++ flume/trunk/flume-ng-doc/sphinx/FlumeUserGuide.rst Tue Jul 17 08:17:19 2012
@@ -1744,7 +1744,61 @@ configuring the HDFS sink Kerberos-relat
 Monitoring
 ==========
 
-TBD
+Monitoring in Flume is still a work in progress. Changes can happen very often.
+Several Flume components report metrics to the JMX platform MBean server. These
+metrics can be queried using Jconsole.
+
+Ganglia Reporting
+-----------------
+Flume can also report these metrics to
+Ganglia 3 or Ganglia 3.1 metanodes. To report metrics to Ganglia, a flume agent
+must be started with this support. The Flume agent has to be started by passing
+in the following parameters as system properties prefixed by ``flume.monitoring.``,
+and can be specified in the flume-env.sh:
+
+=======================  =======  =====================================================================================
+Property Name            Default  Description
+=======================  =======  =====================================================================================
+**type**                 --       The component type name, has to be ``GANGLIA``
+**hosts**                --       Comma separated list of ``hostname:port``
+pollInterval             60       Time, in seconds, between consecutive reporting to ganglia server
+isGanglia3               false    Ganglia server version is 3. By default, Flume sends in ganglia 3.1 format
+=======================  =======  =====================================================================================
+
+We can start Flume with Ganglia support as follows::
+
+  $ bin/flume-ng agent --conf-file example.conf --name agent1 -Dflume.monitoring.type=GANGLIA -Dflume.monitoring.hosts=com.example:1234,com.example2:5455
+
+Any custom flume components should use Java MBean ObjectNames which begin
+with ``org.apache.flume`` for Flume to report the metrics to Ganglia. This can
+be done by adding the ObjectName as follows(the name can be anything provided it
+starts with ``org.apache.flume``):
+
+.. code-block:: java
+
+  ObjectName objName = new ObjectName("org.apache.flume." + myClassName + ":type=" + name);
+
+  ManagementFactory.getPlatformMBeanServer().registerMBean(this, objName);
+
+
+Custom Reporting
+----------------
+It is possible to report metrics to other systems by writing servers that do
+the reporting. Any reporting class has to implement the interface,
+``org.apache.flume.instrumentation.MonitorService``. Such a class can be used
+the same way the GangliaServer is used for reporting. They can poll the platform
+mbean server to poll the mbeans for metrics. For example, if an HTTP
+monitoring service called ``HTTPReporting`` can be used as follows::
+
+  $ bin/flume-ng agent --conf-file example.conf --name agent1 -Dflume.monitoring.type=com.example.reporting.HTTPReporting -Dflume.monitoring.node=com.example:332
+
+=======================  =======  ========================================
+Property Name            Default  Description
+=======================  =======  ========================================
+**type**                 --       The component type name, has to be FQCN
+=======================  =======  ========================================
+
+
 
 Troubleshooting
 ===============

Modified: flume/trunk/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java
URL: http://svn.apache.org/viewvc/flume/trunk/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java?rev=1362401&r1=1362400&r2=1362401&view=diff
==============================================================================
--- flume/trunk/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java (original)
+++ flume/trunk/flume-ng-node/src/main/java/org/apache/flume/node/nodemanager/DefaultLogicalNodeManager.java Tue Jul 17 08:17:19 2012
@@ -34,6 +34,12 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
+import java.util.Properties;
+import java.util.Set;
+import org.apache.flume.Context;
+import org.apache.flume.instrumentation.MonitorService;
+import org.apache.flume.instrumentation.MonitoringType;
+
 
 public class DefaultLogicalNodeManager extends AbstractLogicalNodeManager
     implements NodeConfigurationAware {
@@ -45,6 +51,11 @@ public class DefaultLogicalNodeManager e
   private LifecycleState lifecycleState;
   private NodeConfiguration nodeConfiguration;
 
+  private MonitorService monitorServer;
+
+  public static final String CONF_MONITOR_CLASS = "flume.monitoring.type";
+  public static final String CONF_MONITOR_PREFIX = "flume.monitoring.";
+
   public DefaultLogicalNodeManager() {
     nodeSupervisor = new LifecycleSupervisor();
     lifecycleState = LifecycleState.IDLE;
@@ -85,6 +96,9 @@ public class DefaultLogicalNodeManager e
         }
       }
     }
+    if(monitorServer != null) {
+      monitorServer.stop();
+    }
   }
 
   @Override
@@ -142,6 +156,8 @@ public class DefaultLogicalNodeManager e
         logger.error("Error while starting {}", entry.getValue(), e);
       }
     }
+
+    this.loadMonitoring();
   }
 
   @Override
@@ -212,4 +228,36 @@ public class DefaultLogicalNodeManager e
     return lifecycleState;
   }
 
+  private void loadMonitoring() {
+    Properties systemProps = System.getProperties();
+    Set<String> keys = systemProps.stringPropertyNames();
+    try {
+      if (keys.contains(CONF_MONITOR_CLASS)) {
+        String monitorType = systemProps.getProperty(CONF_MONITOR_CLASS);
+        Class<? extends MonitorService> klass;
+        try {
+          //Is it a known type?
+          klass = MonitoringType.valueOf(
+                  monitorType.toUpperCase()).getMonitorClass();
+        } catch (Exception e) {
+          //Not a known type, use FQCN
+          klass = (Class<? extends MonitorService>) Class.forName(monitorType);
+        }
+        this.monitorServer = klass.newInstance();
+        Context context = new Context();
+        for (String key : keys) {
+          if (key.startsWith(CONF_MONITOR_PREFIX)) {
+            context.put(key.substring(CONF_MONITOR_PREFIX.length()),
+                    systemProps.getProperty(key));
+          }
+        }
+        monitorServer.configure(context);
+        monitorServer.start();
+      }
+    } catch (Exception e) {
+      logger.warn("Error starting monitoring. "
+              + "Monitoring might not be available.", e);
+    }
+
+  }
 }