You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2016/12/06 11:48:04 UTC

incubator-eagle git commit: [EAGLE-825] Improve jmx collector with built-in metric filter

Repository: incubator-eagle
Updated Branches:
  refs/heads/master 188ddf500 -> 8416f64da


[EAGLE-825] Improve jmx collector with built-in metric filter

## Changes

* Support wildcard metric name filter
* Add built-in important metric name filter for hadoop and hbase

## JIRA
https://issues.apache.org/jira/browse/EAGLE-825

Author: Hao Chen <ha...@apache.org>

Closes #715 from haoch/ImproveJMXCollector.


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/8416f64d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/8416f64d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/8416f64d

Branch: refs/heads/master
Commit: 8416f64da7dcba0c48527aa678a5b2ec9a37d4a8
Parents: 188ddf5
Author: Hao Chen <ha...@apache.org>
Authored: Tue Dec 6 19:47:50 2016 +0800
Committer: Hao Chen <ha...@apache.org>
Committed: Tue Dec 6 19:47:50 2016 +0800

----------------------------------------------------------------------
 .../hadoop_jmx_collector/config-sample.json     |  46 ---------
 .../hadoop_jmx_config-sample.json               |  70 +++++++++++++
 .../hadoop_jmx_collector/hadoop_jmx_kafka.py    |  15 ++-
 .../hbase_jmx_config-sample.json                |  94 +++++++++++++++++
 .../hadoop_jmx_collector/metric_collector.py    | 101 ++++++++++++++++---
 5 files changed, 265 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8416f64d/eagle-external/hadoop_jmx_collector/config-sample.json
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/config-sample.json b/eagle-external/hadoop_jmx_collector/config-sample.json
deleted file mode 100644
index a0670e1..0000000
--- a/eagle-external/hadoop_jmx_collector/config-sample.json
+++ /dev/null
@@ -1,46 +0,0 @@
-{
-  "env": {
-    "site": "sandbox"
-  },
-  "input": [
-    {
-      "component": "namenode",
-      "host": "sandbox.hortonworks.com",
-      "port": "50070",
-      "https": false
-    },
-    {
-      "component": "namenode",
-      "host": "sandbox.hortonworks.com",
-      "port": "50070",
-      "https": false
-    },
-    {
-      "component": "resourcemanager",
-      "host": "sandbox.hortonworks.com",
-      "port": "8088",
-      "https": false
-    },
-    {
-      "component": "resourcemanager",
-      "host": "sandbox.hortonworks.com",
-      "port": "8088",
-      "https": false
-    }
-  ],
-  "filter": {
-    "monitoring.group.selected": ["hadoop", "java.lang"]
-  },
-  "output": {
-    "kafka": {
-      "default_topic": "nn_jmx_metric_sandbox",
-      "component_topic_mapping": {
-        "namenode": "nn_jmx_metric_sandbox",
-        "resourcemanager": "rm_jmx_metric_sandbox"
-      },
-      "broker_list": [
-        "sandbox.hortonworks.com:6667"
-      ]
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8416f64d/eagle-external/hadoop_jmx_collector/hadoop_jmx_config-sample.json
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/hadoop_jmx_config-sample.json b/eagle-external/hadoop_jmx_collector/hadoop_jmx_config-sample.json
new file mode 100644
index 0000000..07bdc84
--- /dev/null
+++ b/eagle-external/hadoop_jmx_collector/hadoop_jmx_config-sample.json
@@ -0,0 +1,70 @@
+{
+  "env": {
+    "site": "sandbox"
+  },
+  "input": [
+    {
+      "component": "namenode",
+      "host": "sandbox.hortonworks.com",
+      "port": "50070",
+      "https": false
+    },
+    {
+      "component": "resourcemanager",
+      "host": "sandbox.hortonworks.com",
+      "port": "19888",
+      "https": false
+    },
+    {
+      "component": "datanode",
+      "host": "sandbox.hortonworks.com",
+      "port": "50075",
+      "https": false
+    }
+  ],
+  "filter": {
+    "beam_group_filter": ["hadoop","java.lang"],
+    "metric_name_filter": [
+      "hadoop.memory.heapmemoryusage.used",
+      "hadoop.memory.nonheapmemoryusage.used",
+      "hadoop.namenode.fsnamesystemstate.capacitytotal",
+      "hadoop.namenode.dfs.capacityused",
+      "hadoop.namenode.dfs.capacityremaining",
+      "hadoop.namenode.dfs.blockstotal",
+      "hadoop.namenode.dfs.filestotal",
+      "hadoop.namenode.dfs.underreplicatedblocks",
+      "hadoop.namenode.dfs.missingblocks",
+      "hadoop.namenode.dfs.corruptblocks",
+      "hadoop.namenode.dfs.lastcheckpointtime",
+      "hadoop.namenode.dfs.transactionssincelastcheckpoint",
+      "hadoop.namenode.dfs.lastwrittentransactionid",
+      "hadoop.namenode.dfs.snapshottabledirectories",
+      "hadoop.namenode.dfs.snapshots",
+      "hadoop.namenode.rpc.rpcqueuetimeavgtime",
+      "hadoop.namenode.rpc.rpcprocessingtimeavgtime",
+      "hadoop.namenode.rpc.numopenconnections",
+      "hadoop.namenode.rpc.callqueuelength",
+
+      "hadoop.datanode.fsdatasetstate.capacity",
+      "hadoop.datanode.fsdatasetstate.dfsused",
+      "hadoop.datanode.datanodeinfo.xceivercount",
+      "hadoop.datanode.rpc.rpcqueuetimeavgtime",
+      "hadoop.datanode.rpc.rpcprocessingtimeavgtime",
+      "hadoop.datanode.rpc.numopenconnections",
+      "hadoop.datanode.rpc.callqueuelength"
+    ]
+  },
+  "output": {
+    "kafka": {
+      "debug": false,
+      "default_topic": "hadoop_jmx_metric_sandbox",
+      "component_topic_mapping": {
+        "namenode": "nn_jmx_metric_sandbox",
+        "resourcemanager": "rm_jmx_metric_sandbox"
+      },
+      "broker_list": [
+        "sandbox.hortonworks.com:6667"
+      ]
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8416f64d/eagle-external/hadoop_jmx_collector/hadoop_jmx_kafka.py
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/hadoop_jmx_kafka.py b/eagle-external/hadoop_jmx_collector/hadoop_jmx_kafka.py
index b42bc81..799d351 100644
--- a/eagle-external/hadoop_jmx_collector/hadoop_jmx_kafka.py
+++ b/eagle-external/hadoop_jmx_collector/hadoop_jmx_kafka.py
@@ -17,7 +17,7 @@
 #
 
 from metric_collector import JmxMetricCollector,JmxMetricListener,Runner
-import json
+import json, logging, fnmatch
 
 class NNSafeModeMetric(JmxMetricListener):
     def on_metric(self, metric):
@@ -28,7 +28,6 @@ class NNSafeModeMetric(JmxMetricListener):
                 metric["value"] = 0
             self.collector.collect(metric)
 
-
 class NNHAMetric(JmxMetricListener):
     PREFIX = "hadoop.namenode.fsnamesystem"
 
@@ -74,6 +73,15 @@ class JournalTransactionInfoMetric(JmxMetricListener):
             self.collector.on_bean_kv(self.PREFIX, component, "LastAppliedOrWrittenTxId", LastAppliedOrWrittenTxId)
             self.collector.on_bean_kv(self.PREFIX, component, "MostRecentCheckpointTxId", MostRecentCheckpointTxId)
 
+class DatanodeFSDatasetState(JmxMetricListener):
+    def on_metric(self, metric):
+        if fnmatch.fnmatch(metric["metric"], "hadoop.datanode.fsdatasetstate-*.capacity"):
+            metric["metric"] = "hadoop.datanode.fsdatasetstate.capacity"
+            self.collector.collect(metric)
+        elif fnmatch.fnmatch(metric["metric"], "hadoop.datanode.fsdatasetstate-*.dfsused"):
+            metric["metric"] = "hadoop.datanode.fsdatasetstate.dfsused"
+            self.collector.collect(metric)
+
 if __name__ == '__main__':
     collector = JmxMetricCollector()
     collector.register(
@@ -81,6 +89,7 @@ if __name__ == '__main__':
         NNHAMetric(),
         MemoryUsageMetric(),
         NNCapacityUsageMetric(),
-        JournalTransactionInfoMetric()
+        JournalTransactionInfoMetric(),
+        DatanodeFSDatasetState()
     )
     Runner.run(collector)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8416f64d/eagle-external/hadoop_jmx_collector/hbase_jmx_config-sample.json
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/hbase_jmx_config-sample.json b/eagle-external/hadoop_jmx_collector/hbase_jmx_config-sample.json
new file mode 100644
index 0000000..906f134
--- /dev/null
+++ b/eagle-external/hadoop_jmx_collector/hbase_jmx_config-sample.json
@@ -0,0 +1,94 @@
+{
+  "env": {
+    "site": "sandbox"
+  },
+  "input": [
+    {
+      "component": "hbasemaster",
+      "host": "sandbox.hortonworks.com",
+      "port": "60010",
+      "https": false
+    },
+    {
+      "component": "regionserver",
+      "host": "sandbox.hortonworks.com",
+      "port": "60030",
+      "https": false
+    }
+  ],
+  "filter": {
+    "beam_group_filter": ["hadoop","java.lang","java.nio"],
+    "metric_name_filter": [
+      "hadoop.memory.heapmemoryusage.used",
+      "hadoop.memory.nonheapmemoryusage.used",
+      "hadoop.bufferpool.direct.memoryused",
+      "hadoop.hbase.master.server.averageload",
+      "hadoop.hbase.master.assignmentmanger.ritcount",
+      "hadoop.hbase.master.assignmentmanger.ritcountoverthreshold",
+      "hadoop.hbase.master.assignmentmanger.assign_num_ops",
+      "hadoop.hbase.master.assignmentmanger.assign_min",
+      "hadoop.hbase.master.assignmentmanger.assign_max",
+      "hadoop.hbase.master.assignmentmanger.assign_75th_percentile",
+      "hadoop.hbase.master.assignmentmanger.assign_95th_percentile",
+      "hadoop.hbase.master.assignmentmanger.assign_99th_percentile",
+      "hadoop.hbase.master.assignmentmanger.bulkassign_num_ops",
+      "hadoop.hbase.master.assignmentmanger.bulkassign_min",
+      "hadoop.hbase.master.assignmentmanger.bulkassign_max",
+      "hadoop.hbase.master.assignmentmanger.bulkassign_75th_percentile",
+      "hadoop.hbase.master.assignmentmanger.bulkassign_95th_percentile",
+      "hadoop.hbase.master.assignmentmanger.bulkassign_99th_percentile",
+      "hadoop.hbase.master.balancer.balancercluster_num_ops",
+      "hadoop.hbase.master.balancer.balancercluster_min",
+      "hadoop.hbase.master.balancer.balancercluster_max",
+      "hadoop.hbase.master.balancer.balancercluster_75th_percentile",
+      "hadoop.hbase.master.balancer.balancercluster_95th_percentile",
+      "hadoop.hbase.master.balancer.balancercluster_99th_percentile",
+      "hadoop.hbase.master.filesystem.hlogsplittime_min",
+      "hadoop.hbase.master.filesystem.hlogsplittime_max",
+      "hadoop.hbase.master.filesystem.hlogsplittime_75th_percentile",
+      "hadoop.hbase.master.filesystem.hlogsplittime_95th_percentile",
+      "hadoop.hbase.master.filesystem.hlogsplittime_99th_percentile",
+      "hadoop.hbase.master.filesystem.hlogsplitsize_min",
+      "hadoop.hbase.master.filesystem.hlogsplitsize_max",
+      "hadoop.hbase.master.filesystem.metahlogsplittime_min",
+      "hadoop.hbase.master.filesystem.metahlogsplittime_max",
+      "hadoop.hbase.master.filesystem.metahlogsplittime_75th_percentile",
+      "hadoop.hbase.master.filesystem.metahlogsplittime_95th_percentile",
+      "hadoop.hbase.master.filesystem.metahlogsplittime_99th_percentile",
+      "hadoop.hbase.master.filesystem.metahlogsplitsize_min",
+      "hadoop.hbase.master.filesystem.metahlogsplitsize_max",
+
+      "hadoop.hbase.jvm.gccount",
+      "hadoop.hbase.jvm.gctimemillis",
+      "hadoop.hbase.ipc.ipc.queuesize",
+      "hadoop.hbase.ipc.ipc.numcallsingeneralqueue",
+      "hadoop.hbase.ipc.ipc.numactivehandler",
+      "hadoop.hbase.ipc.ipc.queuecalltime_99th_percentile",
+      "hadoop.hbase.ipc.ipc.processcalltime_99th_percentile",
+      "hadoop.hbase.ipc.ipc.queuecalltime_num_ops",
+      "hadoop.hbase.ipc.ipc.processcalltime_num_ops",
+      "hadoop.hbase.regionserver.server.regioncount",
+      "hadoop.hbase.regionserver.server.storecount",
+      "hadoop.hbase.regionserver.server.memstoresize",
+      "hadoop.hbase.regionserver.server.storefilesize",
+      "hadoop.hbase.regionserver.server.totalrequestcount",
+      "hadoop.hbase.regionserver.server.readrequestcount",
+      "hadoop.hbase.regionserver.server.writerequestcount",
+      "hadoop.hbase.regionserver.server.splitqueuelength",
+      "hadoop.hbase.regionserver.server.compactionqueuelength",
+      "hadoop.hbase.regionserver.server.flushqueuelength",
+      "hadoop.hbase.regionserver.server.blockcachesize",
+      "hadoop.hbase.regionserver.server.blockcachehitcount",
+      "hadoop.hbase.regionserver.server.blockcounthitpercent"
+    ]
+  },
+  "output": {
+    "kafka": {
+      "debug": false,
+      "default_topic": "hadoop_jmx_metric_sandbox",
+      "broker_list": [
+        "sandbox.hortonworks.com:6667"
+      ]
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8416f64d/eagle-external/hadoop_jmx_collector/metric_collector.py
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/metric_collector.py b/eagle-external/hadoop_jmx_collector/metric_collector.py
index c6eac35..4b7b209 100644
--- a/eagle-external/hadoop_jmx_collector/metric_collector.py
+++ b/eagle-external/hadoop_jmx_collector/metric_collector.py
@@ -27,6 +27,7 @@ import types
 import httplib
 import logging
 import threading
+import fnmatch
 
 # load six
 sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), '', 'lib/six'))
@@ -40,6 +41,7 @@ logging.basicConfig(level=logging.INFO,
                     format='%(asctime)s %(name)-12s %(levelname)-6s %(message)s',
                     datefmt='%m-%d %H:%M')
 
+
 class Helper:
     def __init__(self):
         pass
@@ -213,6 +215,11 @@ class KafkaMetricSender(MetricSender):
         self.broker_list = kafka_config["broker_list"]
         self.kafka_client = None
         self.kafka_producer = None
+        self.debug_enabled = False
+        self.sent_count = 0
+        if kafka_config.has_key("debug"):
+            self.debug_enabled = bool(kafka_config["debug"])
+            logging.info("Overrode output.kafka.debug: " + str(self.debug_enabled))
 
     def get_topic_id(self, msg):
         if msg.has_key("component"):
@@ -232,16 +239,23 @@ class KafkaMetricSender(MetricSender):
                                              batch_send_every_t=30)
 
     def send(self, msg):
+        if self.debug_enabled:
+            logging.info("Send message: " + str(msg))
+        self.sent_count += 1
         self.kafka_producer.send_messages(self.get_topic_id(msg), json.dumps(msg))
 
     def close(self):
+        logging.info("Totally sent " + str(self.sent_count) + " metric events")
         if self.kafka_producer is not None:
             self.kafka_producer.stop()
         if self.kafka_client is not None:
             self.kafka_client.close()
 
 class MetricCollector(threading.Thread):
-    def __init__(self,config = None):
+    filters = []
+    config = None
+
+    def __init__(self, config=None):
         threading.Thread.__init__(self)
         self.config = None
         self.sender = None
@@ -251,7 +265,19 @@ class MetricCollector(threading.Thread):
         self.config = config
         self.sender = KafkaMetricSender(self.config)
         self.sender.open()
-        pass
+        self.filter(MetricNameFilter())
+
+        for filter in self.filters:
+            filter.init(self.config)
+
+    def filter(self, *filters):
+        """
+        :param filters: MetricFilters to register
+        :return: None
+        """
+        logging.debug("Register filters: " + str(filters))
+        for filter in filters:
+            self.filters.append(filter)
 
     def start(self):
         super(MetricCollector, self).start()
@@ -265,7 +291,15 @@ class MetricCollector(threading.Thread):
             raise Exception("host is null: " + str(msg))
         if not msg.has_key("site"):
             msg["site"] = self.config["env"]["site"]
-        self.sender.send(msg)
+        if len(self.filters) == 0:
+            self.sender.send(msg)
+            return
+        else:
+            for filter in self.filters:
+                if filter.filter_metric(msg):
+                    self.sender.send(msg)
+                    return
+        # logging.info("Drop metric: " + str(msg))
 
     def close(self):
         self.sender.close()
@@ -273,6 +307,7 @@ class MetricCollector(threading.Thread):
     def run(self):
         raise Exception("`run` method should be overrode by sub-class before being called")
 
+
 class Runner(object):
     @staticmethod
     def run(*collectors):
@@ -288,7 +323,7 @@ class Runner(object):
         elif len(argv) == 2:
             config = Helper.load_config(argv[1])
         else:
-            raise Exception("Usage: "+argv[0]+" CONFIG_FILE_PATH, but given too many arguments: " + str(argv))
+            raise Exception("Usage: " + argv[0] + " CONFIG_FILE_PATH, but given too many arguments: " + str(argv))
 
         for collector in collectors:
             try:
@@ -305,10 +340,12 @@ class Runner(object):
             finally:
                 collector.close()
 
+
 class JmxMetricCollector(MetricCollector):
     selected_domain = None
     listeners = []
     input_components = []
+    metric_prefix = "hadoop."
 
     def init(self, config):
         super(JmxMetricCollector, self).init(config)
@@ -322,8 +359,10 @@ class JmxMetricCollector(MetricCollector):
                 raise Exception("port not defined in " + str(input))
             if not input.has_key("https"):
                 input["https"] = False
-
-        self.selected_domain = [s.encode('utf-8') for s in config[u'filter'].get('monitoring.group.selected')]
+        self.selected_domain = [s.encode('utf-8') for s in config[u'filter'].get('beam_group_filter')]
+        if config["env"].has_key("metric_prefix"):
+            self.metric_prefix = config["env"]["metric_prefix"]
+            logging.info("Override env.metric_prefix: " + self.metric_prefix + ", default: hadoop.")
 
     def register(self, *listeners):
         """
@@ -347,7 +386,7 @@ class JmxMetricCollector(MetricCollector):
 
     def on_beans(self, source, beans):
         for bean in beans:
-            self.on_bean(source,bean)
+            self.on_bean(source, bean)
 
     def on_bean(self, source, bean):
         # mbean is of the form "domain:key=value,...,foo=bar"
@@ -357,7 +396,6 @@ class JmxMetricCollector(MetricCollector):
 
         if not self.filter_bean(bean, mbean_domain):
             return
-
         context = bean.get("tag.Context", "")
         metric_prefix_name = self.__build_metric_prefix(mbean_attribute, context)
 
@@ -367,7 +405,7 @@ class JmxMetricCollector(MetricCollector):
         for listener in self.listeners:
             listener.on_bean(source, bean.copy())
 
-    def on_bean_kv(self, prefix,source, key, value):
+    def on_bean_kv(self, prefix, source, key, value):
         # Skip Tags
         if re.match(r'tag.*', key):
             return
@@ -382,7 +420,9 @@ class JmxMetricCollector(MetricCollector):
     def on_metric(self, metric):
         if Helper.is_number(metric["value"]):
             self.collect(metric)
-
+        elif isinstance(metric["value"], dict):
+            for key, value in metric["value"].iteritems():
+                self.on_bean_kv(metric["metric"], metric, key, value)
         for listener in self.listeners:
             listener.on_metric(metric.copy())
 
@@ -394,14 +434,51 @@ class JmxMetricCollector(MetricCollector):
             name_index = [i[0] for i in mbean_list].index('name')
             mbean_list[name_index][1] = context
             metric_prefix_name = '.'.join([i[1] for i in mbean_list])
-        return ("hadoop." + metric_prefix_name).replace(" ", "").lower()
+        return (self.metric_prefix + metric_prefix_name).replace(" ", "").lower()
 
+# ========================
+#  Metric Listeners
+# ========================
 class JmxMetricListener:
     def init(self, collector):
         self.collector = collector
+        self.metric_prefix = self.collector.metric_prefix
 
     def on_bean(self, component, bean):
         pass
 
     def on_metric(self, metric):
-        pass
\ No newline at end of file
+        pass
+
+
+# ========================
+#  Metric Filters
+# ========================
+class MetricFilter:
+    def init(self, config={}):
+        raise Exception("init() method is called before being overrode in sub-class")
+        pass
+
+    def filter_metric(self, metric):
+        """
+        Filter metric to keep by return True, otherwise throw metric by returning False.
+        """
+        return True
+
+class MetricNameFilter(MetricFilter):
+    metric_name_filter = []
+
+    def init(self, config={}):
+        if config.has_key("filter") and config["filter"].has_key("metric_name_filter"):
+            self.metric_name_filter = config["filter"]["metric_name_filter"]
+
+        logging.debug("Override filter.metric_name_filter: " + str(self.metric_name_filter))
+
+    def filter_metric(self, metric):
+        if len(self.metric_name_filter) == 0:
+            return True
+        else:
+            for name_filter in self.metric_name_filter:
+                if fnmatch.fnmatch(metric["metric"], name_filter):
+                    return True
+        return False
\ No newline at end of file