You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2016/11/29 08:32:30 UTC

incubator-eagle git commit: [EAGLE-807] Refactor JMX Metric Collector Script

Repository: incubator-eagle
Updated Branches:
  refs/heads/master 286034ff3 -> fb853dfd8


[EAGLE-807] Refactor JMX Metric Collector Script

Refactor JMX Metric Collector Script
* Support multiple kafka topics
* Support HA checking logic.

Author: Hao Chen <ha...@apache.org>

Closes #695 from haoch/fixJMXCollectorScript.


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/fb853dfd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/fb853dfd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/fb853dfd

Branch: refs/heads/master
Commit: fb853dfd8ce722a8ee4ee970f9416b2dbfc79d8c
Parents: 286034f
Author: Hao Chen <ha...@apache.org>
Authored: Tue Nov 29 16:32:22 2016 +0800
Committer: Hao Chen <ha...@apache.org>
Committed: Tue Nov 29 16:32:22 2016 +0800

----------------------------------------------------------------------
 eagle-external/hadoop_jmx_collector/README.md   |  94 ++++++++---------
 .../hadoop_jmx_collector/config-sample.json     |  57 +++++-----
 .../hadoop_jmx_collector/hadoop_ha_checker.py   |  59 ++++++-----
 .../hadoop_jmx_collector/hadoop_jmx_kafka.py    |  53 +++++-----
 .../hadoop_jmx_collector/metric_collector.py    | 103 +++++++++++--------
 5 files changed, 191 insertions(+), 175 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fb853dfd/eagle-external/hadoop_jmx_collector/README.md
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/README.md b/eagle-external/hadoop_jmx_collector/README.md
index bd89600..b11c081 100644
--- a/eagle-external/hadoop_jmx_collector/README.md
+++ b/eagle-external/hadoop_jmx_collector/README.md
@@ -18,81 +18,75 @@ limitations under the License.
 -->
 
 
-# Hadoop JMX Collector to Kafka
+# Hadoop Jmx Collector
 
-Python script to collect JMX metrics for any Hadoop component, and send it to Kafka topic
+These scripts help to collect Hadoop jmx and evently sent the metrics to stdout or Kafka. Tested with Python 2.7.
 
 ### How to use it
 
-  1. Edit the configuration file config.json. For example:
-            ```
+  1. Edit the configuration file (json file). For example:
+  
             {
-             "env": {
-              "site": "sandbox"
-             },
-             "inputs": [
+              "env": {
+                "site": "sandbox"
+              },
+              "input": [
                 {
                   "component": "namenode",
-                  "host": "127.0.0.1",
+                  "host": "sandbox.hortonworks.com",
                   "port": "50070",
-                  "https": false,
-                  "kafka_topic": "nn_jmx_metric_sandbox"
+                  "https": false
                 },
                 {
                   "component": "resourcemanager",
-                  "host": "127.0.0.1",
+                  "host": "sandbox.hortonworks.com",
                   "port": "8088",
-                  "https": false,
-                  "kafka_topic": "rm_jmx_metric_sandbox"
-                },
-                {
-                  "component": "datanode",
-                  "host": "127.0.0.1",
-                  "port": "50075",
-                  "https": false,
-                  "kafka_topic": "dn_jmx_metric_sandbox"
+                  "https": false
+                }
+              ],
+              "filter": {
+                "monitoring.group.selected": ["hadoop", "java.lang"]
+              },
+              "output": {
+                "kafka": {
+                  "default_topic": "nn_jmx_metric_sandbox",
+                  "component_topic_mapping": {
+                    "namenode": "nn_jmx_metric_sandbox",
+                    "resourcemanager": "rm_jmx_metric_sandbox"
+                  },
+                  "broker_list": [
+                    "sandbox.hortonworks.com:6667"
+                  ]
                 }
-             ],
-             "filter": {
-              "monitoring.group.selected": ["hadoop", "java.lang"]
-             },
-             "output": {
-             }
+              }
             }
-            ```
-  2. Run the scripts
-
-        ```
-        python hadoop_jmx_kafka.py
-        ```
 
-### Editing config.json
-
-* inputs
+  2. Run the scripts
+  
+        # for general use
+        python hadoop_jmx_kafka.py > 1.txt
 
