You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2016/12/13 02:41:41 UTC

incubator-eagle git commit: [EAGLE-838] Resolve defunct process in hadoop jmx script by kill -9

Repository: incubator-eagle
Updated Branches:
  refs/heads/master b12abb38d -> b27998f82


[EAGLE-838] Resolve defunct process in hadoop jmx script by kill -9

https://issues.apache.org/jira/browse/EAGLE-838

- Resolve defunct process in hadoop jmx script by kill -9
- Support configurable log file path
- Improve JMX reader to multiple-threading

Author: Hao Chen <ha...@apache.org>

Closes #733 from haoch/FixDefunctProcess.


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/b27998f8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/b27998f8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/b27998f8

Branch: refs/heads/master
Commit: b27998f822a492c33ed587d729ba894280660ab1
Parents: b12abb3
Author: Hao Chen <ha...@apache.org>
Authored: Tue Dec 13 10:41:30 2016 +0800
Committer: Hao Chen <ha...@apache.org>
Committed: Tue Dec 13 10:41:30 2016 +0800

----------------------------------------------------------------------
 .../hadoop_jmx_config-sample.json               |   4 +-
 .../hadoop_jmx_collector/hadoop_jmx_kafka.py    |   2 +-
 .../hbase_jmx_config-sample.json                |   4 +-
 .../hadoop_jmx_collector/metric_collector.py    | 143 +++++++++++++------
 4 files changed, 105 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b27998f8/eagle-external/hadoop_jmx_collector/hadoop_jmx_config-sample.json
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/hadoop_jmx_config-sample.json b/eagle-external/hadoop_jmx_collector/hadoop_jmx_config-sample.json
index f5c16fd..a6ddf7d 100755
--- a/eagle-external/hadoop_jmx_collector/hadoop_jmx_config-sample.json
+++ b/eagle-external/hadoop_jmx_collector/hadoop_jmx_config-sample.json
@@ -1,6 +1,8 @@
 {
   "env": {
-    "site": "sandbox"
+    "site": "sandbox",
+    "metric_prefix": "hadoop.",
+    "log_file": "/tmp/hadoop-jmx-collector.log"
   },
   "input": [
     {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b27998f8/eagle-external/hadoop_jmx_collector/hadoop_jmx_kafka.py
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/hadoop_jmx_kafka.py b/eagle-external/hadoop_jmx_collector/hadoop_jmx_kafka.py
index 799d351..be7b9c7 100644
--- a/eagle-external/hadoop_jmx_collector/hadoop_jmx_kafka.py
+++ b/eagle-external/hadoop_jmx_collector/hadoop_jmx_kafka.py
@@ -17,7 +17,7 @@
 #
 
 from metric_collector import JmxMetricCollector,JmxMetricListener,Runner
-import json, logging, fnmatch
+import json, logging, fnmatch, sys
 
 class NNSafeModeMetric(JmxMetricListener):
     def on_metric(self, metric):

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b27998f8/eagle-external/hadoop_jmx_collector/hbase_jmx_config-sample.json
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/hbase_jmx_config-sample.json b/eagle-external/hadoop_jmx_collector/hbase_jmx_config-sample.json
index 2275ccc..c37a9ae 100644
--- a/eagle-external/hadoop_jmx_collector/hbase_jmx_config-sample.json
+++ b/eagle-external/hadoop_jmx_collector/hbase_jmx_config-sample.json
@@ -1,6 +1,8 @@
 {
   "env": {
-    "site": "sandbox"
+    "site": "sandbox",
+    "metric_prefix": "hadoop.",
+    "log_file": "/tmp/hadoop-jmx-collector.log"
   },
   "input": [
     {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b27998f8/eagle-external/hadoop_jmx_collector/metric_collector.py
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/metric_collector.py b/eagle-external/hadoop_jmx_collector/metric_collector.py
index c4fc457..6205e1f 100644
--- a/eagle-external/hadoop_jmx_collector/metric_collector.py
+++ b/eagle-external/hadoop_jmx_collector/metric_collector.py
@@ -16,18 +16,18 @@
 # limitations under the License.
 #
 
-import os
 import re
 import time
 import json
 import urllib2
 import sys
 import socket
-import types
 import httplib
 import logging
 import threading
 import fnmatch
+import subprocess
+import os
 
 # load six
 sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), '', 'lib/six'))
@@ -37,11 +37,6 @@ import six
 sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), '', 'lib/kafka-python'))
 from kafka import KafkaClient, SimpleProducer, SimpleConsumer
 
