You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2017/01/12 03:26:17 UTC

eagle git commit: [EAGLE-849] Refactor system metric collector scripts

Repository: eagle
Updated Branches:
  refs/heads/master b4695801f -> 3af9ac480


[EAGLE-849] Refactor system metric collector scripts

Refactor System metric collector python script following similar framework as existing jmx metric collector.

https://issues.apache.org/jira/browse/EAGLE-849

Author: Hao Chen <ha...@apache.org>

Closes #763 from haoch/SystemMetricCollector.


Project: http://git-wip-us.apache.org/repos/asf/eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/eagle/commit/3af9ac48
Tree: http://git-wip-us.apache.org/repos/asf/eagle/tree/3af9ac48
Diff: http://git-wip-us.apache.org/repos/asf/eagle/diff/3af9ac48

Branch: refs/heads/master
Commit: 3af9ac4802a9dec2f2160aa98cd3b90287e14d48
Parents: b469580
Author: Hao Chen <ha...@apache.org>
Authored: Thu Jan 12 11:26:10 2017 +0800
Committer: Hao Chen <ha...@apache.org>
Committed: Thu Jan 12 11:26:10 2017 +0800

----------------------------------------------------------------------
 .../src/main/resources/ALERT_LIGHT_TEMPLATE.vm  |   2 +-
 .../system_metric_collector.py                  | 336 ++++++++++++++++
 .../system_metric_config-sample.json            |  20 +
 .../hadoop_jmx_collector/system_metric_kafka.py | 393 -------------------
 4 files changed, 357 insertions(+), 394 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/eagle/blob/3af9ac48/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/ALERT_LIGHT_TEMPLATE.vm
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/ALERT_LIGHT_TEMPLATE.vm b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/ALERT_LIGHT_TEMPLATE.vm
index 0eb5efc..f273917 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/ALERT_LIGHT_TEMPLATE.vm
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/ALERT_LIGHT_TEMPLATE.vm
@@ -482,7 +482,7 @@
                 <div class="footer">
                     <table width="100%">
                         <tr>
-                            <td class="aligncenter content-block">Powered by <a href="http://eagle.incubator.apache.org">Apache Eagle</a> (version: $alert["version"])</td>
+                            <td class="aligncenter content-block">Powered by <a href="http://eagle.apache.org">Apache Eagle</a> (version: $alert["version"])</td>
                         </tr>
                     </table>
                 </div>

