You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ao...@apache.org on 2017/04/06 09:33:32 UTC

[11/41] ambari git commit: AMBARI-20632. With multi-process StatusCommandsExecutor, Status commands are taking too long to report back (echekanskiy)

AMBARI-20632. With multi-process StatusCommandsExecutor, Status commands are taking too long to report back (echekanskiy)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/ccf9edbc
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/ccf9edbc
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/ccf9edbc

Branch: refs/heads/branch-3.0-perf
Commit: ccf9edbc14d9d38f1924ab1defa8ca8b6f73f3f9
Parents: 4f5ac09
Author: Eugene Chekanskiy <ec...@hortonworks.com>
Authored: Mon Apr 3 16:46:27 2017 +0300
Committer: Andrew Onishuk <ao...@hortonworks.com>
Committed: Thu Apr 6 12:32:56 2017 +0300

----------------------------------------------------------------------
 .../ambari_agent/StatusCommandsExecutor.py      | 209 +++++++++----------
 .../src/main/python/ambari_agent/main.py        |   5 +-
 2 files changed, 98 insertions(+), 116 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/ccf9edbc/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py b/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py
index 04a3e85..142e7ca 100644
--- a/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py
+++ b/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py
@@ -49,7 +49,7 @@ class SingleProcessStatusCommandsExecutor(StatusCommandsExecutor):
     self.config = config
     self.actionQueue = actionQueue
     self.statusCommandQueue = Queue.Queue()
-    self.need_relaunch = False
+    self.need_relaunch = (False, None) #  tuple (bool, str|None) with flag to relaunch and reason of relaunch
 
   def put_commands(self, commands):
     with self.statusCommandQueue.mutex:
@@ -88,12 +88,13 @@ class MultiProcessStatusCommandsExecutor(StatusCommandsExecutor):
     self.config = config
     self.actionQueue = actionQueue
 
-    self._can_relaunch_lock = threading.RLock()
-    self._can_relaunch = True
+    self.can_relaunch = True
 
     # used to prevent queues from been used during creation of new one to prevent threads messing up with combination of
     # old and new queues
     self.usage_lock = threading.RLock()
+    # protects against simultaneous killing/creating from different threads.
+    self.kill_lock = threading.RLock()
 
     self.status_command_timeout = int(self.config.get('agent', 'status_command_timeout', 5))
     self.customServiceOrchestrator = self.actionQueue.customServiceOrchestrator
@@ -107,42 +108,32 @@ class MultiProcessStatusCommandsExecutor(StatusCommandsExecutor):
     self.mp_result_logs = multiprocessing.Queue()
     self.mp_task_queue = multiprocessing.Queue()
 
-  @property
-  def can_relaunch(self):
-    with self._can_relaunch_lock:
-      return self._can_relaunch
-
-  @can_relaunch.setter
-  def can_relaunch(self, value):
-    with self._can_relaunch_lock:
-      self._can_relaunch = value
-
-  def _log_message(self, level, message, exception=None):
-    """
-    Put log message to logging queue. Must be used only for logging from child process(in _worker_process_target).
-
-    :param level:
-    :param message:
-    :param exception:
-    :return:
+  def _drain_queue(self, target_queue, max_time=5, max_empty_count=15, read_break=.001):
     """
-    result_message = "StatusCommandExecutor reporting at {0}: ".format(time.time()) + message
-    self.mp_result_logs.put((level, result_message, exception))
-
-  def _get_log_messages(self):
-    """
-    Returns list of (level, message, exception) log messages.
-
-    :return: list of (level, message, exception)
+    Read everything that available in queue. Using not reliable multiprocessing.Queue methods(qsize, empty), so contains
+    extremely dumb protection against blocking too much at this method: will try to get all possible items for not more
+    than ``max_time`` seconds; will return after ``max_empty_count`` calls of ``target_queue.get(False)`` that raised
+    ``Queue.Empty`` exception. Notice ``read_break`` argument, with default values this method will be able to read
+    ~4500 ``range(1,10000)`` objects for 5 seconds. So don't fill queue too fast.
+
+    :param target_queue: queue to read from
+    :param max_time: maximum time to spend in this method call
+    :param max_empty_count: maximum allowed ``Queue.Empty`` in a row
+    :param read_break: time to wait before next read cycle iteration
+    :return: list of resulting objects
     """
     results = []
+    _empty = 0
+    _start = time.time()
     with self.usage_lock:
       try:
-        while not self.mp_result_logs.empty():
+        while (not target_queue.empty() or target_queue.qsize() > 0) and time.time() - _start < max_time and _empty < max_empty_count:
           try:
