You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2016/12/06 11:48:04 UTC
incubator-eagle git commit: [EAGLE-825] Improve jmx collector with
built-in metric filter
Repository: incubator-eagle
Updated Branches:
refs/heads/master 188ddf500 -> 8416f64da
[EAGLE-825] Improve jmx collector with built-in metric filter
## Changes
* Support wildcard metric name filter
* Add built-in important metric name filter for hadoop and hbase
## JIRA
https://issues.apache.org/jira/browse/EAGLE-825
Author: Hao Chen <ha...@apache.org>
Closes #715 from haoch/ImproveJMXCollector.
Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/8416f64d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/8416f64d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/8416f64d
Branch: refs/heads/master
Commit: 8416f64da7dcba0c48527aa678a5b2ec9a37d4a8
Parents: 188ddf5
Author: Hao Chen <ha...@apache.org>
Authored: Tue Dec 6 19:47:50 2016 +0800
Committer: Hao Chen <ha...@apache.org>
Committed: Tue Dec 6 19:47:50 2016 +0800
----------------------------------------------------------------------
.../hadoop_jmx_collector/config-sample.json | 46 ---------
.../hadoop_jmx_config-sample.json | 70 +++++++++++++
.../hadoop_jmx_collector/hadoop_jmx_kafka.py | 15 ++-
.../hbase_jmx_config-sample.json | 94 +++++++++++++++++
.../hadoop_jmx_collector/metric_collector.py | 101 ++++++++++++++++---
5 files changed, 265 insertions(+), 61 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8416f64d/eagle-external/hadoop_jmx_collector/config-sample.json
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/config-sample.json b/eagle-external/hadoop_jmx_collector/config-sample.json
deleted file mode 100644
index a0670e1..0000000
--- a/eagle-external/hadoop_jmx_collector/config-sample.json
+++ /dev/null
@@ -1,46 +0,0 @@
-{
- "env": {
- "site": "sandbox"
- },
- "input": [
- {
- "component": "namenode",
- "host": "sandbox.hortonworks.com",
- "port": "50070",
- "https": false
- },
- {
- "component": "namenode",
- "host": "sandbox.hortonworks.com",
- "port": "50070",
- "https": false
- },
- {
- "component": "resourcemanager",
- "host": "sandbox.hortonworks.com",
- "port": "8088",
- "https": false
- },
- {
- "component": "resourcemanager",
- "host": "sandbox.hortonworks.com",
- "port": "8088",
- "https": false
- }
- ],
- "filter": {
- "monitoring.group.selected": ["hadoop", "java.lang"]
- },
- "output": {
- "kafka": {
- "default_topic": "nn_jmx_metric_sandbox",
- "component_topic_mapping": {
- "namenode": "nn_jmx_metric_sandbox",
- "resourcemanager": "rm_jmx_metric_sandbox"
- },
- "broker_list": [
- "sandbox.hortonworks.com:6667"
- ]
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8416f64d/eagle-external/hadoop_jmx_collector/hadoop_jmx_config-sample.json
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/hadoop_jmx_config-sample.json b/eagle-external/hadoop_jmx_collector/hadoop_jmx_config-sample.json
new file mode 100644
index 0000000..07bdc84
--- /dev/null
+++ b/eagle-external/hadoop_jmx_collector/hadoop_jmx_config-sample.json
@@ -0,0 +1,70 @@
+{
+ "env": {
+ "site": "sandbox"
+ },
+ "input": [
+ {
+ "component": "namenode",
+ "host": "sandbox.hortonworks.com",
+ "port": "50070",
+ "https": false
+ },
+ {
+ "component": "resourcemanager",
+ "host": "sandbox.hortonworks.com",
+ "port": "19888",
+ "https": false
+ },
+ {
+ "component": "datanode",
+ "host": "sandbox.hortonworks.com",
+ "port": "50075",
+ "https": false
+ }
+ ],
+ "filter": {
+ "beam_group_filter": ["hadoop","java.lang"],
+ "metric_name_filter": [
+ "hadoop.memory.heapmemoryusage.used",
+ "hadoop.memory.nonheapmemoryusage.used",
+ "hadoop.namenode.fsnamesystemstate.capacitytotal",
+ "hadoop.namenode.dfs.capacityused",
+ "hadoop.namenode.dfs.capacityremaining",
+ "hadoop.namenode.dfs.blockstotal",
+ "hadoop.namenode.dfs.filestotal",
+ "hadoop.namenode.dfs.underreplicatedblocks",
+ "hadoop.namenode.dfs.missingblocks",
+ "hadoop.namenode.dfs.corruptblocks",
+ "hadoop.namenode.dfs.lastcheckpointtime",
+ "hadoop.namenode.dfs.transactionssincelastcheckpoint",
+ "hadoop.namenode.dfs.lastwrittentransactionid",
+ "hadoop.namenode.dfs.snapshottabledirectories",
+ "hadoop.namenode.dfs.snapshots",
+ "hadoop.namenode.rpc.rpcqueuetimeavgtime",
+ "hadoop.namenode.rpc.rpcprocessingtimeavgtime",
+ "hadoop.namenode.rpc.numopenconnections",
+ "hadoop.namenode.rpc.callqueuelength",
+
+ "hadoop.datanode.fsdatasetstate.capacity",
+ "hadoop.datanode.fsdatasetstate.dfsused",
+ "hadoop.datanode.datanodeinfo.xceivercount",
+ "hadoop.datanode.rpc.rpcqueuetimeavgtime",
+ "hadoop.datanode.rpc.rpcprocessingtimeavgtime",
+ "hadoop.datanode.rpc.numopenconnections",
+ "hadoop.datanode.rpc.callqueuelength"
+ ]
+ },
+ "output": {
+ "kafka": {
+ "debug": false,
+ "default_topic": "hadoop_jmx_metric_sandbox",
+ "component_topic_mapping": {
+ "namenode": "nn_jmx_metric_sandbox",
+ "resourcemanager": "rm_jmx_metric_sandbox"
+ },
+ "broker_list": [
+ "sandbox.hortonworks.com:6667"
+ ]
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8416f64d/eagle-external/hadoop_jmx_collector/hadoop_jmx_kafka.py
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/hadoop_jmx_kafka.py b/eagle-external/hadoop_jmx_collector/hadoop_jmx_kafka.py
index b42bc81..799d351 100644
--- a/eagle-external/hadoop_jmx_collector/hadoop_jmx_kafka.py
+++ b/eagle-external/hadoop_jmx_collector/hadoop_jmx_kafka.py
@@ -17,7 +17,7 @@
#
from metric_collector import JmxMetricCollector,JmxMetricListener,Runner
-import json
+import json, logging, fnmatch
class NNSafeModeMetric(JmxMetricListener):
def on_metric(self, metric):
@@ -28,7 +28,6 @@ class NNSafeModeMetric(JmxMetricListener):
metric["value"] = 0
self.collector.collect(metric)
-
class NNHAMetric(JmxMetricListener):
PREFIX = "hadoop.namenode.fsnamesystem"
@@ -74,6 +73,15 @@ class JournalTransactionInfoMetric(JmxMetricListener):
self.collector.on_bean_kv(self.PREFIX, component, "LastAppliedOrWrittenTxId", LastAppliedOrWrittenTxId)
self.collector.on_bean_kv(self.PREFIX, component, "MostRecentCheckpointTxId", MostRecentCheckpointTxId)
+class DatanodeFSDatasetState(JmxMetricListener):
+ def on_metric(self, metric):
+ if fnmatch.fnmatch(metric["metric"], "hadoop.datanode.fsdatasetstate-*.capacity"):
+ metric["metric"] = "hadoop.datanode.fsdatasetstate.capacity"
+ self.collector.collect(metric)
+ elif fnmatch.fnmatch(metric["metric"], "hadoop.datanode.fsdatasetstate-*.dfsused"):
+ metric["metric"] = "hadoop.datanode.fsdatasetstate.dfsused"
+ self.collector.collect(metric)
+
if __name__ == '__main__':
collector = JmxMetricCollector()
collector.register(
@@ -81,6 +89,7 @@ if __name__ == '__main__':
NNHAMetric(),
MemoryUsageMetric(),
NNCapacityUsageMetric(),
- JournalTransactionInfoMetric()
+ JournalTransactionInfoMetric(),
+ DatanodeFSDatasetState()
)
Runner.run(collector)
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8416f64d/eagle-external/hadoop_jmx_collector/hbase_jmx_config-sample.json
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/hbase_jmx_config-sample.json b/eagle-external/hadoop_jmx_collector/hbase_jmx_config-sample.json
new file mode 100644
index 0000000..906f134
--- /dev/null
+++ b/eagle-external/hadoop_jmx_collector/hbase_jmx_config-sample.json
@@ -0,0 +1,94 @@
+{
+ "env": {
+ "site": "sandbox"
+ },
+ "input": [
+ {
+ "component": "hbasemaster",
+ "host": "sandbox.hortonworks.com",
+ "port": "60010",
+ "https": false
+ },
+ {
+ "component": "regionserver",
+ "host": "sandbox.hortonworks.com",
+ "port": "60030",
+ "https": false
+ }
+ ],
+ "filter": {
+ "beam_group_filter": ["hadoop","java.lang","java.nio"],
+ "metric_name_filter": [
+ "hadoop.memory.heapmemoryusage.used",
+ "hadoop.memory.nonheapmemoryusage.used",
+ "hadoop.bufferpool.direct.memoryused",
+ "hadoop.hbase.master.server.averageload",
+ "hadoop.hbase.master.assignmentmanger.ritcount",
+ "hadoop.hbase.master.assignmentmanger.ritcountoverthreshold",
+ "hadoop.hbase.master.assignmentmanger.assign_num_ops",
+ "hadoop.hbase.master.assignmentmanger.assign_min",
+ "hadoop.hbase.master.assignmentmanger.assign_max",
+ "hadoop.hbase.master.assignmentmanger.assign_75th_percentile",
+ "hadoop.hbase.master.assignmentmanger.assign_95th_percentile",
+ "hadoop.hbase.master.assignmentmanger.assign_99th_percentile",
+ "hadoop.hbase.master.assignmentmanger.bulkassign_num_ops",
+ "hadoop.hbase.master.assignmentmanger.bulkassign_min",
+ "hadoop.hbase.master.assignmentmanger.bulkassign_max",
+ "hadoop.hbase.master.assignmentmanger.bulkassign_75th_percentile",
+ "hadoop.hbase.master.assignmentmanger.bulkassign_95th_percentile",
+ "hadoop.hbase.master.assignmentmanger.bulkassign_99th_percentile",
+ "hadoop.hbase.master.balancer.balancercluster_num_ops",
+ "hadoop.hbase.master.balancer.balancercluster_min",
+ "hadoop.hbase.master.balancer.balancercluster_max",
+ "hadoop.hbase.master.balancer.balancercluster_75th_percentile",
+ "hadoop.hbase.master.balancer.balancercluster_95th_percentile",
+ "hadoop.hbase.master.balancer.balancercluster_99th_percentile",
+ "hadoop.hbase.master.filesystem.hlogsplittime_min",
+ "hadoop.hbase.master.filesystem.hlogsplittime_max",
+ "hadoop.hbase.master.filesystem.hlogsplittime_75th_percentile",
+ "hadoop.hbase.master.filesystem.hlogsplittime_95th_percentile",
+ "hadoop.hbase.master.filesystem.hlogsplittime_99th_percentile",
+ "hadoop.hbase.master.filesystem.hlogsplitsize_min",
+ "hadoop.hbase.master.filesystem.hlogsplitsize_max",
+ "hadoop.hbase.master.filesystem.metahlogsplittime_min",
+ "hadoop.hbase.master.filesystem.metahlogsplittime_max",
+ "hadoop.hbase.master.filesystem.metahlogsplittime_75th_percentile",
+ "hadoop.hbase.master.filesystem.metahlogsplittime_95th_percentile",
+ "hadoop.hbase.master.filesystem.metahlogsplittime_99th_percentile",
+ "hadoop.hbase.master.filesystem.metahlogsplitsize_min",
+ "hadoop.hbase.master.filesystem.metahlogsplitsize_max",
+
+ "hadoop.hbase.jvm.gccount",
+ "hadoop.hbase.jvm.gctimemillis",
+ "hadoop.hbase.ipc.ipc.queuesize",
+ "hadoop.hbase.ipc.ipc.numcallsingeneralqueue",
+ "hadoop.hbase.ipc.ipc.numactivehandler",
+ "hadoop.hbase.ipc.ipc.queuecalltime_99th_percentile",
+ "hadoop.hbase.ipc.ipc.processcalltime_99th_percentile",
+ "hadoop.hbase.ipc.ipc.queuecalltime_num_ops",
+ "hadoop.hbase.ipc.ipc.processcalltime_num_ops",
+ "hadoop.hbase.regionserver.server.regioncount",
+ "hadoop.hbase.regionserver.server.storecount",
+ "hadoop.hbase.regionserver.server.memstoresize",
+ "hadoop.hbase.regionserver.server.storefilesize",
+ "hadoop.hbase.regionserver.server.totalrequestcount",
+ "hadoop.hbase.regionserver.server.readrequestcount",
+ "hadoop.hbase.regionserver.server.writerequestcount",
+ "hadoop.hbase.regionserver.server.splitqueuelength",
+ "hadoop.hbase.regionserver.server.compactionqueuelength",
+ "hadoop.hbase.regionserver.server.flushqueuelength",
+ "hadoop.hbase.regionserver.server.blockcachesize",
+ "hadoop.hbase.regionserver.server.blockcachehitcount",
+ "hadoop.hbase.regionserver.server.blockcounthitpercent"
+ ]
+ },
+ "output": {
+ "kafka": {
+ "debug": false,
+ "default_topic": "hadoop_jmx_metric_sandbox",
+ "broker_list": [
+ "sandbox.hortonworks.com:6667"
+ ]
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/8416f64d/eagle-external/hadoop_jmx_collector/metric_collector.py
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/metric_collector.py b/eagle-external/hadoop_jmx_collector/metric_collector.py
index c6eac35..4b7b209 100644
--- a/eagle-external/hadoop_jmx_collector/metric_collector.py
+++ b/eagle-external/hadoop_jmx_collector/metric_collector.py
@@ -27,6 +27,7 @@ import types
import httplib
import logging
import threading
+import fnmatch
# load six
sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), '', 'lib/six'))
@@ -40,6 +41,7 @@ logging.basicConfig(level=logging.INFO,
format='%(asctime)s %(name)-12s %(levelname)-6s %(message)s',
datefmt='%m-%d %H:%M')
+
class Helper:
def __init__(self):
pass
@@ -213,6 +215,11 @@ class KafkaMetricSender(MetricSender):
self.broker_list = kafka_config["broker_list"]
self.kafka_client = None
self.kafka_producer = None
+ self.debug_enabled = False
+ self.sent_count = 0
+ if kafka_config.has_key("debug"):
+ self.debug_enabled = bool(kafka_config["debug"])
+ logging.info("Overrode output.kafka.debug: " + str(self.debug_enabled))
def get_topic_id(self, msg):
if msg.has_key("component"):
@@ -232,16 +239,23 @@ class KafkaMetricSender(MetricSender):
batch_send_every_t=30)
def send(self, msg):
+ if self.debug_enabled:
+ logging.info("Send message: " + str(msg))
+ self.sent_count += 1
self.kafka_producer.send_messages(self.get_topic_id(msg), json.dumps(msg))
def close(self):
+ logging.info("Totally sent " + str(self.sent_count) + " metric events")
if self.kafka_producer is not None:
self.kafka_producer.stop()
if self.kafka_client is not None:
self.kafka_client.close()
class MetricCollector(threading.Thread):
- def __init__(self,config = None):
+ filters = []
+ config = None
+
+ def __init__(self, config=None):
threading.Thread.__init__(self)
self.config = None
self.sender = None
@@ -251,7 +265,19 @@ class MetricCollector(threading.Thread):
self.config = config
self.sender = KafkaMetricSender(self.config)
self.sender.open()
- pass
+ self.filter(MetricNameFilter())
+
+ for filter in self.filters:
+ filter.init(self.config)
+
+ def filter(self, *filters):
+ """
+ :param filters: MetricFilters to register
+ :return: None
+ """
+ logging.debug("Register filters: " + str(filters))
+ for filter in filters:
+ self.filters.append(filter)
def start(self):
super(MetricCollector, self).start()
@@ -265,7 +291,15 @@ class MetricCollector(threading.Thread):
raise Exception("host is null: " + str(msg))
if not msg.has_key("site"):
msg["site"] = self.config["env"]["site"]
- self.sender.send(msg)
+ if len(self.filters) == 0:
+ self.sender.send(msg)
+ return
+ else:
+ for filter in self.filters:
+ if filter.filter_metric(msg):
+ self.sender.send(msg)
+ return
+ # logging.info("Drop metric: " + str(msg))
def close(self):
self.sender.close()
@@ -273,6 +307,7 @@ class MetricCollector(threading.Thread):
def run(self):
raise Exception("`run` method should be overrode by sub-class before being called")
+
class Runner(object):
@staticmethod
def run(*collectors):
@@ -288,7 +323,7 @@ class Runner(object):
elif len(argv) == 2:
config = Helper.load_config(argv[1])
else:
- raise Exception("Usage: "+argv[0]+" CONFIG_FILE_PATH, but given too many arguments: " + str(argv))
+ raise Exception("Usage: " + argv[0] + " CONFIG_FILE_PATH, but given too many arguments: " + str(argv))
for collector in collectors:
try:
@@ -305,10 +340,12 @@ class Runner(object):
finally:
collector.close()
+
class JmxMetricCollector(MetricCollector):
selected_domain = None
listeners = []
input_components = []
+ metric_prefix = "hadoop."
def init(self, config):
super(JmxMetricCollector, self).init(config)
@@ -322,8 +359,10 @@ class JmxMetricCollector(MetricCollector):
raise Exception("port not defined in " + str(input))
if not input.has_key("https"):
input["https"] = False
-
- self.selected_domain = [s.encode('utf-8') for s in config[u'filter'].get('monitoring.group.selected')]
+ self.selected_domain = [s.encode('utf-8') for s in config[u'filter'].get('beam_group_filter')]
+ if config["env"].has_key("metric_prefix"):
+ self.metric_prefix = config["env"]["metric_prefix"]
+ logging.info("Override env.metric_prefix: " + self.metric_prefix + ", default: hadoop.")
def register(self, *listeners):
"""
@@ -347,7 +386,7 @@ class JmxMetricCollector(MetricCollector):
def on_beans(self, source, beans):
for bean in beans:
- self.on_bean(source,bean)
+ self.on_bean(source, bean)
def on_bean(self, source, bean):
# mbean is of the form "domain:key=value,...,foo=bar"
@@ -357,7 +396,6 @@ class JmxMetricCollector(MetricCollector):
if not self.filter_bean(bean, mbean_domain):
return
-
context = bean.get("tag.Context", "")
metric_prefix_name = self.__build_metric_prefix(mbean_attribute, context)
@@ -367,7 +405,7 @@ class JmxMetricCollector(MetricCollector):
for listener in self.listeners:
listener.on_bean(source, bean.copy())
- def on_bean_kv(self, prefix,source, key, value):
+ def on_bean_kv(self, prefix, source, key, value):
# Skip Tags
if re.match(r'tag.*', key):
return
@@ -382,7 +420,9 @@ class JmxMetricCollector(MetricCollector):
def on_metric(self, metric):
if Helper.is_number(metric["value"]):
self.collect(metric)
-
+ elif isinstance(metric["value"], dict):
+ for key, value in metric["value"].iteritems():
+ self.on_bean_kv(metric["metric"], metric, key, value)
for listener in self.listeners:
listener.on_metric(metric.copy())
@@ -394,14 +434,51 @@ class JmxMetricCollector(MetricCollector):
name_index = [i[0] for i in mbean_list].index('name')
mbean_list[name_index][1] = context
metric_prefix_name = '.'.join([i[1] for i in mbean_list])
- return ("hadoop." + metric_prefix_name).replace(" ", "").lower()
+ return (self.metric_prefix + metric_prefix_name).replace(" ", "").lower()
+# ========================
+# Metric Listeners
+# ========================
class JmxMetricListener:
def init(self, collector):
self.collector = collector
+ self.metric_prefix = self.collector.metric_prefix
def on_bean(self, component, bean):
pass
def on_metric(self, metric):
- pass
\ No newline at end of file
+ pass
+
+
+# ========================
+# Metric Filters
+# ========================
+class MetricFilter:
+ def init(self, config={}):
+ raise Exception("init() method is called before being overrode in sub-class")
+ pass
+
+ def filter_metric(self, metric):
+ """
+ Filter metric to keep by return True, otherwise throw metric by returning False.
+ """
+ return True
+
+class MetricNameFilter(MetricFilter):
+ metric_name_filter = []
+
+ def init(self, config={}):
+ if config.has_key("filter") and config["filter"].has_key("metric_name_filter"):
+ self.metric_name_filter = config["filter"]["metric_name_filter"]
+
+ logging.debug("Override filter.metric_name_filter: " + str(self.metric_name_filter))
+
+ def filter_metric(self, metric):
+ if len(self.metric_name_filter) == 0:
+ return True
+ else:
+ for name_filter in self.metric_name_filter:
+ if fnmatch.fnmatch(metric["metric"], name_filter):
+ return True
+ return False
\ No newline at end of file