http://git-wip-us.apache.org/repos/asf/eagle/blob/3af9ac48/eagle-external/hadoop_jmx_collector/system_metric_collector.py
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/system_metric_collector.py b/eagle-external/hadoop_jmx_collector/system_metric_collector.py
new file mode 100644
index 0000000..e0ffecc
--- /dev/null
+++ b/eagle-external/hadoop_jmx_collector/system_metric_collector.py
@@ -0,0 +1,336 @@
+# !/usr/bin/python
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from metric_collector import MetricCollector, Runner
+import logging, socket, string, os, re, time
+
+
+class SystemMetricCollector(MetricCollector):
+    METRIC_PREFIX = "system"
+    METRIC_NAME_EXCLUDE = re.compile(r"[\(|\)]")
+
+    def run(self):
+        if self.config["env"].has_key("cpu_stat_file"):
+            self.cpu_stat_file = self.config["env"]["cpu_stat_file"]
+            logging.info("Overrode env.cpu_stat_file: %s", self.cpu_stat_file)
+        else:
+            self.cpu_stat_file = "/tmp/eagle_cpu_usage_state"
+            logging.info("Using default env.cpu_stat_file: %s", self.cpu_stat_file)
+
+        self.try_exec_func(
+            self.collect_cpu_metric,
+            self.collect_uptime_metric,
+            self.collect_memory_metric,
+            self.collect_loadavg_metric,
+            self.collect_cpu_temp_metric,
+            self.collect_nic_metric,
+            self.collect_smartdisk_metric,
+            self.collect_diskstat_metric
+        )
+
+    def try_exec_func(self, *funcs):
+        result = dict()
+        succeed_num = 0
+        failed_num = 0
+        for func in funcs:
+            try:
+                logging.info("Executing: %s", func.__name__)
+                func()
+                result[func.__name__] = "success"
+                succeed_num = succeed_num + 1
+            except Exception as e:
+                logging.warn("Failed to execute: %s", func.__name__)
+                logging.exception(e)
+                result[func.__name__] = "error: %s: %s" % (type(e), e)
+                failed_num = failed_num + 1
+        result_desc = ""
+        for key in result:
+            result_desc = result_desc + "%-30s: %-30s\n" % (key, result[key])
+        logging.info("Execution result (total: %s, succeed: %s, failed: %s): \n\n%s", len(funcs), succeed_num,
+                     failed_num, result_desc)
+
+    # ====================================
+    # CPU Usage
+    # ====================================
+
+    def collect_cpu_metric(self):
+        """
+        CPU Usage Percentage Metrics:
+
+        system.cpu.usage: (user + nice + system + wait + irq + softirq + steal + guest) / (user + nice + system + idle + wait + irq + softirq + steal + guest)
+
+            Example:
+
+                {'timestamp': 1483594861458, 'metric': 'system.cpu.usage', 'site': u'sandbox', 'value': 0.048, 'host': 'localhost', 'device': 'cpuN'}
+
+        system.cpu.totalusage: Sum(Each CPU Usage) / Sum (CPU Total)
+
+            Example:
+
+                {'timestamp': 1483594861484, 'metric': 'system.cpu.totalusage', 'site': u'sandbox', 'value': 0.17, 'host': 'sandbox.hortonworks.com', 'device': 'cpu'}
+
+        """
+
+        cpu_metric = self.new_metric()
+        cpu_info = os.popen('cat /proc/stat').readlines()
+        dimensions = ["cpu", "user", "nice", "system", "idle", "wait", "irq", "softirq", "steal", "guest"]
+
+        total_cpu = 0
+        total_cpu_usage = 0
+        cpu_stat_pre = None
+
+        data_dir = self.cpu_stat_file
+        if os.path.exists(data_dir):
+            fd = open(data_dir, "r")
+            cpu_stat_pre = fd.read()
+            fd.close()
+
+        for item in cpu_info:
+            if re.match(r'^cpu\d+', item) is None:
+                continue
+
+            items = re.split("\s+", item.strip())
+            demens = min(len(dimensions), len(items))
+            metric_event = dict()
+            for i in range(1, demens):
+                metric_event[dimensions[i]] = int(items[i])
+                cpu_metric['timestamp'] = int(round(time.time() * 1000))
+                cpu_metric['metric'] = self.METRIC_PREFIX + "." + 'cpu.' + dimensions[i]
+                cpu_metric['device'] = items[0]
+                cpu_metric['value'] = items[i]
+                self.collect(cpu_metric)
+
+            per_cpu_usage = metric_event["user"] + metric_event["nice"] + metric_event["system"] + metric_event[
+                "wait"] + metric_event["irq"] + metric_event["softirq"] + metric_event["steal"] + metric_event["guest"]
+            per_cpu_total = metric_event["user"] + metric_event["nice"] + metric_event["system"] + metric_event[
+                "idle"] + metric_event["wait"] + metric_event["irq"] + metric_event["softirq"] + metric_event["steal"] + metric_event["guest"]
+            total_cpu += per_cpu_total
+            total_cpu_usage += per_cpu_usage
+
+            # system.cpu.usage
+            cpu_metric['timestamp'] = int(round(time.time() * 1000))
+            cpu_metric['metric'] = self.METRIC_PREFIX + "." + 'cpu.' + "usage"
+            cpu_metric['device'] = items[0]
+            cpu_metric['value'] = per_cpu_usage * 1.0 /per_cpu_total
+            self.collect(cpu_metric)
+
+        cup_stat_current = str(total_cpu_usage) + " " + str(total_cpu)
+        logging.info("Current cpu stat: %s", cup_stat_current)
+        fd = open(data_dir, "w")
+        fd.write(cup_stat_current)
+        fd.close()
+
+        pre_total_cpu_usage = 0
+        pre_total_cpu = 0
+        if cpu_stat_pre != None:
+            result = re.split("\s+", cpu_stat_pre.rstrip())
+            pre_total_cpu_usage = int(result[0])
+            pre_total_cpu = int(result[1])
+        cpu_metric['timestamp'] = int(round(time.time() * 1000))
+        cpu_metric['metric'] = self.METRIC_PREFIX + "." + 'cpu.' + "totalusage"
+        cpu_metric['device'] = "cpu"
+        cpu_metric['value'] = (total_cpu_usage - pre_total_cpu_usage) * 1.0 / (total_cpu - pre_total_cpu)
+
+        self.collect(cpu_metric)
+
+    # ====================================
+    # OS Up Time
+    # ====================================
+
+    def collect_uptime_metric(self):
+        metric = self.new_metric()
+        demension = ["uptime.day", "idletime.day"]
+        output = os.popen('cat /proc/uptime').readlines()
+
+        for item in output:
+            items = re.split("\s+", item.rstrip())
+            for i in range(len(demension)):
+                metric["timestamp"] = int(round(time.time() * 1000))
+                metric["metric"] = self.METRIC_PREFIX + "." + 'uptime' + '.' + demension[i]
+                metric["value"] = str(round(float(items[i]) / 86400, 2))
+                self.collect(metric)
+
+    # ====================================
+    # Memory
+    # ====================================
+
+    def collect_memory_metric(self):
+        event = self.new_metric()
+        event["host"] = self.fqdn
+        output = os.popen('cat /proc/meminfo').readlines()
+        mem_info = dict()
+        for item in output:
+            items = re.split(":?\s+", item.rstrip())
+            # print items
+            mem_info[items[0]] = int(items[1])
+            itemNum = len(items)
+            metric = 'memory' + '.' + items[0]
+            if (len(items) > 2):
+                metric = metric + '.' + items[2]
+            event["timestamp"] = int(round(time.time() * 1000))
+            event["metric"] = self.METRIC_NAME_EXCLUDE.sub("", self.METRIC_PREFIX + "." + metric.lower())
+            event["value"] = items[1]
+            event["device"] = 'memory'
+            self.collect(event)
+
+        usage = (mem_info['MemTotal'] - mem_info['MemFree'] - mem_info['Buffers'] - mem_info['Cached']) * 100.0 / \
+                mem_info[
+                    'MemTotal']
+        usage = round(usage, 2)
+        self.emit_metric(event, self.METRIC_PREFIX, "memory.usage", usage, "memory")
+
+    # ====================================
+    # Load AVG
+    # ====================================
+
+    def collect_loadavg_metric(self):
+        """
+        Collect Load Avg Metrics
+        """
+        demension = ['cpu.loadavg.1min', 'cpu.loadavg.5min', 'cpu.loadavg.15min']
+        output = os.popen('cat /proc/loadavg').readlines()
+        for item in output:
+            items = re.split("\s+", item.rstrip())
+            demens = min(len(demension), len(items))
+            for i in range(demens):
+                event = self.new_metric()
+                event["timestamp"] = int(round(time.time() * 1000))
+                event["metric"] = self.METRIC_PREFIX + "." + demension[i]
+                event["value"] = items[i]
+                event["device"] = 'cpu'
+                self.collect(event)
+
+    # ====================================
+    # IPMI CPU Temp
+    # ====================================
+
+    def collect_cpu_temp_metric(self):
+        output = os.popen('sudo ipmitool sdr | grep Temp | grep CPU').readlines()
+        for item in output:
+            items = re.split("^(CPU\d+)\sTemp\.\s+\|\s+(\d+|\d+\.\d+)\s", item.rstrip())
+            event = self.new_metric()
+            event["timestamp"] = int(round(time.time() * 1000))
+            event["metric"] = DATA_TYPE + "." + 'cpu.temp'
+            event["value"] = items[2]
+            event["device"] = item[1]
+            self.collect(event)
+
+    # ====================================
+    # NIC Metrics
+    # ====================================
+
+    def collect_nic_metric(self):
+        demension = ['receivedbytes', 'receivedpackets', 'receivederrs', 'receiveddrop', 'transmitbytes',
+                     'transmitpackets',
+                     'transmiterrs', 'transmitdrop']
+        output = os.popen("cat /proc/net/dev").readlines()
+
+        for item in output:
+            if re.match(r'^\s+eth\d+:', item) is None:
+                continue
+            items = re.split("[:\s]+", item.strip())
+            filtered_items = items[1:5] + items[9:13]
+
+            for i in range(len(demension)):
+                kafka_dict = self.new_metric()
+                kafka_dict["timestamp"] = int(round(time.time() * 1000))
+                kafka_dict['metric'] = self.METRIC_PREFIX + "." + 'nic.' + demension[i]
+                kafka_dict["value"] = filtered_items[i]
+                kafka_dict["device"] = items[0]
+                self.collect(kafka_dict)
+
+    # ====================================
+    # Smart Disk Metrics
+    # ====================================
+
+    def collect_smartdisk_metric(self):
+        harddisks = os.popen("lsscsi").readlines()
+        for item in harddisks:
+            items = re.split('\/', item.strip())
+            # print items
+            smartctl = os.popen('sudo smartctl -A /dev/' + items[-1]).readlines()
+            for line in smartctl:
+                line = line.strip()
+                if re.match(r'^[\d]+\s', line) is None:
+                    continue
+                lineitems = re.split("\s+", line)
+                metric = 'smartdisk.' + lineitems[1]
+                kafka_dict = self.new_metric()
+                kafka_dict['metric'] = DATA_TYPE + "." + metric.lower()
+                kafka_dict["timestamp"] = int(round(time.time() * 1000))
+                kafka_dict["value"] = lineitems[-1]
+                kafka_dict["device"] = 'smartdisk'
+                self.collect(kafka_dict)
+
+    # ====================================
+    # Disk Stat Metrics
+    # ====================================
+
+    def collect_diskstat_metric(self):
+        """
+        FIXME: IndexError: list index out of range
+        """
+        demension = ['readrate', 'writerate', 'avgwaittime', 'utilization', 'disktotal', 'diskused', 'usage']
+        iostat_output = os.popen("iostat -xk 1 2 | grep ^sd").readlines()
+        # remove the first set of elements
+        iostat_output = iostat_output[len(iostat_output) / 2:]
+        iostat_dict = {}
+        for item in iostat_output:
+            items = re.split('\s+', item.strip())
+            filtered_items = [items[5], items[6], items[9], items[11]]
+            iostat_dict[items[0]] = filtered_items
+
+        disk_output = os.popen("df -k | grep ^/dev").readlines()
+        for item in disk_output:
+            items = re.split('\s+', item.strip())
+            disks = re.split('^\/dev\/(\w+)\d+$', items[0])
+            logging.info(len(disks))
+            disk = disks[1]
+            iostat_dict[disk].append(items[1])
+            iostat_dict[disk].append(items[2])
+            iostat_dict[disk].append(items[4].rstrip('%'))
+
+        for key, metrics in iostat_dict.iteritems():
+            for i in range(len(metrics)):
+                metric = 'disk.' + demension[i]
+                kafka_dict = self.new_metric()
+                kafka_dict['metric'] = DATA_TYPE + "." + metric.lower()
+                kafka_dict["timestamp"] = int(round(time.time() * 1000))
+                kafka_dict["value"] = metrics[i]
+                kafka_dict["device"] = key
+                self.collect(kafka_dict)
+
+    # ====================================
+    # Helper Methods
+    # ====================================
+
+    def emit_metric(self, event, prefix, metric, value, device):
+        event["timestamp"] = int(round(time.time() * 1000))
+        event["metric"] = prefix + "." + metric.lower()
+        event["value"] = str(value)
+        event["device"] = device
+        self.collect(event)
+
+    def new_metric(self):
+        metric = dict()
+        metric["host"] = self.fqdn
+        return metric
+
+
+if __name__ == '__main__':
+    Runner.run(SystemMetricCollector())

