You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2016/12/13 09:00:05 UTC
incubator-eagle git commit: [EAGLE-838] Use subprocess for metric
collector to avoid defunct process
Repository: incubator-eagle
Updated Branches:
refs/heads/master 7c6315311 -> f5f07402d
[EAGLE-838] Use subprocess for metric collector to avoid defunct process
Use subprocess for metric collector to avoid defunct process
https://issues.apache.org/jira/browse/EAGLE-838
Author: Hao Chen <ha...@apache.org>
Closes #737 from haoch/UseSubProcessForMetricCollector.
Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/f5f07402
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/f5f07402
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/f5f07402
Branch: refs/heads/master
Commit: f5f07402d40c19962ea6650092509c32aafb12e6
Parents: 7c63153
Author: Hao Chen <ha...@apache.org>
Authored: Tue Dec 13 17:00:00 2016 +0800
Committer: Hao Chen <ha...@apache.org>
Committed: Tue Dec 13 17:00:00 2016 +0800
----------------------------------------------------------------------
.../hadoop_jmx_collector/metric_collector.py | 67 +++++++++++---------
1 file changed, 37 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f5f07402/eagle-external/hadoop_jmx_collector/metric_collector.py
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/metric_collector.py b/eagle-external/hadoop_jmx_collector/metric_collector.py
index 6205e1f..c7a5599 100644
--- a/eagle-external/hadoop_jmx_collector/metric_collector.py
+++ b/eagle-external/hadoop_jmx_collector/metric_collector.py
@@ -26,8 +26,8 @@ import httplib
import logging
import threading
import fnmatch
-import subprocess
import os
+import multiprocessing
# load six
sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), '', 'lib/six'))
@@ -242,7 +242,7 @@ class KafkaMetricSender(MetricSender):
def open(self):
logging.info("Opening kafka connection for producer")
self.kafka_client = KafkaClient(self.broker_list, timeout=55)
- self.kafka_producer = SimpleProducer(self.kafka_client, batch_send=True, batch_send_every_n=500,
+ self.kafka_producer = SimpleProducer(self.kafka_client, batch_send=False, batch_send_every_n=500,
batch_send_every_t=30)
self.start_time = time.time()
@@ -325,51 +325,58 @@ class MetricCollector(threading.Thread):
class Runner(object):
@staticmethod
- def run(*collectors):
+ def worker(collectors, config):
"""
- Execute concurrently
-
- :param threads:
- :return:
- """
- current_pid = os.getpid()
+ Execute concurrently
+ :param threads:
+ :return:
+ """
try:
- argv = sys.argv
- if len(argv) == 1:
- config = Helper.load_config()
- elif len(argv) == 2:
- config = Helper.load_config(argv[1])
- else:
- raise Exception("Usage: " + argv[0] + " CONFIG_FILE_PATH, but given too many arguments: " + str(argv))
-
- logging.info("PID: %s", current_pid)
for collector in collectors:
try:
collector.init(config)
collector.start()
except Exception as e:
logging.exception(e)
-
for collector in collectors:
- try:
- collector.join(timeout = 55)
- collector.close()
- except BaseException as e:
- logging.exception(e)
+ collector.join(timeout=55)
exit(0)
except BaseException as e:
- if isinstance(e, SystemExit):
- logging.info("Exit code: %s", e)
- else:
+ if not isinstance(e, SystemExit):
logging.exception(e)
exit(1)
- logging.info("Exit code: 1")
finally:
for collector in collectors:
if not collector.is_closed():
collector.close()
- logging.info("Ensuring process stopped: kill -9 %s", current_pid)
- subprocess.call(["kill","-9",str(current_pid)])
+
+ @staticmethod
+ def run(*collectors):
+ config = None
+ argv = sys.argv
+ if len(argv) == 1:
+ config = Helper.load_config()
+ elif len(argv) == 2:
+ config = Helper.load_config(argv[1])
+ else:
+ raise Exception("Usage: " + argv[0] + " CONFIG_FILE_PATH, but given too many arguments: " + str(argv))
+ current_process = multiprocessing.current_process()
+ sub_process = multiprocessing.Process(target=Runner.worker, args=[collectors,config])
+ sub_process.daemon = False
+ sub_process.name = "CollectorSubprocess"
+ try:
+ logging.info("Starting %s", sub_process)
+ sub_process.start()
+ logging.info("Current PID: %s, subprocess PID: %s", current_process.pid, sub_process.pid)
+ sub_process.join(timeout = 55)
+ except BaseException as e:
+ logging.exception(e)
+ finally:
+ if sub_process.is_alive():
+ logging.info("%s is still alive, terminating", sub_process)
+ sub_process.terminate()
+ logging.info("%s exit code: %s", sub_process, sub_process.exitcode)
+ exit(0)
class JmxMetricCollector(MetricCollector):
selected_domain = None