You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by sm...@apache.org on 2015/05/13 04:55:43 UTC
ambari git commit: AMBARI-11089. Auto recovery commands should not be
scheduled till the execution commands are complete
Repository: ambari
Updated Branches:
refs/heads/trunk 54e22bc94 -> e7fdb7146
AMBARI-11089. Auto recovery commands should not be scheduled till the execution commands are complete
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/e7fdb714
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/e7fdb714
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/e7fdb714
Branch: refs/heads/trunk
Commit: e7fdb7146bc278aafd3d9530b7c9f824e4716cb2
Parents: 54e22bc
Author: Sumit Mohanty <sm...@hortonworks.com>
Authored: Tue May 12 19:55:20 2015 -0700
Committer: Sumit Mohanty <sm...@hortonworks.com>
Committed: Tue May 12 19:55:20 2015 -0700
----------------------------------------------------------------------
.../src/main/python/ambari_agent/ActionQueue.py | 30 +++++-
.../python/ambari_agent/CommandStatusDict.py | 5 +
.../src/main/python/ambari_agent/Controller.py | 2 +-
.../main/python/ambari_agent/RecoveryManager.py | 14 +++
.../test/python/ambari_agent/TestActionQueue.py | 99 +++++++++++++++++++-
.../python/ambari_agent/TestRecoveryManager.py | 14 ++-
6 files changed, 160 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/e7fdb714/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
index bdaefd0..59a5720 100644
--- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
+++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
@@ -193,7 +193,13 @@ class ActionQueue(threading.Thread):
logger.debug("Took an element of Queue (command type = %s)." % commandType)
try:
if commandType in [self.EXECUTION_COMMAND, self.BACKGROUND_EXECUTION_COMMAND, self.AUTO_EXECUTION_COMMAND]:
- self.execute_command(command)
+ try:
+ if self.controller.recovery_manager.enabled():
+ self.controller.recovery_manager.start_execution_command()
+ self.execute_command(command)
+ finally:
+ if self.controller.recovery_manager.enabled():
+ self.controller.recovery_manager.stop_execution_command()
elif commandType == self.STATUS_COMMAND:
self.execute_status_command(command)
else:
@@ -203,6 +209,15 @@ class ActionQueue(threading.Thread):
traceback.print_exc()
logger.warn(err)
+ def tasks_in_progress_or_pending(self):
+ return_val = False
+ if not self.commandQueue.empty():
+ return_val = True
+ if self.controller.recovery_manager.has_active_command():
+ return_val = True
+ return return_val
+ pass
+
def execute_command(self, command):
'''
Executes commands of type EXECUTION_COMMAND
@@ -248,6 +263,8 @@ class ActionQueue(threading.Thread):
maxAttempts = int(command['commandParams']['command_retry_max_attempt_count'])
if 'command_retry_enabled' in command['commandParams']:
retryAble = command['commandParams']['command_retry_enabled'] == "true"
+ if isAutoExecuteCommand:
+ retryAble = False
logger.debug("Command execution metadata - retry enabled = {retryAble}, max attempt count = {maxAttemptCount}".
format(retryAble = retryAble, maxAttemptCount = maxAttempts))
@@ -296,6 +313,17 @@ class ActionQueue(threading.Thread):
# let ambari know that configuration tags were applied
if status == self.COMPLETED_STATUS:
+ if self.controller.recovery_manager.enabled() and command.has_key('roleCommand'):
+ if command['roleCommand'] == self.ROLE_COMMAND_START:
+ self.controller.recovery_manager.update_current_status(command['role'], LiveStatus.LIVE_STATUS)
+ elif command['roleCommand'] == self.ROLE_COMMAND_STOP or command['roleCommand'] == self.ROLE_COMMAND_INSTALL:
+ self.controller.recovery_manager.update_current_status(command['role'], LiveStatus.DEAD_STATUS)
+ elif command['roleCommand'] == self.ROLE_COMMAND_CUSTOM_COMMAND:
+ if command['hostLevelParams'].has_key('custom_command') and \
+ command['hostLevelParams']['custom_command'] == self.CUSTOM_COMMAND_RESTART:
+ self.controller.recovery_manager.update_current_status(command['role'], LiveStatus.LIVE_STATUS)
+ pass
+
configHandler = ActualConfigHandler(self.config, self.configTags)
#update
if command.has_key('forceRefreshConfigTags') and len(command['forceRefreshConfigTags']) > 0 :
http://git-wip-us.apache.org/repos/asf/ambari/blob/e7fdb714/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py b/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py
index 0ebc45e..861b568 100644
--- a/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py
+++ b/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py
@@ -78,6 +78,7 @@ class CommandStatusDict():
with self.lock:
c = copy.copy(self.current_state[taskId][1])
return c
+
def generate_report(self):
"""
Generates status reports about commands that are IN_PROGRESS, COMPLETE or
@@ -103,6 +104,10 @@ class CommandStatusDict():
resultComponentStatus.append(report)
# Component status is useful once, removing it
del self.current_state[key]
+ elif command ['commandType'] in [ActionQueue.AUTO_EXECUTION_COMMAND]:
+ logger.debug("AUTO_EXECUTION_COMMAND task deleted " + str(command['commandId']))
+ del self.current_state[key]
+ pass
result = {
'reports': resultReports,
'componentStatus': resultComponentStatus
http://git-wip-us.apache.org/repos/asf/ambari/blob/e7fdb714/ambari-agent/src/main/python/ambari_agent/Controller.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/Controller.py b/ambari-agent/src/main/python/ambari_agent/Controller.py
index 9ebb83a..ccd1233 100644
--- a/ambari-agent/src/main/python/ambari_agent/Controller.py
+++ b/ambari-agent/src/main/python/ambari_agent/Controller.py
@@ -283,7 +283,7 @@ class Controller(threading.Thread):
self.recovery_manager.process_status_commands(response['statusCommands'])
self.addToStatusQueue(response['statusCommands'])
- if self.actionQueue.commandQueue.empty():
+ if not self.actionQueue.tasks_in_progress_or_pending():
recovery_commands = self.recovery_manager.get_recovery_commands()
for recovery_command in recovery_commands:
logger.info("Adding recovery command %s for component %s",
http://git-wip-us.apache.org/repos/asf/ambari/blob/e7fdb714/ambari-agent/src/main/python/ambari_agent/RecoveryManager.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/RecoveryManager.py b/ambari-agent/src/main/python/ambari_agent/RecoveryManager.py
index e2c5e98..12ba75d 100644
--- a/ambari-agent/src/main/python/ambari_agent/RecoveryManager.py
+++ b/ambari-agent/src/main/python/ambari_agent/RecoveryManager.py
@@ -88,12 +88,26 @@ class RecoveryManager:
self.statuses = {}
self.__status_lock = threading.RLock()
self.__command_lock = threading.RLock()
+ self.__active_command_lock = threading.RLock()
+ self.active_command_count = 0
self.paused = False
self.update_config(6, 60, 5, 12, recovery_enabled, auto_start_only)
pass
+ def start_execution_command(self):
+ with self.__active_command_lock:
+ self.active_command_count += 1
+ pass
+
+ def stop_execution_command(self):
+ with self.__active_command_lock:
+ self.active_command_count -= 1
+ pass
+
+ def has_active_command(self):
+ return self.active_command_count > 0
def set_paused(self, paused):
if self.paused != paused:
http://git-wip-us.apache.org/repos/asf/ambari/blob/e7fdb714/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
index ee18c38..7a834d8 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
@@ -67,6 +67,19 @@ class TestActionQueue(TestCase):
'configurationTags':{'global' : { 'tag': 'v1' }}
}
+ datanode_auto_start_command = {
+ 'commandType': 'AUTO_EXECUTION_COMMAND',
+ 'role': u'DATANODE',
+ 'roleCommand': u'START',
+ 'commandId': '1-1',
+ 'taskId': 3,
+ 'clusterName': u'cc',
+ 'serviceName': u'HDFS',
+ 'hostLevelParams': {},
+ 'configurations':{'global' : {}},
+ 'configurationTags':{'global' : { 'tag': 'v1' }}
+ }
+
datanode_upgrade_command = {
'commandId': 17,
'role' : "role",
@@ -311,6 +324,85 @@ class TestActionQueue(TestCase):
@patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
@patch("__builtin__.open")
@patch.object(ActionQueue, "status_update_callback")
+ def test_auto_execute_command(self, status_update_callback_mock, open_mock):
+ # Make file read calls visible
+ def open_side_effect(file, mode):
+ if mode == 'r':
+ file_mock = MagicMock()
+ file_mock.read.return_value = "Read from " + str(file)
+ return file_mock
+ else:
+ return self.original_open(file, mode)
+ open_mock.side_effect = open_side_effect
+
+ config = AmbariConfig()
+ tempdir = tempfile.gettempdir()
+ config.set('agent', 'prefix', tempdir)
+ config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
+ config.set('agent', 'tolerate_download_failures', "true")
+ dummy_controller = MagicMock()
+ dummy_controller.recovery_manager = RecoveryManager()
+ dummy_controller.recovery_manager.update_config(5, 5, 1, 11, True, False)
+
+ actionQueue = ActionQueue(config, dummy_controller)
+ unfreeze_flag = threading.Event()
+ python_execution_result_dict = {
+ 'stdout': 'out',
+ 'stderr': 'stderr',
+ 'structuredOut' : ''
+ }
+
+ def side_effect(command, tmpoutfile, tmperrfile, override_output_files=True, retry=False):
+ unfreeze_flag.wait()
+ return python_execution_result_dict
+ def patched_aq_execute_command(command):
+ # We have to perform patching for separate thread in the same thread
+ with patch.object(CustomServiceOrchestrator, "runCommand") as runCommand_mock:
+ runCommand_mock.side_effect = side_effect
+ actionQueue.process_command(command)
+
+ python_execution_result_dict['status'] = 'COMPLETE'
+ python_execution_result_dict['exitcode'] = 0
+ self.assertFalse(actionQueue.tasks_in_progress_or_pending())
+ # We call method in a separate thread
+ execution_thread = Thread(target = patched_aq_execute_command ,
+ args = (self.datanode_auto_start_command, ))
+ execution_thread.start()
+ # check in progress report
+ # wait until ready
+ while True:
+ time.sleep(0.1)
+ if actionQueue.tasks_in_progress_or_pending():
+ break
+ # Continue command execution
+ unfreeze_flag.set()
+ # wait until ready
+ while actionQueue.tasks_in_progress_or_pending():
+ time.sleep(0.1)
+ report = actionQueue.result()
+
+ self.assertEqual(len(report['reports']), 0)
+
+ ## Test failed execution
+ python_execution_result_dict['status'] = 'FAILED'
+ python_execution_result_dict['exitcode'] = 13
+ # We call method in a separate thread
+ execution_thread = Thread(target = patched_aq_execute_command ,
+ args = (self.datanode_auto_start_command, ))
+ execution_thread.start()
+ unfreeze_flag.set()
+ # check in progress report
+ # wait until ready
+ report = actionQueue.result()
+ while actionQueue.tasks_in_progress_or_pending():
+ time.sleep(0.1)
+ report = actionQueue.result()
+
+ self.assertEqual(len(report['reports']), 0)
+
+ @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
+ @patch("__builtin__.open")
+ @patch.object(ActionQueue, "status_update_callback")
def test_execute_command(self, status_update_callback_mock, open_mock):
# Make file read calls visible
def open_side_effect(file, mode):
@@ -371,7 +463,9 @@ class TestActionQueue(TestCase):
'taskId': 3,
'exitCode': 777}
self.assertEqual(report['reports'][0], expected)
- # Continue command execution
+ self.assertTrue(actionQueue.tasks_in_progress_or_pending())
+
+ # Continue command execution
unfreeze_flag.set()
# wait until ready
while report['reports'][0]['status'] == 'IN_PROGRESS':
@@ -631,6 +725,7 @@ class TestActionQueue(TestCase):
get_mock, process_command_mock, gpeo_mock):
CustomServiceOrchestrator_mock.return_value = None
dummy_controller = MagicMock()
+ dummy_controller.recovery_manager = RecoveryManager()
config = MagicMock()
gpeo_mock.return_value = 0
config.get_parallel_exec_option = gpeo_mock
@@ -638,8 +733,10 @@ class TestActionQueue(TestCase):
actionQueue.start()
actionQueue.put([self.datanode_install_command, self.hbase_install_command])
self.assertEqual(2, actionQueue.commandQueue.qsize())
+ self.assertTrue(actionQueue.tasks_in_progress_or_pending())
actionQueue.reset()
self.assertTrue(actionQueue.commandQueue.empty())
+ self.assertFalse(actionQueue.tasks_in_progress_or_pending())
time.sleep(0.1)
actionQueue.stop()
actionQueue.join()
http://git-wip-us.apache.org/repos/asf/ambari/blob/e7fdb714/ambari-agent/src/test/python/ambari_agent/TestRecoveryManager.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestRecoveryManager.py b/ambari-agent/src/test/python/ambari_agent/TestRecoveryManager.py
index bd7c96b..aaf6e53 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestRecoveryManager.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestRecoveryManager.py
@@ -507,4 +507,16 @@ class TestRecoveryManager(TestCase):
commands = rm.get_recovery_commands()
self.assertEqual(1, len(commands))
self.assertEqual("START", commands[0]["roleCommand"])
- pass
\ No newline at end of file
+ pass
+
+ def test_command_count(self):
+ rm = RecoveryManager(True)
+ self.assertFalse(rm.has_active_command())
+ rm.start_execution_command()
+ self.assertTrue(rm.has_active_command())
+ rm.start_execution_command()
+ self.assertTrue(rm.has_active_command())
+ rm.stop_execution_command()
+ self.assertTrue(rm.has_active_command())
+ rm.stop_execution_command()
+ self.assertFalse(rm.has_active_command())
\ No newline at end of file