You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ao...@apache.org on 2016/10/03 09:29:19 UTC

[2/2] ambari git commit: AMBARI-18505. Ambari Status commands should enforce a timeout < heartbeat interval (aonishuk)

AMBARI-18505. Ambari Status commands should enforce a timeout < heartbeat interval (aonishuk)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/a660c490
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/a660c490
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/a660c490

Branch: refs/heads/branch-2.5
Commit: a660c490618b59896b893b49745688746773605e
Parents: df9854c
Author: Andrew Onishuk <ao...@hortonworks.com>
Authored: Mon Oct 3 12:28:49 2016 +0300
Committer: Andrew Onishuk <ao...@hortonworks.com>
Committed: Mon Oct 3 12:28:49 2016 +0300

----------------------------------------------------------------------
 ambari-agent/conf/unix/ambari-agent.ini         |  1 +
 .../src/main/python/ambari_agent/ActionQueue.py | 22 ++++++++++++++++-
 .../ambari_agent/PythonReflectiveExecutor.py    | 25 +++++++++++++++-----
 .../test/python/ambari_agent/TestActionQueue.py |  3 ++-
 4 files changed, 43 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/a660c490/ambari-agent/conf/unix/ambari-agent.ini
----------------------------------------------------------------------
diff --git a/ambari-agent/conf/unix/ambari-agent.ini b/ambari-agent/conf/unix/ambari-agent.ini
index 914e09a..1c39c24 100644
--- a/ambari-agent/conf/unix/ambari-agent.ini
+++ b/ambari-agent/conf/unix/ambari-agent.ini
@@ -32,6 +32,7 @@ tolerate_download_failures=true
 run_as_user=root
 parallel_execution=0
 alert_grace_period=5
+status_command_timeout=2
 alert_kinit_timeout=14400000
 system_resource_overrides=/etc/resource_overrides
 ; memory_threshold_soft_mb=400

http://git-wip-us.apache.org/repos/asf/ambari/blob/a660c490/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
index f104939..86918e5 100644
--- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
+++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
@@ -29,6 +29,7 @@ import time
 import signal
 
 from AgentException import AgentException
+from PythonReflectiveExecutor import PythonReflectiveExecutor
 from LiveStatus import LiveStatus
 from ActualConfigHandler import ActualConfigHandler
 from CommandStatusDict import CommandStatusDict
@@ -82,9 +83,11 @@ class ActionQueue(threading.Thread):
     self.controller = controller
     self.configTags = {}
     self._stop = threading.Event()
+    self.hangingStatusCommands = {}
     self.tmpdir = config.get('agent', 'prefix')
     self.customServiceOrchestrator = CustomServiceOrchestrator(config, controller)
     self.parallel_execution = config.get_parallel_exec_option()
+    self.status_command_timeout = int(self.config.get('agent', 'status_command_timeout', 5))
     if self.parallel_execution == 1:
       logger.info("Parallel execution is enabled, will execute agent commands in parallel")
 
@@ -225,7 +228,24 @@ class ActionQueue(threading.Thread):
           if self.controller.recovery_manager.enabled():
             self.controller.recovery_manager.stop_execution_command()
       elif commandType == self.STATUS_COMMAND:
-        self.execute_status_command(command)
+        component_name = command['componentName']
+
+        if component_name in self.hangingStatusCommands and not self.hangingStatusCommands[component_name].isAlive():
+          del self.hangingStatusCommands[component_name]
+
+        if not component_name in self.hangingStatusCommands:
+          thread = threading.Thread(target = self.execute_status_command, args = (command,))
+          thread.daemon = True # hanging status commands should not be prevent ambari-agent from stopping
+          thread.start()
+          thread.join(timeout=self.status_command_timeout)
+
+          if thread.isAlive():
+            # Force context to reset to normal. By context we mean sys.path, imports, logger setting, etc. They are set by specific status command, and are not relevant to ambari-agent.
+            PythonReflectiveExecutor.last_context.revert()
+            logger.warn("Command {0} for {1} is running for more than {2} seconds. Skipping it for current pack of status commands.".format(commandType, component_name, self.status_command_timeout))
+            self.hangingStatusCommands[component_name] = thread
+        else:
+          logger.info("Not running {0} for {1}, because previous one is still running.".format(commandType, component_name))
       else:
         logger.error("Unrecognized command " + pprint.pformat(command))
     except Exception:

