You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2016/12/13 02:41:41 UTC
incubator-eagle git commit: [EAGLE-838] Resolve defunct process in
hadoop jmx script by kill -9
Repository: incubator-eagle
Updated Branches:
refs/heads/master b12abb38d -> b27998f82
[EAGLE-838] Resolve defunct process in hadoop jmx script by kill -9
https://issues.apache.org/jira/browse/EAGLE-838
- Resolve defunct process in hadoop jmx script by kill -9
- Support configurable log file path
- Improve JMX reader to multiple-threading
Author: Hao Chen <ha...@apache.org>
Closes #733 from haoch/FixDefunctProcess.
Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/b27998f8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/b27998f8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/b27998f8
Branch: refs/heads/master
Commit: b27998f822a492c33ed587d729ba894280660ab1
Parents: b12abb3
Author: Hao Chen <ha...@apache.org>
Authored: Tue Dec 13 10:41:30 2016 +0800
Committer: Hao Chen <ha...@apache.org>
Committed: Tue Dec 13 10:41:30 2016 +0800
----------------------------------------------------------------------
.../hadoop_jmx_config-sample.json | 4 +-
.../hadoop_jmx_collector/hadoop_jmx_kafka.py | 2 +-
.../hbase_jmx_config-sample.json | 4 +-
.../hadoop_jmx_collector/metric_collector.py | 143 +++++++++++++------
4 files changed, 105 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b27998f8/eagle-external/hadoop_jmx_collector/hadoop_jmx_config-sample.json
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/hadoop_jmx_config-sample.json b/eagle-external/hadoop_jmx_collector/hadoop_jmx_config-sample.json
index f5c16fd..a6ddf7d 100755
--- a/eagle-external/hadoop_jmx_collector/hadoop_jmx_config-sample.json
+++ b/eagle-external/hadoop_jmx_collector/hadoop_jmx_config-sample.json
@@ -1,6 +1,8 @@
{
"env": {
- "site": "sandbox"
+ "site": "sandbox",
+ "metric_prefix": "hadoop.",
+ "log_file": "/tmp/hadoop-jmx-collector.log"
},
"input": [
{
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b27998f8/eagle-external/hadoop_jmx_collector/hadoop_jmx_kafka.py
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/hadoop_jmx_kafka.py b/eagle-external/hadoop_jmx_collector/hadoop_jmx_kafka.py
index 799d351..be7b9c7 100644
--- a/eagle-external/hadoop_jmx_collector/hadoop_jmx_kafka.py
+++ b/eagle-external/hadoop_jmx_collector/hadoop_jmx_kafka.py
@@ -17,7 +17,7 @@
#
from metric_collector import JmxMetricCollector,JmxMetricListener,Runner
-import json, logging, fnmatch
+import json, logging, fnmatch, sys
class NNSafeModeMetric(JmxMetricListener):
def on_metric(self, metric):
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b27998f8/eagle-external/hadoop_jmx_collector/hbase_jmx_config-sample.json
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/hbase_jmx_config-sample.json b/eagle-external/hadoop_jmx_collector/hbase_jmx_config-sample.json
index 2275ccc..c37a9ae 100644
--- a/eagle-external/hadoop_jmx_collector/hbase_jmx_config-sample.json
+++ b/eagle-external/hadoop_jmx_collector/hbase_jmx_config-sample.json
@@ -1,6 +1,8 @@
{
"env": {
- "site": "sandbox"
+ "site": "sandbox",
+ "metric_prefix": "hadoop.",
+ "log_file": "/tmp/hadoop-jmx-collector.log"
},
"input": [
{
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b27998f8/eagle-external/hadoop_jmx_collector/metric_collector.py
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/metric_collector.py b/eagle-external/hadoop_jmx_collector/metric_collector.py
index c4fc457..6205e1f 100644
--- a/eagle-external/hadoop_jmx_collector/metric_collector.py
+++ b/eagle-external/hadoop_jmx_collector/metric_collector.py
@@ -16,18 +16,18 @@
# limitations under the License.
#
-import os
import re
import time
import json
import urllib2
import sys
import socket
-import types
import httplib
import logging
import threading
import fnmatch
+import subprocess
+import os
# load six
sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), '', 'lib/six'))
@@ -37,11 +37,6 @@ import six
sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), '', 'lib/kafka-python'))
from kafka import KafkaClient, SimpleProducer, SimpleConsumer
-logging.basicConfig(level=logging.INFO,
- format='%(asctime)s %(name)-12s %(levelname)-6s %(message)s',
- datefmt='%m-%d %H:%M')
-
-
class Helper:
def __init__(self):
pass
@@ -49,24 +44,32 @@ class Helper:
@staticmethod
def load_config(config_file="config.json"):
"""
-
:param config_file:
:return:
"""
- abs_file_path = config_file
+ abs_file_path = config_file
if not os.path.isfile(abs_file_path):
script_dir = os.path.dirname(__file__)
rel_path = "./" + config_file
abs_file_path = os.path.join(script_dir, rel_path)
if not os.path.isfile(abs_file_path):
raise Exception(abs_file_path + " doesn't exist, please rename config-sample.json to config.json")
-
- logging.info("Using configuration file " + abs_file_path)
f = open(abs_file_path, 'r')
json_file = f.read()
f.close()
config = json.loads(json_file)
+
+ if config["env"].has_key("log_file"):
+ logging.basicConfig(filename=config["env"]["log_file"], filemode='w',level=logging.INFO,
+ format='%(asctime)s %(name)s %(threadName)s %(levelname)s %(message)s',
+ datefmt='%m-%d %H:%M')
+ else:
+ logging.basicConfig(level=logging.INFO,
+ format='%(asctime)s %(name)s %(threadName)s %(levelname)s %(message)s',
+ datefmt='%m-%d %H:%M')
+
+ logging.info("Loaded config from %s", abs_file_path)
return config
@staticmethod
@@ -103,12 +106,12 @@ class Helper:
try:
if https:
logging.info("Reading https://" + str(url) + path)
- c = httplib.HTTPSConnection(url, timeout=28)
+ c = httplib.HTTPSConnection(url, timeout=30)
c.request("GET", path)
response = c.getresponse()
else:
logging.info("Reading http://" + str(url) + path)
- response = urllib2.urlopen("http://" + str(url) + path, timeout=28)
+ response = urllib2.urlopen("http://" + str(url) + path, timeout=30)
logging.debug("Got response")
result = response.read()
break
@@ -197,6 +200,9 @@ class MetricSender(object):
class KafkaMetricSender(MetricSender):
+ start_time = time.time()
+ end_time = time.time()
+
def __init__(self, config):
super(KafkaMetricSender, self).__init__(config)
kafka_config = config["output"]["kafka"]
@@ -234,9 +240,11 @@ class KafkaMetricSender(MetricSender):
return self.default_topic
def open(self):
- self.kafka_client = KafkaClient(self.broker_list, timeout=59)
+ logging.info("Opening kafka connection for producer")
+ self.kafka_client = KafkaClient(self.broker_list, timeout=55)
self.kafka_producer = SimpleProducer(self.kafka_client, batch_send=True, batch_send_every_n=500,
batch_send_every_t=30)
+ self.start_time = time.time()
def send(self, msg):
if self.debug_enabled:
@@ -245,15 +253,19 @@ class KafkaMetricSender(MetricSender):
self.kafka_producer.send_messages(self.get_topic_id(msg), json.dumps(msg))
def close(self):
- logging.info("Totally sent " + str(self.sent_count) + " metric events")
+ logging.info("Closing kafka connection and producer")
if self.kafka_producer is not None:
self.kafka_producer.stop()
if self.kafka_client is not None:
self.kafka_client.close()
+ self.end_time = time.time()
+ logging.info("Totally sent " + str(self.sent_count) + " metric events in "+str(self.end_time - self.start_time)+" sec")
+
class MetricCollector(threading.Thread):
filters = []
config = None
+ closed = False
def __init__(self, config=None):
threading.Thread.__init__(self)
@@ -303,11 +315,14 @@ class MetricCollector(threading.Thread):
def close(self):
self.sender.close()
+ self.closed = True
+
+ def is_closed(self):
+ return self.closed
def run(self):
raise Exception("`run` method should be overrode by sub-class before being called")
-
class Runner(object):
@staticmethod
def run(*collectors):
@@ -317,29 +332,44 @@ class Runner(object):
:param threads:
:return:
"""
- argv = sys.argv
- if len(argv) == 1:
- config = Helper.load_config()
- elif len(argv) == 2:
- config = Helper.load_config(argv[1])
- else:
- raise Exception("Usage: " + argv[0] + " CONFIG_FILE_PATH, but given too many arguments: " + str(argv))
-
- for collector in collectors:
- try:
- collector.init(config)
- collector.start()
- except Exception as e:
- logging.exception(e)
-
- for collector in collectors:
- try:
- collector.join()
- except Exception as e:
+ current_pid = os.getpid()
+ try:
+ argv = sys.argv
+ if len(argv) == 1:
+ config = Helper.load_config()
+ elif len(argv) == 2:
+ config = Helper.load_config(argv[1])
+ else:
+ raise Exception("Usage: " + argv[0] + " CONFIG_FILE_PATH, but given too many arguments: " + str(argv))
+
+ logging.info("PID: %s", current_pid)
+ for collector in collectors:
+ try:
+ collector.init(config)
+ collector.start()
+ except Exception as e:
+ logging.exception(e)
+
+ for collector in collectors:
+ try:
+ collector.join(timeout = 55)
+ collector.close()
+ except BaseException as e:
+ logging.exception(e)
+ exit(0)
+ except BaseException as e:
+ if isinstance(e, SystemExit):
+ logging.info("Exit code: %s", e)
+ else:
logging.exception(e)
- finally:
- collector.close()
-
+ exit(1)
+ logging.info("Exit code: 1")
+ finally:
+ for collector in collectors:
+ if not collector.is_closed():
+ collector.close()
+ logging.info("Ensuring process stopped: kill -9 %s", current_pid)
+ subprocess.call(["kill","-9",str(current_pid)])
class JmxMetricCollector(MetricCollector):
selected_domain = None
@@ -373,13 +403,34 @@ class JmxMetricCollector(MetricCollector):
listener.init(self)
self.listeners.append(listener)
+ def jmx_reader(self, source):
+ host = source["host"]
+ port=source["port"]
+ https=source["https"]
+ protocol = "https" if https else "http"
+ try:
+ beans = JmxReader(host, port, https).open().get_jmx_beans()
+ self.on_beans(source, beans)
+ except Exception as e:
+ jmx_url = protocol+"://"+str(host) + ":" + str(port)
+ logging.error("Failed to read jmx for " + jmx_url)
+ logging.exception(e)
+
def run(self):
- for input in self.input_components:
- try:
- beans = JmxReader(input["host"], input["port"], input["https"]).open().get_jmx_beans()
- self.on_beans(input, beans)
- except Exception as e:
- logging.exception("Failed to read jmx for " + str(input))
+ size=str(len(self.input_components))
+ logging.info("Starting jmx reading threads (num: " + size + ")")
+ reader_threads = []
+ for source in self.input_components:
+ reader_thread=threading.Thread(target=self.jmx_reader, args=[source])
+ reader_thread.daemon = True
+ logging.info(reader_thread.name + " starting")
+ reader_thread.start()
+ reader_threads.append(reader_thread)
+ for reader_thread in reader_threads:
+ logging.info(reader_thread.name + " stopping")
+ reader_thread.join(timeout = 55)
+
+ logging.info("Jmx reading threads (num: "+size+") finished")
def filter_bean(self, bean, mbean_domain):
return mbean_domain in self.selected_domain
@@ -436,6 +487,9 @@ class JmxMetricCollector(MetricCollector):
metric_prefix_name = '.'.join([i[1] for i in mbean_list])
return (self.metric_prefix + metric_prefix_name).replace(" ", "").lower()
+ def close(self):
+ super(JmxMetricCollector, self).close()
+
# ========================
# Metric Listeners
# ========================
@@ -450,7 +504,6 @@ class JmxMetricListener:
def on_metric(self, metric):
pass
-
# ========================
# Metric Filters
# ========================