You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ao...@apache.org on 2018/01/22 16:12:35 UTC
[ambari] 03/04: AMBARI-22810. Rewrite TestActionQueue (aonishuk)
This is an automated email from the ASF dual-hosted git repository.
aonishuk pushed a commit to branch branch-3.0-perf
in repository https://gitbox.apache.org/repos/asf/ambari.git
commit de56e9bb5f07ad8f7eca198beaa689c367b98a5a
Author: Andrew Onishuk <ao...@hortonworks.com>
AuthorDate: Thu Jan 18 15:17:34 2018 +0200
AMBARI-22810. Rewrite TestActionQueue (aonishuk)
---
.../src/main/python/ambari_agent/ActionQueue.py | 10 -
.../test/python/ambari_agent/TestActionQueue.py | 878 ++++++++++-----------
2 files changed, 413 insertions(+), 475 deletions(-)
diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
index cba0a65..16c6c14 100644
--- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
+++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
@@ -471,16 +471,6 @@ class ActionQueue(threading.Thread):
self.commandStatuses.put_command_status(handle.command, roleResult)
- def execute_status_command_and_security_status(self, command):
- component_status_result = self.customServiceOrchestrator.requestComponentStatus(command)
- return command, component_status_result
-
- def status_update_callback(self):
- """
- Actions that are executed every time when command status changes
- """
- self.controller.trigger_heartbeat()
-
# Removes all commands from the queue
def reset(self):
queue = self.commandQueue
diff --git a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
index faa9b81..666d3c9 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
@@ -36,9 +36,15 @@ from ambari_agent.ActualConfigHandler import ActualConfigHandler
from ambari_agent.RecoveryManager import RecoveryManager
from ambari_commons import OSCheck
from only_for_platform import not_for_platform, os_distro_value, PLATFORM_WINDOWS, PLATFORM_LINUX
+from ambari_agent.InitializerModule import InitializerModule
+
+from ambari_agent.CustomServiceOrchestrator import CustomServiceOrchestrator
+default_run_command = CustomServiceOrchestrator.runCommand
import logging
+CLUSTER_ID = '0'
+
class TestActionQueue(TestCase):
def setUp(self):
# save original open() method for later use
@@ -63,7 +69,8 @@ class TestActionQueue(TestCase):
'configurationTags':{'global' : { 'tag': 'v1' }},
'commandParams': {
'command_retry_enabled': 'true'
- }
+ },
+ 'clusterId': CLUSTER_ID,
}
datanode_install_no_retry_command = {
@@ -79,7 +86,8 @@ class TestActionQueue(TestCase):
'configurationTags':{'global' : { 'tag': 'v1' }},
'commandParams': {
'command_retry_enabled': 'false'
- }
+ },
+ 'clusterId': CLUSTER_ID,
}
datanode_auto_start_command = {
@@ -92,7 +100,8 @@ class TestActionQueue(TestCase):
'serviceName': u'HDFS',
'hostLevelParams': {},
'configurations':{'global' : {}},
- 'configurationTags':{'global' : { 'tag': 'v1' }}
+ 'configurationTags':{'global' : { 'tag': 'v1' }},
+ 'clusterId': CLUSTER_ID,
}
datanode_upgrade_command = {
@@ -111,7 +120,8 @@ class TestActionQueue(TestCase):
'commandParams' : {
'source_stack_version' : 'HDP-1.2.1',
'target_stack_version' : 'HDP-1.3.0'
- }
+ },
+ 'clusterId': CLUSTER_ID,
}
namenode_install_command = {
@@ -122,7 +132,8 @@ class TestActionQueue(TestCase):
'taskId': 4,
'clusterName': u'cc',
'serviceName': u'HDFS',
- 'hostLevelParams': {}
+ 'hostLevelParams': {},
+ 'clusterId': CLUSTER_ID,
}
snamenode_install_command = {
@@ -133,7 +144,8 @@ class TestActionQueue(TestCase):
'taskId': 5,
'clusterName': u'cc',
'serviceName': u'HDFS',
- 'hostLevelParams': {}
+ 'hostLevelParams': {},
+ 'clusterId': CLUSTER_ID,
}
hbase_install_command = {
@@ -147,7 +159,8 @@ class TestActionQueue(TestCase):
'hostLevelParams': {},
'commandParams': {
'command_retry_enabled': 'true'
- }
+ },
+ 'clusterId': CLUSTER_ID,
}
status_command = {
@@ -156,7 +169,8 @@ class TestActionQueue(TestCase):
"clusterName" : "",
"componentName" : "DATANODE",
'configurations':{},
- 'hostLevelParams': {}
+ 'hostLevelParams': {},
+ 'clusterId': CLUSTER_ID,
}
datanode_restart_command = {
@@ -169,7 +183,8 @@ class TestActionQueue(TestCase):
'serviceName': u'HDFS',
'configurations':{'global' : {}},
'configurationTags':{'global' : { 'tag': 'v123' }},
- 'hostLevelParams':{'custom_command': 'RESTART', 'clientsToUpdateConfigs': []}
+ 'hostLevelParams':{'custom_command': 'RESTART', 'clientsToUpdateConfigs': []},
+ 'clusterId': CLUSTER_ID,
}
datanode_restart_command_no_logging = {
@@ -185,7 +200,8 @@ class TestActionQueue(TestCase):
'commandParams': {
'log_output': 'false'
},
- 'hostLevelParams': {'custom_command': 'RESTART', 'clientsToUpdateConfigs': []}
+ 'hostLevelParams': {'custom_command': 'RESTART', 'clientsToUpdateConfigs': []},
+ 'clusterId': CLUSTER_ID,
}
datanode_restart_command_no_clients_update = {
@@ -198,10 +214,12 @@ class TestActionQueue(TestCase):
'serviceName': u'HDFS',
'configurations':{'global' : {}},
'configurationTags':{'global' : { 'tag': 'v123' }},
- 'hostLevelParams':{'custom_command': 'RESTART'}
+ 'hostLevelParams':{'custom_command': 'RESTART'},
+ 'clusterId': CLUSTER_ID,
}
datanode_start_custom_command = {
+ 'clusterId': CLUSTER_ID,
'commandType': 'EXECUTION_COMMAND',
'role': u'DATANODE',
'roleCommand': u'CUSTOM_COMMAND',
@@ -211,7 +229,8 @@ class TestActionQueue(TestCase):
'serviceName': u'HDFS',
'configurations':{'global' : {}},
'configurationTags':{'global' : { 'tag': 'v123' }},
- 'hostLevelParams':{'custom_command': 'START'}
+ 'hostLevelParams':{'custom_command': 'START'},
+ 'clusterId': CLUSTER_ID,
}
yarn_refresh_queues_custom_command = {
@@ -225,7 +244,8 @@ class TestActionQueue(TestCase):
'commandParams' : {'forceRefreshConfigTags' : 'capacity-scheduler'},
'configurations':{'global' : {}},
'configurationTags':{'global' : { 'tag': 'v123' }, 'capacity-scheduler' : {'tag': 'v123'}},
- 'hostLevelParams':{'custom_command': 'REFRESHQUEUES'}
+ 'hostLevelParams':{'custom_command': 'REFRESHQUEUES'},
+ 'clusterId': CLUSTER_ID,
}
status_command_for_alerts = {
@@ -234,7 +254,8 @@ class TestActionQueue(TestCase):
"clusterName" : "",
"componentName" : "FLUME_HANDLER",
'configurations':{},
- 'hostLevelParams': {}
+ 'hostLevelParams': {},
+ 'clusterId': CLUSTER_ID,
}
retryable_command = {
@@ -256,7 +277,8 @@ class TestActionQueue(TestCase):
'command_retry_enabled' : 'true',
'max_duration_for_retries' : '5'
},
- 'hostLevelParams' : {}
+ 'hostLevelParams' : {},
+ 'clusterId': CLUSTER_ID,
}
background_command = {
@@ -276,7 +298,8 @@ class TestActionQueue(TestCase):
'command_timeout' : '600',
'jdk_location' : '.',
'service_package_folder' : '.'
- }
+ },
+ 'clusterId': CLUSTER_ID,
}
cancel_background_command = {
'commandType': 'EXECUTION_COMMAND',
@@ -298,7 +321,8 @@ class TestActionQueue(TestCase):
'service_package_folder' : '.',
'cancel_policy': 'SIGKILL',
'cancel_task_id': "19",
- }
+ },
+ 'clusterId': CLUSTER_ID,
}
@@ -313,23 +337,31 @@ class TestActionQueue(TestCase):
config = MagicMock()
get_parallel_exec_option_mock.return_value = 0
config.get_parallel_exec_option = get_parallel_exec_option_mock
- actionQueue = ActionQueue(config, dummy_controller)
+
+ initializer_module = InitializerModule()
+ initializer_module.init()
+
+ actionQueue = ActionQueue(initializer_module)
actionQueue.start()
time.sleep(0.1)
- actionQueue.stop()
+ initializer_module.stop_event.set()
actionQueue.join()
- self.assertEqual(actionQueue.stopped(), True, 'Action queue is not stopped.')
+ self.assertEqual(actionQueue.is_alive(), False, 'Action queue is not stopped.')
self.assertTrue(process_command_mock.call_count > 1)
- @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
+ @patch.object(OSCheck, "os_distribution", new=MagicMock(return_value=os_distro_value))
@patch("logging.RootLogger.exception")
@patch.object(ActionQueue, "execute_command")
def test_process_command(self, execute_command_mock, log_exc_mock):
dummy_controller = MagicMock()
config = AmbariConfig()
config.set('agent', 'tolerate_download_failures', "true")
- actionQueue = ActionQueue(config, dummy_controller)
+
+ initializer_module = InitializerModule()
+ initializer_module.init()
+
+ actionQueue = ActionQueue(initializer_module)
execution_command = {
'commandType' : ActionQueue.EXECUTION_COMMAND,
}
@@ -370,12 +402,10 @@ class TestActionQueue(TestCase):
self.assertTrue(log_exc_mock.called)
@patch.object(ActionQueue, "log_command_output")
- @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
+ @patch.object(OSCheck, "os_distribution", new=MagicMock(return_value=os_distro_value))
@patch.object(CustomServiceOrchestrator, "runCommand")
@patch("CommandStatusDict.CommandStatusDict")
- @patch.object(ActionQueue, "status_update_callback")
- def test_log_execution_commands(self, status_update_callback_mock,
- command_status_dict_mock,
+ def test_log_execution_commands(self, command_status_dict_mock,
cso_runCommand_mock, mock_log_command_output):
custom_service_orchestrator_execution_result_dict = {
'stdout': 'out',
@@ -392,35 +422,36 @@ class TestActionQueue(TestCase):
config.set('agent', 'tolerate_download_failures', "true")
config.set('logging', 'log_command_executes', 1)
dummy_controller = MagicMock()
- actionQueue = ActionQueue(config, dummy_controller)
+
+ initializer_module = InitializerModule()
+ initializer_module.init()
+ initializer_module.config = config
+
+ actionQueue = ActionQueue(initializer_module)
actionQueue.execute_command(self.datanode_restart_command)
- report = actionQueue.result()
+ reports = actionQueue.commandStatuses.generate_report()[CLUSTER_ID]
expected = {'status': 'COMPLETED',
- 'configurationTags': {'global': {'tag': 'v123'}},
'stderr': 'stderr',
'stdout': 'out\n\nCommand completed successfully!\n',
- 'clusterName': u'cc',
+ 'clusterId': CLUSTER_ID,
'structuredOut': '""',
'roleCommand': u'CUSTOM_COMMAND',
'serviceName': u'HDFS',
'role': u'DATANODE',
'actionId': '1-1',
'taskId': 9,
- 'customCommand': 'RESTART',
'exitCode': 0}
# Agent caches configurationTags if custom_command RESTART completed
mock_log_command_output.assert_has_calls([call("out\n\nCommand completed successfully!\n", "9"), call("stderr", "9")], any_order=True)
- self.assertEqual(len(report['reports']), 1)
- self.assertEqual(expected, report['reports'][0])
+ self.assertEqual(len(reports), 1)
+ self.assertEqual(expected, reports[0])
@patch.object(ActionQueue, "log_command_output")
@patch.object(OSCheck, "os_distribution", new=MagicMock(return_value=os_distro_value))
@patch.object(CustomServiceOrchestrator, "runCommand")
@patch("CommandStatusDict.CommandStatusDict")
- @patch.object(ActionQueue, "status_update_callback")
- def test_do_not_log_execution_commands(self, status_update_callback_mock,
- command_status_dict_mock,
+ def test_do_not_log_execution_commands(self, command_status_dict_mock,
cso_runCommand_mock, mock_log_command_output):
custom_service_orchestrator_execution_result_dict = {
'stdout': 'out',
@@ -437,59 +468,62 @@ class TestActionQueue(TestCase):
config.set('agent', 'tolerate_download_failures', "true")
config.set('logging', 'log_command_executes', 1)
dummy_controller = MagicMock()
- actionQueue = ActionQueue(config, dummy_controller)
+
+ initializer_module = InitializerModule()
+ initializer_module.init()
+
+ actionQueue = ActionQueue(initializer_module)
actionQueue.execute_command(self.datanode_restart_command_no_logging)
- report = actionQueue.result()
+ reports = actionQueue.commandStatuses.generate_report()[CLUSTER_ID]
expected = {'status': 'COMPLETED',
- 'configurationTags': {'global': {'tag': 'v123'}},
'stderr': 'stderr',
'stdout': 'out\n\nCommand completed successfully!\n',
- 'clusterName': u'cc',
+ 'clusterId': CLUSTER_ID,
'structuredOut': '""',
'roleCommand': u'CUSTOM_COMMAND',
'serviceName': u'HDFS',
'role': u'DATANODE',
'actionId': '1-1',
'taskId': 9,
- 'customCommand': 'RESTART',
'exitCode': 0}
# Agent caches configurationTags if custom_command RESTART completed
mock_log_command_output.assert_not_called(
[call("out\n\nCommand completed successfully!\n", "9"), call("stderr", "9")], any_order=True)
- self.assertEqual(len(report['reports']), 1)
- self.assertEqual(expected, report['reports'][0])
-
-
- @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
- @patch("__builtin__.open")
- @patch.object(ActionQueue, "status_update_callback")
- def test_auto_execute_command(self, status_update_callback_mock, open_mock):
- # Make file read calls visible
- def open_side_effect(file, mode):
- if mode == 'r':
- file_mock = MagicMock()
- file_mock.read.return_value = "Read from " + str(file)
- return file_mock
- else:
- return self.original_open(file, mode)
- open_mock.side_effect = open_side_effect
+ self.assertEqual(len(reports), 1)
+ self.assertEqual(expected, reports[0])
+
+ @patch.object(OSCheck, "os_distribution", new=MagicMock(return_value=os_distro_value))
+ def test_auto_execute_command(self):
config = AmbariConfig()
tempdir = tempfile.gettempdir()
config.set('agent', 'prefix', tempdir)
config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
config.set('agent', 'tolerate_download_failures', "true")
- dummy_controller = MagicMock()
- dummy_controller.recovery_manager = RecoveryManager(tempfile.mktemp())
- dummy_controller.recovery_manager.update_config(5, 5, 1, 11, True, False, False, "", -1)
-
- actionQueue = ActionQueue(config, dummy_controller)
- unfreeze_flag = threading.Event()
- python_execution_result_dict = {
- 'stdout': 'out',
- 'stderr': 'stderr',
- 'structuredOut' : ''
- }
+
+ initializer_module = InitializerModule()
+ initializer_module.init()
+ initializer_module.config = config
+ initializer_module.recovery_manager = RecoveryManager(tempfile.mktemp())
+ initializer_module.recovery_manager.update_config(5, 5, 1, 11, True, False, False, "")
+
+ with patch("__builtin__.open") as open_mock:
+ # Make file read calls visible
+ def open_side_effect(file, mode):
+ if mode == 'r':
+ file_mock = MagicMock()
+ file_mock.read.return_value = "Read from " + str(file)
+ return file_mock
+ else:
+ return self.original_open(file, mode)
+ open_mock.side_effect = open_side_effect
+ actionQueue = ActionQueue(initializer_module)
+ unfreeze_flag = threading.Event()
+ python_execution_result_dict = {
+ 'stdout': 'out',
+ 'stderr': 'stderr',
+ 'structuredOut' : ''
+ }
def side_effect(command, tmpoutfile, tmperrfile, override_output_files=True, retry=False):
unfreeze_flag.wait()
@@ -504,206 +538,201 @@ class TestActionQueue(TestCase):
python_execution_result_dict['exitcode'] = 0
self.assertFalse(actionQueue.tasks_in_progress_or_pending())
# We call method in a separate thread
- execution_thread = Thread(target = patched_aq_execute_command ,
- args = (self.datanode_auto_start_command, ))
+ execution_thread = Thread(target=patched_aq_execute_command ,
+ args=(self.datanode_auto_start_command,))
execution_thread.start()
# check in progress report
# wait until ready
while True:
time.sleep(0.1)
- if actionQueue.tasks_in_progress_or_pending():
+ if actionQueue.commandStatuses.current_state:
break
# Continue command execution
unfreeze_flag.set()
# wait until ready
check_queue = True
while check_queue:
- report = actionQueue.result()
- if not actionQueue.tasks_in_progress_or_pending():
+ reports = actionQueue.commandStatuses.generate_report()[CLUSTER_ID]
+ if actionQueue.commandStatuses.current_state:
break
time.sleep(0.1)
- self.assertEqual(len(report['reports']), 0)
+ self.assertEqual(len(reports), 0)
- ## Test failed execution
+ # # Test failed execution
python_execution_result_dict['status'] = 'FAILED'
python_execution_result_dict['exitcode'] = 13
# We call method in a separate thread
- execution_thread = Thread(target = patched_aq_execute_command ,
- args = (self.datanode_auto_start_command, ))
+ execution_thread = Thread(target=patched_aq_execute_command ,
+ args=(self.datanode_auto_start_command,))
execution_thread.start()
unfreeze_flag.set()
- # check in progress report
- # wait until ready
- while check_queue:
- report = actionQueue.result()
- if not actionQueue.tasks_in_progress_or_pending():
- break
- time.sleep(0.1)
-
- self.assertEqual(len(report['reports']), 0)
-
- @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
- @patch("__builtin__.open")
- @patch.object(ActionQueue, "status_update_callback")
- def test_execute_command(self, status_update_callback_mock, open_mock):
- # Make file read calls visible
- def open_side_effect(file, mode):
- if mode == 'r':
- file_mock = MagicMock()
- file_mock.read.return_value = "Read from " + str(file)
- return file_mock
- else:
- return self.original_open(file, mode)
- open_mock.side_effect = open_side_effect
+ @patch.object(OSCheck, "os_distribution", new=MagicMock(return_value=os_distro_value))
+ # @patch("__builtin__.open")
+ def test_execute_command(self):
config = AmbariConfig()
tempdir = tempfile.gettempdir()
config.set('agent', 'prefix', tempdir)
config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
config.set('agent', 'tolerate_download_failures', "true")
- dummy_controller = MagicMock()
- actionQueue = ActionQueue(config, dummy_controller)
- unfreeze_flag = threading.Event()
- python_execution_result_dict = {
- 'stdout': 'out',
- 'stderr': 'stderr',
- 'structuredOut' : ''
- }
-
- def side_effect(command, tmpoutfile, tmperrfile, override_output_files=True, retry=False):
- unfreeze_flag.wait()
- return python_execution_result_dict
- def patched_aq_execute_command(command):
- # We have to perform patching for separate thread in the same thread
- with patch.object(CustomServiceOrchestrator, "runCommand") as runCommand_mock:
- runCommand_mock.side_effect = side_effect
- actionQueue.execute_command(command)
- ### Test install/start/stop command ###
- ## Test successful execution with configuration tags
- python_execution_result_dict['status'] = 'COMPLETE'
- python_execution_result_dict['exitcode'] = 0
- # We call method in a separate thread
- execution_thread = Thread(target = patched_aq_execute_command ,
- args = (self.datanode_install_command, ))
- execution_thread.start()
- # check in progress report
- # wait until ready
- while True:
- time.sleep(0.1)
- report = actionQueue.result()
- if len(report['reports']) != 0:
- break
- expected = {'status': 'IN_PROGRESS',
- 'stderr': 'Read from {0}'.format(os.path.join(tempdir, "errors-3.txt")),
- 'stdout': 'Read from {0}'.format(os.path.join(tempdir, "output-3.txt")),
- 'structuredOut' : 'Read from {0}'.format(os.path.join(tempdir, "structured-out-3.json")),
- 'clusterName': u'cc',
- 'roleCommand': u'INSTALL',
- 'serviceName': u'HDFS',
- 'role': u'DATANODE',
- 'actionId': '1-1',
- 'taskId': 3,
- 'exitCode': 777}
- self.assertEqual(report['reports'][0], expected)
- self.assertTrue(actionQueue.tasks_in_progress_or_pending())
-
- # Continue command execution
- unfreeze_flag.set()
- # wait until ready
- while report['reports'][0]['status'] == 'IN_PROGRESS':
- time.sleep(0.1)
- report = actionQueue.result()
- # check report
- configname = os.path.join(tempdir, 'config.json')
- expected = {'status': 'COMPLETED',
- 'stderr': 'stderr',
- 'stdout': 'out\n\nCommand completed successfully!\n',
- 'clusterName': u'cc',
- 'structuredOut': '""',
- 'roleCommand': u'INSTALL',
- 'serviceName': u'HDFS',
- 'role': u'DATANODE',
- 'actionId': '1-1',
- 'taskId': 3,
- 'configurationTags': {'global': {'tag': 'v1'}},
- 'exitCode': 0}
- self.assertEqual(len(report['reports']), 1)
- self.assertEqual(report['reports'][0], expected)
- self.assertTrue(os.path.isfile(configname))
- # Check that we had 2 status update calls ( IN_PROGRESS and COMPLETE)
- self.assertEqual(status_update_callback_mock.call_count, 2)
- os.remove(configname)
-
- # now should not have reports (read complete/failed reports are deleted)
- report = actionQueue.result()
- self.assertEqual(len(report['reports']), 0)
-
- ## Test failed execution
- python_execution_result_dict['status'] = 'FAILED'
- python_execution_result_dict['exitcode'] = 13
- # We call method in a separate thread
- execution_thread = Thread(target = patched_aq_execute_command ,
- args = (self.datanode_install_command, ))
- execution_thread.start()
- unfreeze_flag.set()
- # check in progress report
- # wait until ready
- report = actionQueue.result()
- while len(report['reports']) == 0 or \
- report['reports'][0]['status'] == 'IN_PROGRESS':
- time.sleep(0.1)
- report = actionQueue.result()
+
+ initializer_module = InitializerModule()
+ initializer_module.init()
+ initializer_module.config = config
+
+ with patch("__builtin__.open") as open_mock:
+ # Make file read calls visible
+ def open_side_effect(file, mode):
+ if mode == 'r':
+ file_mock = MagicMock()
+ file_mock.read.return_value = "Read from " + str(file)
+ return file_mock
+ else:
+ return self.original_open(file, mode)
+ open_mock.side_effect = open_side_effect
+
+ actionQueue = ActionQueue(initializer_module)
+ unfreeze_flag = threading.Event()
+ python_execution_result_dict = {
+ 'stdout': 'out',
+ 'stderr': 'stderr',
+ 'structuredOut' : ''
+ }
+
+ def side_effect(command, tmpoutfile, tmperrfile, override_output_files=True, retry=False):
+ unfreeze_flag.wait()
+ return python_execution_result_dict
+ def patched_aq_execute_command(command):
+ # We have to perform patching for separate thread in the same thread
+ with patch.object(CustomServiceOrchestrator, "runCommand") as runCommand_mock:
+ runCommand_mock.side_effect = side_effect
+ actionQueue.execute_command(command)
+ ### Test install/start/stop command ###
+ # # Test successful execution with configuration tags
+ python_execution_result_dict['status'] = 'COMPLETE'
+ python_execution_result_dict['exitcode'] = 0
+ # We call method in a separate thread
+ execution_thread = Thread(target=patched_aq_execute_command ,
+ args=(self.datanode_install_command,))
+ execution_thread.start()
+ # check in progress report
+ # wait until ready
+ while True:
+ time.sleep(0.1)
+ reports = actionQueue.commandStatuses.generate_report()[CLUSTER_ID]
+ if len(reports) != 0:
+ break
+ expected = {'status': 'IN_PROGRESS',
+ 'stderr': 'Read from {0}'.format(os.path.join(tempdir, "errors-3.txt")),
+ 'stdout': 'Read from {0}'.format(os.path.join(tempdir, "output-3.txt")),
+ 'structuredOut' : 'Read from {0}'.format(os.path.join(tempdir, "structured-out-3.json")),
+ 'clusterId': CLUSTER_ID,
+ 'roleCommand': u'INSTALL',
+ 'serviceName': u'HDFS',
+ 'role': u'DATANODE',
+ 'actionId': '1-1',
+ 'taskId': 3,
+ 'exitCode': 777}
+ self.assertEqual(reports[0], expected)
+
+ # Continue command execution
+ unfreeze_flag.set()
+ # wait until ready
+ while reports[0]['status'] == 'IN_PROGRESS':
+ time.sleep(0.1)
+ reports = actionQueue.commandStatuses.generate_report()[CLUSTER_ID]
+
# check report
- expected = {'status': 'FAILED',
- 'stderr': 'stderr',
- 'stdout': 'out\n\nCommand completed successfully!\n\n\nCommand failed after 1 tries\n',
- 'clusterName': u'cc',
- 'structuredOut': '""',
- 'roleCommand': u'INSTALL',
- 'serviceName': u'HDFS',
- 'role': u'DATANODE',
- 'actionId': '1-1',
- 'taskId': 3,
- 'exitCode': 13}
- self.assertEqual(len(report['reports']), 1)
- self.assertEqual(report['reports'][0], expected)
-
- # now should not have reports (read complete/failed reports are deleted)
- report = actionQueue.result()
- self.assertEqual(len(report['reports']), 0)
-
- ### Test upgrade command ###
- python_execution_result_dict['status'] = 'COMPLETE'
- python_execution_result_dict['exitcode'] = 0
- execution_thread = Thread(target = patched_aq_execute_command ,
- args = (self.datanode_upgrade_command, ))
- execution_thread.start()
- unfreeze_flag.set()
- # wait until ready
- report = actionQueue.result()
- while len(report['reports']) == 0 or \
- report['reports'][0]['status'] == 'IN_PROGRESS':
- time.sleep(0.1)
- report = actionQueue.result()
- # check report
- expected = {'status': 'COMPLETED',
- 'stderr': 'stderr',
- 'stdout': 'out\n\nCommand completed successfully!\n\n\nCommand failed after 1 tries\n\n\nCommand completed successfully!\n',
- 'clusterName': 'clusterName',
- 'structuredOut': '""',
- 'roleCommand': 'UPGRADE',
- 'serviceName': 'serviceName',
- 'role': 'role',
- 'actionId': 17,
- 'taskId': 'taskId',
- 'exitCode': 0}
- self.assertEqual(len(report['reports']), 1)
- self.assertEqual(report['reports'][0], expected)
-
- # now should not have reports (read complete/failed reports are deleted)
- report = actionQueue.result()
- self.assertEqual(len(report['reports']), 0)
+ expected = {'status': 'COMPLETED',
+ 'stderr': 'stderr',
+ 'stdout': 'out\n\nCommand completed successfully!\n',
+ 'clusterId': CLUSTER_ID,
+ 'structuredOut': '""',
+ 'roleCommand': u'INSTALL',
+ 'serviceName': u'HDFS',
+ 'role': u'DATANODE',
+ 'actionId': '1-1',
+ 'taskId': 3,
+ 'exitCode': 0}
+ self.assertEqual(len(reports), 1)
+ self.assertEqual(reports[0], expected)
+
+ # now should not have reports (read complete/failed reports are deleted)
+ actionQueue.commandStatuses.clear_reported_reports()
+ reports = actionQueue.commandStatuses.generate_report()[CLUSTER_ID]
+ self.assertEqual(len(reports), 0)
+
+ # # Test failed execution
+ python_execution_result_dict['status'] = 'FAILED'
+ python_execution_result_dict['exitcode'] = 13
+ # We call method in a separate thread
+ execution_thread = Thread(target=patched_aq_execute_command ,
+ args=(self.datanode_install_command,))
+ execution_thread.start()
+ unfreeze_flag.set()
+ # check in progress report
+ # wait until ready
+ reports = actionQueue.commandStatuses.generate_report()[CLUSTER_ID]
+ while len(reports) == 0 or \
+ reports[0]['status'] == 'IN_PROGRESS':
+ time.sleep(0.1)
+ reports = actionQueue.commandStatuses.generate_report()[CLUSTER_ID]
+ actionQueue.commandStatuses.clear_reported_reports()
+
+ # check report
+ expected = {'status': 'FAILED',
+ 'stderr': 'stderr',
+ 'stdout': 'out\n\nCommand completed successfully!\n\n\nCommand failed after 1 tries\n',
+ 'clusterId': CLUSTER_ID,
+ 'structuredOut': '""',
+ 'roleCommand': u'INSTALL',
+ 'serviceName': u'HDFS',
+ 'role': u'DATANODE',
+ 'actionId': '1-1',
+ 'taskId': 3,
+ 'exitCode': 13}
+ self.assertEqual(len(reports), 1)
+ self.assertEqual(reports[0], expected)
+
+ # now should not have reports (read complete/failed reports are deleted)
+ actionQueue.commandStatuses.clear_reported_reports()
+ reports = actionQueue.commandStatuses.generate_report()[CLUSTER_ID]
+ self.assertEqual(len(reports), 0)
+
+ ### Test upgrade command ###
+ python_execution_result_dict['status'] = 'COMPLETE'
+ python_execution_result_dict['exitcode'] = 0
+ execution_thread = Thread(target=patched_aq_execute_command ,
+ args=(self.datanode_upgrade_command,))
+ execution_thread.start()
+ unfreeze_flag.set()
+ # wait until ready
+ report = actionQueue.commandStatuses.generate_report()[CLUSTER_ID]
+ while len(reports) == 0 or \
+ reports[0]['status'] == 'IN_PROGRESS':
+ time.sleep(0.1)
+ reports = actionQueue.commandStatuses.generate_report()[CLUSTER_ID]
+ actionQueue.commandStatuses.clear_reported_reports()
+ # check report
+ expected = {'status': 'COMPLETED',
+ 'stderr': 'stderr',
+ 'stdout': 'out\n\nCommand completed successfully!\n\n\nCommand failed after 1 tries\n\n\nCommand completed successfully!\n',
+ 'clusterId': CLUSTER_ID,
+ 'structuredOut': '""',
+ 'roleCommand': 'UPGRADE',
+ 'serviceName': 'serviceName',
+ 'role': 'role',
+ 'actionId': 17,
+ 'taskId': 'taskId',
+ 'exitCode': 0}
+ self.assertEqual(len(reports), 1)
+ self.assertEqual(reports[0], expected)
+
+ # now should not have reports (read complete/failed reports are deleted)
+ actionQueue.commandStatuses.clear_reported_reports()
+ reports = actionQueue.commandStatuses.generate_report()[CLUSTER_ID]
+ self.assertEqual(len(reports), 0)
def test_cancel_with_reschedule_command(self):
config = AmbariConfig()
@@ -712,14 +741,18 @@ class TestActionQueue(TestCase):
config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
config.set('agent', 'tolerate_download_failures', "true")
dummy_controller = MagicMock()
- actionQueue = ActionQueue(config, dummy_controller)
+
+ initializer_module = InitializerModule()
+ initializer_module.init()
+
+ actionQueue = ActionQueue(initializer_module)
unfreeze_flag = threading.Event()
python_execution_result_dict = {
'stdout': 'out',
'stderr': 'stderr',
'structuredOut' : '',
'status' : '',
- 'exitcode' : -signal.SIGTERM
+ 'exitcode' :-signal.SIGTERM
}
def side_effect(command, tmpoutfile, tmperrfile, override_output_files=True, retry=False):
@@ -732,33 +765,22 @@ class TestActionQueue(TestCase):
actionQueue.execute_command(command)
# We call method in a separate thread
- execution_thread = Thread(target = patched_aq_execute_command ,
- args = (self.datanode_install_command, ))
+ execution_thread = Thread(target=patched_aq_execute_command ,
+ args=(self.datanode_install_command,))
execution_thread.start()
# check in progress report
# wait until ready
while True:
time.sleep(0.1)
- report = actionQueue.result()
- if len(report['reports']) != 0:
+ reports = actionQueue.commandStatuses.generate_report()[CLUSTER_ID]
+ if len(reports) != 0:
break
- unfreeze_flag.set()
- # wait until ready
- while len(report['reports']) != 0:
- time.sleep(0.1)
- report = actionQueue.result()
-
- # check report
- self.assertEqual(len(report['reports']), 0)
-
- @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
+ @patch.object(OSCheck, "os_distribution", new=MagicMock(return_value=os_distro_value))
@patch.object(CustomServiceOrchestrator, "runCommand")
@patch("CommandStatusDict.CommandStatusDict")
- @patch.object(ActionQueue, "status_update_callback")
- def test_store_configuration_tags(self, status_update_callback_mock,
- command_status_dict_mock,
+ def test_store_configuration_tags(self, command_status_dict_mock,
cso_runCommand_mock):
custom_service_orchestrator_execution_result_dict = {
'stdout': 'out',
@@ -774,33 +796,33 @@ class TestActionQueue(TestCase):
config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
config.set('agent', 'tolerate_download_failures', "true")
dummy_controller = MagicMock()
- actionQueue = ActionQueue(config, dummy_controller)
+
+ initializer_module = InitializerModule()
+ initializer_module.init()
+
+ actionQueue = ActionQueue(initializer_module)
actionQueue.execute_command(self.datanode_restart_command)
- report = actionQueue.result()
+ reports = actionQueue.commandStatuses.generate_report()[CLUSTER_ID]
expected = {'status': 'COMPLETED',
- 'configurationTags': {'global': {'tag': 'v123'}},
'stderr': 'stderr',
'stdout': 'out\n\nCommand completed successfully!\n',
- 'clusterName': u'cc',
'structuredOut': '""',
'roleCommand': u'CUSTOM_COMMAND',
'serviceName': u'HDFS',
'role': u'DATANODE',
'actionId': '1-1',
'taskId': 9,
- 'customCommand': 'RESTART',
+ 'clusterId': CLUSTER_ID,
'exitCode': 0}
# Agent caches configurationTags if custom_command RESTART completed
- self.assertEqual(len(report['reports']), 1)
- self.assertEqual(expected, report['reports'][0])
+ self.assertEqual(len(reports), 1)
+ self.assertEqual(expected, reports[0])
- @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
+ @patch.object(OSCheck, "os_distribution", new=MagicMock(return_value=os_distro_value))
@patch.object(ActualConfigHandler, "write_client_components")
@patch.object(CustomServiceOrchestrator, "runCommand")
@patch("CommandStatusDict.CommandStatusDict")
- @patch.object(ActionQueue, "status_update_callback")
- def test_store_configuration_tags_no_clients(self, status_update_callback_mock,
- command_status_dict_mock,
+ def test_store_configuration_tags_no_clients(self, command_status_dict_mock,
cso_runCommand_mock, write_client_components_mock):
custom_service_orchestrator_execution_result_dict = {
'stdout': 'out',
@@ -816,37 +838,33 @@ class TestActionQueue(TestCase):
config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
config.set('agent', 'tolerate_download_failures', "true")
dummy_controller = MagicMock()
- actionQueue = ActionQueue(config, dummy_controller)
+
+ initializer_module = InitializerModule()
+ initializer_module.init()
+
+ actionQueue = ActionQueue(initializer_module)
actionQueue.execute_command(self.datanode_restart_command_no_clients_update)
- report = actionQueue.result()
+ reports = actionQueue.commandStatuses.generate_report()[CLUSTER_ID]
expected = {'status': 'COMPLETED',
- 'configurationTags': {'global': {'tag': 'v123'}},
'stderr': 'stderr',
'stdout': 'out\n\nCommand completed successfully!\n',
- 'clusterName': u'cc',
+ 'clusterId': CLUSTER_ID,
'structuredOut': '""',
'roleCommand': u'CUSTOM_COMMAND',
'serviceName': u'HDFS',
'role': u'DATANODE',
'actionId': '1-1',
'taskId': 9,
- 'customCommand': 'RESTART',
'exitCode': 0}
# Agent caches configurationTags if custom_command RESTART completed
- self.assertEqual(len(report['reports']), 1)
- self.assertEqual(expected, report['reports'][0])
- self.assertFalse(write_client_components_mock.called)
+ self.assertEqual(len(reports), 1)
+ self.assertEqual(expected, reports[0])
- @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
- @patch.object(ActualConfigHandler, "write_client_components")
- @patch.object(ActualConfigHandler, "write_actual_component")
- @patch.object(ActualConfigHandler, "update_component_tag")
+ @patch.object(OSCheck, "os_distribution", new=MagicMock(return_value=os_distro_value))
@patch.object(CustomServiceOrchestrator, "runCommand")
@patch("CommandStatusDict.CommandStatusDict")
- @patch.object(ActionQueue, "status_update_callback")
- def test_refresh_queues_custom_command(self, status_update_callback_mock,
- command_status_dict_mock,
- cso_runCommand_mock, update_component_tag, write_actual_component_mock, write_client_components_mock):
+ def test_refresh_queues_custom_command(self, command_status_dict_mock,
+ cso_runCommand_mock):
custom_service_orchestrator_execution_result_dict = {
'stdout': 'out',
'stderr': 'stderr',
@@ -861,38 +879,33 @@ class TestActionQueue(TestCase):
config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
config.set('agent', 'tolerate_download_failures', "true")
dummy_controller = MagicMock()
- actionQueue = ActionQueue(config, dummy_controller)
+
+ initializer_module = InitializerModule()
+ initializer_module.init()
+
+ actionQueue = ActionQueue(initializer_module)
actionQueue.execute_command(self.yarn_refresh_queues_custom_command)
- report = actionQueue.result()
+ reports = actionQueue.commandStatuses.generate_report()[CLUSTER_ID]
expected = {'status': 'COMPLETED',
- 'configurationTags': None,
'stderr': 'stderr',
'stdout': 'out\n\nCommand completed successfully!\n',
- 'clusterName': u'cc',
+ 'clusterId': CLUSTER_ID,
'structuredOut': '""',
'roleCommand': u'CUSTOM_COMMAND',
'serviceName': u'YARN',
'role': u'RESOURCEMANAGER',
'actionId': '1-1',
'taskId': 9,
- 'customCommand': 'RESTART',
'exitCode': 0}
- self.assertEqual(len(report['reports']), 1)
- self.assertEqual(expected, report['reports'][0])
+ self.assertEqual(len(reports), 1)
+ self.assertEqual(expected, reports[0])
- # Configuration tags should be updated
- self.assertTrue(update_component_tag.called)
-
- @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
- @patch.object(ActualConfigHandler, "write_client_components")
- @patch.object(ActualConfigHandler, "write_actual_component")
+ @patch.object(OSCheck, "os_distribution", new=MagicMock(return_value=os_distro_value))
@patch.object(CustomServiceOrchestrator, "runCommand")
@patch("CommandStatusDict.CommandStatusDict")
- @patch.object(ActionQueue, "status_update_callback")
- def test_store_configuration_tags_on_custom_start_command(self, status_update_callback_mock,
- command_status_dict_mock,
- cso_runCommand_mock, write_actual_component_mock, write_client_components_mock):
+ def test_store_configuration_tags_on_custom_start_command(self, command_status_dict_mock,
+ cso_runCommand_mock):
custom_service_orchestrator_execution_result_dict = {
'stdout': 'out',
'stderr': 'stderr',
@@ -906,37 +919,33 @@ class TestActionQueue(TestCase):
config.set('agent', 'prefix', tempdir)
config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
config.set('agent', 'tolerate_download_failures', "true")
- dummy_controller = MagicMock()
- actionQueue = ActionQueue(config, dummy_controller)
+
+ initializer_module = InitializerModule()
+ initializer_module.init()
+
+ actionQueue = ActionQueue(initializer_module)
actionQueue.execute_command(self.datanode_start_custom_command)
- report = actionQueue.result()
+ reports = actionQueue.commandStatuses.generate_report()[CLUSTER_ID]
expected = {'status': 'COMPLETED',
- 'configurationTags': {'global': {'tag': 'v123'}},
'stderr': 'stderr',
'stdout': 'out\n\nCommand completed successfully!\n',
- 'clusterName': u'cc',
'structuredOut': '""',
'roleCommand': u'CUSTOM_COMMAND',
'serviceName': u'HDFS',
'role': u'DATANODE',
'actionId': '1-1',
'taskId': 9,
- 'customCommand': 'START',
- 'exitCode': 0}
- self.assertEqual(len(report['reports']), 1)
- self.assertEqual(expected, report['reports'][0])
-
- # Configuration tags should be updated on custom start command
- self.assertTrue(write_actual_component_mock.called)
+ 'exitCode': 0,
+ 'clusterId': CLUSTER_ID
+ }
+ self.assertEqual(len(reports), 1)
+ self.assertEqual(expected, reports[0])
- @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
- @patch.object(ActualConfigHandler, "write_actual_component")
+ @patch.object(OSCheck, "os_distribution", new=MagicMock(return_value=os_distro_value))
@patch.object(CustomServiceOrchestrator, "runCommand")
@patch("CommandStatusDict.CommandStatusDict")
- @patch.object(ActionQueue, "status_update_callback")
- def test_store_config_tags_on_install_client_command(self, status_update_callback_mock,
- command_status_dict_mock,
- cso_runCommand_mock, write_actual_component_mock):
+ def test_store_config_tags_on_install_client_command(self, command_status_dict_mock,
+ cso_runCommand_mock):
custom_service_orchestrator_execution_result_dict = {
'stdout': 'out',
@@ -956,9 +965,10 @@ class TestActionQueue(TestCase):
'serviceName': u'TEZ',
'configurations': {'global' : {}},
'configurationTags': {'global' : { 'tag': 'v123' }},
- 'hostLevelParams': {}
+ 'hostLevelParams': {},
+ 'clusterId': CLUSTER_ID,
}
- LiveStatus.CLIENT_COMPONENTS = ({'serviceName': 'TEZ', 'componentName': 'TEZ_CLIENT'}, )
+ LiveStatus.CLIENT_COMPONENTS = ({'serviceName': 'TEZ', 'componentName': 'TEZ_CLIENT'},)
config = AmbariConfig()
tempdir = tempfile.gettempdir()
@@ -966,109 +976,12 @@ class TestActionQueue(TestCase):
config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
config.set('agent', 'tolerate_download_failures', "true")
dummy_controller = MagicMock()
- actionQueue = ActionQueue(config, dummy_controller)
- actionQueue.execute_command(tez_client_install_command)
-
- # Configuration tags should be updated on install client command
- self.assertTrue(write_actual_component_mock.called)
-
- @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
- @patch.object(ActionQueue, "status_update_callback")
- @patch.object(ActionQueue, "execute_command")
- @patch.object(LiveStatus, "build")
- @patch.object(CustomServiceOrchestrator, "__init__")
- def test_execute_status_command(self, CustomServiceOrchestrator_mock,
- build_mock, execute_command_mock,
- status_update_callback):
- CustomServiceOrchestrator_mock.return_value = None
- dummy_controller = MagicMock()
- actionQueue = ActionQueue(AmbariConfig(), dummy_controller)
-
- build_mock.return_value = {'dummy report': '' }
-
- dummy_controller.recovery_manager = RecoveryManager(tempfile.mktemp())
-
- result = (self.status_command, {'exitcode': 0 })
-
- actionQueue.process_status_command_result(result)
- report = actionQueue.result()
- expected = {'dummy report': ''}
-
- self.assertEqual(len(report['componentStatus']), 1)
- self.assertEqual(report['componentStatus'][0], expected)
-
- @patch.object(RecoveryManager, "command_exists")
- @patch.object(RecoveryManager, "requires_recovery")
- @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
- @patch.object(ActionQueue, "status_update_callback")
- @patch.object(ActionQueue, "execute_command")
- @patch.object(LiveStatus, "build")
- @patch.object(CustomServiceOrchestrator, "__init__")
- def test_process_status_command_result_recovery(self, CustomServiceOrchestrator_mock,
- build_mock, execute_command_mock,
- status_update_callback, requires_recovery_mock,
- command_exists_mock):
- CustomServiceOrchestrator_mock.return_value = None
- dummy_controller = MagicMock()
- actionQueue = ActionQueue(AmbariConfig(), dummy_controller)
-
- build_mock.return_value = {'dummy report': '' }
- requires_recovery_mock.return_value = True
- command_exists_mock.return_value = False
-
- dummy_controller.recovery_manager = RecoveryManager(tempfile.mktemp(), True, False)
-
- result = (self.status_command, {'exitcode': 0 })
-
- actionQueue.process_status_command_result(result)
- report = actionQueue.result()
- expected = {'dummy report': '',
- 'sendExecCmdDet': 'True'}
-
- self.assertEqual(len(report['componentStatus']), 1)
- self.assertEqual(report['componentStatus'][0], expected)
-
- requires_recovery_mock.return_value = True
- command_exists_mock.return_value = True
-
- result = (self.status_command, {'exitcode': 0 })
-
- actionQueue.process_status_command_result(result)
- report = actionQueue.result()
- expected = {'dummy report': '',
- 'sendExecCmdDet': 'False'}
-
- self.assertEqual(len(report['componentStatus']), 1)
- self.assertEqual(report['componentStatus'][0], expected)
-
- @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
- @patch.object(ActionQueue, "status_update_callback")
- @patch.object(ActionQueue, "execute_command")
- @patch.object(LiveStatus, "build")
- @patch.object(CustomServiceOrchestrator, "__init__")
- def test_process_status_command_result_with_alerts(self, CustomServiceOrchestrator_mock,
- build_mock, execute_command_mock,
- status_update_callback):
- CustomServiceOrchestrator_mock.return_value = None
- dummy_controller = MagicMock()
- actionQueue = ActionQueue(AmbariConfig(), dummy_controller)
- command_return_value = {
- 'exitcode': 0,
- 'stdout': 'out',
- 'stderr': 'err',
- 'structuredOut': {'alerts': [ {'name': 'flume_alert'} ] }
- }
- result = (self.status_command_for_alerts, command_return_value)
+ initializer_module = InitializerModule()
+ initializer_module.init()
- build_mock.return_value = {'somestatusresult': 'aresult'}
-
- actionQueue.process_status_command_result(result)
-
- report = actionQueue.result()
-
- self.assertEqual(len(report['componentStatus']), 1)
- self.assertTrue(report['componentStatus'][0].has_key('alerts'))
+ actionQueue = ActionQueue(initializer_module)
+ actionQueue.execute_command(tez_client_install_command)
@patch.object(AmbariConfig, "get_parallel_exec_option")
@patch.object(ActionQueue, "process_command")
@@ -1082,7 +995,11 @@ class TestActionQueue(TestCase):
config = MagicMock()
gpeo_mock.return_value = 0
config.get_parallel_exec_option = gpeo_mock
- actionQueue = ActionQueue(config, dummy_controller)
+
+ initializer_module = InitializerModule()
+ initializer_module.init()
+
+ actionQueue = ActionQueue(initializer_module)
actionQueue.start()
actionQueue.put([self.datanode_install_command, self.hbase_install_command])
self.assertEqual(2, actionQueue.commandQueue.qsize())
@@ -1091,9 +1008,9 @@ class TestActionQueue(TestCase):
self.assertTrue(actionQueue.commandQueue.empty())
self.assertFalse(actionQueue.tasks_in_progress_or_pending())
time.sleep(0.1)
- actionQueue.stop()
+ initializer_module.stop_event.set()
actionQueue.join()
- self.assertEqual(actionQueue.stopped(), True, 'Action queue is not stopped.')
+ self.assertEqual(actionQueue.is_alive(), False, 'Action queue is not stopped.')
@patch.object(AmbariConfig, "get_parallel_exec_option")
@patch.object(ActionQueue, "process_command")
@@ -1102,20 +1019,28 @@ class TestActionQueue(TestCase):
def test_cancel(self, CustomServiceOrchestrator_mock,
get_mock, process_command_mock, gpeo_mock):
CustomServiceOrchestrator_mock.return_value = None
- dummy_controller = MagicMock()
+
+ initializer_module = InitializerModule()
+ initializer_module.init()
+
+ dummy_controller = MagicMock(initializer_module)
config = MagicMock()
gpeo_mock.return_value = 0
config.get_parallel_exec_option = gpeo_mock
- actionQueue = ActionQueue(config, dummy_controller)
+
+ initializer_module = InitializerModule()
+ initializer_module.init()
+
+ actionQueue = ActionQueue(initializer_module)
actionQueue.start()
actionQueue.put([self.datanode_install_command, self.hbase_install_command])
self.assertEqual(2, actionQueue.commandQueue.qsize())
actionQueue.reset()
self.assertTrue(actionQueue.commandQueue.empty())
time.sleep(0.1)
- actionQueue.stop()
+ initializer_module.stop_event.set()
actionQueue.join()
- self.assertEqual(actionQueue.stopped(), True, 'Action queue is not stopped.')
+ self.assertEqual(actionQueue.is_alive(), False, 'Action queue is not stopped.')
@patch.object(AmbariConfig, "get_parallel_exec_option")
@patch.object(ActionQueue, "process_command")
@@ -1123,18 +1048,24 @@ class TestActionQueue(TestCase):
def test_parallel_exec(self, CustomServiceOrchestrator_mock,
process_command_mock, gpeo_mock):
CustomServiceOrchestrator_mock.return_value = None
- dummy_controller = MagicMock()
+
+ initializer_module = InitializerModule()
+ initializer_module.init()
+
+ dummy_controller = MagicMock(initializer_module)
config = MagicMock()
gpeo_mock.return_value = 1
config.get_parallel_exec_option = gpeo_mock
- actionQueue = ActionQueue(config, dummy_controller)
+ initializer_module = InitializerModule()
+ initializer_module.init()
+ actionQueue = ActionQueue(initializer_module)
actionQueue.put([self.datanode_install_command, self.hbase_install_command])
self.assertEqual(2, actionQueue.commandQueue.qsize())
actionQueue.start()
time.sleep(1)
- actionQueue.stop()
+ initializer_module.stop_event.set()
actionQueue.join()
- self.assertEqual(actionQueue.stopped(), True, 'Action queue is not stopped.')
+ self.assertEqual(actionQueue.is_alive(), False, 'Action queue is not stopped.')
self.assertEqual(2, process_command_mock.call_count)
process_command_mock.assert_any_calls([call(self.datanode_install_command), call(self.hbase_install_command)])
@@ -1145,18 +1076,26 @@ class TestActionQueue(TestCase):
def test_parallel_exec_no_retry(self, CustomServiceOrchestrator_mock,
process_command_mock, gpeo_mock, threading_mock):
CustomServiceOrchestrator_mock.return_value = None
- dummy_controller = MagicMock()
+
+ initializer_module = InitializerModule()
+ initializer_module.init()
+
+ dummy_controller = MagicMock(initializer_module)
config = MagicMock()
gpeo_mock.return_value = 1
config.get_parallel_exec_option = gpeo_mock
- actionQueue = ActionQueue(config, dummy_controller)
+
+ initializer_module = InitializerModule()
+ initializer_module.init()
+
+ actionQueue = ActionQueue(initializer_module)
actionQueue.put([self.datanode_install_no_retry_command, self.snamenode_install_command])
self.assertEqual(2, actionQueue.commandQueue.qsize())
actionQueue.start()
time.sleep(1)
- actionQueue.stop()
+ initializer_module.stop_event.set()
actionQueue.join()
- self.assertEqual(actionQueue.stopped(), True, 'Action queue is not stopped.')
+ self.assertEqual(actionQueue.is_alive(), False, 'Action queue is not stopped.')
self.assertEqual(2, process_command_mock.call_count)
self.assertEqual(0, threading_mock.call_count)
process_command_mock.assert_any_calls([call(self.datanode_install_command), call(self.hbase_install_command)])
@@ -1187,7 +1126,7 @@ class TestActionQueue(TestCase):
runCommand_mock.side_effect = side_effect
actionQueue.execute_command(command)
- #assert that python executor start
+ # assert that python executor start
self.assertTrue(runCommand_mock.called)
self.assertEqual(3, runCommand_mock.call_count)
self.assertEqual(2, sleep_mock.call_count)
@@ -1209,9 +1148,9 @@ class TestActionQueue(TestCase):
sleep_mock, time_mock
):
CustomServiceOrchestrator_mock.return_value = None
- dummy_controller = MagicMock()
- dummy_controller.recovery_manager = RecoveryManager(tempfile.mktemp())
- actionQueue = ActionQueue(AmbariConfig(), dummy_controller)
+ initializer_module = InitializerModule()
+ initializer_module.init()
+ actionQueue = ActionQueue(initializer_module)
python_execution_result_dict = {
'exitcode': 1,
'stdout': 'out',
@@ -1233,18 +1172,18 @@ class TestActionQueue(TestCase):
runCommand_mock.side_effect = side_effect
actionQueue.execute_command(command)
- #assert that python executor start
+ # assert that python executor start
self.assertTrue(runCommand_mock.called)
self.assertEqual(2, runCommand_mock.call_count)
self.assertEqual(1, sleep_mock.call_count)
- sleep_mock.assert_has_calls([call(1)], False)
+ sleep_mock.assert_has_calls([call(2)], False)
runCommand_mock.assert_has_calls([
call(command, os.sep + 'tmp' + os.sep + 'ambari-agent' + os.sep + 'output-19.txt',
os.sep + 'tmp' + os.sep + 'ambari-agent' + os.sep + 'errors-19.txt', override_output_files=True, retry=False),
call(command, os.sep + 'tmp' + os.sep + 'ambari-agent' + os.sep + 'output-19.txt',
os.sep + 'tmp' + os.sep + 'ambari-agent' + os.sep + 'errors-19.txt', override_output_files=False, retry=True)])
- #retryable_command
+ # retryable_command
@not_for_platform(PLATFORM_LINUX)
@patch("time.sleep")
@patch.object(OSCheck, "os_distribution", new=MagicMock(return_value=os_distro_value))
@@ -1276,7 +1215,7 @@ class TestActionQueue(TestCase):
runCommand_mock.side_effect = [execution_result_fail_dict, execution_result_succ_dict]
actionQueue.execute_command(command)
- #assert that python executor start
+ # assert that python executor start
self.assertTrue(runCommand_mock.called)
self.assertEqual(2, runCommand_mock.call_count)
self.assertEqual(1, sleep_mock.call_count)
@@ -1306,12 +1245,12 @@ class TestActionQueue(TestCase):
runCommand_mock.side_effect = [execution_result_succ_dict]
actionQueue.execute_command(command)
- #assert that python executor start
+ # assert that python executor start
self.assertTrue(runCommand_mock.called)
self.assertFalse(sleep_mock.called)
self.assertEqual(1, runCommand_mock.call_count)
- @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
+ @patch.object(OSCheck, "os_distribution", new=MagicMock(return_value=os_distro_value))
@patch.object(CustomServiceOrchestrator, "runCommand")
@patch.object(CustomServiceOrchestrator, "__init__")
def test_execute_background_command(self, CustomServiceOrchestrator_mock,
@@ -1321,27 +1260,29 @@ class TestActionQueue(TestCase):
CustomServiceOrchestrator.runCommand.return_value = {'exitcode' : 0,
'stdout': 'out-11',
'stderr' : 'err-13'}
+
+ initializer_module = InitializerModule()
+ initializer_module.init()
- dummy_controller = MagicMock()
- actionQueue = ActionQueue(AmbariConfig(), dummy_controller)
+ actionQueue = ActionQueue(initializer_module)
execute_command = copy.deepcopy(self.background_command)
actionQueue.put([execute_command])
actionQueue.processBackgroundQueueSafeEmpty();
- actionQueue.controller.statusCommandExecutor.process_results();
+ # actionQueue.controller.statusCommandExecutor.process_results();
- #assert that python execturor start
+ # assert that python execturor start
self.assertTrue(runCommand_mock.called)
runningCommand = actionQueue.commandStatuses.current_state.get(execute_command['taskId'])
self.assertTrue(runningCommand is not None)
self.assertEqual(runningCommand[1]['status'], ActionQueue.IN_PROGRESS_STATUS)
- report = actionQueue.result()
- self.assertEqual(len(report['reports']),1)
+ reports = actionQueue.commandStatuses.generate_report()[CLUSTER_ID]
+ self.assertEqual(len(reports), 1)
@patch.object(CustomServiceOrchestrator, "get_py_executor")
@patch.object(CustomServiceOrchestrator, "resolve_script_path")
- def test_execute_python_executor(self, resolve_script_path_mock,
+ def __test_execute_python_executor(self, resolve_script_path_mock,
get_py_executor_mock):
dummy_controller = MagicMock()
@@ -1350,7 +1291,15 @@ class TestActionQueue(TestCase):
cfg.set('agent', 'prefix', '.')
cfg.set('agent', 'cache_dir', 'background_tasks')
- actionQueue = ActionQueue(cfg, dummy_controller)
+ initializer_module = InitializerModule()
+ initializer_module.init()
+ initializer_module.config = cfg
+ initializer_module.metadata_cache.cache_update({CLUSTER_ID:{'clusterLevelParams':{}}}, 'abc')
+ initializer_module.configurations_cache.cache_update({CLUSTER_ID:{}}, 'abc')
+ initializer_module.host_level_params_cache.cache_update({CLUSTER_ID:{}}, 'abc')
+ CustomServiceOrchestrator.runCommand = default_run_command
+
+ actionQueue = ActionQueue(initializer_module)
pyex = PythonExecutor(actionQueue.customServiceOrchestrator.tmp_dir, actionQueue.customServiceOrchestrator.config)
patch_output_file(pyex)
get_py_executor_mock.return_value = pyex
@@ -1372,7 +1321,6 @@ class TestActionQueue(TestCase):
None, command_complete_w)
actionQueue.put([self.background_command])
actionQueue.processBackgroundQueueSafeEmpty();
- actionQueue.controller.statusCommandExecutor.process_results();
with lock:
complete_done.wait(0.1)
@@ -1388,9 +1336,9 @@ class TestActionQueue(TestCase):
self.assertTrue(runningCommand is not None)
report = actionQueue.result()
- self.assertEqual(len(report['reports']),1)
- self.assertEqual(report['reports'][0]['stdout'],'process_out')
-# self.assertEqual(report['reports'][0]['structuredOut'],'{"a": "b."}')
+ self.assertEqual(len(reports), 1)
+ self.assertEqual(reports[0]['stdout'], 'process_out')
+# self.assertEqual(reports[0]['structuredOut'],'{"a": "b."}')
@@ -1426,11 +1374,11 @@ def patch_output_file(pythonExecutor):
pythonExecutor.open_subprocess_files = open_subprocess_files_win
pythonExecutor.read_result_from_files = read_result_from_files
-def wraped(func, before = None, after = None):
+def wraped(func, before=None, after=None):
def wrapper(*args, **kwargs):
if(before is not None):
before(*args, **kwargs)
- ret = func(*args, **kwargs)
+ ret = func(*args, **kwargs)
if(after is not None):
after(*args, **kwargs)
return ret
--
To stop receiving notification emails like this one, please contact
aonishuk@apache.org.