-logging.basicConfig(level=logging.INFO,
-                    format='%(asctime)s %(name)-12s %(levelname)-6s %(message)s',
-                    datefmt='%m-%d %H:%M')
-
-
 class Helper:
     def __init__(self):
         pass
@@ -49,24 +44,32 @@ class Helper:
     @staticmethod
     def load_config(config_file="config.json"):
         """
-
         :param config_file:
         :return:
         """
-        abs_file_path = config_file
 
+        abs_file_path = config_file
         if not os.path.isfile(abs_file_path):
             script_dir = os.path.dirname(__file__)
             rel_path = "./" + config_file
             abs_file_path = os.path.join(script_dir, rel_path)
             if not os.path.isfile(abs_file_path):
                 raise Exception(abs_file_path + " doesn't exist, please rename config-sample.json to config.json")
-
-        logging.info("Using configuration file " + abs_file_path)
         f = open(abs_file_path, 'r')
         json_file = f.read()
         f.close()
         config = json.loads(json_file)
+
+        if config["env"].has_key("log_file"):
+            logging.basicConfig(filename=config["env"]["log_file"], filemode='w',level=logging.INFO,
+                                format='%(asctime)s %(name)s %(threadName)s %(levelname)s %(message)s',
+                                datefmt='%m-%d %H:%M')
+        else:
+            logging.basicConfig(level=logging.INFO,
+                            format='%(asctime)s %(name)s %(threadName)s %(levelname)s %(message)s',
+                            datefmt='%m-%d %H:%M')
+
+        logging.info("Loaded config from %s", abs_file_path)
         return config
 
     @staticmethod
@@ -103,12 +106,12 @@ class Helper:
             try:
                 if https:
                     logging.info("Reading https://" + str(url) + path)
-                    c = httplib.HTTPSConnection(url, timeout=28)
+                    c = httplib.HTTPSConnection(url, timeout=30)
                     c.request("GET", path)
                     response = c.getresponse()
                 else:
                     logging.info("Reading http://" + str(url) + path)
-                    response = urllib2.urlopen("http://" + str(url) + path, timeout=28)
+                    response = urllib2.urlopen("http://" + str(url) + path, timeout=30)
                 logging.debug("Got response")
                 result = response.read()
                 break
@@ -197,6 +200,9 @@ class MetricSender(object):
 
 
 class KafkaMetricSender(MetricSender):
+    start_time = time.time()
+    end_time = time.time()
+
     def __init__(self, config):
         super(KafkaMetricSender, self).__init__(config)
         kafka_config = config["output"]["kafka"]
@@ -234,9 +240,11 @@ class KafkaMetricSender(MetricSender):
             return self.default_topic
 
     def open(self):
-        self.kafka_client = KafkaClient(self.broker_list, timeout=59)
+        logging.info("Opening kafka connection for producer")
+        self.kafka_client = KafkaClient(self.broker_list, timeout=55)
         self.kafka_producer = SimpleProducer(self.kafka_client, batch_send=True, batch_send_every_n=500,
                                              batch_send_every_t=30)
+        self.start_time = time.time()
 
     def send(self, msg):
         if self.debug_enabled:
@@ -245,15 +253,19 @@ class KafkaMetricSender(MetricSender):
         self.kafka_producer.send_messages(self.get_topic_id(msg), json.dumps(msg))
 
     def close(self):
-        logging.info("Totally sent " + str(self.sent_count) + " metric events")
+        logging.info("Closing kafka connection and producer")
         if self.kafka_producer is not None:
             self.kafka_producer.stop()
         if self.kafka_client is not None:
             self.kafka_client.close()
 
+        self.end_time = time.time()
+        logging.info("Totally sent " + str(self.sent_count) + " metric events in "+str(self.end_time - self.start_time)+" sec")
+
 class MetricCollector(threading.Thread):
     filters = []
     config = None
+    closed = False
 
     def __init__(self, config=None):
         threading.Thread.__init__(self)
@@ -303,11 +315,14 @@ class MetricCollector(threading.Thread):
 
     def close(self):
         self.sender.close()