http://git-wip-us.apache.org/repos/asf/ambari/blob/a660c490/ambari-agent/src/main/python/ambari_agent/PythonReflectiveExecutor.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/PythonReflectiveExecutor.py b/ambari-agent/src/main/python/ambari_agent/PythonReflectiveExecutor.py
index 655b2fc..b476671 100644
--- a/ambari-agent/src/main/python/ambari_agent/PythonReflectiveExecutor.py
+++ b/ambari-agent/src/main/python/ambari_agent/PythonReflectiveExecutor.py
@@ -53,7 +53,9 @@ class PythonReflectiveExecutor(PythonExecutor):
     returncode = 1
 
     try:
-      with PythonContext(script_dir, pythonCommand):
+      current_context = PythonContext(script_dir, pythonCommand)
+      PythonReflectiveExecutor.last_context = current_context
+      with current_context:
         imp.load_source('__main__', script)
     except SystemExit as e:
       returncode = e.code
@@ -62,7 +64,10 @@ class PythonReflectiveExecutor(PythonExecutor):
     except (ClientComponentHasNoStatus, ComponentIsNotRunning):
       logger.debug("Reflective command failed with exception:", exc_info=1)
     except Exception:
-      logger.info("Reflective command failed with exception:", exc_info=1)
+      if current_context.is_forced_revert:
+        logger.info("Hanging status command finished its execution")
+      else:
+        logger.info("Reflective command failed with exception:", exc_info=1)
     else: 
       returncode = 0
       
@@ -76,6 +81,8 @@ class PythonContext:
   def __init__(self, script_dir, pythonCommand):
     self.script_dir = script_dir
     self.pythonCommand = pythonCommand
+    self.is_reverted = False
+    self.is_forced_revert = False
     
   def __enter__(self):
     self.old_sys_path = copy.copy(sys.path)
@@ -88,12 +95,18 @@ class PythonContext:
     sys.argv = self.pythonCommand[1:]
 
   def __exit__(self, exc_type, exc_val, exc_tb):
-    sys.path = self.old_sys_path
-    sys.argv = self.old_agv
-    logging.disable(self.old_logging_disable)
-    self.revert_sys_modules(self.old_sys_modules)
+    self.revert(is_forced_revert=False)
     return False
   
+  def revert(self, is_forced_revert=True):
+    if not self.is_reverted:
+      self.is_forced_revert = is_forced_revert
+      self.is_reverted = True
+      sys.path = self.old_sys_path
+      sys.argv = self.old_agv
+      logging.disable(self.old_logging_disable)
+      self.revert_sys_modules(self.old_sys_modules)
+
   def revert_sys_modules(self, value):
     sys.modules.update(value)
     

http://git-wip-us.apache.org/repos/asf/ambari/blob/a660c490/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
index 7d04d42..32773b8 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
@@ -225,6 +225,7 @@ class TestActionQueue(TestCase):
   retryable_command = {
     'commandType': 'EXECUTION_COMMAND',
     'role': 'NAMENODE',
+    'componentName': 'NAMENODE',
     'roleCommand': 'INSTALL',
     'commandId': '1-1',
     'taskId': 19,
@@ -322,6 +323,7 @@ class TestActionQueue(TestCase):
     }
     status_command = {
       'commandType' : ActionQueue.STATUS_COMMAND,
+      'componentName': 'NAMENODE'
     }
     wrong_command = {
       'commandType' : "SOME_WRONG_COMMAND",
@@ -1126,7 +1128,6 @@ class TestActionQueue(TestCase):
     self.assertTrue(runCommand_mock.called)
     self.assertEqual(2, runCommand_mock.call_count)
     self.assertEqual(1, sleep_mock.call_count)
-    sleep_mock.assert_has_calls([call(1)], False)
     runCommand_mock.assert_has_calls([
       call(command, os.sep + 'tmp' + os.sep + 'ambari-agent' + os.sep + 'output-19.txt',
            os.sep + 'tmp' + os.sep + 'ambari-agent' + os.sep + 'errors-19.txt', override_output_files=True, retry=False),