You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2016/12/14 08:23:58 UTC

incubator-eagle git commit: [MINOR] Modify timeout settings and support single-process mode

Repository: incubator-eagle
Updated Branches:
  refs/heads/master 9ca2cebae -> 880ba738c


[MINOR] Modify timeout settings and support single-process mode


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/880ba738
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/880ba738
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/880ba738

Branch: refs/heads/master
Commit: 880ba738c54251734da14411cf873d5aebd13d76
Parents: 9ca2ceb
Author: Hao Chen <ha...@apache.org>
Authored: Wed Dec 14 16:23:45 2016 +0800
Committer: Hao Chen <ha...@apache.org>
Committed: Wed Dec 14 16:23:45 2016 +0800

----------------------------------------------------------------------
 .../hadoop_jmx_collector/metric_collector.py    | 32 +++++++++++++++-----
 1 file changed, 25 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/880ba738/eagle-external/hadoop_jmx_collector/metric_collector.py
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/metric_collector.py b/eagle-external/hadoop_jmx_collector/metric_collector.py
index c7a5599..0f09a9e 100644
--- a/eagle-external/hadoop_jmx_collector/metric_collector.py
+++ b/eagle-external/hadoop_jmx_collector/metric_collector.py
@@ -241,7 +241,7 @@ class KafkaMetricSender(MetricSender):
 
     def open(self):
         logging.info("Opening kafka connection for producer")
-        self.kafka_client = KafkaClient(self.broker_list, timeout=55)
+        self.kafka_client = KafkaClient(self.broker_list, timeout=50)
         self.kafka_producer = SimpleProducer(self.kafka_client, batch_send=False, batch_send_every_n=500,
                                              batch_send_every_t=30)
         self.start_time = time.time()
@@ -340,18 +340,17 @@ class Runner(object):
                     logging.exception(e)
             for collector in collectors:
                 collector.join(timeout=55)
-            exit(0)
+                collector.close()
         except BaseException as e:
             if not isinstance(e, SystemExit):
                 logging.exception(e)
-                exit(1)
         finally:
             for collector in collectors:
                 if not collector.is_closed():
                     collector.close()
 
     @staticmethod
-    def run(*collectors):
+    def run_async(*collectors):
         config = None
         argv = sys.argv
         if len(argv) == 1:
@@ -368,7 +367,7 @@ class Runner(object):
             logging.info("Starting %s", sub_process)
             sub_process.start()
             logging.info("Current PID: %s, subprocess PID: %s", current_process.pid, sub_process.pid)
-            sub_process.join(timeout = 55)
+            sub_process.join(timeout = 56)
         except BaseException as e:
             logging.exception(e)
         finally:
@@ -378,6 +377,25 @@ class Runner(object):
             logging.info("%s exit code: %s", sub_process, sub_process.exitcode)
             exit(0)
 
+    @staticmethod
+    def run(*collectors):
+        config = None
+        argv = sys.argv
+        current_process=multiprocessing.current_process()
+        if len(argv) == 1:
+            config = Helper.load_config()
+        elif len(argv) == 2:
+            config = Helper.load_config(argv[1])
+        else:
+            raise Exception("Usage: " + argv[0] + " CONFIG_FILE_PATH, but given too many arguments: " + str(argv))
+        try:
+            Runner.worker(collectors, config)
+        except BaseException as e:
+            logging.exception(e)
+        finally:
+            logging.info("%s (PID: %s) exit", current_process.name, current_process.pid)
+            exit(0)
+
 class JmxMetricCollector(MetricCollector):
     selected_domain = None
     listeners = []
@@ -429,13 +447,13 @@ class JmxMetricCollector(MetricCollector):
         reader_threads = []
         for source in self.input_components:
             reader_thread=threading.Thread(target=self.jmx_reader, args=[source])
-            reader_thread.daemon = True
+            reader_thread.daemon = False
             logging.info(reader_thread.name + " starting")
             reader_thread.start()
             reader_threads.append(reader_thread)
         for reader_thread in reader_threads:
             logging.info(reader_thread.name + " stopping")
-            reader_thread.join(timeout = 55)
+            reader_thread.join(timeout = 45)
 
         logging.info("Jmx reading threads (num: "+size+") finished")