You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2017/01/12 03:26:17 UTC
eagle git commit: [EAGLE-849] Refactor system metric collector scripts
Repository: eagle
Updated Branches:
refs/heads/master b4695801f -> 3af9ac480
[EAGLE-849] Refactor system metric collector scripts
Refactor System metric collector python script following similar framework as existing jmx metric collector.
https://issues.apache.org/jira/browse/EAGLE-849
Author: Hao Chen <ha...@apache.org>
Closes #763 from haoch/SystemMetricCollector.
Project: http://git-wip-us.apache.org/repos/asf/eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/eagle/commit/3af9ac48
Tree: http://git-wip-us.apache.org/repos/asf/eagle/tree/3af9ac48
Diff: http://git-wip-us.apache.org/repos/asf/eagle/diff/3af9ac48
Branch: refs/heads/master
Commit: 3af9ac4802a9dec2f2160aa98cd3b90287e14d48
Parents: b469580
Author: Hao Chen <ha...@apache.org>
Authored: Thu Jan 12 11:26:10 2017 +0800
Committer: Hao Chen <ha...@apache.org>
Committed: Thu Jan 12 11:26:10 2017 +0800
----------------------------------------------------------------------
.../src/main/resources/ALERT_LIGHT_TEMPLATE.vm | 2 +-
.../system_metric_collector.py | 336 ++++++++++++++++
.../system_metric_config-sample.json | 20 +
.../hadoop_jmx_collector/system_metric_kafka.py | 393 -------------------
4 files changed, 357 insertions(+), 394 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/eagle/blob/3af9ac48/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/ALERT_LIGHT_TEMPLATE.vm
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/ALERT_LIGHT_TEMPLATE.vm b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/ALERT_LIGHT_TEMPLATE.vm
index 0eb5efc..f273917 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/ALERT_LIGHT_TEMPLATE.vm
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/ALERT_LIGHT_TEMPLATE.vm
@@ -482,7 +482,7 @@
<div class="footer">
<table width="100%">
<tr>
- <td class="aligncenter content-block">Powered by <a href="http://eagle.incubator.apache.org">Apache Eagle</a> (version: $alert["version"])</td>
+ <td class="aligncenter content-block">Powered by <a href="http://eagle.apache.org">Apache Eagle</a> (version: $alert["version"])</td>
</tr>
</table>
</div>
http://git-wip-us.apache.org/repos/asf/eagle/blob/3af9ac48/eagle-external/hadoop_jmx_collector/system_metric_collector.py
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/system_metric_collector.py b/eagle-external/hadoop_jmx_collector/system_metric_collector.py
new file mode 100644
index 0000000..e0ffecc
--- /dev/null
+++ b/eagle-external/hadoop_jmx_collector/system_metric_collector.py
@@ -0,0 +1,336 @@
+# !/usr/bin/python
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from metric_collector import MetricCollector, Runner
+import logging, socket, string, os, re, time
+
+
+class SystemMetricCollector(MetricCollector):
+ METRIC_PREFIX = "system"
+ METRIC_NAME_EXCLUDE = re.compile(r"[\(|\)]")
+
+ def run(self):
+ if self.config["env"].has_key("cpu_stat_file"):
+ self.cpu_stat_file = self.config["env"]["cpu_stat_file"]
+ logging.info("Overrode env.cpu_stat_file: %s", self.cpu_stat_file)
+ else:
+ self.cpu_stat_file = "/tmp/eagle_cpu_usage_state"
+ logging.info("Using default env.cpu_stat_file: %s", self.cpu_stat_file)
+
+ self.try_exec_func(
+ self.collect_cpu_metric,
+ self.collect_uptime_metric,
+ self.collect_memory_metric,
+ self.collect_loadavg_metric,
+ self.collect_cpu_temp_metric,
+ self.collect_nic_metric,
+ self.collect_smartdisk_metric,
+ self.collect_diskstat_metric
+ )
+
+ def try_exec_func(self, *funcs):
+ result = dict()
+ succeed_num = 0
+ failed_num = 0
+ for func in funcs:
+ try:
+ logging.info("Executing: %s", func.__name__)
+ func()
+ result[func.__name__] = "success"
+ succeed_num = succeed_num + 1
+ except Exception as e:
+ logging.warn("Failed to execute: %s", func.__name__)
+ logging.exception(e)
+ result[func.__name__] = "error: %s: %s" % (type(e), e)
+ failed_num = failed_num + 1
+ result_desc = ""
+ for key in result:
+ result_desc = result_desc + "%-30s: %-30s\n" % (key, result[key])
+ logging.info("Execution result (total: %s, succeed: %s, failed: %s): \n\n%s", len(funcs), succeed_num,
+ failed_num, result_desc)
+
+ # ====================================
+ # CPU Usage
+ # ====================================
+
+ def collect_cpu_metric(self):
+ """
+ CPU Usage Percentage Metrics:
+
+ system.cpu.usage: (user + nice + system + wait + irq + softirq + steal + guest) / (user + nice + system + idle + wait + irq + softirq + steal + guest)
+
+ Example:
+
+ {'timestamp': 1483594861458, 'metric': 'system.cpu.usage', 'site': u'sandbox', 'value': 0.048, 'host': 'localhost', 'device': 'cpuN'}
+
+ system.cpu.totalusage: Sum(Each CPU Usage) / Sum (CPU Total)
+
+ Example:
+
+ {'timestamp': 1483594861484, 'metric': 'system.cpu.totalusage', 'site': u'sandbox', 'value': 0.17, 'host': 'sandbox.hortonworks.com', 'device': 'cpu'}
+
+ """
+
+ cpu_metric = self.new_metric()
+ cpu_info = os.popen('cat /proc/stat').readlines()
+ dimensions = ["cpu", "user", "nice", "system", "idle", "wait", "irq", "softirq", "steal", "guest"]
+
+ total_cpu = 0
+ total_cpu_usage = 0
+ cpu_stat_pre = None
+
+ data_dir = self.cpu_stat_file
+ if os.path.exists(data_dir):
+ fd = open(data_dir, "r")
+ cpu_stat_pre = fd.read()
+ fd.close()
+
+ for item in cpu_info:
+ if re.match(r'^cpu\d+', item) is None:
+ continue
+
+ items = re.split("\s+", item.strip())
+ demens = min(len(dimensions), len(items))
+ metric_event = dict()
+ for i in range(1, demens):
+ metric_event[dimensions[i]] = int(items[i])
+ cpu_metric['timestamp'] = int(round(time.time() * 1000))
+ cpu_metric['metric'] = self.METRIC_PREFIX + "." + 'cpu.' + dimensions[i]
+ cpu_metric['device'] = items[0]
+ cpu_metric['value'] = items[i]
+ self.collect(cpu_metric)
+
+ per_cpu_usage = metric_event["user"] + metric_event["nice"] + metric_event["system"] + metric_event[
+ "wait"] + metric_event["irq"] + metric_event["softirq"] + metric_event["steal"] + metric_event["guest"]
+ per_cpu_total = metric_event["user"] + metric_event["nice"] + metric_event["system"] + metric_event[
+ "idle"] + metric_event["wait"] + metric_event["irq"] + metric_event["softirq"] + metric_event["steal"] + metric_event["guest"]
+ total_cpu += per_cpu_total
+ total_cpu_usage += per_cpu_usage
+
+ # system.cpu.usage
+ cpu_metric['timestamp'] = int(round(time.time() * 1000))
+ cpu_metric['metric'] = self.METRIC_PREFIX + "." + 'cpu.' + "usage"
+ cpu_metric['device'] = items[0]
+ cpu_metric['value'] = per_cpu_usage * 1.0 /per_cpu_total
+ self.collect(cpu_metric)
+
+ cup_stat_current = str(total_cpu_usage) + " " + str(total_cpu)
+ logging.info("Current cpu stat: %s", cup_stat_current)
+ fd = open(data_dir, "w")
+ fd.write(cup_stat_current)
+ fd.close()
+
+ pre_total_cpu_usage = 0
+ pre_total_cpu = 0
+ if cpu_stat_pre != None:
+ result = re.split("\s+", cpu_stat_pre.rstrip())
+ pre_total_cpu_usage = int(result[0])
+ pre_total_cpu = int(result[1])
+ cpu_metric['timestamp'] = int(round(time.time() * 1000))
+ cpu_metric['metric'] = self.METRIC_PREFIX + "." + 'cpu.' + "totalusage"
+ cpu_metric['device'] = "cpu"
+ cpu_metric['value'] = (total_cpu_usage - pre_total_cpu_usage) * 1.0 / (total_cpu - pre_total_cpu)
+
+ self.collect(cpu_metric)
+
+ # ====================================
+ # OS Up Time
+ # ====================================
+
+ def collect_uptime_metric(self):
+ metric = self.new_metric()
+ demension = ["uptime.day", "idletime.day"]
+ output = os.popen('cat /proc/uptime').readlines()
+
+ for item in output:
+ items = re.split("\s+", item.rstrip())
+ for i in range(len(demension)):
+ metric["timestamp"] = int(round(time.time() * 1000))
+ metric["metric"] = self.METRIC_PREFIX + "." + 'uptime' + '.' + demension[i]
+ metric["value"] = str(round(float(items[i]) / 86400, 2))
+ self.collect(metric)
+
+ # ====================================
+ # Memory
+ # ====================================
+
+ def collect_memory_metric(self):
+ event = self.new_metric()
+ event["host"] = self.fqdn
+ output = os.popen('cat /proc/meminfo').readlines()
+ mem_info = dict()
+ for item in output:
+ items = re.split(":?\s+", item.rstrip())
+ # print items
+ mem_info[items[0]] = int(items[1])
+ itemNum = len(items)
+ metric = 'memory' + '.' + items[0]
+ if (len(items) > 2):
+ metric = metric + '.' + items[2]
+ event["timestamp"] = int(round(time.time() * 1000))
+ event["metric"] = self.METRIC_NAME_EXCLUDE.sub("", self.METRIC_PREFIX + "." + metric.lower())
+ event["value"] = items[1]
+ event["device"] = 'memory'
+ self.collect(event)
+
+ usage = (mem_info['MemTotal'] - mem_info['MemFree'] - mem_info['Buffers'] - mem_info['Cached']) * 100.0 / \
+ mem_info[
+ 'MemTotal']
+ usage = round(usage, 2)
+ self.emit_metric(event, self.METRIC_PREFIX, "memory.usage", usage, "memory")
+
+ # ====================================
+ # Load AVG
+ # ====================================
+
+ def collect_loadavg_metric(self):
+ """
+ Collect Load Avg Metrics
+ """
+ demension = ['cpu.loadavg.1min', 'cpu.loadavg.5min', 'cpu.loadavg.15min']
+ output = os.popen('cat /proc/loadavg').readlines()
+ for item in output:
+ items = re.split("\s+", item.rstrip())
+ demens = min(len(demension), len(items))
+ for i in range(demens):
+ event = self.new_metric()
+ event["timestamp"] = int(round(time.time() * 1000))
+ event["metric"] = self.METRIC_PREFIX + "." + demension[i]
+ event["value"] = items[i]
+ event["device"] = 'cpu'
+ self.collect(event)
+
+ # ====================================
+ # IPMI CPU Temp
+ # ====================================
+
+ def collect_cpu_temp_metric(self):
+ output = os.popen('sudo ipmitool sdr | grep Temp | grep CPU').readlines()
+ for item in output:
+ items = re.split("^(CPU\d+)\sTemp\.\s+\|\s+(\d+|\d+\.\d+)\s", item.rstrip())
+ event = self.new_metric()
+ event["timestamp"] = int(round(time.time() * 1000))
+ event["metric"] = DATA_TYPE + "." + 'cpu.temp'
+ event["value"] = items[2]
+ event["device"] = item[1]
+ self.collect(event)
+
+ # ====================================
+ # NIC Metrics
+ # ====================================
+
+ def collect_nic_metric(self):
+ demension = ['receivedbytes', 'receivedpackets', 'receivederrs', 'receiveddrop', 'transmitbytes',
+ 'transmitpackets',
+ 'transmiterrs', 'transmitdrop']
+ output = os.popen("cat /proc/net/dev").readlines()
+
+ for item in output:
+ if re.match(r'^\s+eth\d+:', item) is None:
+ continue
+ items = re.split("[:\s]+", item.strip())
+ filtered_items = items[1:5] + items[9:13]
+
+ for i in range(len(demension)):
+ kafka_dict = self.new_metric()
+ kafka_dict["timestamp"] = int(round(time.time() * 1000))
+ kafka_dict['metric'] = self.METRIC_PREFIX + "." + 'nic.' + demension[i]
+ kafka_dict["value"] = filtered_items[i]
+ kafka_dict["device"] = items[0]
+ self.collect(kafka_dict)
+
+ # ====================================
+ # Smart Disk Metrics
+ # ====================================
+
+ def collect_smartdisk_metric(self):
+ harddisks = os.popen("lsscsi").readlines()
+ for item in harddisks:
+ items = re.split('\/', item.strip())
+ # print items
+ smartctl = os.popen('sudo smartctl -A /dev/' + items[-1]).readlines()
+ for line in smartctl:
+ line = line.strip()
+ if re.match(r'^[\d]+\s', line) is None:
+ continue
+ lineitems = re.split("\s+", line)
+ metric = 'smartdisk.' + lineitems[1]
+ kafka_dict = self.new_metric()
+ kafka_dict['metric'] = DATA_TYPE + "." + metric.lower()
+ kafka_dict["timestamp"] = int(round(time.time() * 1000))
+ kafka_dict["value"] = lineitems[-1]
+ kafka_dict["device"] = 'smartdisk'
+ self.collect(kafka_dict)
+
+ # ====================================
+ # Disk Stat Metrics
+ # ====================================
+
+ def collect_diskstat_metric(self):
+ """
+ FIXME: IndexError: list index out of range
+ """
+ demension = ['readrate', 'writerate', 'avgwaittime', 'utilization', 'disktotal', 'diskused', 'usage']
+ iostat_output = os.popen("iostat -xk 1 2 | grep ^sd").readlines()
+ # remove the first set of elements
+ iostat_output = iostat_output[len(iostat_output) / 2:]
+ iostat_dict = {}
+ for item in iostat_output:
+ items = re.split('\s+', item.strip())
+ filtered_items = [items[5], items[6], items[9], items[11]]
+ iostat_dict[items[0]] = filtered_items
+
+ disk_output = os.popen("df -k | grep ^/dev").readlines()
+ for item in disk_output:
+ items = re.split('\s+', item.strip())
+ disks = re.split('^\/dev\/(\w+)\d+$', items[0])
+ logging.info(len(disks))
+ disk = disks[1]
+ iostat_dict[disk].append(items[1])
+ iostat_dict[disk].append(items[2])
+ iostat_dict[disk].append(items[4].rstrip('%'))
+
+ for key, metrics in iostat_dict.iteritems():
+ for i in range(len(metrics)):
+ metric = 'disk.' + demension[i]
+ kafka_dict = self.new_metric()
+ kafka_dict['metric'] = DATA_TYPE + "." + metric.lower()
+ kafka_dict["timestamp"] = int(round(time.time() * 1000))
+ kafka_dict["value"] = metrics[i]
+ kafka_dict["device"] = key
+ self.collect(kafka_dict)
+
+ # ====================================
+ # Helper Methods
+ # ====================================
+
+ def emit_metric(self, event, prefix, metric, value, device):
+ event["timestamp"] = int(round(time.time() * 1000))
+ event["metric"] = prefix + "." + metric.lower()
+ event["value"] = str(value)
+ event["device"] = device
+ self.collect(event)
+
+ def new_metric(self):
+ metric = dict()
+ metric["host"] = self.fqdn
+ return metric
+
+
+if __name__ == '__main__':
+ Runner.run(SystemMetricCollector())
http://git-wip-us.apache.org/repos/asf/eagle/blob/3af9ac48/eagle-external/hadoop_jmx_collector/system_metric_config-sample.json
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/system_metric_config-sample.json b/eagle-external/hadoop_jmx_collector/system_metric_config-sample.json
new file mode 100644
index 0000000..6fcd43b
--- /dev/null
+++ b/eagle-external/hadoop_jmx_collector/system_metric_config-sample.json
@@ -0,0 +1,20 @@
+{
+ "env": {
+ "site": "sandbox",
+ "log_file": "/tmp/hadoop-jmx-collector.log",
+ "cpu_stat_file": "/tmp/eagle_cpu_usage_state"
+ },
+ "input": [
+ ],
+ "filter": {
+ },
+ "output": {
+ "kafka": {
+ "debug": false,
+ "default_topic": "system_metric_sandbox",
+ "broker_list": [
+ "sandbox.hortonworks.com:6667"
+ ]
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/eagle/blob/3af9ac48/eagle-external/hadoop_jmx_collector/system_metric_kafka.py
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/system_metric_kafka.py b/eagle-external/hadoop_jmx_collector/system_metric_kafka.py
deleted file mode 100644
index 7d805d7..0000000
--- a/eagle-external/hadoop_jmx_collector/system_metric_kafka.py
+++ /dev/null
@@ -1,393 +0,0 @@
-#!/usr/bin/python
-
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import os
-import re
-import time
-import json
-import sys
-import socket
-import types
-import re
-import errno
-
-# load kafka-python
-sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), '', 'lib/six'))
-import six
-
-# load kafka-python
-sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), '', 'lib/kafka-python'))
-from kafka import KafkaClient, SimpleProducer, SimpleConsumer
-
-from util_func import *
-
-TOPIC = "cronus_sys_metrics"
-DATA_TYPE = "system"
-
-METRIC_NAME_EXCLUDE = re.compile(r"[\(|\)]")
-
-DEBUG_KAFKA_HOST = []
-PROD_KAFKA_HOST = []
-
-PORT_MAP = {
- "60030": "regionserver",
- "50075": "datanode",
- "50070": "namenode",
- "60010": "master",
- "50030": "resourcemanager",
- "50060": "nodemanager",
- "8480": "journalnode"
-}
-
-def readFile(filename):
- f = open(filename, 'r')
- s = f.read()
- f.close()
- return s
-
-def kafka_connect(host):
- print "Connecting to kafka " + str(host)
- # To send messages synchronously
- kafka = KafkaClient(host, timeout=58)
- producer = SimpleProducer(kafka, batch_send=True, batch_send_every_n=500, batch_send_every_t=30)
- return kafka, producer
-
-
-def kafka_close(kafka, producer):
- if producer is not None:
- producer.stop()
- if kafka is not None:
- kafka.close()
-
-
-def kafka_produce(producer, topic, kafka_json):
- # Note that the application is responsible for encoding messages to type str
- if producer != None :
- producer.send_messages(topic, kafka_json)
- else:
- print kafka_json
-
-
-def addExtraMetric(producer, kafka_dict, metric, value, device, topic):
- kafka_dict["timestamp"] = int(round(time.time() * 1000))
- kafka_dict["metric"] = DATA_TYPE + "." + metric.lower()
- kafka_dict["value"] = str(value)
- kafka_dict["device"] = device
- kafka_json = json.dumps(kafka_dict)
- print(kafka_json)
- kafka_produce(producer, topic, kafka_json)
-
-
-def getCPU(producer, kafka_dict, topic):
- cpu_info = os.popen('cat /proc/stat').readlines()
- demension = ["cpu", "user", "nice", "system", "idle", "wait", "irq", "softirq", "steal", "guest"]
-
- total_cpu = 0
- total_cpu_usage = 0
- cpu_stat_pre = None
-
- data_dir = "/tmp/eagle_cpu_stat_previous"
- if os.path.exists(data_dir):
- fd = open(data_dir, "r")
- cpu_stat_pre = fd.read()
- fd.close()
-
- for item in cpu_info:
- if re.match(r'^cpu\d+', item) is None:
- continue
-
- items = re.split("\s+", item.strip())
- demens = min(len(demension), len(items))
- # print items
- tuple = dict()
- for i in range(1, demens):
- # if not isNumber(items[i]):
- # continue
-
- tuple[demension[i]] = int(items[i])
-
- kafka_dict['timestamp'] = int(round(time.time() * 1000))
- kafka_dict['metric'] = DATA_TYPE + "." + 'cpu.' + demension[i]
- kafka_dict['device'] = items[0]
- kafka_dict['value'] = items[i]
- kafka_json = json.dumps(kafka_dict)
- #print kafka_json
- kafka_produce(producer, topic, kafka_json)
-
- per_cpu_usage = tuple["user"] + tuple["nice"] + tuple["system"] + tuple["wait"] + tuple["irq"] + tuple[
- "softirq"] + tuple["steal"] + tuple["guest"]
- per_cpu_total = tuple["user"] + tuple["nice"] + tuple["system"] + tuple["idle"] + tuple["wait"] + tuple["irq"] + \
- tuple["softirq"] + tuple["steal"] + tuple["guest"]
- total_cpu += per_cpu_total
- total_cpu_usage += per_cpu_usage
-
- # system.cpu.usage
- kafka_dict['timestamp'] = int(round(time.time() * 1000))
- kafka_dict['metric'] = DATA_TYPE + "." + 'cpu.' + "perusage"
- kafka_dict['device'] = items[0]
- kafka_dict['value'] = str(round(per_cpu_usage * 100.0 / per_cpu_total, 2))
- kafka_json = json.dumps(kafka_dict)
- print kafka_json
- kafka_produce(producer, topic, kafka_json)
-
- cup_stat_current = str(total_cpu_usage) + " " + str(total_cpu)
- print cup_stat_current
- fd = open(data_dir, "w")
- fd.write(cup_stat_current)
- fd.close()
-
- pre_total_cpu_usage = 0
- pre_total_cpu = 0
- if cpu_stat_pre != None:
- result = re.split("\s+", cpu_stat_pre.rstrip())
- pre_total_cpu_usage = int(result[0])
- pre_total_cpu = int(result[1])
- kafka_dict['timestamp'] = int(round(time.time() * 1000))
- kafka_dict['metric'] = DATA_TYPE + "." + 'cpu.' + "totalusage"
- kafka_dict['device'] = "cpu"
- kafka_dict['value'] = str(round((total_cpu_usage-pre_total_cpu_usage) * 100.0 / (total_cpu-pre_total_cpu), 2))
- kafka_json = json.dumps(kafka_dict)
-
- print kafka_json
- kafka_produce(producer, topic, kafka_json)
-
-
-def getUptime(producer, kafka_dict, topic):
- demension = ["uptime.day", "idletime.day"]
- output = os.popen('cat /proc/uptime').readlines()
-
- for item in output:
- items = re.split("\s+", item.rstrip())
- for i in range(len(demension)):
- kafka_dict["timestamp"] = int(round(time.time() * 1000))
- kafka_dict["metric"] = DATA_TYPE + "." + 'uptime' + '.' + demension[i]
- kafka_dict["value"] = str(round(float(items[i]) / 86400, 2))
- kafka_json = json.dumps(kafka_dict)
- print kafka_json
- kafka_produce(producer, topic, kafka_json)
-
-
-def getMemInfo(producer, kafka_dict, topic):
- output = os.popen('cat /proc/meminfo').readlines()
- mem_info = dict()
- for item in output:
- items = re.split(":?\s+", item.rstrip())
- # print items
- mem_info[items[0]] = int(items[1])
- itemNum = len(items)
- metric = 'memory' + '.' + items[0]
- if (len(items) > 2 ):
- metric = metric + '.' + items[2]
- kafka_dict["timestamp"] = int(round(time.time() * 1000))
- kafka_dict["metric"] = METRIC_NAME_EXCLUDE.sub("", DATA_TYPE + "." + metric.lower())
- kafka_dict["value"] = items[1]
- kafka_dict["device"] = 'memory'
- kafka_json = json.dumps(kafka_dict)
- print kafka_json
- kafka_produce(producer, topic, kafka_json)
- usage = (mem_info['MemTotal'] - mem_info['MemFree'] - mem_info['Buffers'] - mem_info['Cached']) * 100.0 / mem_info[
- 'MemTotal']
- usage = round(usage, 2)
- addExtraMetric(producer, kafka_dict, "memory.usage", usage, "memory", topic)
-
-
-def getLoadAvg(producer, kafka_dict, topic):
- demension = ['cpu.loadavg.1min', 'cpu.loadavg.5min', 'cpu.loadavg.15min']
- output = os.popen('cat /proc/loadavg').readlines()
- for item in output:
- items = re.split("\s+", item.rstrip())
-
- demens = min(len(demension), len(items))
- for i in range(demens):
- kafka_dict["timestamp"] = int(round(time.time() * 1000))
- kafka_dict["metric"] = DATA_TYPE + "." + demension[i]
- kafka_dict["value"] = items[i]
- kafka_dict["device"] = 'cpu'
- kafka_json = json.dumps(kafka_dict)
- print kafka_json
- kafka_produce(producer, topic, kafka_json)
-
-
-def getIpmiCPUTemp(producer, kafka_dict, topic):
- output = os.popen('sudo ipmitool sdr | grep Temp | grep CPU').readlines()
- for item in output:
- items = re.split("^(CPU\d+)\sTemp\.\s+\|\s+(\d+|\d+\.\d+)\s", item.rstrip())
- kafka_dict["timestamp"] = int(round(time.time() * 1000))
- kafka_dict["metric"] = DATA_TYPE + "." + 'cpu.temp'
- kafka_dict["value"] = items[2]
- kafka_dict["device"] = item[1]
- kafka_json = json.dumps(kafka_dict)
- print kafka_json
- kafka_produce(producer, topic, kafka_json)
-
-
-def getInterface(producer, kafka_dict, topic):
- demension = ['receivedbytes', 'receivedpackets', 'receivederrs', 'receiveddrop', 'transmitbytes', 'transmitpackets',
- 'transmiterrs', 'transmitdrop']
- output = os.popen("cat /proc/net/dev").readlines()
-
- for item in output:
- if re.match(r'^\s+eth\d+:', item) is None:
- continue
- items = re.split("[:\s]+", item.strip())
- filtered_items = items[1:5] + items[9:13]
-
- for i in range(len(demension)):
- kafka_dict["timestamp"] = int(round(time.time() * 1000))
- kafka_dict['metric'] = DATA_TYPE + "." + 'nic.' + demension[i]
- kafka_dict["value"] = filtered_items[i]
- kafka_dict["device"] = items[0]
- kafka_json = json.dumps(kafka_dict)
- print kafka_json
- kafka_produce(producer, topic, kafka_json)
-
-
-def getSmartDisk(producer, kafka_dict, topic):
- harddisks = os.popen("lsscsi").readlines()
- for item in harddisks:
- items = re.split('\/', item.strip())
- # print items
- smartctl = os.popen('sudo smartctl -A /dev/' + items[-1]).readlines()
- for line in smartctl:
- line = line.strip()
- if re.match(r'^[\d]+\s', line) is None:
- continue
- lineitems = re.split("\s+", line)
- metric = 'smartdisk.' + lineitems[1]
- kafka_dict['metric'] = DATA_TYPE + "." + metric.lower()
- kafka_dict["timestamp"] = int(round(time.time() * 1000))
- kafka_dict["value"] = lineitems[-1]
- kafka_dict["device"] = 'smartdisk'
- kafka_json = json.dumps(kafka_dict)
- print kafka_json
- kafka_produce(producer, topic, kafka_json)
-
-
-def getDiskStat(producer, kafka_dict, topic):
- demension = ['readrate', 'writerate', 'avgwaittime', 'utilization', 'disktotal', 'diskused', 'usage']
- iostat_output = os.popen("iostat -xk 1 2 | grep ^sd").readlines()
- # remove the first set of elements
- iostat_output = iostat_output[len(iostat_output) / 2:]
- # print iostat_output
- iostat_dict = {}
- for item in iostat_output:
- items = re.split('\s+', item.strip())
- # print items
- filtered_items = [items[5], items[6], items[9], items[11]]
- iostat_dict[items[0]] = filtered_items
- # print iostat_dict
-
- disk_output = os.popen("df -k | grep ^/dev").readlines()
- for item in disk_output:
- items = re.split('\s+', item.strip())
- fs = re.split('^\/dev\/(\w+)\d+$', items[0])
- disk = fs[1]
- iostat_dict[disk].append(items[1])
- iostat_dict[disk].append(items[2])
- iostat_dict[disk].append(items[4].rstrip('%'))
- #print iostat_dict
-
- for key, metrics in iostat_dict.iteritems():
- for i in range(len(metrics)):
- metric = 'disk.' + demension[i]
- kafka_dict['metric'] = DATA_TYPE + "." + metric.lower()
- kafka_dict["timestamp"] = int(round(time.time() * 1000))
- kafka_dict["value"] = metrics[i]
- kafka_dict["device"] = key
- kafka_json = json.dumps(kafka_dict)
- # print kafka_json
- kafka_produce(producer, topic, kafka_json)
-
-
-def get_services(host):
- service_list = list()
- socket.setdefaulttimeout(1)
- for (key, value) in PORT_MAP.items():
- try:
- handle = None
- port = int(key)
- handle = socket.socket().connect((host, port))
- service_list.append(value)
- except socket.error as err:
- # if err.errno != errno.ECONNREFUSED:
- # service_list.append(value)
- pass
- finally:
- if handle != None:
- handle.close()
-
- return service_list
-
-def tryGetSystemMetric(type, func, *args):
- try:
- func(*args)
- except:
- print type + " does not work, ignore"
-
-DEVICE_CONF = {
- "cpustat": getCPU,
- "uptime": getUptime,
- "meminfo": getMemInfo,
- "loadavg": getLoadAvg,
- "ipmicputemp": getIpmiCPUTemp,
- "network": getInterface,
- "smartdisk": getSmartDisk,
- "diskstat": getDiskStat
-}
-
-def main(argv):
- kafka = None
- producer = None
- topic = None
- try:
- # read the kafka.ini
- config = load_config('config.json')
- print config
-
- site = config[u'env'].get('site').encode('utf-8')
- component = config[u'input'].get('component').encode('utf-8')
- host = socket.getfqdn()
- print host
-
- outputs = [s.encode('utf-8') for s in config[u'output']]
-
- if('kafka' in outputs):
- kafkaConfig = config[u'output'].get(u'kafka')
- brokerList = kafkaConfig.get('brokerList')
- topic = kafkaConfig.get('topic')
- kafka, producer = kafka_connect(brokerList)
-
- kafka_dict = {"host": host, "value": 0, "device": ''}
- services = get_services(host)
- print services
- for service in services:
- kafka_dict[service] = 'true'
-
- for type, func in DEVICE_CONF.items():
- print type + ":" + str(func)
- tryGetSystemMetric(type, func, kafka, kafka_dict, topic)
-
- except Exception, e:
- print 'main except: ', e
-
- finally:
- kafka_close(kafka, producer)
- return 0
-
-
-if __name__ == "__main__":
- sys.exit(main(sys.argv))