You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ao...@apache.org on 2017/04/06 09:33:32 UTC
[11/41] ambari git commit: AMBARI-20632. With multi-process
StatusCommandsExecutor,
Status commands are taking too long to report back (echekanskiy)
AMBARI-20632. With multi-process StatusCommandsExecutor, Status commands are taking too long to report back (echekanskiy)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/ccf9edbc
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/ccf9edbc
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/ccf9edbc
Branch: refs/heads/branch-3.0-perf
Commit: ccf9edbc14d9d38f1924ab1defa8ca8b6f73f3f9
Parents: 4f5ac09
Author: Eugene Chekanskiy <ec...@hortonworks.com>
Authored: Mon Apr 3 16:46:27 2017 +0300
Committer: Andrew Onishuk <ao...@hortonworks.com>
Committed: Thu Apr 6 12:32:56 2017 +0300
----------------------------------------------------------------------
.../ambari_agent/StatusCommandsExecutor.py | 209 +++++++++----------
.../src/main/python/ambari_agent/main.py | 5 +-
2 files changed, 98 insertions(+), 116 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/ccf9edbc/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py b/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py
index 04a3e85..142e7ca 100644
--- a/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py
+++ b/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py
@@ -49,7 +49,7 @@ class SingleProcessStatusCommandsExecutor(StatusCommandsExecutor):
self.config = config
self.actionQueue = actionQueue
self.statusCommandQueue = Queue.Queue()
- self.need_relaunch = False
+ self.need_relaunch = (False, None) # tuple (bool, str|None) with flag to relaunch and reason of relaunch
def put_commands(self, commands):
with self.statusCommandQueue.mutex:
@@ -88,12 +88,13 @@ class MultiProcessStatusCommandsExecutor(StatusCommandsExecutor):
self.config = config
self.actionQueue = actionQueue
- self._can_relaunch_lock = threading.RLock()
- self._can_relaunch = True
+ self.can_relaunch = True
# used to prevent queues from been used during creation of new one to prevent threads messing up with combination of
# old and new queues
self.usage_lock = threading.RLock()
+ # protects against simultaneous killing/creating from different threads.
+ self.kill_lock = threading.RLock()
self.status_command_timeout = int(self.config.get('agent', 'status_command_timeout', 5))
self.customServiceOrchestrator = self.actionQueue.customServiceOrchestrator
@@ -107,42 +108,32 @@ class MultiProcessStatusCommandsExecutor(StatusCommandsExecutor):
self.mp_result_logs = multiprocessing.Queue()
self.mp_task_queue = multiprocessing.Queue()
- @property
- def can_relaunch(self):
- with self._can_relaunch_lock:
- return self._can_relaunch
-
- @can_relaunch.setter
- def can_relaunch(self, value):
- with self._can_relaunch_lock:
- self._can_relaunch = value
-
- def _log_message(self, level, message, exception=None):
- """
- Put log message to logging queue. Must be used only for logging from child process(in _worker_process_target).
-
- :param level:
- :param message:
- :param exception:
- :return:
+ def _drain_queue(self, target_queue, max_time=5, max_empty_count=15, read_break=.001):
"""
- result_message = "StatusCommandExecutor reporting at {0}: ".format(time.time()) + message
- self.mp_result_logs.put((level, result_message, exception))
-
- def _get_log_messages(self):
- """
- Returns list of (level, message, exception) log messages.
-
- :return: list of (level, message, exception)
+ Read everything that available in queue. Using not reliable multiprocessing.Queue methods(qsize, empty), so contains
+ extremely dumb protection against blocking too much at this method: will try to get all possible items for not more
+ than ``max_time`` seconds; will return after ``max_empty_count`` calls of ``target_queue.get(False)`` that raised
+ ``Queue.Empty`` exception. Notice ``read_break`` argument, with default values this method will be able to read
+ ~4500 ``range(1,10000)`` objects for 5 seconds. So don't fill queue too fast.
+
+ :param target_queue: queue to read from
+ :param max_time: maximum time to spend in this method call
+ :param max_empty_count: maximum allowed ``Queue.Empty`` in a row
+ :param read_break: time to wait before next read cycle iteration
+ :return: list of resulting objects
"""
results = []
+ _empty = 0
+ _start = time.time()
with self.usage_lock:
try:
- while not self.mp_result_logs.empty():
+ while (not target_queue.empty() or target_queue.qsize() > 0) and time.time() - _start < max_time and _empty < max_empty_count:
try:
- results.append(self.mp_result_logs.get(False))
+ results.append(target_queue.get(False))
+ _empty = 0
+ time.sleep(read_break) # sleep a little to get more accurate empty and qsize results
except Queue.Empty:
- pass
+ _empty += 1
except IOError:
pass
except UnicodeDecodeError:
@@ -151,11 +142,23 @@ class MultiProcessStatusCommandsExecutor(StatusCommandsExecutor):
pass
return results
+ def _log_message(self, level, message, exception=None):
+ """
+ Put log message to logging queue. Must be used only for logging from child process(in _worker_process_target).
+
+ :param level:
+ :param message:
+ :param exception:
+ :return:
+ """
+ result_message = "StatusCommandExecutor reporting at {0}: ".format(time.time()) + message
+ self.mp_result_logs.put((level, result_message, exception))
+
def _process_logs(self):
"""
Get all available at this moment logs and prints them to logger.
"""
- for level, message, exception in self._get_log_messages():
+ for level, message, exception in self._drain_queue(self.mp_result_logs):
if level == logging.ERROR:
logger.debug(message, exc_info=exception)
if level == logging.WARN:
@@ -256,16 +259,6 @@ class MultiProcessStatusCommandsExecutor(StatusCommandsExecutor):
:return:
"""
with self.usage_lock:
- if not self.mp_task_queue.empty():
- status_command_queue_size = 0
- try:
- while not self.mp_task_queue.empty():
- self.mp_task_queue.get(False)
- status_command_queue_size += 1
- except Queue.Empty:
- pass
-
- logger.info("Number of status commands removed from queue : " + str(status_command_queue_size))
for command in commands:
logger.info("Adding " + command['commandType'] + " for component " + \
command['componentName'] + " of service " + \
@@ -276,43 +269,29 @@ class MultiProcessStatusCommandsExecutor(StatusCommandsExecutor):
def process_results(self):
"""
- Process all the results from the internal worker
+ Process all the results from the SCE worker process.
"""
self._process_logs()
- for result in self._get_results():
+ results = self._drain_queue(self.mp_result_queue)
+ logger.debug("Drained %s status commands results, ~%s remains in queue", len(results), self.mp_result_queue.qsize())
+ for result in results:
try:
self.actionQueue.process_status_command_result(result)
except UnicodeDecodeError:
pass
- def _get_results(self):
- """
- Get all available results for status commands.
-
- :return: list of results
- """
- results = []
- with self.usage_lock:
- try:
- while not self.mp_result_queue.empty():
- try:
- results.append(self.mp_result_queue.get(False))
- except Queue.Empty:
- pass
- except IOError:
- pass
- except UnicodeDecodeError:
- pass
- except IOError:
- pass
- return results
-
@property
def need_relaunch(self):
"""
Indicates if process need to be relaunched due to timeout or it is dead or even was not created.
+
+ :return: tuple (bool, str|None) with flag to relaunch and reason of relaunch
"""
- return self.timedOutEvent.is_set() or not self.worker_process or not self.worker_process.is_alive()
+ if not self.worker_process or not self.worker_process.is_alive():
+ return True, "WORKER_DEAD"
+ elif self.timedOutEvent.is_set():
+ return True, "COMMAND_TIMEOUT"
+ return False, None
def relaunch(self, reason=None):
"""
@@ -321,13 +300,15 @@ class MultiProcessStatusCommandsExecutor(StatusCommandsExecutor):
:param reason: reason of restart
:return:
"""
- if self.can_relaunch:
- self.kill(reason)
- self.worker_process = multiprocessing.Process(target=self._worker_process_target)
- self.worker_process.start()
- logger.info("Started process with pid {0}".format(self.worker_process.pid))
- else:
- logger.debug("Relaunch does not allowed, can not relaunch")
+ with self.kill_lock:
+ logger.info("Relaunching child process reason:" + str(reason))
+ if self.can_relaunch:
+ self.kill(reason)
+ self.worker_process = multiprocessing.Process(target=self._worker_process_target)
+ self.worker_process.start()
+ logger.info("Started process with pid {0}".format(self.worker_process.pid))
+ else:
+ logger.debug("Relaunch does not allowed, can not relaunch")
def kill(self, reason=None, can_relaunch=True):
"""
@@ -339,43 +320,43 @@ class MultiProcessStatusCommandsExecutor(StatusCommandsExecutor):
:param reason: reason of killing
:return:
"""
- logger.info("Killing child process reason:" + str(reason))
- self.can_relaunch = can_relaunch
-
- if not self.can_relaunch:
- logger.info("Killing without possibility to relaunch...")
-
- # try graceful stop, otherwise hard-kill
- if self.worker_process and self.worker_process.is_alive():
- self.mustDieEvent.set()
- self.worker_process.join(timeout=3)
- if self.worker_process.is_alive():
- os.kill(self.worker_process.pid, signal.SIGKILL)
- logger.info("Child process killed by -9")
+ with self.kill_lock:
+ self.can_relaunch = can_relaunch
+
+ if not self.can_relaunch:
+ logger.info("Killing without possibility to relaunch...")
+
+ # try graceful stop, otherwise hard-kill
+ if self.worker_process and self.worker_process.is_alive():
+ self.mustDieEvent.set()
+ self.worker_process.join(timeout=3)
+ if self.worker_process.is_alive():
+ os.kill(self.worker_process.pid, signal.SIGKILL)
+ logger.info("Child process killed by -9")
+ else:
+ # get log messages only if we died gracefully, otherwise we will have chance to block here forever, in most cases
+ # this call will do nothing, as all logs will be processed in ActionQueue loop
+ self._process_logs()
+ logger.info("Child process died gracefully")
else:
- # get log messages only if we died gracefully, otherwise we will have chance to block here forever, in most cases
- # this call will do nothing, as all logs will be processed in ActionQueue loop
- self._process_logs()
- logger.info("Child process died gracefully")
- else:
- logger.info("Child process already dead")
-
- # close queues and acquire usage lock
- # closing both sides of pipes here, we need this hack in case of blocking on recv() call
- self.mp_result_queue.close()
- self.mp_result_queue._writer.close()
- self.mp_result_logs.close()
- self.mp_result_logs._writer.close()
- self.mp_task_queue.close()
- self.mp_task_queue._writer.close()
-
- with self.usage_lock:
- self.mp_result_queue.join_thread()
- self.mp_result_queue = multiprocessing.Queue()
- self.mp_task_queue.join_thread()
- self.mp_task_queue = multiprocessing.Queue()
- self.mp_result_logs.join_thread()
- self.mp_result_logs = multiprocessing.Queue()
- self.customServiceOrchestrator = self.actionQueue.customServiceOrchestrator
- self.mustDieEvent.clear()
- self.timedOutEvent.clear()
+ logger.info("Child process already dead")
+
+ # close queues and acquire usage lock
+ # closing both sides of pipes here, we need this hack in case of blocking on recv() call
+ self.mp_result_queue.close()
+ self.mp_result_queue._writer.close()
+ self.mp_result_logs.close()
+ self.mp_result_logs._writer.close()
+ self.mp_task_queue.close()
+ self.mp_task_queue._writer.close()
+
+ with self.usage_lock:
+ self.mp_result_queue.join_thread()
+ self.mp_result_queue = multiprocessing.Queue()
+ self.mp_task_queue.join_thread()
+ self.mp_task_queue = multiprocessing.Queue()
+ self.mp_result_logs.join_thread()
+ self.mp_result_logs = multiprocessing.Queue()
+ self.customServiceOrchestrator = self.actionQueue.customServiceOrchestrator
+ self.mustDieEvent.clear()
+ self.timedOutEvent.clear()
http://git-wip-us.apache.org/repos/asf/ambari/blob/ccf9edbc/ambari-agent/src/main/python/ambari_agent/main.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/main.py b/ambari-agent/src/main/python/ambari_agent/main.py
index 923c570..19c92b0 100644
--- a/ambari-agent/src/main/python/ambari_agent/main.py
+++ b/ambari-agent/src/main/python/ambari_agent/main.py
@@ -352,8 +352,9 @@ def run_threads(server_hostname, heartbeat_stop_callback):
while controller.is_alive():
time.sleep(0.1)
- if controller.get_status_commands_executor().need_relaunch:
- controller.get_status_commands_executor().relaunch("COMMAND_TIMEOUT_OR_KILLED")
+ need_relaunch, reason = controller.get_status_commands_executor().need_relaunch
+ if need_relaunch:
+ controller.get_status_commands_executor().relaunch(reason)
controller.get_status_commands_executor().kill("AGENT_STOPPED", can_relaunch=False)