+        self.closed = True
+
+    def is_closed(self):
+        return self.closed
 
     def run(self):
         raise Exception("`run` method should be overrode by sub-class before being called")
 
-
 class Runner(object):
     @staticmethod
     def run(*collectors):
@@ -317,29 +332,44 @@ class Runner(object):
         :param threads:
         :return:
         """
-        argv = sys.argv
-        if len(argv) == 1:
-            config = Helper.load_config()
-        elif len(argv) == 2:
-            config = Helper.load_config(argv[1])
-        else:
-            raise Exception("Usage: " + argv[0] + " CONFIG_FILE_PATH, but given too many arguments: " + str(argv))
-
-        for collector in collectors:
-            try:
-                collector.init(config)
-                collector.start()
-            except Exception as e:
-                logging.exception(e)
-
-        for collector in collectors:
-            try:
-                collector.join()
-            except Exception as e:
+        current_pid = os.getpid()
+        try:
+            argv = sys.argv
+            if len(argv) == 1:
+                config = Helper.load_config()
+            elif len(argv) == 2:
+                config = Helper.load_config(argv[1])
+            else:
+                raise Exception("Usage: " + argv[0] + " CONFIG_FILE_PATH, but given too many arguments: " + str(argv))
+
+            logging.info("PID: %s", current_pid)
+            for collector in collectors:
+                try:
+                    collector.init(config)
+                    collector.start()
+                except Exception as e:
+                    logging.exception(e)
+
+            for collector in collectors:
+                try:
+                    collector.join(timeout = 55)
+                    collector.close()
+                except BaseException as e:
+                    logging.exception(e)
+            exit(0)
+        except BaseException as e:
+            if isinstance(e, SystemExit):
+                logging.info("Exit code: %s", e)
+            else:
                 logging.exception(e)
-            finally:
-                collector.close()
-
+                exit(1)
+                logging.info("Exit code: 1")
+        finally:
+            for collector in collectors:
+                if not collector.is_closed():
+                    collector.close()
+            logging.info("Ensuring process stopped: kill -9 %s", current_pid)
+            subprocess.call(["kill","-9",str(current_pid)])
 
 class JmxMetricCollector(MetricCollector):
     selected_domain = None
@@ -373,13 +403,34 @@ class JmxMetricCollector(MetricCollector):
             listener.init(self)
             self.listeners.append(listener)
 
+    def jmx_reader(self, source):
+        host = source["host"]
+        port=source["port"]
+        https=source["https"]
+        protocol = "https" if https else "http"
+        try:
+            beans = JmxReader(host, port, https).open().get_jmx_beans()
+            self.on_beans(source, beans)
+        except Exception as e:
+            jmx_url = protocol+"://"+str(host) + ":" + str(port)
+            logging.error("Failed to read jmx for " + jmx_url)
+            logging.exception(e)
+
     def run(self):
-        for input in self.input_components:
-            try:
-                beans = JmxReader(input["host"], input["port"], input["https"]).open().get_jmx_beans()
-                self.on_beans(input, beans)
-            except Exception as e:
-                logging.exception("Failed to read jmx for " + str(input))
+        size=str(len(self.input_components))
+        logging.info("Starting jmx reading threads (num: " + size + ")")
+        reader_threads = []
+        for source in self.input_components:
+            reader_thread=threading.Thread(target=self.jmx_reader, args=[source])
+            reader_thread.daemon = True
+            logging.info(reader_thread.name + " starting")
+            reader_thread.start()
+            reader_threads.append(reader_thread)
+        for reader_thread in reader_threads:
+            logging.info(reader_thread.name + " stopping")
+            reader_thread.join(timeout = 55)
+
+        logging.info("Jmx reading threads (num: "+size+") finished")
 
     def filter_bean(self, bean, mbean_domain):
         return mbean_domain in self.selected_domain
@@ -436,6 +487,9 @@ class JmxMetricCollector(MetricCollector):
             metric_prefix_name = '.'.join([i[1] for i in mbean_list])
         return (self.metric_prefix + metric_prefix_name).replace(" ", "").lower()
 
+    def close(self):
+        super(JmxMetricCollector, self).close()
+
 # ========================
 #  Metric Listeners
 # ========================
@@ -450,7 +504,6 @@ class JmxMetricListener:
     def on_metric(self, metric):
         pass
 
-
 # ========================
 #  Metric Filters
 # ========================