-            results.append(self.mp_result_logs.get(False))
+            results.append(target_queue.get(False))
+            _empty = 0
+            time.sleep(read_break) # sleep a little to get more accurate empty and qsize results
           except Queue.Empty:
-            pass
+            _empty += 1
           except IOError:
             pass
           except UnicodeDecodeError:
@@ -151,11 +142,23 @@ class MultiProcessStatusCommandsExecutor(StatusCommandsExecutor):
         pass
     return results
 
+  def _log_message(self, level, message, exception=None):
+    """
+    Put log message to logging queue. Must be used only for logging from child process(in _worker_process_target).
+
+    :param level:
+    :param message:
+    :param exception:
+    :return:
+    """
+    result_message = "StatusCommandExecutor reporting at {0}: ".format(time.time()) + message
+    self.mp_result_logs.put((level, result_message, exception))
+
   def _process_logs(self):
     """
     Get all available at this moment logs and prints them to logger.
     """
-    for level, message, exception in self._get_log_messages():
+    for level, message, exception in self._drain_queue(self.mp_result_logs):
       if level == logging.ERROR:
         logger.debug(message, exc_info=exception)
       if level == logging.WARN:
@@ -256,16 +259,6 @@ class MultiProcessStatusCommandsExecutor(StatusCommandsExecutor):
     :return:
     """
     with self.usage_lock:
-      if not self.mp_task_queue.empty():
-        status_command_queue_size = 0
-        try:
-          while not self.mp_task_queue.empty():
-            self.mp_task_queue.get(False)
-            status_command_queue_size += 1
-        except Queue.Empty:
-          pass
-
-        logger.info("Number of status commands removed from queue : " + str(status_command_queue_size))
       for command in commands:
         logger.info("Adding " + command['commandType'] + " for component " + \
                     command['componentName'] + " of service " + \
@@ -276,43 +269,29 @@ class MultiProcessStatusCommandsExecutor(StatusCommandsExecutor):
 
   def process_results(self):
     """
-    Process all the results from the internal worker
+    Process all the results from the SCE worker process.
     """
     self._process_logs()
-    for result in self._get_results():
+    results = self._drain_queue(self.mp_result_queue)
+    logger.debug("Drained %s status commands results, ~%s remains in queue", len(results), self.mp_result_queue.qsize())
+    for result in results:
       try:
         self.actionQueue.process_status_command_result(result)
       except UnicodeDecodeError:
         pass
 
-  def _get_results(self):
-    """
-    Get all available results for status commands.
-
-    :return: list of results
-    """
-    results = []
-    with self.usage_lock:
-      try:
-        while not self.mp_result_queue.empty():
-          try:
-            results.append(self.mp_result_queue.get(False))
-          except Queue.Empty:
-            pass
-          except IOError:
-            pass
-          except UnicodeDecodeError:
-            pass
-      except IOError:
-        pass
-    return results
-
   @property
   def need_relaunch(self):
     """
     Indicates if process need to be relaunched due to timeout or it is dead or even was not created.
+
+    :return: tuple (bool, str|None) with flag to relaunch and reason of relaunch
     """
-    return self.timedOutEvent.is_set() or not self.worker_process or not self.worker_process.is_alive()
+    if not self.worker_process or not self.worker_process.is_alive():
+      return True, "WORKER_DEAD"
+    elif self.timedOutEvent.is_set():
+      return True, "COMMAND_TIMEOUT"
+    return False, None
 
   def relaunch(self, reason=None):
     """
@@ -321,13 +300,15 @@ class MultiProcessStatusCommandsExecutor(StatusCommandsExecutor):
     :param reason: reason of restart
     :return:
     """
-    if self.can_relaunch:
-      self.kill(reason)
-      self.worker_process = multiprocessing.Process(target=self._worker_process_target)
-      self.worker_process.start()
-      logger.info("Started process with pid {0}".format(self.worker_process.pid))
-    else:
-      logger.debug("Relaunch does not allowed, can not relaunch")
+    with self.kill_lock:
+      logger.info("Relaunching child process reason:" + str(reason))
+      if self.can_relaunch:
+        self.kill(reason)
+        self.worker_process = multiprocessing.Process(target=self._worker_process_target)
+        self.worker_process.start()
+        logger.info("Started process with pid {0}".format(self.worker_process.pid))
+      else:
+        logger.debug("Relaunch does not allowed, can not relaunch")
 
   def kill(self, reason=None, can_relaunch=True):
     """
