You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by nc...@apache.org on 2017/04/17 20:17:58 UTC
[04/34] ambari git commit: AMBARI-20750. Remove multiprocessing-based
StatusCommandsExecutor (Eugene Chekanskiy via smohanty)
AMBARI-20750. Remove multiprocessing-based StatusCommandsExecutor (Eugene Chekanskiy via smohanty)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/5ef0c99a
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/5ef0c99a
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/5ef0c99a
Branch: refs/heads/branch-feature-AMBARI-12556
Commit: 5ef0c99a9d477b63f4e7213d058c9ab2d3ac2feb
Parents: ef34cb4
Author: Sumit Mohanty <sm...@hortonworks.com>
Authored: Wed Apr 12 12:35:15 2017 -0700
Committer: Sumit Mohanty <sm...@hortonworks.com>
Committed: Wed Apr 12 12:35:15 2017 -0700
----------------------------------------------------------------------
.../ambari_agent/StatusCommandsExecutor.py | 279 +------------------
1 file changed, 2 insertions(+), 277 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/5ef0c99a/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py b/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py
index 142e7ca..f42e134 100644
--- a/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py
+++ b/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py
@@ -83,280 +83,5 @@ class SingleProcessStatusCommandsExecutor(StatusCommandsExecutor):
def kill(self, reason=None, can_relaunch=True):
pass
-class MultiProcessStatusCommandsExecutor(StatusCommandsExecutor):
- def __init__(self, config, actionQueue):
- self.config = config
- self.actionQueue = actionQueue
-
- self.can_relaunch = True
-
- # used to prevent queues from been used during creation of new one to prevent threads messing up with combination of
- # old and new queues
- self.usage_lock = threading.RLock()
- # protects against simultaneous killing/creating from different threads.
- self.kill_lock = threading.RLock()
-
- self.status_command_timeout = int(self.config.get('agent', 'status_command_timeout', 5))
- self.customServiceOrchestrator = self.actionQueue.customServiceOrchestrator
-
- self.worker_process = None
- self.mustDieEvent = multiprocessing.Event()
- self.timedOutEvent = multiprocessing.Event()
-
- # multiprocessing stuff that need to be cleaned every time
- self.mp_result_queue = multiprocessing.Queue()
- self.mp_result_logs = multiprocessing.Queue()
- self.mp_task_queue = multiprocessing.Queue()
-
- def _drain_queue(self, target_queue, max_time=5, max_empty_count=15, read_break=.001):
- """
- Read everything that available in queue. Using not reliable multiprocessing.Queue methods(qsize, empty), so contains
- extremely dumb protection against blocking too much at this method: will try to get all possible items for not more
- than ``max_time`` seconds; will return after ``max_empty_count`` calls of ``target_queue.get(False)`` that raised
- ``Queue.Empty`` exception. Notice ``read_break`` argument, with default values this method will be able to read
- ~4500 ``range(1,10000)`` objects for 5 seconds. So don't fill queue too fast.
-
- :param target_queue: queue to read from
- :param max_time: maximum time to spend in this method call
- :param max_empty_count: maximum allowed ``Queue.Empty`` in a row
- :param read_break: time to wait before next read cycle iteration
- :return: list of resulting objects
- """
- results = []
- _empty = 0
- _start = time.time()
- with self.usage_lock:
- try:
- while (not target_queue.empty() or target_queue.qsize() > 0) and time.time() - _start < max_time and _empty < max_empty_count:
- try:
- results.append(target_queue.get(False))
- _empty = 0
- time.sleep(read_break) # sleep a little to get more accurate empty and qsize results
- except Queue.Empty:
- _empty += 1
- except IOError:
- pass
- except UnicodeDecodeError:
- pass
- except IOError:
- pass
- return results
-
- def _log_message(self, level, message, exception=None):
- """
- Put log message to logging queue. Must be used only for logging from child process(in _worker_process_target).
-
- :param level:
- :param message:
- :param exception:
- :return:
- """
- result_message = "StatusCommandExecutor reporting at {0}: ".format(time.time()) + message
- self.mp_result_logs.put((level, result_message, exception))
-
- def _process_logs(self):
- """
- Get all available at this moment logs and prints them to logger.
- """
- for level, message, exception in self._drain_queue(self.mp_result_logs):
- if level == logging.ERROR:
- logger.debug(message, exc_info=exception)
- if level == logging.WARN:
- logger.warn(message)
- if level == logging.INFO:
- logger.info(message)
-
- def _worker_process_target(self):
- """
- Internal method that running in separate process.
- """
- # cleanup monkey-patching results in child process, as it causing problems
- import subprocess
- reload(subprocess)
- import multiprocessing
- reload(multiprocessing)
-
- bind_debug_signal_handlers()
- self._log_message(logging.INFO, "StatusCommandsExecutor process started")
-
- # region StatusCommandsExecutor process internals
- internal_in_queue = Queue.Queue()
- internal_out_queue = Queue.Queue()
-
- def _internal_worker():
- """
- thread that actually executes status commands
- """
- while True:
- _cmd = internal_in_queue.get()
- internal_out_queue.put(self.actionQueue.execute_status_command_and_security_status(_cmd))
-
- worker = threading.Thread(target=_internal_worker)
- worker.daemon = True
- worker.start()
-
- def _internal_process_command(_command):
- internal_in_queue.put(_command)
- start_time = time.time()
- result = None
- while not self.mustDieEvent.is_set() and not result and time.time() - start_time < self.status_command_timeout:
- try:
- result = internal_out_queue.get(timeout=1)
- except Queue.Empty:
- pass
-
- if result:
- self.mp_result_queue.put(result)
- return True
- else:
- # do not set timed out event twice
- if not self.timedOutEvent.is_set():
- self._set_timed_out(_command)
- return False
-
- # endregion
-
- try:
- while not self.mustDieEvent.is_set():
- try:
- command = self.mp_task_queue.get(False)
- except Queue.Empty:
- # no command, lets try in other loop iteration
- time.sleep(.1)
- continue
-
- self._log_message(logging.DEBUG, "Running status command for {0}".format(command['componentName']))
-
- if _internal_process_command(command):
- self._log_message(logging.DEBUG, "Completed status command for {0}".format(command['componentName']))
-
- except Exception as e:
- self._log_message(logging.ERROR, "StatusCommandsExecutor process failed with exception:", e)
- raise
-
- self._log_message(logging.INFO, "StatusCommandsExecutor subprocess finished")
-
- def _set_timed_out(self, command):
- """
- Set timeout event and adding log entry for given command.
-
- :param command:
- :return:
- """
- msg = "Command {0} for {1} is running for more than {2} seconds. Terminating it due to timeout.".format(
- command['commandType'],
- command['componentName'],
- self.status_command_timeout
- )
- self._log_message(logging.WARN, msg)
- self.timedOutEvent.set()
-
- def put_commands(self, commands):
- """
- Put given commands to command executor.
-
- :param commands: status commands to execute
- :return:
- """
- with self.usage_lock:
- for command in commands:
- logger.info("Adding " + command['commandType'] + " for component " + \
- command['componentName'] + " of service " + \
- command['serviceName'] + " of cluster " + \
- command['clusterName'] + " to the queue.")
- self.mp_task_queue.put(command)
- logger.debug(pprint.pformat(command))
-
- def process_results(self):
- """
- Process all the results from the SCE worker process.
- """
- self._process_logs()
- results = self._drain_queue(self.mp_result_queue)
- logger.debug("Drained %s status commands results, ~%s remains in queue", len(results), self.mp_result_queue.qsize())
- for result in results:
- try:
- self.actionQueue.process_status_command_result(result)
- except UnicodeDecodeError:
- pass
-
- @property
- def need_relaunch(self):
- """
- Indicates if process need to be relaunched due to timeout or it is dead or even was not created.
-
- :return: tuple (bool, str|None) with flag to relaunch and reason of relaunch
- """
- if not self.worker_process or not self.worker_process.is_alive():
- return True, "WORKER_DEAD"
- elif self.timedOutEvent.is_set():
- return True, "COMMAND_TIMEOUT"
- return False, None
-
- def relaunch(self, reason=None):
- """
- Restart status command executor internal process.
-
- :param reason: reason of restart
- :return:
- """
- with self.kill_lock:
- logger.info("Relaunching child process reason:" + str(reason))
- if self.can_relaunch:
- self.kill(reason)
- self.worker_process = multiprocessing.Process(target=self._worker_process_target)
- self.worker_process.start()
- logger.info("Started process with pid {0}".format(self.worker_process.pid))
- else:
- logger.debug("Relaunch does not allowed, can not relaunch")
-
- def kill(self, reason=None, can_relaunch=True):
- """
- Tries to stop command executor internal process for sort time, otherwise killing it. Closing all possible queues to
- unblock threads that probably blocked on read or write operations to queues. Must be called from threads different
- from threads that calling read or write methods(get_log_messages, get_results, put_commands).
-
- :param can_relaunch: indicates if StatusCommandsExecutor can be relaunched after this kill
- :param reason: reason of killing
- :return:
- """
- with self.kill_lock:
- self.can_relaunch = can_relaunch
-
- if not self.can_relaunch:
- logger.info("Killing without possibility to relaunch...")
-
- # try graceful stop, otherwise hard-kill
- if self.worker_process and self.worker_process.is_alive():
- self.mustDieEvent.set()
- self.worker_process.join(timeout=3)
- if self.worker_process.is_alive():
- os.kill(self.worker_process.pid, signal.SIGKILL)
- logger.info("Child process killed by -9")
- else:
- # get log messages only if we died gracefully, otherwise we will have chance to block here forever, in most cases
- # this call will do nothing, as all logs will be processed in ActionQueue loop
- self._process_logs()
- logger.info("Child process died gracefully")
- else:
- logger.info("Child process already dead")
-
- # close queues and acquire usage lock
- # closing both sides of pipes here, we need this hack in case of blocking on recv() call
- self.mp_result_queue.close()
- self.mp_result_queue._writer.close()
- self.mp_result_logs.close()
- self.mp_result_logs._writer.close()
- self.mp_task_queue.close()
- self.mp_task_queue._writer.close()
-
- with self.usage_lock:
- self.mp_result_queue.join_thread()
- self.mp_result_queue = multiprocessing.Queue()
- self.mp_task_queue.join_thread()
- self.mp_task_queue = multiprocessing.Queue()
- self.mp_result_logs.join_thread()
- self.mp_result_logs = multiprocessing.Queue()
- self.customServiceOrchestrator = self.actionQueue.customServiceOrchestrator
- self.mustDieEvent.clear()
- self.timedOutEvent.clear()
+# TODO make reliable MultiProcessStatusCommandsExecutor implementation
+MultiProcessStatusCommandsExecutor = SingleProcessStatusCommandsExecutor