You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2016/12/13 09:00:05 UTC

incubator-eagle git commit: [EAGLE-838] Use subprocess for metric collector to avoid defunct process

Repository: incubator-eagle
Updated Branches:
  refs/heads/master 7c6315311 -> f5f07402d


[EAGLE-838] Use subprocess for metric collector to avoid defunct process

Use subprocess for metric collector to avoid defunct process

https://issues.apache.org/jira/browse/EAGLE-838

Author: Hao Chen <ha...@apache.org>

Closes #737 from haoch/UseSubProcessForMetricCollector.


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/f5f07402
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/f5f07402
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/f5f07402

Branch: refs/heads/master
Commit: f5f07402d40c19962ea6650092509c32aafb12e6
Parents: 7c63153
Author: Hao Chen <ha...@apache.org>
Authored: Tue Dec 13 17:00:00 2016 +0800
Committer: Hao Chen <ha...@apache.org>
Committed: Tue Dec 13 17:00:00 2016 +0800

----------------------------------------------------------------------
 .../hadoop_jmx_collector/metric_collector.py    | 67 +++++++++++---------
 1 file changed, 37 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f5f07402/eagle-external/hadoop_jmx_collector/metric_collector.py
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/metric_collector.py b/eagle-external/hadoop_jmx_collector/metric_collector.py
index 6205e1f..c7a5599 100644
--- a/eagle-external/hadoop_jmx_collector/metric_collector.py
+++ b/eagle-external/hadoop_jmx_collector/metric_collector.py
@@ -26,8 +26,8 @@ import httplib
 import logging
 import threading
 import fnmatch
-import subprocess
 import os
+import multiprocessing
 
 # load six
 sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), '', 'lib/six'))
@@ -242,7 +242,7 @@ class KafkaMetricSender(MetricSender):
     def open(self):
         logging.info("Opening kafka connection for producer")
         self.kafka_client = KafkaClient(self.broker_list, timeout=55)
-        self.kafka_producer = SimpleProducer(self.kafka_client, batch_send=True, batch_send_every_n=500,
+        self.kafka_producer = SimpleProducer(self.kafka_client, batch_send=False, batch_send_every_n=500,
                                              batch_send_every_t=30)
         self.start_time = time.time()
 
@@ -325,51 +325,58 @@ class MetricCollector(threading.Thread):
 
 class Runner(object):
     @staticmethod
-    def run(*collectors):
+    def worker(collectors, config):
         """
-        Execute concurrently
-
-        :param threads:
-        :return:
-        """
-        current_pid = os.getpid()
+       Execute concurrently
+       :param threads:
+       :return:
+       """
         try:
-            argv = sys.argv
-            if len(argv) == 1:
-                config = Helper.load_config()
-            elif len(argv) == 2:
-                config = Helper.load_config(argv[1])
-            else:
-                raise Exception("Usage: " + argv[0] + " CONFIG_FILE_PATH, but given too many arguments: " + str(argv))
-
-            logging.info("PID: %s", current_pid)
             for collector in collectors:
                 try:
                     collector.init(config)
                     collector.start()
                 except Exception as e:
                     logging.exception(e)
-
             for collector in collectors:
-                try:
-                    collector.join(timeout = 55)
-                    collector.close()
-                except BaseException as e:
-                    logging.exception(e)
+                collector.join(timeout=55)
             exit(0)
         except BaseException as e:
-            if isinstance(e, SystemExit):
-                logging.info("Exit code: %s", e)
-            else:
+            if not isinstance(e, SystemExit):
                 logging.exception(e)
                 exit(1)
-                logging.info("Exit code: 1")
         finally:
             for collector in collectors:
                 if not collector.is_closed():
                     collector.close()
-            logging.info("Ensuring process stopped: kill -9 %s", current_pid)
-            subprocess.call(["kill","-9",str(current_pid)])
+
+    @staticmethod
+    def run(*collectors):
+        config = None
+        argv = sys.argv
+        if len(argv) == 1:
+            config = Helper.load_config()
+        elif len(argv) == 2:
+            config = Helper.load_config(argv[1])
+        else:
+            raise Exception("Usage: " + argv[0] + " CONFIG_FILE_PATH, but given too many arguments: " + str(argv))
+        current_process = multiprocessing.current_process()
+        sub_process = multiprocessing.Process(target=Runner.worker, args=[collectors,config])
+        sub_process.daemon = False
+        sub_process.name = "CollectorSubprocess"
+        try:
+            logging.info("Starting %s", sub_process)
+            sub_process.start()
+            logging.info("Current PID: %s, subprocess PID: %s", current_process.pid, sub_process.pid)
+            sub_process.join(timeout = 55)
+        except BaseException as e:
+            logging.exception(e)
+        finally:
+            if sub_process.is_alive():
+                logging.info("%s is still alive, terminating", sub_process)
+                sub_process.terminate()
+            logging.info("%s exit code: %s", sub_process, sub_process.exitcode)
+            exit(0)
 
 class JmxMetricCollector(MetricCollector):
     selected_domain = None