-  "port" defines the hadoop service port, such as 50070 => "namenode", 16010 => "hbasemaster".
-  Like the example above, you can specify multiple hadoop components to collect
+### Edit `eagle-collector.conf`
 
-  "https" is whether or not you want to use SSL protocol in your connection to the host+port
+* input (monitored hosts)
 
-  "kafka_topic" is the kafka topic that you want to populate with the jmx data from the respective component
+  "port" defines the hadoop service port, such as 50070 => "namenode", 60010 => "hbase master".
 
 * filter
 
   "monitoring.group.selected" can filter out beans which we care about.
 
-* output
+* output 
+  
+  if we left it empty, then the output is stdout by default. 
+
+        "output": {}
+        
+  It also supports Kafka as its output. 
 
-  You can specify the kafka broker list
-        ```
         "output": {
           "kafka": {
-            "brokerList": [ "localhost:9092"]
+            "topic": "test_topic",
+            "broker_list": [ "sandbox.hortonworks.com:6667"]
           }
         }
-        ```
-
-  To check that the a desired kafka topic is being populated:
-    ```
-    kafka-console-consumer --zookeeper localhost:2181 --topic nn_jmx_metric_sandbox
-    ```

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fb853dfd/eagle-external/hadoop_jmx_collector/config-sample.json
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/config-sample.json b/eagle-external/hadoop_jmx_collector/config-sample.json
index 9245f1a..a0670e1 100644
--- a/eagle-external/hadoop_jmx_collector/config-sample.json
+++ b/eagle-external/hadoop_jmx_collector/config-sample.json
@@ -1,55 +1,46 @@
 {
   "env": {
-    "site": "sandbox",
-    "name_node": {
-      "hosts": [
-        "sandbox.hortonworks.com"
-      ],
-      "port": 50070,
+    "site": "sandbox"
+  },
+  "input": [
+    {
+      "component": "namenode",
+      "host": "sandbox.hortonworks.com",
+      "port": "50070",
       "https": false
     },
-    "resource_manager": {
-      "hosts": [
-        "sandbox.hortonworks.com"
-      ],
-      "port": 50030,
-      "https": false
-    }
-  },
-  "inputs": [
     {
       "component": "namenode",
-      "host": "server.eagle.apache.org",
+      "host": "sandbox.hortonworks.com",
       "port": "50070",
-      "https": false,
-      "kafka_topic": "nn_jmx_metric_sandbox"
+      "https": false
     },
     {
       "component": "resourcemanager",
-      "host": "server.eagle.apache.org",
+      "host": "sandbox.hortonworks.com",
       "port": "8088",
-      "https": false,
-      "kafka_topic": "rm_jmx_metric_sandbox"
+      "https": false
     },
     {
-      "component": "datanode",
-      "host": "server.eagle.apache.org",
-      "port": "50075",
-      "https": false,
-      "kafka_topic": "dn_jmx_metric_sandbox"
+      "component": "resourcemanager",
+      "host": "sandbox.hortonworks.com",
+      "port": "8088",
+      "https": false
     }
   ],
   "filter": {
-    "monitoring.group.selected": [
-      "hadoop",
-      "java.lang"
-    ]
+    "monitoring.group.selected": ["hadoop", "java.lang"]
   },
   "output": {
     "kafka": {
-      "brokerList": [
-        "localhost:9092"
+      "default_topic": "nn_jmx_metric_sandbox",
+      "component_topic_mapping": {
+        "namenode": "nn_jmx_metric_sandbox",
+        "resourcemanager": "rm_jmx_metric_sandbox"
+      },
+      "broker_list": [
+        "sandbox.hortonworks.com:6667"
       ]
     }
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fb853dfd/eagle-external/hadoop_jmx_collector/hadoop_ha_checker.py
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/hadoop_ha_checker.py b/eagle-external/hadoop_jmx_collector/hadoop_ha_checker.py
index c33b327..e8bf4ae 100644
--- a/eagle-external/hadoop_jmx_collector/hadoop_ha_checker.py
+++ b/eagle-external/hadoop_jmx_collector/hadoop_ha_checker.py
@@ -17,18 +17,22 @@
 #
 
 from metric_collector import MetricCollector, JmxReader, YarnWSReader, Runner
-import logging
+import logging,socket
 
 class HadoopNNHAChecker(MetricCollector):
     def run(self):
-        if not self.config["env"].has_key("name_node"):
-            logging.warn("Do nothing for HadoopNNHAChecker as config of env.name_node not found")
+        hosts = []
+
+        for input in self.config["input"]:
+            if not input.has_key("host"):
+                input["host"] = socket.getfqdn()
+            if input.has_key("component") and input["component"] == "namenode":
+                hosts.append(input)
+        if not bool(hosts):
+            logging.warn("non hosts are configured as 'namenode' in 'input' config, exit")
             return
-        name_node_config = self.config["env"]["name_node"]
-        hosts = name_node_config["hosts"]
-        port = name_node_config["port"]
-        https = name_node_config["https"]
 
+        logging.info("Checking namenode HA: " + str(hosts))
         total_count = len(hosts)
 
         self.collect({
@@ -43,17 +47,21 @@ class HadoopNNHAChecker(MetricCollector):
 
         for host in hosts:
             try:
-                bean = JmxReader(host, port, https).open().get_jmx_bean_by_name(
+                bean = JmxReader(host["host"], host["port"], host["https"]).open().get_jmx_bean_by_name(
                         "Hadoop:service=NameNode,name=FSNamesystem")
-                logging.debug(host + " is " + bean["tag.HAState"])
-                if bean["tag.HAState"] == "active":
-                    active_count += 1
+                if not bean:
+                    logging.error("JMX Bean[Hadoop:service=NameNode,name=FSNamesystem] is null from " + host["host"])
+                if bean.has_key("tag.HAState"):
+                    logging.debug(str(host) + " is " + bean["tag.HAState"])
+                    if bean["tag.HAState"] == "active":
+                        active_count += 1
+                    else:
+                        standby_count += 1
                 else:
-                    standby_count += 1
+                    logging.info("'tag.HAState' not found from jmx of " + host["host"] + ":" + host["port"])
             except Exception as e:
-                logging.exception("failed to read jmx from " + host)
+                logging.exception("failed to read jmx from " + host["host"] + ":" + host["port"])
                 failed_count += 1
-
         self.collect({
             "component": "namenode",
             "metric": "hadoop.namenode.hastate.active.count",
@@ -72,17 +80,19 @@ class HadoopNNHAChecker(MetricCollector):
             "value": failed_count
         })
 
-
 class HadoopRMHAChecker(MetricCollector):
     def run(self):
-        if not self.config["env"].has_key("resource_manager"):
-            logging.warn("Do nothing for HadoopRMHAChecker as config of env.resource_manager not found")
+        hosts = []
+        for input in self.config["input"]:
+            if not input.has_key("host"):
+                input["host"] = socket.getfqdn()
+            if input.has_key("component") and input["component"] == "resourcemanager":
+                hosts.append(input)
+        if not bool(hosts):
+            logging.warn("Non hosts are configured as 'resourcemanager' in 'input' config, exit")
             return
-        name_node_config = self.config["env"]["resource_manager"]
-        hosts = name_node_config["hosts"]
-        port = name_node_config["port"]
-        https = name_node_config["https"]
 
+        logging.info("Checking resource manager HA: " + str(hosts))
         total_count = len(hosts)
 
         self.collect({
@@ -97,13 +107,16 @@ class HadoopRMHAChecker(MetricCollector):
 
         for host in hosts:
             try:
-                cluster_info = YarnWSReader(host, port, https).read_cluster_info()
+                cluster_info = YarnWSReader(host["host"], host["port"], host["https"]).read_cluster_info()
+                if not cluster_info:
+                    logging.error("Cluster info is null from web service of " + host["host"])
+                    raise Exception("cluster info is null from " + host["host"])
                 if cluster_info["clusterInfo"]["haState"] == "ACTIVE":
                     active_count += 1
                 else:
                     standby_count += 1
             except Exception as e:
-                logging.exception("Failed to read yarn ws from " + host)
+                logging.error("Failed to read yarn ws from " + str(host))
                 failed_count += 1
 
         self.collect({

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fb853dfd/eagle-external/hadoop_jmx_collector/hadoop_jmx_kafka.py
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/hadoop_jmx_kafka.py b/eagle-external/hadoop_jmx_collector/hadoop_jmx_kafka.py
index 6eca7d9..b42bc81 100644
--- a/eagle-external/hadoop_jmx_collector/hadoop_jmx_kafka.py
+++ b/eagle-external/hadoop_jmx_collector/hadoop_jmx_kafka.py
@@ -16,11 +16,8 @@
 # limitations under the License.
 #
 
-from metric_collector import JmxMetricCollector,JmxMetricListener,Runner,MetricCollector,Helper
+from metric_collector import JmxMetricCollector,JmxMetricListener,Runner
 import json
-import logging
-
-logging.basicConfig(level=logging.INFO,format='%(asctime)s %(name)-12s %(levelname)-6s %(message)s',datefmt='%m-%d %H:%M')
 
 class NNSafeModeMetric(JmxMetricListener):
     def on_metric(self, metric):
@@ -35,53 +32,55 @@ class NNSafeModeMetric(JmxMetricListener):
 class NNHAMetric(JmxMetricListener):
     PREFIX = "hadoop.namenode.fsnamesystem"
 
-    def on_bean(self, bean):
+    def on_bean(self, component, bean):
         if bean["name"] == "Hadoop:service=NameNode,name=FSNamesystem":
             if bean[u"tag.HAState"] == "active":
-                self.collector.on_bean_kv(self.PREFIX, "hastate", 0)
+                self.collector.on_bean_kv(self.PREFIX, component, "hastate", 0)
             else:
-                self.collector.on_bean_kv(self.PREFIX, "hastate", 1)
+                self.collector.on_bean_kv(self.PREFIX, component, "hastate", 1)
 
 
-class MemortUsageMetric(JmxMetricListener):
+class MemoryUsageMetric(JmxMetricListener):
     PREFIX = "hadoop.namenode.jvm"
 
-    def on_bean(self, bean):
+    def on_bean(self, component, bean):
         if bean["name"] == "Hadoop:service=NameNode,name=JvmMetrics":
             memnonheapusedusage = round(float(bean['MemNonHeapUsedM']) / float(bean['MemNonHeapMaxM']) * 100.0, 2)
-            self.collector.on_bean_kv(self.PREFIX, "memnonheapusedusage", memnonheapusedusage)
-            memnonheapcommittedusage = round(float(bean['MemNonHeapCommittedM']) / float(bean['MemNonHeapMaxM']) * 100, 2)
-            self.collector.on_bean_kv(self.PREFIX, "memnonheapcommittedusage", memnonheapcommittedusage)
+            self.collector.on_bean_kv(self.PREFIX, component, "memnonheapusedusage", memnonheapusedusage)
+            memnonheapcommittedusage = round(float(bean['MemNonHeapCommittedM']) / float(bean['MemNonHeapMaxM']) * 100,
+                                             2)
+            self.collector.on_bean_kv(self.PREFIX, component, "memnonheapcommittedusage", memnonheapcommittedusage)
             memheapusedusage = round(float(bean['MemHeapUsedM']) / float(bean['MemHeapMaxM']) * 100, 2)
-            self.collector.on_bean_kv(self.PREFIX, "memheapusedusage", memheapusedusage)
+            self.collector.on_bean_kv(self.PREFIX, component,"memheapusedusage", memheapusedusage)
             memheapcommittedusage = round(float(bean['MemHeapCommittedM']) / float(bean['MemHeapMaxM']) * 100, 2)
-            self.collector.on_bean_kv(self.PREFIX, "memheapcommittedusage", memheapcommittedusage)
-
+            self.collector.on_bean_kv(self.PREFIX, component, "memheapcommittedusage", memheapcommittedusage)
 
 class NNCapacityUsageMetric(JmxMetricListener):
     PREFIX = "hadoop.namenode.fsnamesystemstate"
 
-    def on_bean(self, bean):
+    def on_bean(self, component, bean):
         if bean["name"] == "Hadoop:service=NameNode,name=FSNamesystemState":
             capacityusage = round(float(bean['CapacityUsed']) / float(bean['CapacityTotal']) * 100, 2)
-            self.collector.on_bean_kv(self.PREFIX, "capacityusage", capacityusage)
+            self.collector.on_bean_kv(self.PREFIX, component, "capacityusage", capacityusage)
 
 class JournalTransactionInfoMetric(JmxMetricListener):
     PREFIX = "hadoop.namenode.journaltransaction"
 
-    def on_bean(self, bean):
+    def on_bean(self, component, bean):
         if bean.has_key("JournalTransactionInfo"):
             JournalTransactionInfo = json.loads(bean.get("JournalTransactionInfo"))
             LastAppliedOrWrittenTxId = float(JournalTransactionInfo.get("LastAppliedOrWrittenTxId"))
             MostRecentCheckpointTxId = float(JournalTransactionInfo.get("MostRecentCheckpointTxId"))
-            self.collector.on_bean_kv(self.PREFIX, "LastAppliedOrWrittenTxId", LastAppliedOrWrittenTxId)
-            self.collector.on_bean_kv(self.PREFIX, "MostRecentCheckpointTxId", MostRecentCheckpointTxId)
-
+            self.collector.on_bean_kv(self.PREFIX, component, "LastAppliedOrWrittenTxId", LastAppliedOrWrittenTxId)
+            self.collector.on_bean_kv(self.PREFIX, component, "MostRecentCheckpointTxId", MostRecentCheckpointTxId)
 
 if __name__ == '__main__':
-    config = Helper.load_config()
-
-    for ip in config['inputs']:
-        collector = JmxMetricCollector(ip['component'], ip['host'], ip['port'], ip['https'], ip['kafka_topic'])
-        collector.register(NNSafeModeMetric(), NNHAMetric(), MemortUsageMetric(), JournalTransactionInfoMetric(), NNCapacityUsageMetric())
-        Runner.run(collector)
+    collector = JmxMetricCollector()
+    collector.register(
+        NNSafeModeMetric(),
+        NNHAMetric(),
+        MemoryUsageMetric(),
+        NNCapacityUsageMetric(),
+        JournalTransactionInfoMetric()
+    )
+    Runner.run(collector)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/fb853dfd/eagle-external/hadoop_jmx_collector/metric_collector.py
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/metric_collector.py b/eagle-external/hadoop_jmx_collector/metric_collector.py
index 79320cc..e165ea2 100644
--- a/eagle-external/hadoop_jmx_collector/metric_collector.py
+++ b/eagle-external/hadoop_jmx_collector/metric_collector.py
@@ -40,7 +40,6 @@ logging.basicConfig(level=logging.INFO,
                     format='%(asctime)s %(name)-12s %(levelname)-6s %(message)s',
                     datefmt='%m-%d %H:%M')
 
-
 class Helper:
     def __init__(self):
         pass
@@ -186,19 +185,40 @@ class KafkaMetricSender(MetricSender):
         super(KafkaMetricSender, self).__init__(config)
         kafka_config = config["output"]["kafka"]
         # default topic
-        # self.topic = kafka_config["topic"].encode('utf-8')
+        self.default_topic = None
+        if kafka_config.has_key("default_topic"):
+            self.default_topic = kafka_config["default_topic"].encode('utf-8')
+        self.component_topic_mapping = {}
+        if kafka_config.has_key("component_topic_mapping"):
+            self.component_topic_mapping = kafka_config["component_topic_mapping"]
+
+        if not self.default_topic and not bool(self.component_topic_mapping):
+            raise Exception("both kafka config 'topic' and 'component_topic_mapping' are empty")
+
         # producer
-        self.broker_list = kafka_config["brokerList"]
+        self.broker_list = kafka_config["broker_list"]
         self.kafka_client = None
         self.kafka_producer = None
 
+    def get_topic_id(self, msg):
+        if msg.has_key("component"):
+            component = msg["component"]
+            if self.component_topic_mapping.has_key(component):
+                return self.component_topic_mapping[component]
+            else:
+                return self.default_topic
+        else:
+            if not self.default_topic:
+                raise Exception("no default topic found for unknown-component msg: " + str(msg))
+            return self.default_topic
+
     def open(self):
         self.kafka_client = KafkaClient(self.broker_list, timeout=59)
         self.kafka_producer = SimpleProducer(self.kafka_client, batch_send=True, batch_send_every_n=500,
                                              batch_send_every_t=30)
 
-    def send(self, msg, topic):
-        self.kafka_producer.send_messages(topic, json.dumps(msg))
+    def send(self, msg):
+        self.kafka_producer.send_messages(self.get_topic_id(msg), json.dumps(msg))
 
     def close(self):
         if self.kafka_producer is not None:
@@ -206,23 +226,15 @@ class KafkaMetricSender(MetricSender):
         if self.kafka_client is not None:
             self.kafka_client.close()
 
-
 class MetricCollector(threading.Thread):
-    def __init__(self, comp, host, port, https, topic):
+    def __init__(self):
         threading.Thread.__init__(self)
-
-        self.comp = comp
-        self.host = host
-        self.port = port
-        self.https = https
-        self.topic = topic
-
         self.config = Helper.load_config()
         self.sender = KafkaMetricSender(self.config)
         self.fqdn = socket.getfqdn()
-        self.init(self.config, self.comp, self.host, self.port, self.https, self.topic)
+        self.init(self.config)
 
-    def init(self, config, comp, host, port, https, topic):
+    def init(self, config):
         pass
 
     def start(self):
@@ -242,7 +254,7 @@ class MetricCollector(threading.Thread):
         if not msg.has_key("site"):
             msg["site"] = self.config["env"]["site"]
 
-        self.sender.send(msg, self.topic)
+        self.sender.send(msg)
 
     def run(self):
         raise Exception("`run` method should be overrode by sub-class before being called")
@@ -258,21 +270,28 @@ class Runner(object):
         :return:
         """
         for thread in threads:
-            thread.start()
+            try:
+                thread.start()
+            except Exception as e:
+                logging.exception(e)
 
 class JmxMetricCollector(MetricCollector):
     selected_domain = None
-    component = None
-    https = False
-    port = None
     listeners = []
+    input_components = []
+
+    def init(self, config):
+        self.input_components = config["input"]
+        for input in self.input_components:
+            if not input.has_key("host"):
+                input["host"] = self.fqdn
+            if not input.has_key("component"):
+                raise Exception("component not defined in " + str(input))
+            if not input.has_key("port"):
+                raise Exception("port not defined in " + str(input))
+            if not input.has_key("https"):
+                input["https"] = False
 
-    def init(self, config, comp, host, port, https, topic):
-        self.host = host
-        self.port = port
-        self.https = https
-        self.component = comp
-        self.topic = topic
         self.selected_domain = [s.encode('utf-8') for s in config[u'filter'].get('monitoring.group.selected')]
         self.listeners = []
 
@@ -286,20 +305,21 @@ class JmxMetricCollector(MetricCollector):
             self.listeners.append(listener)
 
     def run(self):
-        try:
-            beans = JmxReader(self.host, self.port, self.https).open().get_jmx_beans()
-            self.on_beans(beans)
-        except Exception as e:
-            logging.exception("Failed to read jmx for " + self.host)
+        for input in self.input_components:
+            try:
+                beans = JmxReader(input["host"], input["port"], input["https"]).open().get_jmx_beans()
+                self.on_beans(input["component"], beans)
+            except Exception as e:
+                logging.exception("Failed to read jmx for " + str(input))
 
     def filter_bean(self, bean, mbean_domain):
         return mbean_domain in self.selected_domain
 
-    def on_beans(self, beans):
+    def on_beans(self, component, beans):
         for bean in beans:
-            self.on_bean(bean)
+            self.on_bean(component,bean)
 
-    def on_bean(self, bean):
+    def on_bean(self, component, bean):
         # mbean is of the form "domain:key=value,...,foo=bar"
         mbean = bean[u'name']
         mbean_domain, mbean_attribute = mbean.rstrip().split(":", 1)
@@ -313,24 +333,23 @@ class JmxMetricCollector(MetricCollector):
 
         # print kafka_dict
         for key, value in bean.iteritems():
-            self.on_bean_kv(metric_prefix_name, key, value)
+            self.on_bean_kv(metric_prefix_name, component,key, value)
 
         for listener in self.listeners:
-            listener.on_bean(bean.copy())
+            listener.on_bean(component, bean.copy())
 
-    def on_bean_kv(self, prefix, key, value):
+    def on_bean_kv(self, prefix,component, key, value):
         # Skip Tags
         if re.match(r'tag.*', key):
             return
         metric_name = (prefix + '.' + key).lower()
         self.on_metric({
+            "component": component,
             "metric": metric_name,
             "value": value
         })
 
     def on_metric(self, metric):
-        metric["component"] = self.component
-
         if Helper.is_number(metric["value"]):
             self.collect(metric)
 
@@ -352,8 +371,8 @@ class JmxMetricListener:
     def init(self, collector):
         self.collector = collector
 
-    def on_bean(self, bean):
+    def on_bean(self, component, bean):
         pass
 
     def on_metric(self, metric):
-        pass
+        pass
\ No newline at end of file