You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ao...@apache.org on 2016/10/03 09:33:39 UTC
ambari git commit: AMBARI-18505. Ambari Status commands should
enforce a timeout < heartbeat interval (aonishuk)
Repository: ambari
Updated Branches:
refs/heads/branch-2.4 bac68f745 -> 676bf8c91
AMBARI-18505. Ambari Status commands should enforce a timeout < heartbeat interval (aonishuk)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/676bf8c9
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/676bf8c9
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/676bf8c9
Branch: refs/heads/branch-2.4
Commit: 676bf8c913afbe302d3d598b8e4f25dde0cc9068
Parents: bac68f7
Author: Andrew Onishuk <ao...@hortonworks.com>
Authored: Mon Oct 3 12:33:32 2016 +0300
Committer: Andrew Onishuk <ao...@hortonworks.com>
Committed: Mon Oct 3 12:33:32 2016 +0300
----------------------------------------------------------------------
ambari-agent/conf/unix/ambari-agent.ini | 1 +
.../src/main/python/ambari_agent/ActionQueue.py | 22 ++++++++++++++++-
.../ambari_agent/PythonReflectiveExecutor.py | 25 +++++++++++++++-----
.../test/python/ambari_agent/TestActionQueue.py | 3 ++-
4 files changed, 43 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/676bf8c9/ambari-agent/conf/unix/ambari-agent.ini
----------------------------------------------------------------------
diff --git a/ambari-agent/conf/unix/ambari-agent.ini b/ambari-agent/conf/unix/ambari-agent.ini
index 914e09a..1c39c24 100644
--- a/ambari-agent/conf/unix/ambari-agent.ini
+++ b/ambari-agent/conf/unix/ambari-agent.ini
@@ -32,6 +32,7 @@ tolerate_download_failures=true
run_as_user=root
parallel_execution=0
alert_grace_period=5
+status_command_timeout=2
alert_kinit_timeout=14400000
system_resource_overrides=/etc/resource_overrides
; memory_threshold_soft_mb=400
http://git-wip-us.apache.org/repos/asf/ambari/blob/676bf8c9/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
index 064e4f0..32d6f84 100644
--- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
+++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
@@ -29,6 +29,7 @@ import time
import signal
from AgentException import AgentException
+from PythonReflectiveExecutor import PythonReflectiveExecutor
from LiveStatus import LiveStatus
from ActualConfigHandler import ActualConfigHandler
from CommandStatusDict import CommandStatusDict
@@ -81,9 +82,11 @@ class ActionQueue(threading.Thread):
self.controller = controller
self.configTags = {}
self._stop = threading.Event()
+ self.hangingStatusCommands = {}
self.tmpdir = config.get('agent', 'prefix')
self.customServiceOrchestrator = CustomServiceOrchestrator(config, controller)
self.parallel_execution = config.get_parallel_exec_option()
+ self.status_command_timeout = int(self.config.get('agent', 'status_command_timeout', 5))
if self.parallel_execution == 1:
logger.info("Parallel execution is enabled, will execute agent commands in parallel")
@@ -224,7 +227,24 @@ class ActionQueue(threading.Thread):
if self.controller.recovery_manager.enabled():
self.controller.recovery_manager.stop_execution_command()
elif commandType == self.STATUS_COMMAND:
- self.execute_status_command(command)
+ component_name = command['componentName']
+
+ if component_name in self.hangingStatusCommands and not self.hangingStatusCommands[component_name].isAlive():
+ del self.hangingStatusCommands[component_name]
+
+ if not component_name in self.hangingStatusCommands:
+ thread = threading.Thread(target = self.execute_status_command, args = (command,))
+ thread.daemon = True # hanging status commands should not be prevent ambari-agent from stopping
+ thread.start()
+ thread.join(timeout=self.status_command_timeout)
+
+ if thread.isAlive():
+ # Force context to reset to normal. By context we mean sys.path, imports, logger setting, etc. They are set by specific status command, and are not relevant to ambari-agent.
+ PythonReflectiveExecutor.last_context.revert()
+ logger.warn("Command {0} for {1} is running for more than {2} seconds. Skipping it for current pack of status commands.".format(commandType, component_name, self.status_command_timeout))
+ self.hangingStatusCommands[component_name] = thread
+ else:
+ logger.info("Not running {0} for {1}, because previous one is still running.".format(commandType, component_name))
else:
logger.error("Unrecognized command " + pprint.pformat(command))
except Exception:
http://git-wip-us.apache.org/repos/asf/ambari/blob/676bf8c9/ambari-agent/src/main/python/ambari_agent/PythonReflectiveExecutor.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/PythonReflectiveExecutor.py b/ambari-agent/src/main/python/ambari_agent/PythonReflectiveExecutor.py
index 655b2fc..b476671 100644
--- a/ambari-agent/src/main/python/ambari_agent/PythonReflectiveExecutor.py
+++ b/ambari-agent/src/main/python/ambari_agent/PythonReflectiveExecutor.py
@@ -53,7 +53,9 @@ class PythonReflectiveExecutor(PythonExecutor):
returncode = 1
try:
- with PythonContext(script_dir, pythonCommand):
+ current_context = PythonContext(script_dir, pythonCommand)
+ PythonReflectiveExecutor.last_context = current_context
+ with current_context:
imp.load_source('__main__', script)
except SystemExit as e:
returncode = e.code
@@ -62,7 +64,10 @@ class PythonReflectiveExecutor(PythonExecutor):
except (ClientComponentHasNoStatus, ComponentIsNotRunning):
logger.debug("Reflective command failed with exception:", exc_info=1)
except Exception:
- logger.info("Reflective command failed with exception:", exc_info=1)
+ if current_context.is_forced_revert:
+ logger.info("Hanging status command finished its execution")
+ else:
+ logger.info("Reflective command failed with exception:", exc_info=1)
else:
returncode = 0
@@ -76,6 +81,8 @@ class PythonContext:
def __init__(self, script_dir, pythonCommand):
self.script_dir = script_dir
self.pythonCommand = pythonCommand
+ self.is_reverted = False
+ self.is_forced_revert = False
def __enter__(self):
self.old_sys_path = copy.copy(sys.path)
@@ -88,12 +95,18 @@ class PythonContext:
sys.argv = self.pythonCommand[1:]
def __exit__(self, exc_type, exc_val, exc_tb):
- sys.path = self.old_sys_path
- sys.argv = self.old_agv
- logging.disable(self.old_logging_disable)
- self.revert_sys_modules(self.old_sys_modules)
+ self.revert(is_forced_revert=False)
return False
+ def revert(self, is_forced_revert=True):
+ if not self.is_reverted:
+ self.is_forced_revert = is_forced_revert
+ self.is_reverted = True
+ sys.path = self.old_sys_path
+ sys.argv = self.old_agv
+ logging.disable(self.old_logging_disable)
+ self.revert_sys_modules(self.old_sys_modules)
+
def revert_sys_modules(self, value):
sys.modules.update(value)
http://git-wip-us.apache.org/repos/asf/ambari/blob/676bf8c9/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
index 1805c9a..b048829 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
@@ -212,6 +212,7 @@ class TestActionQueue(TestCase):
retryable_command = {
'commandType': 'EXECUTION_COMMAND',
'role': 'NAMENODE',
+ 'componentName': 'NAMENODE',
'roleCommand': 'INSTALL',
'commandId': '1-1',
'taskId': 19,
@@ -309,6 +310,7 @@ class TestActionQueue(TestCase):
}
status_command = {
'commandType' : ActionQueue.STATUS_COMMAND,
+ 'componentName': 'NAMENODE'
}
wrong_command = {
'commandType' : "SOME_WRONG_COMMAND",
@@ -1068,7 +1070,6 @@ class TestActionQueue(TestCase):
self.assertTrue(runCommand_mock.called)
self.assertEqual(2, runCommand_mock.call_count)
self.assertEqual(1, sleep_mock.call_count)
- sleep_mock.assert_has_calls([call(1)], False)
runCommand_mock.assert_has_calls([
call(command, os.sep + 'tmp' + os.sep + 'ambari-agent' + os.sep + 'output-19.txt',
os.sep + 'tmp' + os.sep + 'ambari-agent' + os.sep + 'errors-19.txt', override_output_files=True, retry=False),