http://git-wip-us.apache.org/repos/asf/eagle/blob/3af9ac48/eagle-external/hadoop_jmx_collector/system_metric_config-sample.json
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/system_metric_config-sample.json b/eagle-external/hadoop_jmx_collector/system_metric_config-sample.json
new file mode 100644
index 0000000..6fcd43b
--- /dev/null
+++ b/eagle-external/hadoop_jmx_collector/system_metric_config-sample.json
@@ -0,0 +1,20 @@
+{
+  "env": {
+    "site": "sandbox",
+    "log_file": "/tmp/hadoop-jmx-collector.log",
+    "cpu_stat_file": "/tmp/eagle_cpu_usage_state"
+  },
+  "input": [
+  ],
+  "filter": {
+  },
+  "output": {
+    "kafka": {
+      "debug": false,
+      "default_topic": "system_metric_sandbox",
+      "broker_list": [
+        "sandbox.hortonworks.com:6667"
+      ]
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/eagle/blob/3af9ac48/eagle-external/hadoop_jmx_collector/system_metric_kafka.py
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/system_metric_kafka.py b/eagle-external/hadoop_jmx_collector/system_metric_kafka.py
deleted file mode 100644
index 7d805d7..0000000
--- a/eagle-external/hadoop_jmx_collector/system_metric_kafka.py
+++ /dev/null
@@ -1,393 +0,0 @@
-#!/usr/bin/python
-
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import os
-import re
-import time
-import json
-import sys
-import socket
-import types
-import re
-import errno
-
-# load kafka-python
-sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), '', 'lib/six'))
-import six
-
-# load kafka-python
-sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), '', 'lib/kafka-python'))
-from kafka import KafkaClient, SimpleProducer, SimpleConsumer
-
-from util_func import *
-
-TOPIC = "cronus_sys_metrics"
-DATA_TYPE = "system"
-
-METRIC_NAME_EXCLUDE = re.compile(r"[\(|\)]")
-
-DEBUG_KAFKA_HOST = []
-PROD_KAFKA_HOST = []
-
-PORT_MAP = {
-    "60030": "regionserver",
-    "50075": "datanode",
-    "50070": "namenode",
-    "60010": "master",
-    "50030": "resourcemanager",
-    "50060": "nodemanager",
-    "8480": "journalnode"
-}
-
-def readFile(filename):
-    f = open(filename, 'r')
-    s = f.read()
-    f.close()
-    return s
-
-def kafka_connect(host):
-    print "Connecting to kafka " + str(host)
-    # To send messages synchronously
-    kafka = KafkaClient(host, timeout=58)
-    producer = SimpleProducer(kafka, batch_send=True, batch_send_every_n=500, batch_send_every_t=30)
-    return kafka, producer
-
-
-def kafka_close(kafka, producer):
-    if producer is not None:
-        producer.stop()
-    if kafka is not None:
-        kafka.close()
-
-
-def kafka_produce(producer, topic, kafka_json):
-    # Note that the application is responsible for encoding messages to type str
-    if producer != None :
-        producer.send_messages(topic, kafka_json)
-    else:
-        print kafka_json
-
-
-def addExtraMetric(producer, kafka_dict, metric, value, device, topic):
-    kafka_dict["timestamp"] = int(round(time.time() * 1000))
-    kafka_dict["metric"] = DATA_TYPE + "." + metric.lower()
-    kafka_dict["value"] = str(value)
-    kafka_dict["device"] = device
-    kafka_json = json.dumps(kafka_dict)
-    print(kafka_json)
-    kafka_produce(producer, topic, kafka_json)
-
-
-def getCPU(producer, kafka_dict, topic):
-    cpu_info = os.popen('cat /proc/stat').readlines()
-    demension = ["cpu", "user", "nice", "system", "idle", "wait", "irq", "softirq", "steal", "guest"]
-
-    total_cpu = 0
-    total_cpu_usage = 0
-    cpu_stat_pre = None
-
-    data_dir = "/tmp/eagle_cpu_stat_previous"
-    if os.path.exists(data_dir):
-        fd = open(data_dir, "r")
-        cpu_stat_pre = fd.read()
-        fd.close()
-
-    for item in cpu_info:
-        if re.match(r'^cpu\d+', item) is None:
-            continue
-
-        items = re.split("\s+", item.strip())
-        demens = min(len(demension), len(items))
-        # print items
-        tuple = dict()
-        for i in range(1, demens):
-            # if not isNumber(items[i]):
-            # continue
-
-            tuple[demension[i]] = int(items[i])
-
-            kafka_dict['timestamp'] = int(round(time.time() * 1000))
-            kafka_dict['metric'] = DATA_TYPE + "." + 'cpu.' + demension[i]
-            kafka_dict['device'] = items[0]
-            kafka_dict['value'] = items[i]
-            kafka_json = json.dumps(kafka_dict)
-            #print kafka_json
-            kafka_produce(producer, topic, kafka_json)
-
-        per_cpu_usage = tuple["user"] + tuple["nice"] + tuple["system"] + tuple["wait"] + tuple["irq"] + tuple[
-            "softirq"] + tuple["steal"] + tuple["guest"]
-        per_cpu_total = tuple["user"] + tuple["nice"] + tuple["system"] + tuple["idle"] + tuple["wait"] + tuple["irq"] + \
-                        tuple["softirq"] + tuple["steal"] + tuple["guest"]
-        total_cpu += per_cpu_total
-        total_cpu_usage += per_cpu_usage
-
-        # system.cpu.usage
-        kafka_dict['timestamp'] = int(round(time.time() * 1000))
-        kafka_dict['metric'] = DATA_TYPE + "." + 'cpu.' + "perusage"
-        kafka_dict['device'] = items[0]
-        kafka_dict['value'] = str(round(per_cpu_usage * 100.0 / per_cpu_total, 2))
-        kafka_json = json.dumps(kafka_dict)
-        print kafka_json
-        kafka_produce(producer, topic, kafka_json)
-
-    cup_stat_current = str(total_cpu_usage) + " " + str(total_cpu)
-    print cup_stat_current
-    fd = open(data_dir, "w")
-    fd.write(cup_stat_current)
-    fd.close()
-
-    pre_total_cpu_usage = 0
-    pre_total_cpu = 0
-    if cpu_stat_pre != None:
-        result = re.split("\s+", cpu_stat_pre.rstrip())
-        pre_total_cpu_usage = int(result[0])
-        pre_total_cpu = int(result[1])
-    kafka_dict['timestamp'] = int(round(time.time() * 1000))
-    kafka_dict['metric'] = DATA_TYPE + "." + 'cpu.' + "totalusage"
-    kafka_dict['device'] = "cpu"
-    kafka_dict['value'] = str(round((total_cpu_usage-pre_total_cpu_usage) * 100.0 / (total_cpu-pre_total_cpu), 2))
-    kafka_json = json.dumps(kafka_dict)
-
-    print kafka_json
-    kafka_produce(producer, topic, kafka_json)
-
-
-def getUptime(producer, kafka_dict, topic):
-    demension = ["uptime.day", "idletime.day"]
-    output = os.popen('cat /proc/uptime').readlines()
-
-    for item in output:
-        items = re.split("\s+", item.rstrip())
-        for i in range(len(demension)):
-            kafka_dict["timestamp"] = int(round(time.time() * 1000))
-            kafka_dict["metric"] = DATA_TYPE + "." + 'uptime' + '.' + demension[i]
-            kafka_dict["value"] = str(round(float(items[i]) / 86400, 2))
-            kafka_json = json.dumps(kafka_dict)
-            print kafka_json
-            kafka_produce(producer, topic, kafka_json)
-
-
-def getMemInfo(producer, kafka_dict, topic):
-    output = os.popen('cat /proc/meminfo').readlines()
-    mem_info = dict()
-    for item in output:
-        items = re.split(":?\s+", item.rstrip())
-        # print items
-        mem_info[items[0]] = int(items[1])
-        itemNum = len(items)
-        metric = 'memory' + '.' + items[0]
-        if (len(items) > 2 ):
-            metric = metric + '.' + items[2]
-        kafka_dict["timestamp"] = int(round(time.time() * 1000))
-        kafka_dict["metric"] = METRIC_NAME_EXCLUDE.sub("", DATA_TYPE + "." + metric.lower())
-        kafka_dict["value"] = items[1]
-        kafka_dict["device"] = 'memory'
-        kafka_json = json.dumps(kafka_dict)
-        print kafka_json
-        kafka_produce(producer, topic, kafka_json)
-    usage = (mem_info['MemTotal'] - mem_info['MemFree'] - mem_info['Buffers'] - mem_info['Cached']) * 100.0 / mem_info[
-        'MemTotal']
-    usage = round(usage, 2)
-    addExtraMetric(producer, kafka_dict, "memory.usage", usage, "memory", topic)
-
-
-def getLoadAvg(producer, kafka_dict, topic):
-    demension = ['cpu.loadavg.1min', 'cpu.loadavg.5min', 'cpu.loadavg.15min']
-    output = os.popen('cat /proc/loadavg').readlines()
-    for item in output:
-        items = re.split("\s+", item.rstrip())
-
-        demens = min(len(demension), len(items))
-        for i in range(demens):
-            kafka_dict["timestamp"] = int(round(time.time() * 1000))
-            kafka_dict["metric"] = DATA_TYPE + "." + demension[i]
-            kafka_dict["value"] = items[i]
-            kafka_dict["device"] = 'cpu'
-            kafka_json = json.dumps(kafka_dict)
-            print kafka_json
-            kafka_produce(producer, topic, kafka_json)
-
-
-def getIpmiCPUTemp(producer, kafka_dict, topic):
-    output = os.popen('sudo ipmitool sdr | grep Temp | grep CPU').readlines()
-    for item in output:
-        items = re.split("^(CPU\d+)\sTemp\.\s+\|\s+(\d+|\d+\.\d+)\s", item.rstrip())
-        kafka_dict["timestamp"] = int(round(time.time() * 1000))
-        kafka_dict["metric"] = DATA_TYPE + "." + 'cpu.temp'
-        kafka_dict["value"] = items[2]
-        kafka_dict["device"] = item[1]
-        kafka_json = json.dumps(kafka_dict)
-        print kafka_json
-        kafka_produce(producer, topic, kafka_json)
-
-
-def getInterface(producer, kafka_dict, topic):
-    demension = ['receivedbytes', 'receivedpackets', 'receivederrs', 'receiveddrop', 'transmitbytes', 'transmitpackets',
-                 'transmiterrs', 'transmitdrop']
-    output = os.popen("cat /proc/net/dev").readlines()
-
-    for item in output:
-        if re.match(r'^\s+eth\d+:', item) is None:
-            continue
-        items = re.split("[:\s]+", item.strip())
-        filtered_items = items[1:5] + items[9:13]
-
-        for i in range(len(demension)):
-            kafka_dict["timestamp"] = int(round(time.time() * 1000))
-            kafka_dict['metric'] = DATA_TYPE + "." + 'nic.' + demension[i]
-            kafka_dict["value"] = filtered_items[i]
-            kafka_dict["device"] = items[0]
-            kafka_json = json.dumps(kafka_dict)
-            print kafka_json
-            kafka_produce(producer, topic, kafka_json)
-
-
-def getSmartDisk(producer, kafka_dict, topic):
-    harddisks = os.popen("lsscsi").readlines()
-    for item in harddisks:
-        items = re.split('\/', item.strip())
-        # print items
-        smartctl = os.popen('sudo smartctl -A /dev/' + items[-1]).readlines()
-        for line in smartctl:
-            line = line.strip()
-            if re.match(r'^[\d]+\s', line) is None:
-                continue
-            lineitems = re.split("\s+", line)
-            metric = 'smartdisk.' + lineitems[1]
-            kafka_dict['metric'] = DATA_TYPE + "." + metric.lower()
-            kafka_dict["timestamp"] = int(round(time.time() * 1000))
-            kafka_dict["value"] = lineitems[-1]
-            kafka_dict["device"] = 'smartdisk'
-            kafka_json = json.dumps(kafka_dict)
-            print kafka_json
-            kafka_produce(producer, topic, kafka_json)
-
-
-def getDiskStat(producer, kafka_dict, topic):
-    demension = ['readrate', 'writerate', 'avgwaittime', 'utilization', 'disktotal', 'diskused', 'usage']
-    iostat_output = os.popen("iostat -xk 1 2 | grep ^sd").readlines()
-    # remove the first set of elements
-    iostat_output = iostat_output[len(iostat_output) / 2:]
-    # print iostat_output
-    iostat_dict = {}
-    for item in iostat_output:
-        items = re.split('\s+', item.strip())
-        # print items
-        filtered_items = [items[5], items[6], items[9], items[11]]
-        iostat_dict[items[0]] = filtered_items
-    # print iostat_dict
-
-    disk_output = os.popen("df -k | grep ^/dev").readlines()
-    for item in disk_output:
-        items = re.split('\s+', item.strip())
-        fs = re.split('^\/dev\/(\w+)\d+$', items[0])
-        disk = fs[1]
-        iostat_dict[disk].append(items[1])
-        iostat_dict[disk].append(items[2])
-        iostat_dict[disk].append(items[4].rstrip('%'))
-    #print iostat_dict
-
-    for key, metrics in iostat_dict.iteritems():
-        for i in range(len(metrics)):
-            metric = 'disk.' + demension[i]
-            kafka_dict['metric'] = DATA_TYPE + "." + metric.lower()
-            kafka_dict["timestamp"] = int(round(time.time() * 1000))
-            kafka_dict["value"] = metrics[i]
-            kafka_dict["device"] = key
-            kafka_json = json.dumps(kafka_dict)
-            # print kafka_json
-            kafka_produce(producer, topic, kafka_json)
-
-
-def get_services(host):
-    service_list = list()
-    socket.setdefaulttimeout(1)
-    for (key, value) in PORT_MAP.items():
-        try:
-            handle = None
-            port = int(key)
-            handle = socket.socket().connect((host, port))
-            service_list.append(value)
-        except socket.error as err:
-            # if err.errno != errno.ECONNREFUSED:
-            # service_list.append(value)
-            pass
-        finally:
-            if handle != None:
-                handle.close()
-
-    return service_list
-
-def tryGetSystemMetric(type, func, *args):
-    try:
-        func(*args)
-    except:
-        print type + " does not work, ignore"
-
-DEVICE_CONF = {
-    "cpustat": getCPU,
-    "uptime": getUptime,
-    "meminfo": getMemInfo,
-    "loadavg": getLoadAvg,
-    "ipmicputemp": getIpmiCPUTemp,
-    "network": getInterface,
-    "smartdisk": getSmartDisk,
-    "diskstat": getDiskStat
-}
-
-def main(argv):
-    kafka = None
-    producer = None
-    topic = None
-    try:
-        # read the kafka.ini
-        config = load_config('config.json')
-        print config
-
-        site = config[u'env'].get('site').encode('utf-8')
-        component = config[u'input'].get('component').encode('utf-8')
-        host = socket.getfqdn()
-        print host
-
-        outputs = [s.encode('utf-8') for s in config[u'output']]
-
-        if('kafka' in outputs):
-            kafkaConfig = config[u'output'].get(u'kafka')
-            brokerList = kafkaConfig.get('brokerList')
-            topic = kafkaConfig.get('topic')
-            kafka, producer = kafka_connect(brokerList)
-
-        kafka_dict = {"host": host, "value": 0, "device": ''}
-        services = get_services(host)
-        print services
-        for service in services:
-            kafka_dict[service] = 'true'
-
-        for type, func in DEVICE_CONF.items():
-            print type + ":" + str(func)
-            tryGetSystemMetric(type, func, kafka, kafka_dict, topic)
-
-    except Exception, e:
-        print 'main except: ', e
-
-    finally:
-        kafka_close(kafka, producer)
-        return 0
-
-
-if __name__ == "__main__":
-    sys.exit(main(sys.argv))