You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2016/12/14 08:23:58 UTC
incubator-eagle git commit: [MINOR] Modify timeout settings and
support single-process mode
Repository: incubator-eagle
Updated Branches:
refs/heads/master 9ca2cebae -> 880ba738c
[MINOR] Modify timeout settings and support single-process mode
Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/880ba738
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/880ba738
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/880ba738
Branch: refs/heads/master
Commit: 880ba738c54251734da14411cf873d5aebd13d76
Parents: 9ca2ceb
Author: Hao Chen <ha...@apache.org>
Authored: Wed Dec 14 16:23:45 2016 +0800
Committer: Hao Chen <ha...@apache.org>
Committed: Wed Dec 14 16:23:45 2016 +0800
----------------------------------------------------------------------
.../hadoop_jmx_collector/metric_collector.py | 32 +++++++++++++++-----
1 file changed, 25 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/880ba738/eagle-external/hadoop_jmx_collector/metric_collector.py
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/metric_collector.py b/eagle-external/hadoop_jmx_collector/metric_collector.py
index c7a5599..0f09a9e 100644
--- a/eagle-external/hadoop_jmx_collector/metric_collector.py
+++ b/eagle-external/hadoop_jmx_collector/metric_collector.py
@@ -241,7 +241,7 @@ class KafkaMetricSender(MetricSender):
def open(self):
logging.info("Opening kafka connection for producer")
- self.kafka_client = KafkaClient(self.broker_list, timeout=55)
+ self.kafka_client = KafkaClient(self.broker_list, timeout=50)
self.kafka_producer = SimpleProducer(self.kafka_client, batch_send=False, batch_send_every_n=500,
batch_send_every_t=30)
self.start_time = time.time()
@@ -340,18 +340,17 @@ class Runner(object):
logging.exception(e)
for collector in collectors:
collector.join(timeout=55)
- exit(0)
+ collector.close()
except BaseException as e:
if not isinstance(e, SystemExit):
logging.exception(e)
- exit(1)
finally:
for collector in collectors:
if not collector.is_closed():
collector.close()
@staticmethod
- def run(*collectors):
+ def run_async(*collectors):
config = None
argv = sys.argv
if len(argv) == 1:
@@ -368,7 +367,7 @@ class Runner(object):
logging.info("Starting %s", sub_process)
sub_process.start()
logging.info("Current PID: %s, subprocess PID: %s", current_process.pid, sub_process.pid)
- sub_process.join(timeout = 55)
+ sub_process.join(timeout = 56)
except BaseException as e:
logging.exception(e)
finally:
@@ -378,6 +377,25 @@ class Runner(object):
logging.info("%s exit code: %s", sub_process, sub_process.exitcode)
exit(0)
+ @staticmethod
+ def run(*collectors):
+ config = None
+ argv = sys.argv
+ current_process=multiprocessing.current_process()
+ if len(argv) == 1:
+ config = Helper.load_config()
+ elif len(argv) == 2:
+ config = Helper.load_config(argv[1])
+ else:
+ raise Exception("Usage: " + argv[0] + " CONFIG_FILE_PATH, but given too many arguments: " + str(argv))
+ try:
+ Runner.worker(collectors, config)
+ except BaseException as e:
+ logging.exception(e)
+ finally:
+ logging.info("%s (PID: %s) exit", current_process.name, current_process.pid)
+ exit(0)
+
class JmxMetricCollector(MetricCollector):
selected_domain = None
listeners = []
@@ -429,13 +447,13 @@ class JmxMetricCollector(MetricCollector):
reader_threads = []
for source in self.input_components:
reader_thread=threading.Thread(target=self.jmx_reader, args=[source])
- reader_thread.daemon = True
+ reader_thread.daemon = False
logging.info(reader_thread.name + " starting")
reader_thread.start()
reader_threads.append(reader_thread)
for reader_thread in reader_threads:
logging.info(reader_thread.name + " stopping")
- reader_thread.join(timeout = 55)
+ reader_thread.join(timeout = 45)
logging.info("Jmx reading threads (num: "+size+") finished")