You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2016/11/29 09:27:57 UTC
incubator-eagle git commit: [EAGLE-807] Support configuration path
argument from CLI
Repository: incubator-eagle
Updated Branches:
refs/heads/master fb853dfd8 -> 285524efb
[EAGLE-807] Support configuration path argument from CLI
* Support configuration path argument from CLI and fix config bug
* Fix python script multi-threading problem
Author: Hao Chen <ha...@apache.org>
Closes #697 from haoch/refactorScriptConfig.
Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/285524ef
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/285524ef
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/285524ef
Branch: refs/heads/master
Commit: 285524efb7e898c1ace70c70a62933d8370b1f63
Parents: fb853df
Author: Hao Chen <ha...@apache.org>
Authored: Tue Nov 29 17:27:31 2016 +0800
Committer: Hao Chen <ha...@apache.org>
Committed: Tue Nov 29 17:27:31 2016 +0800
----------------------------------------------------------------------
.../hadoop_jmx_collector/metric_collector.py | 60 +++++++++++++-------
1 file changed, 40 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/285524ef/eagle-external/hadoop_jmx_collector/metric_collector.py
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/metric_collector.py b/eagle-external/hadoop_jmx_collector/metric_collector.py
index e165ea2..45a3877 100644
--- a/eagle-external/hadoop_jmx_collector/metric_collector.py
+++ b/eagle-external/hadoop_jmx_collector/metric_collector.py
@@ -51,13 +51,16 @@ class Helper:
:param config_file:
:return:
"""
- # read the self-defined filters
- script_dir = os.path.dirname(__file__)
- rel_path = "./" + config_file
- abs_file_path = os.path.join(script_dir, rel_path)
+ abs_file_path = config_file
+
if not os.path.isfile(abs_file_path):
- logging.error(abs_file_path + " doesn't exist, please rename config-sample.json to config.json")
- exit(1)
+ script_dir = os.path.dirname(__file__)
+ rel_path = "./" + config_file
+ abs_file_path = os.path.join(script_dir, rel_path)
+ if not os.path.isfile(abs_file_path):
+ raise Exception(abs_file_path + " doesn't exist, please rename config-sample.json to config.json")
+
+ logging.info("Using configuration file " + abs_file_path)
f = open(abs_file_path, 'r')
json_file = f.read()
f.close()
@@ -227,22 +230,20 @@ class KafkaMetricSender(MetricSender):
self.kafka_client.close()
class MetricCollector(threading.Thread):
- def __init__(self):
+ def __init__(self,config = None):
threading.Thread.__init__(self)
- self.config = Helper.load_config()
- self.sender = KafkaMetricSender(self.config)
+ self.config = None
+ self.sender = None
self.fqdn = socket.getfqdn()
- self.init(self.config)
def init(self, config):
+ self.config = config
+ self.sender = KafkaMetricSender(self.config)
+ self.sender.open()
pass
def start(self):
- try:
- self.sender.open()
- self.run()
- finally:
- self.sender.close()
+ super(MetricCollector, self).start()
def collect(self, msg):
if not msg.has_key("timestamp"):
@@ -253,27 +254,45 @@ class MetricCollector(threading.Thread):
msg["host"] = self.fqdn
if not msg.has_key("site"):
msg["site"] = self.config["env"]["site"]
-
self.sender.send(msg)
+ def close(self):
+ self.sender.close()
+
def run(self):
raise Exception("`run` method should be overrode by sub-class before being called")
-
class Runner(object):
@staticmethod
- def run(*threads):
+ def run(*collectors):
"""
Execute concurrently
:param threads:
:return:
"""
- for thread in threads:
+ argv = sys.argv
+ if len(argv) == 1:
+ config = Helper.load_config()
+ elif len(argv) == 2:
+ config = Helper.load_config(argv[1])
+ else:
+ raise Exception("Usage: "+argv[0]+" CONFIG_FILE_PATH, but given too many arguments: " + str(argv))
+
+ for collector in collectors:
+ try:
+ collector.init(config)
+ collector.start()
+ except Exception as e:
+ logging.exception(e)
+
+ for collector in collectors:
try:
- thread.start()
+ collector.join()
except Exception as e:
logging.exception(e)
+ finally:
+ collector.close()
class JmxMetricCollector(MetricCollector):
selected_domain = None
@@ -281,6 +300,7 @@ class JmxMetricCollector(MetricCollector):
input_components = []
def init(self, config):
+ super(JmxMetricCollector, self).init(config)
self.input_components = config["input"]
for input in self.input_components:
if not input.has_key("host"):