@@ -339,43 +320,43 @@ class MultiProcessStatusCommandsExecutor(StatusCommandsExecutor):
     :param reason: reason of killing
     :return:
     """
-    logger.info("Killing child process reason:" + str(reason))
-    self.can_relaunch = can_relaunch
-
-    if not self.can_relaunch:
-      logger.info("Killing without possibility to relaunch...")
-
-    # try graceful stop, otherwise hard-kill
-    if self.worker_process and self.worker_process.is_alive():
-      self.mustDieEvent.set()
-      self.worker_process.join(timeout=3)
-      if self.worker_process.is_alive():
-        os.kill(self.worker_process.pid, signal.SIGKILL)
-        logger.info("Child process killed by -9")
+    with self.kill_lock:
+      self.can_relaunch = can_relaunch
+
+      if not self.can_relaunch:
+        logger.info("Killing without possibility to relaunch...")
+
+      # try graceful stop, otherwise hard-kill
+      if self.worker_process and self.worker_process.is_alive():
+        self.mustDieEvent.set()
+        self.worker_process.join(timeout=3)
+        if self.worker_process.is_alive():
+          os.kill(self.worker_process.pid, signal.SIGKILL)
+          logger.info("Child process killed by -9")
+        else:
+          # get log messages only if we died gracefully, otherwise we will have chance to block here forever, in most cases
+          # this call will do nothing, as all logs will be processed in ActionQueue loop
+          self._process_logs()
+          logger.info("Child process died gracefully")
       else:
-        # get log messages only if we died gracefully, otherwise we will have chance to block here forever, in most cases
-        # this call will do nothing, as all logs will be processed in ActionQueue loop
-        self._process_logs()
-        logger.info("Child process died gracefully")
-    else:
-      logger.info("Child process already dead")
-
-    # close queues and acquire usage lock
-    # closing both sides of pipes here, we need this hack in case of blocking on recv() call
-    self.mp_result_queue.close()
-    self.mp_result_queue._writer.close()
-    self.mp_result_logs.close()
-    self.mp_result_logs._writer.close()
-    self.mp_task_queue.close()
-    self.mp_task_queue._writer.close()
-
-    with self.usage_lock:
-      self.mp_result_queue.join_thread()
-      self.mp_result_queue = multiprocessing.Queue()
-      self.mp_task_queue.join_thread()
-      self.mp_task_queue = multiprocessing.Queue()
-      self.mp_result_logs.join_thread()
-      self.mp_result_logs = multiprocessing.Queue()
-      self.customServiceOrchestrator = self.actionQueue.customServiceOrchestrator
-      self.mustDieEvent.clear()
-      self.timedOutEvent.clear()
+        logger.info("Child process already dead")
+
+      # close queues and acquire usage lock
+      # closing both sides of pipes here, we need this hack in case of blocking on recv() call
+      self.mp_result_queue.close()
+      self.mp_result_queue._writer.close()
+      self.mp_result_logs.close()
+      self.mp_result_logs._writer.close()
+      self.mp_task_queue.close()
+      self.mp_task_queue._writer.close()
+
+      with self.usage_lock:
+        self.mp_result_queue.join_thread()
+        self.mp_result_queue = multiprocessing.Queue()
+        self.mp_task_queue.join_thread()
+        self.mp_task_queue = multiprocessing.Queue()
+        self.mp_result_logs.join_thread()
+        self.mp_result_logs = multiprocessing.Queue()
+        self.customServiceOrchestrator = self.actionQueue.customServiceOrchestrator
+        self.mustDieEvent.clear()
+        self.timedOutEvent.clear()

http://git-wip-us.apache.org/repos/asf/ambari/blob/ccf9edbc/ambari-agent/src/main/python/ambari_agent/main.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/main.py b/ambari-agent/src/main/python/ambari_agent/main.py
index 923c570..19c92b0 100644
--- a/ambari-agent/src/main/python/ambari_agent/main.py
+++ b/ambari-agent/src/main/python/ambari_agent/main.py
@@ -352,8 +352,9 @@ def run_threads(server_hostname, heartbeat_stop_callback):
   while controller.is_alive():
     time.sleep(0.1)
 
-    if controller.get_status_commands_executor().need_relaunch:
-      controller.get_status_commands_executor().relaunch("COMMAND_TIMEOUT_OR_KILLED")
+    need_relaunch, reason = controller.get_status_commands_executor().need_relaunch
+    if need_relaunch:
+      controller.get_status_commands_executor().relaunch(reason)
 
   controller.get_status_commands_executor().kill("AGENT_STOPPED", can_relaunch=False)