You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by sm...@apache.org on 2016/04/12 00:46:22 UTC
ambari git commit: AMBARI-15795. Parallel execution should only be
allowed on commands that have auto retry enabled (smohanty)
Repository: ambari
Updated Branches:
refs/heads/trunk ca7b901e7 -> 1a7c781ce
AMBARI-15795. Parallel execution should only be allowed on commands that have auto retry enabled (smohanty)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/1a7c781c
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/1a7c781c
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/1a7c781c
Branch: refs/heads/trunk
Commit: 1a7c781ce5340e332dc182c2c93d010cf0eec902
Parents: ca7b901
Author: Sumit Mohanty <sm...@hortonworks.com>
Authored: Mon Apr 11 15:45:53 2016 -0700
Committer: Sumit Mohanty <sm...@hortonworks.com>
Committed: Mon Apr 11 15:45:59 2016 -0700
----------------------------------------------------------------------
.../src/main/python/ambari_agent/ActionQueue.py | 22 +-
.../test/python/ambari_agent/TestActionQueue.py | 48 +-
.../python/ambari_agent/TestActionQueue.py.orig | 1158 ++++++++++++++++++
3 files changed, 1221 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/1a7c781c/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
index d603566..ccae62c 100644
--- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
+++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
@@ -156,11 +156,23 @@ class ActionQueue(threading.Thread):
# commands using separate threads
while (True):
command = self.commandQueue.get(True, self.EXECUTION_COMMAND_WAIT_TIME)
- logger.info("Kicking off a thread for the command, id=" +
- str(command['commandId']) + " taskId=" + str(command['taskId']))
- t = threading.Thread(target=self.process_command, args=(command,))
- t.daemon = True
- t.start()
+ # If command is not retry_enabled then do not start them in parallel
+ # checking just one command is enough as all commands for a stage is sent
+ # at the same time and retry is only enabled for initial start/install
+ retryAble = False
+ if 'command_retry_enabled' in command['commandParams']:
+ retryAble = command['commandParams']['command_retry_enabled'] == "true"
+ if retryAble:
+ logger.info("Kicking off a thread for the command, id=" +
+ str(command['commandId']) + " taskId=" + str(command['taskId']))
+ t = threading.Thread(target=self.process_command, args=(command,))
+ t.daemon = True
+ t.start()
+ else:
+ self.process_command(command)
+ break;
+ pass
+ pass
except (Queue.Empty):
pass
http://git-wip-us.apache.org/repos/asf/ambari/blob/1a7c781c/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
index 8bd5ddc..2adf4ed 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
@@ -59,7 +59,26 @@ class TestActionQueue(TestCase):
'serviceName': u'HDFS',
'hostLevelParams': {},
'configurations':{'global' : {}},
- 'configurationTags':{'global' : { 'tag': 'v1' }}
+ 'configurationTags':{'global' : { 'tag': 'v1' }},
+ 'commandParams': {
+ 'command_retry_enabled': 'true'
+ }
+ }
+
+ datanode_install_no_retry_command = {
+ 'commandType': 'EXECUTION_COMMAND',
+ 'role': u'DATANODE',
+ 'roleCommand': u'INSTALL',
+ 'commandId': '1-1',
+ 'taskId': 3,
+ 'clusterName': u'cc',
+ 'serviceName': u'HDFS',
+ 'hostLevelParams': {},
+ 'configurations':{'global' : {}},
+ 'configurationTags':{'global' : { 'tag': 'v1' }},
+ 'commandParams': {
+ 'command_retry_enabled': 'false'
+ }
}
datanode_auto_start_command = {
@@ -124,8 +143,11 @@ class TestActionQueue(TestCase):
'taskId': 7,
'clusterName': u'cc',
'serviceName': u'HDFS',
- 'hostLevelParams': {}
+ 'hostLevelParams': {},
+ 'commandParams': {
+ 'command_retry_enabled': 'true'
}
+ }
status_command = {
"serviceName" : 'HDFS',
@@ -883,6 +905,28 @@ class TestActionQueue(TestCase):
self.assertEqual(2, process_command_mock.call_count)
process_command_mock.assert_any_calls([call(self.datanode_install_command), call(self.hbase_install_command)])
+ @patch("threading.Thread")
+ @patch.object(AmbariConfig, "get_parallel_exec_option")
+ @patch.object(ActionQueue, "process_command")
+ @patch.object(CustomServiceOrchestrator, "__init__")
+ def test_parallel_exec_no_retry(self, CustomServiceOrchestrator_mock,
+ process_command_mock, gpeo_mock, threading_mock):
+ CustomServiceOrchestrator_mock.return_value = None
+ dummy_controller = MagicMock()
+ config = MagicMock()
+ gpeo_mock.return_value = 1
+ config.get_parallel_exec_option = gpeo_mock
+ actionQueue = ActionQueue(config, dummy_controller)
+ actionQueue.put([self.datanode_install_no_retry_command, self.snamenode_install_command])
+ self.assertEqual(2, actionQueue.commandQueue.qsize())
+ actionQueue.start()
+ time.sleep(1)
+ actionQueue.stop()
+ actionQueue.join()
+ self.assertEqual(actionQueue.stopped(), True, 'Action queue is not stopped.')
+ self.assertEqual(1, process_command_mock.call_count)
+ self.assertEqual(0, threading_mock.call_count)
+ process_command_mock.assert_any_calls([call(self.datanode_install_command), call(self.hbase_install_command)])
@not_for_platform(PLATFORM_LINUX)
@patch("time.sleep")
http://git-wip-us.apache.org/repos/asf/ambari/blob/1a7c781c/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py.orig
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py.orig b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py.orig
new file mode 100644
index 0000000..8bd5ddc
--- /dev/null
+++ b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py.orig
@@ -0,0 +1,1158 @@
+#!/usr/bin/env python
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+from Queue import Queue
+
+from unittest import TestCase
+from ambari_agent.LiveStatus import LiveStatus
+from ambari_agent.ActionQueue import ActionQueue
+from ambari_agent.AmbariConfig import AmbariConfig
+import os, errno, time, pprint, tempfile, threading
+import sys
+from threading import Thread
+import copy
+
+from mock.mock import patch, MagicMock, call
+from ambari_agent.CustomServiceOrchestrator import CustomServiceOrchestrator
+from ambari_agent.PythonExecutor import PythonExecutor
+from ambari_agent.ActualConfigHandler import ActualConfigHandler
+from ambari_agent.RecoveryManager import RecoveryManager
+from ambari_commons import OSCheck
+from only_for_platform import not_for_platform, os_distro_value, PLATFORM_WINDOWS, PLATFORM_LINUX
+
+import logging
+
+class TestActionQueue(TestCase):
+ def setUp(self):
+ # save original open() method for later use
+ self.original_open = open
+
+
+ def tearDown(self):
+ sys.stdout = sys.__stdout__
+
+ logger = logging.getLogger()
+
+ datanode_install_command = {
+ 'commandType': 'EXECUTION_COMMAND',
+ 'role': u'DATANODE',
+ 'roleCommand': u'INSTALL',
+ 'commandId': '1-1',
+ 'taskId': 3,
+ 'clusterName': u'cc',
+ 'serviceName': u'HDFS',
+ 'hostLevelParams': {},
+ 'configurations':{'global' : {}},
+ 'configurationTags':{'global' : { 'tag': 'v1' }}
+ }
+
+ datanode_auto_start_command = {
+ 'commandType': 'AUTO_EXECUTION_COMMAND',
+ 'role': u'DATANODE',
+ 'roleCommand': u'START',
+ 'commandId': '1-1',
+ 'taskId': 3,
+ 'clusterName': u'cc',
+ 'serviceName': u'HDFS',
+ 'hostLevelParams': {},
+ 'configurations':{'global' : {}},
+ 'configurationTags':{'global' : { 'tag': 'v1' }}
+ }
+
+ datanode_upgrade_command = {
+ 'commandId': 17,
+ 'role' : "role",
+ 'taskId' : "taskId",
+ 'clusterName' : "clusterName",
+ 'serviceName' : "serviceName",
+ 'roleCommand' : 'UPGRADE',
+ 'hostname' : "localhost.localdomain",
+ 'hostLevelParams': {},
+ 'clusterHostInfo': "clusterHostInfo",
+ 'commandType': "EXECUTION_COMMAND",
+ 'configurations':{'global' : {}},
+ 'roleParams': {},
+ 'commandParams' : {
+ 'source_stack_version' : 'HDP-1.2.1',
+ 'target_stack_version' : 'HDP-1.3.0'
+ }
+ }
+
+ namenode_install_command = {
+ 'commandType': 'EXECUTION_COMMAND',
+ 'role': u'NAMENODE',
+ 'roleCommand': u'INSTALL',
+ 'commandId': '1-1',
+ 'taskId': 4,
+ 'clusterName': u'cc',
+ 'serviceName': u'HDFS',
+ 'hostLevelParams': {}
+ }
+
+ snamenode_install_command = {
+ 'commandType': 'EXECUTION_COMMAND',
+ 'role': u'SECONDARY_NAMENODE',
+ 'roleCommand': u'INSTALL',
+ 'commandId': '1-1',
+ 'taskId': 5,
+ 'clusterName': u'cc',
+ 'serviceName': u'HDFS',
+ 'hostLevelParams': {}
+ }
+
+ hbase_install_command = {
+ 'commandType': 'EXECUTION_COMMAND',
+ 'role': u'HBASE',
+ 'roleCommand': u'INSTALL',
+ 'commandId': '1-1',
+ 'taskId': 7,
+ 'clusterName': u'cc',
+ 'serviceName': u'HDFS',
+ 'hostLevelParams': {}
+ }
+
+ status_command = {
+ "serviceName" : 'HDFS',
+ "commandType" : "STATUS_COMMAND",
+ "clusterName" : "",
+ "componentName" : "DATANODE",
+ 'configurations':{},
+ 'hostLevelParams': {}
+ }
+
+ datanode_restart_command = {
+ 'commandType': 'EXECUTION_COMMAND',
+ 'role': u'DATANODE',
+ 'roleCommand': u'CUSTOM_COMMAND',
+ 'commandId': '1-1',
+ 'taskId': 9,
+ 'clusterName': u'cc',
+ 'serviceName': u'HDFS',
+ 'configurations':{'global' : {}},
+ 'configurationTags':{'global' : { 'tag': 'v123' }},
+ 'hostLevelParams':{'custom_command': 'RESTART', 'clientsToUpdateConfigs': []}
+ }
+
+ datanode_restart_command_no_clients_update = {
+ 'commandType': 'EXECUTION_COMMAND',
+ 'role': u'DATANODE',
+ 'roleCommand': u'CUSTOM_COMMAND',
+ 'commandId': '1-1',
+ 'taskId': 9,
+ 'clusterName': u'cc',
+ 'serviceName': u'HDFS',
+ 'configurations':{'global' : {}},
+ 'configurationTags':{'global' : { 'tag': 'v123' }},
+ 'hostLevelParams':{'custom_command': 'RESTART'}
+ }
+
+ status_command_for_alerts = {
+ "serviceName" : 'FLUME',
+ "commandType" : "STATUS_COMMAND",
+ "clusterName" : "",
+ "componentName" : "FLUME_HANDLER",
+ 'configurations':{},
+ 'hostLevelParams': {}
+ }
+
+ retryable_command = {
+ 'commandType': 'EXECUTION_COMMAND',
+ 'role': 'NAMENODE',
+ 'roleCommand': 'INSTALL',
+ 'commandId': '1-1',
+ 'taskId': 19,
+ 'clusterName': 'c1',
+ 'serviceName': 'HDFS',
+ 'configurations':{'global' : {}},
+ 'configurationTags':{'global' : { 'tag': 'v123' }},
+ 'commandParams' : {
+ 'script_type' : 'PYTHON',
+ 'script' : 'script.py',
+ 'command_timeout' : '600',
+ 'jdk_location' : '.',
+ 'service_package_folder' : '.',
+ 'command_retry_enabled' : 'true',
+ 'max_duration_for_retries' : '5'
+ },
+ 'hostLevelParams' : {}
+ }
+
+ background_command = {
+ 'commandType': 'BACKGROUND_EXECUTION_COMMAND',
+ 'role': 'NAMENODE',
+ 'roleCommand': 'CUSTOM_COMMAND',
+ 'commandId': '1-1',
+ 'taskId': 19,
+ 'clusterName': 'c1',
+ 'serviceName': 'HDFS',
+ 'configurations':{'global' : {}},
+ 'configurationTags':{'global' : { 'tag': 'v123' }},
+ 'hostLevelParams':{'custom_command': 'REBALANCE_HDFS'},
+ 'commandParams' : {
+ 'script_type' : 'PYTHON',
+ 'script' : 'script.py',
+ 'command_timeout' : '600',
+ 'jdk_location' : '.',
+ 'service_package_folder' : '.'
+ }
+ }
+ cancel_background_command = {
+ 'commandType': 'EXECUTION_COMMAND',
+ 'role': 'NAMENODE',
+ 'roleCommand': 'ACTIONEXECUTE',
+ 'commandId': '1-1',
+ 'taskId': 20,
+ 'clusterName': 'c1',
+ 'serviceName': 'HDFS',
+ 'configurations':{'global' : {}},
+ 'configurationTags':{'global' : {}},
+ 'hostLevelParams':{},
+ 'commandParams' : {
+ 'script_type' : 'PYTHON',
+ 'script' : 'cancel_background_task.py',
+ 'before_system_hook_function' : 'fetch_bg_pid_by_taskid',
+ 'jdk_location' : '.',
+ 'command_timeout' : '600',
+ 'service_package_folder' : '.',
+ 'cancel_policy': 'SIGKILL',
+ 'cancel_task_id': "19",
+ }
+ }
+
+
+ @patch.object(AmbariConfig, "get_parallel_exec_option")
+ @patch.object(ActionQueue, "process_command")
+ @patch.object(Queue, "get")
+ @patch.object(CustomServiceOrchestrator, "__init__")
+ def test_ActionQueueStartStop(self, CustomServiceOrchestrator_mock,
+ get_mock, process_command_mock, get_parallel_exec_option_mock):
+ CustomServiceOrchestrator_mock.return_value = None
+ dummy_controller = MagicMock()
+ config = MagicMock()
+ get_parallel_exec_option_mock.return_value = 0
+ config.get_parallel_exec_option = get_parallel_exec_option_mock
+ actionQueue = ActionQueue(config, dummy_controller)
+ actionQueue.start()
+ time.sleep(0.1)
+ actionQueue.stop()
+ actionQueue.join()
+ self.assertEqual(actionQueue.stopped(), True, 'Action queue is not stopped.')
+ self.assertTrue(process_command_mock.call_count > 1)
+
+
+ @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
+ @patch("traceback.print_exc")
+ @patch.object(ActionQueue, "execute_command")
+ @patch.object(ActionQueue, "execute_status_command")
+ def test_process_command(self, execute_status_command_mock,
+ execute_command_mock, print_exc_mock):
+ dummy_controller = MagicMock()
+ config = AmbariConfig()
+ config.set('agent', 'tolerate_download_failures', "true")
+ actionQueue = ActionQueue(config, dummy_controller)
+ execution_command = {
+ 'commandType' : ActionQueue.EXECUTION_COMMAND,
+ }
+ status_command = {
+ 'commandType' : ActionQueue.STATUS_COMMAND,
+ }
+ wrong_command = {
+ 'commandType' : "SOME_WRONG_COMMAND",
+ }
+ # Try wrong command
+ actionQueue.process_command(wrong_command)
+ self.assertFalse(execute_command_mock.called)
+ self.assertFalse(execute_status_command_mock.called)
+ self.assertFalse(print_exc_mock.called)
+
+ execute_command_mock.reset_mock()
+ execute_status_command_mock.reset_mock()
+ print_exc_mock.reset_mock()
+ # Try normal execution
+ actionQueue.process_command(execution_command)
+ self.assertTrue(execute_command_mock.called)
+ self.assertFalse(execute_status_command_mock.called)
+ self.assertFalse(print_exc_mock.called)
+
+ execute_command_mock.reset_mock()
+ execute_status_command_mock.reset_mock()
+ print_exc_mock.reset_mock()
+
+ actionQueue.process_command(status_command)
+ self.assertFalse(execute_command_mock.called)
+ self.assertTrue(execute_status_command_mock.called)
+ self.assertFalse(print_exc_mock.called)
+
+ execute_command_mock.reset_mock()
+ execute_status_command_mock.reset_mock()
+ print_exc_mock.reset_mock()
+
+ # Try exception to check proper logging
+ def side_effect(self):
+ raise Exception("TerribleException")
+ execute_command_mock.side_effect = side_effect
+ actionQueue.process_command(execution_command)
+ self.assertTrue(print_exc_mock.called)
+
+ print_exc_mock.reset_mock()
+
+ execute_status_command_mock.side_effect = side_effect
+ actionQueue.process_command(execution_command)
+ self.assertTrue(print_exc_mock.called)
+
+ @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
+ @patch.object(CustomServiceOrchestrator, "runCommand")
+ @patch("CommandStatusDict.CommandStatusDict")
+ @patch.object(ActionQueue, "status_update_callback")
+ def test_log_execution_commands(self, status_update_callback_mock,
+ command_status_dict_mock,
+ cso_runCommand_mock):
+ custom_service_orchestrator_execution_result_dict = {
+ 'stdout': 'out',
+ 'stderr': 'stderr',
+ 'structuredOut' : '',
+ 'exitcode' : 0
+ }
+ cso_runCommand_mock.return_value = custom_service_orchestrator_execution_result_dict
+
+ config = AmbariConfig()
+ tempdir = tempfile.gettempdir()
+ config.set('agent', 'prefix', tempdir)
+ config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
+ config.set('agent', 'tolerate_download_failures', "true")
+ config.set('logging', 'log_command_executes', 1)
+ dummy_controller = MagicMock()
+ actionQueue = ActionQueue(config, dummy_controller)
+ actionQueue.execute_command(self.datanode_restart_command)
+ report = actionQueue.result()
+ expected = {'status': 'COMPLETED',
+ 'configurationTags': {'global': {'tag': 'v123'}},
+ 'stderr': 'stderr',
+ 'stdout': 'out',
+ 'clusterName': u'cc',
+ 'structuredOut': '""',
+ 'roleCommand': u'CUSTOM_COMMAND',
+ 'serviceName': u'HDFS',
+ 'role': u'DATANODE',
+ 'actionId': '1-1',
+ 'taskId': 9,
+ 'customCommand': 'RESTART',
+ 'exitCode': 0}
+ # Agent caches configurationTags if custom_command RESTART completed
+ self.assertEqual(len(report['reports']), 1)
+ self.assertEqual(expected, report['reports'][0])
+
+
+ @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
+ @patch("__builtin__.open")
+ @patch.object(ActionQueue, "status_update_callback")
+ def test_auto_execute_command(self, status_update_callback_mock, open_mock):
+ # Make file read calls visible
+ def open_side_effect(file, mode):
+ if mode == 'r':
+ file_mock = MagicMock()
+ file_mock.read.return_value = "Read from " + str(file)
+ return file_mock
+ else:
+ return self.original_open(file, mode)
+ open_mock.side_effect = open_side_effect
+
+ config = AmbariConfig()
+ tempdir = tempfile.gettempdir()
+ config.set('agent', 'prefix', tempdir)
+ config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
+ config.set('agent', 'tolerate_download_failures', "true")
+ dummy_controller = MagicMock()
+ dummy_controller.recovery_manager = RecoveryManager(tempfile.mktemp())
+ dummy_controller.recovery_manager.update_config(5, 5, 1, 11, True, False, "", -1)
+
+ actionQueue = ActionQueue(config, dummy_controller)
+ unfreeze_flag = threading.Event()
+ python_execution_result_dict = {
+ 'stdout': 'out',
+ 'stderr': 'stderr',
+ 'structuredOut' : ''
+ }
+
+ def side_effect(command, tmpoutfile, tmperrfile, override_output_files=True, retry=False):
+ unfreeze_flag.wait()
+ return python_execution_result_dict
+ def patched_aq_execute_command(command):
+ # We have to perform patching for separate thread in the same thread
+ with patch.object(CustomServiceOrchestrator, "runCommand") as runCommand_mock:
+ runCommand_mock.side_effect = side_effect
+ actionQueue.process_command(command)
+
+ python_execution_result_dict['status'] = 'COMPLETE'
+ python_execution_result_dict['exitcode'] = 0
+ self.assertFalse(actionQueue.tasks_in_progress_or_pending())
+ # We call method in a separate thread
+ execution_thread = Thread(target = patched_aq_execute_command ,
+ args = (self.datanode_auto_start_command, ))
+ execution_thread.start()
+ # check in progress report
+ # wait until ready
+ while True:
+ time.sleep(0.1)
+ if actionQueue.tasks_in_progress_or_pending():
+ break
+ # Continue command execution
+ unfreeze_flag.set()
+ # wait until ready
+ check_queue = True
+ while check_queue:
+ report = actionQueue.result()
+ if not actionQueue.tasks_in_progress_or_pending():
+ break
+ time.sleep(0.1)
+
+ self.assertEqual(len(report['reports']), 0)
+
+ ## Test failed execution
+ python_execution_result_dict['status'] = 'FAILED'
+ python_execution_result_dict['exitcode'] = 13
+ # We call method in a separate thread
+ execution_thread = Thread(target = patched_aq_execute_command ,
+ args = (self.datanode_auto_start_command, ))
+ execution_thread.start()
+ unfreeze_flag.set()
+ # check in progress report
+ # wait until ready
+ while check_queue:
+ report = actionQueue.result()
+ if not actionQueue.tasks_in_progress_or_pending():
+ break
+ time.sleep(0.1)
+
+ self.assertEqual(len(report['reports']), 0)
+
+ @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
+ @patch("__builtin__.open")
+ @patch.object(ActionQueue, "status_update_callback")
+ def test_execute_command(self, status_update_callback_mock, open_mock):
+ # Make file read calls visible
+ def open_side_effect(file, mode):
+ if mode == 'r':
+ file_mock = MagicMock()
+ file_mock.read.return_value = "Read from " + str(file)
+ return file_mock
+ else:
+ return self.original_open(file, mode)
+ open_mock.side_effect = open_side_effect
+
+ config = AmbariConfig()
+ tempdir = tempfile.gettempdir()
+ config.set('agent', 'prefix', tempdir)
+ config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
+ config.set('agent', 'tolerate_download_failures', "true")
+ dummy_controller = MagicMock()
+ actionQueue = ActionQueue(config, dummy_controller)
+ unfreeze_flag = threading.Event()
+ python_execution_result_dict = {
+ 'stdout': 'out',
+ 'stderr': 'stderr',
+ 'structuredOut' : ''
+ }
+
+ def side_effect(command, tmpoutfile, tmperrfile, override_output_files=True, retry=False):
+ unfreeze_flag.wait()
+ return python_execution_result_dict
+ def patched_aq_execute_command(command):
+ # We have to perform patching for separate thread in the same thread
+ with patch.object(CustomServiceOrchestrator, "runCommand") as runCommand_mock:
+ runCommand_mock.side_effect = side_effect
+ actionQueue.execute_command(command)
+ ### Test install/start/stop command ###
+ ## Test successful execution with configuration tags
+ python_execution_result_dict['status'] = 'COMPLETE'
+ python_execution_result_dict['exitcode'] = 0
+ # We call method in a separate thread
+ execution_thread = Thread(target = patched_aq_execute_command ,
+ args = (self.datanode_install_command, ))
+ execution_thread.start()
+ # check in progress report
+ # wait until ready
+ while True:
+ time.sleep(0.1)
+ report = actionQueue.result()
+ if len(report['reports']) != 0:
+ break
+ expected = {'status': 'IN_PROGRESS',
+ 'stderr': 'Read from {0}'.format(os.path.join(tempdir, "errors-3.txt")),
+ 'stdout': 'Read from {0}'.format(os.path.join(tempdir, "output-3.txt")),
+ 'structuredOut' : 'Read from {0}'.format(os.path.join(tempdir, "structured-out-3.json")),
+ 'clusterName': u'cc',
+ 'roleCommand': u'INSTALL',
+ 'serviceName': u'HDFS',
+ 'role': u'DATANODE',
+ 'actionId': '1-1',
+ 'taskId': 3,
+ 'exitCode': 777}
+ self.assertEqual(report['reports'][0], expected)
+ self.assertTrue(actionQueue.tasks_in_progress_or_pending())
+
+ # Continue command execution
+ unfreeze_flag.set()
+ # wait until ready
+ while report['reports'][0]['status'] == 'IN_PROGRESS':
+ time.sleep(0.1)
+ report = actionQueue.result()
+ # check report
+ configname = os.path.join(tempdir, 'config.json')
+ expected = {'status': 'COMPLETED',
+ 'stderr': 'stderr',
+ 'stdout': 'out',
+ 'clusterName': u'cc',
+ 'structuredOut': '""',
+ 'roleCommand': u'INSTALL',
+ 'serviceName': u'HDFS',
+ 'role': u'DATANODE',
+ 'actionId': '1-1',
+ 'taskId': 3,
+ 'configurationTags': {'global': {'tag': 'v1'}},
+ 'exitCode': 0}
+ self.assertEqual(len(report['reports']), 1)
+ self.assertEqual(report['reports'][0], expected)
+ self.assertTrue(os.path.isfile(configname))
+ # Check that we had 2 status update calls ( IN_PROGRESS and COMPLETE)
+ self.assertEqual(status_update_callback_mock.call_count, 2)
+ os.remove(configname)
+
+ # now should not have reports (read complete/failed reports are deleted)
+ report = actionQueue.result()
+ self.assertEqual(len(report['reports']), 0)
+
+ ## Test failed execution
+ python_execution_result_dict['status'] = 'FAILED'
+ python_execution_result_dict['exitcode'] = 13
+ # We call method in a separate thread
+ execution_thread = Thread(target = patched_aq_execute_command ,
+ args = (self.datanode_install_command, ))
+ execution_thread.start()
+ unfreeze_flag.set()
+ # check in progress report
+ # wait until ready
+ report = actionQueue.result()
+ while len(report['reports']) == 0 or \
+ report['reports'][0]['status'] == 'IN_PROGRESS':
+ time.sleep(0.1)
+ report = actionQueue.result()
+ # check report
+ expected = {'status': 'FAILED',
+ 'stderr': 'stderr',
+ 'stdout': 'out',
+ 'clusterName': u'cc',
+ 'structuredOut': '""',
+ 'roleCommand': u'INSTALL',
+ 'serviceName': u'HDFS',
+ 'role': u'DATANODE',
+ 'actionId': '1-1',
+ 'taskId': 3,
+ 'exitCode': 13}
+ self.assertEqual(len(report['reports']), 1)
+ self.assertEqual(report['reports'][0], expected)
+
+ # now should not have reports (read complete/failed reports are deleted)
+ report = actionQueue.result()
+ self.assertEqual(len(report['reports']), 0)
+
+ ### Test upgrade command ###
+ python_execution_result_dict['status'] = 'COMPLETE'
+ python_execution_result_dict['exitcode'] = 0
+ execution_thread = Thread(target = patched_aq_execute_command ,
+ args = (self.datanode_upgrade_command, ))
+ execution_thread.start()
+ unfreeze_flag.set()
+ # wait until ready
+ report = actionQueue.result()
+ while len(report['reports']) == 0 or \
+ report['reports'][0]['status'] == 'IN_PROGRESS':
+ time.sleep(0.1)
+ report = actionQueue.result()
+ # check report
+ expected = {'status': 'COMPLETED',
+ 'stderr': 'stderr',
+ 'stdout': 'out',
+ 'clusterName': 'clusterName',
+ 'structuredOut': '""',
+ 'roleCommand': 'UPGRADE',
+ 'serviceName': 'serviceName',
+ 'role': 'role',
+ 'actionId': 17,
+ 'taskId': 'taskId',
+ 'exitCode': 0}
+ self.assertEqual(len(report['reports']), 1)
+ self.assertEqual(report['reports'][0], expected)
+
+ # now should not have reports (read complete/failed reports are deleted)
+ report = actionQueue.result()
+ self.assertEqual(len(report['reports']), 0)
+
+
+ @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
+ @patch.object(CustomServiceOrchestrator, "runCommand")
+ @patch("CommandStatusDict.CommandStatusDict")
+ @patch.object(ActionQueue, "status_update_callback")
+ def test_store_configuration_tags(self, status_update_callback_mock,
+ command_status_dict_mock,
+ cso_runCommand_mock):
+ custom_service_orchestrator_execution_result_dict = {
+ 'stdout': 'out',
+ 'stderr': 'stderr',
+ 'structuredOut' : '',
+ 'exitcode' : 0
+ }
+ cso_runCommand_mock.return_value = custom_service_orchestrator_execution_result_dict
+
+ config = AmbariConfig()
+ tempdir = tempfile.gettempdir()
+ config.set('agent', 'prefix', tempdir)
+ config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
+ config.set('agent', 'tolerate_download_failures', "true")
+ dummy_controller = MagicMock()
+ actionQueue = ActionQueue(config, dummy_controller)
+ actionQueue.execute_command(self.datanode_restart_command)
+ report = actionQueue.result()
+ expected = {'status': 'COMPLETED',
+ 'configurationTags': {'global': {'tag': 'v123'}},
+ 'stderr': 'stderr',
+ 'stdout': 'out',
+ 'clusterName': u'cc',
+ 'structuredOut': '""',
+ 'roleCommand': u'CUSTOM_COMMAND',
+ 'serviceName': u'HDFS',
+ 'role': u'DATANODE',
+ 'actionId': '1-1',
+ 'taskId': 9,
+ 'customCommand': 'RESTART',
+ 'exitCode': 0}
+ # Agent caches configurationTags if custom_command RESTART completed
+ self.assertEqual(len(report['reports']), 1)
+ self.assertEqual(expected, report['reports'][0])
+
+ @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
+ @patch.object(ActualConfigHandler, "write_client_components")
+ @patch.object(CustomServiceOrchestrator, "runCommand")
+ @patch("CommandStatusDict.CommandStatusDict")
+ @patch.object(ActionQueue, "status_update_callback")
+ def test_store_configuration_tags_no_clients(self, status_update_callback_mock,
+ command_status_dict_mock,
+ cso_runCommand_mock, write_client_components_mock):
+ custom_service_orchestrator_execution_result_dict = {
+ 'stdout': 'out',
+ 'stderr': 'stderr',
+ 'structuredOut' : '',
+ 'exitcode' : 0
+ }
+ cso_runCommand_mock.return_value = custom_service_orchestrator_execution_result_dict
+
+ config = AmbariConfig()
+ tempdir = tempfile.gettempdir()
+ config.set('agent', 'prefix', tempdir)
+ config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
+ config.set('agent', 'tolerate_download_failures', "true")
+ dummy_controller = MagicMock()
+ actionQueue = ActionQueue(config, dummy_controller)
+ actionQueue.execute_command(self.datanode_restart_command_no_clients_update)
+ report = actionQueue.result()
+ expected = {'status': 'COMPLETED',
+ 'configurationTags': {'global': {'tag': 'v123'}},
+ 'stderr': 'stderr',
+ 'stdout': 'out',
+ 'clusterName': u'cc',
+ 'structuredOut': '""',
+ 'roleCommand': u'CUSTOM_COMMAND',
+ 'serviceName': u'HDFS',
+ 'role': u'DATANODE',
+ 'actionId': '1-1',
+ 'taskId': 9,
+ 'customCommand': 'RESTART',
+ 'exitCode': 0}
+ # Agent caches configurationTags if custom_command RESTART completed
+ self.assertEqual(len(report['reports']), 1)
+ self.assertEqual(expected, report['reports'][0])
+ self.assertFalse(write_client_components_mock.called)
+
+ @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
+ @patch.object(ActionQueue, "status_update_callback")
+ @patch.object(CustomServiceOrchestrator, "requestComponentStatus")
+ @patch.object(CustomServiceOrchestrator, "requestComponentSecurityState")
+ @patch.object(ActionQueue, "execute_command")
+ @patch.object(LiveStatus, "build")
+ @patch.object(CustomServiceOrchestrator, "__init__")
+ def test_execute_status_command(self, CustomServiceOrchestrator_mock,
+ build_mock, execute_command_mock, requestComponentSecurityState_mock,
+ requestComponentStatus_mock,
+ status_update_callback):
+ CustomServiceOrchestrator_mock.return_value = None
+ dummy_controller = MagicMock()
+ actionQueue = ActionQueue(AmbariConfig(), dummy_controller)
+
+ build_mock.return_value = {'dummy report': '' }
+
+ dummy_controller.recovery_manager = RecoveryManager(tempfile.mktemp())
+
+ requestComponentStatus_mock.reset_mock()
+ requestComponentStatus_mock.return_value = {'exitcode': 0 }
+
+ requestComponentSecurityState_mock.reset_mock()
+ requestComponentSecurityState_mock.return_value = 'UNKNOWN'
+
+ actionQueue.execute_status_command(self.status_command)
+ report = actionQueue.result()
+ expected = {'dummy report': '',
+ 'securityState' : 'UNKNOWN'}
+
+ self.assertEqual(len(report['componentStatus']), 1)
+ self.assertEqual(report['componentStatus'][0], expected)
+ self.assertTrue(requestComponentStatus_mock.called)
+
+ @patch.object(RecoveryManager, "command_exists")
+ @patch.object(RecoveryManager, "requires_recovery")
+ @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
+ @patch.object(ActionQueue, "status_update_callback")
+ @patch.object(CustomServiceOrchestrator, "requestComponentStatus")
+ @patch.object(CustomServiceOrchestrator, "requestComponentSecurityState")
+ @patch.object(ActionQueue, "execute_command")
+ @patch.object(LiveStatus, "build")
+ @patch.object(CustomServiceOrchestrator, "__init__")
+ def test_execute_status_command_recovery(self, CustomServiceOrchestrator_mock,
+ build_mock, execute_command_mock, requestComponentSecurityState_mock,
+ requestComponentStatus_mock,
+ status_update_callback, requires_recovery_mock,
+ command_exists_mock):
+ CustomServiceOrchestrator_mock.return_value = None
+ dummy_controller = MagicMock()
+ actionQueue = ActionQueue(AmbariConfig(), dummy_controller)
+
+ build_mock.return_value = {'dummy report': '' }
+ requires_recovery_mock.return_value = True
+ command_exists_mock.return_value = False
+
+ dummy_controller.recovery_manager = RecoveryManager(tempfile.mktemp(), True, False)
+
+ requestComponentStatus_mock.reset_mock()
+ requestComponentStatus_mock.return_value = {'exitcode': 0 }
+
+ requestComponentSecurityState_mock.reset_mock()
+ requestComponentSecurityState_mock.return_value = 'UNKNOWN'
+
+ actionQueue.execute_status_command(self.status_command)
+ report = actionQueue.result()
+ expected = {'dummy report': '',
+ 'securityState' : 'UNKNOWN',
+ 'sendExecCmdDet': 'True'}
+
+ self.assertEqual(len(report['componentStatus']), 1)
+ self.assertEqual(report['componentStatus'][0], expected)
+ self.assertTrue(requestComponentStatus_mock.called)
+
+ requires_recovery_mock.return_value = True
+ command_exists_mock.return_value = True
+ requestComponentStatus_mock.reset_mock()
+ requestComponentStatus_mock.return_value = {'exitcode': 0 }
+
+ requestComponentSecurityState_mock.reset_mock()
+ requestComponentSecurityState_mock.return_value = 'UNKNOWN'
+
+ actionQueue.execute_status_command(self.status_command)
+ report = actionQueue.result()
+ expected = {'dummy report': '',
+ 'securityState' : 'UNKNOWN',
+ 'sendExecCmdDet': 'False'}
+
+ self.assertEqual(len(report['componentStatus']), 1)
+ self.assertEqual(report['componentStatus'][0], expected)
+ self.assertTrue(requestComponentStatus_mock.called)
+
+ @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
+ @patch.object(ActionQueue, "status_update_callback")
+ @patch.object(CustomServiceOrchestrator, "requestComponentStatus")
+ @patch.object(CustomServiceOrchestrator, "requestComponentSecurityState")
+ @patch.object(ActionQueue, "execute_command")
+ @patch.object(LiveStatus, "build")
+ @patch.object(CustomServiceOrchestrator, "__init__")
+ def test_execute_status_command_with_alerts(self, CustomServiceOrchestrator_mock,
+ requestComponentSecurityState_mock,
+ build_mock, execute_command_mock,
+ requestComponentStatus_mock,
+ status_update_callback):
+ CustomServiceOrchestrator_mock.return_value = None
+ dummy_controller = MagicMock()
+ actionQueue = ActionQueue(AmbariConfig(), dummy_controller)
+
+
+ requestComponentStatus_mock.reset_mock()
+ requestComponentStatus_mock.return_value = {
+ 'exitcode': 0,
+ 'stdout': 'out',
+ 'stderr': 'err',
+ 'structuredOut': {'alerts': [ {'name': 'flume_alert'} ] }
+ }
+ build_mock.return_value = {'somestatusresult': 'aresult'}
+
+ actionQueue.execute_status_command(self.status_command_for_alerts)
+
+ report = actionQueue.result()
+
+ self.assertTrue(requestComponentStatus_mock.called)
+ self.assertEqual(len(report['componentStatus']), 1)
+ self.assertTrue(report['componentStatus'][0].has_key('alerts'))
+
+ @patch.object(AmbariConfig, "get_parallel_exec_option")
+ @patch.object(ActionQueue, "process_command")
+ @patch.object(Queue, "get")
+ @patch.object(CustomServiceOrchestrator, "__init__")
+ def test_reset_queue(self, CustomServiceOrchestrator_mock,
+ get_mock, process_command_mock, gpeo_mock):
+ CustomServiceOrchestrator_mock.return_value = None
+ dummy_controller = MagicMock()
+ dummy_controller.recovery_manager = RecoveryManager(tempfile.mktemp())
+ config = MagicMock()
+ gpeo_mock.return_value = 0
+ config.get_parallel_exec_option = gpeo_mock
+ actionQueue = ActionQueue(config, dummy_controller)
+ actionQueue.start()
+ actionQueue.put([self.datanode_install_command, self.hbase_install_command])
+ self.assertEqual(2, actionQueue.commandQueue.qsize())
+ self.assertTrue(actionQueue.tasks_in_progress_or_pending())
+ actionQueue.reset()
+ self.assertTrue(actionQueue.commandQueue.empty())
+ self.assertFalse(actionQueue.tasks_in_progress_or_pending())
+ time.sleep(0.1)
+ actionQueue.stop()
+ actionQueue.join()
+ self.assertEqual(actionQueue.stopped(), True, 'Action queue is not stopped.')
+
+ @patch.object(AmbariConfig, "get_parallel_exec_option")
+ @patch.object(ActionQueue, "process_command")
+ @patch.object(Queue, "get")
+ @patch.object(CustomServiceOrchestrator, "__init__")
+ def test_cancel(self, CustomServiceOrchestrator_mock,
+ get_mock, process_command_mock, gpeo_mock):
+ CustomServiceOrchestrator_mock.return_value = None
+ dummy_controller = MagicMock()
+ config = MagicMock()
+ gpeo_mock.return_value = 0
+ config.get_parallel_exec_option = gpeo_mock
+ actionQueue = ActionQueue(config, dummy_controller)
+ actionQueue.start()
+ actionQueue.put([self.datanode_install_command, self.hbase_install_command])
+ self.assertEqual(2, actionQueue.commandQueue.qsize())
+ actionQueue.reset()
+ self.assertTrue(actionQueue.commandQueue.empty())
+ time.sleep(0.1)
+ actionQueue.stop()
+ actionQueue.join()
+ self.assertEqual(actionQueue.stopped(), True, 'Action queue is not stopped.')
+
+ @patch.object(AmbariConfig, "get_parallel_exec_option")
+ @patch.object(ActionQueue, "process_command")
+ @patch.object(CustomServiceOrchestrator, "__init__")
+ def test_parallel_exec(self, CustomServiceOrchestrator_mock,
+ process_command_mock, gpeo_mock):
+ CustomServiceOrchestrator_mock.return_value = None
+ dummy_controller = MagicMock()
+ config = MagicMock()
+ gpeo_mock.return_value = 1
+ config.get_parallel_exec_option = gpeo_mock
+ actionQueue = ActionQueue(config, dummy_controller)
+ actionQueue.put([self.datanode_install_command, self.hbase_install_command])
+ self.assertEqual(2, actionQueue.commandQueue.qsize())
+ actionQueue.start()
+ time.sleep(1)
+ actionQueue.stop()
+ actionQueue.join()
+ self.assertEqual(actionQueue.stopped(), True, 'Action queue is not stopped.')
+ self.assertEqual(2, process_command_mock.call_count)
+ process_command_mock.assert_any_calls([call(self.datanode_install_command), call(self.hbase_install_command)])
+
+
+ @not_for_platform(PLATFORM_LINUX)
+ @patch("time.sleep")
+ @patch.object(OSCheck, "os_distribution", new=MagicMock(return_value=os_distro_value))
+ @patch.object(CustomServiceOrchestrator, "__init__")
+ def test_execute_retryable_command(self, CustomServiceOrchestrator_mock,
+ sleep_mock
+ ):
+ CustomServiceOrchestrator_mock.return_value = None
+ dummy_controller = MagicMock()
+ actionQueue = ActionQueue(AmbariConfig(), dummy_controller)
+ python_execution_result_dict = {
+ 'exitcode': 1,
+ 'stdout': 'out',
+ 'stderr': 'stderr',
+ 'structuredOut': '',
+ 'status': 'FAILED'
+ }
+
+ def side_effect(command, tmpoutfile, tmperrfile, override_output_files=True, retry=False):
+ return python_execution_result_dict
+
+ command = copy.deepcopy(self.retryable_command)
+ with patch.object(CustomServiceOrchestrator, "runCommand") as runCommand_mock:
+ runCommand_mock.side_effect = side_effect
+ actionQueue.execute_command(command)
+
+ #assert that python executor start
+ self.assertTrue(runCommand_mock.called)
+ self.assertEqual(3, runCommand_mock.call_count)
+ self.assertEqual(2, sleep_mock.call_count)
+ sleep_mock.assert_has_calls([call(2), call(3)], False)
+ runCommand_mock.assert_has_calls([
+ call(command, os.sep + 'tmp' + os.sep + 'ambari-agent' + os.sep + 'output-19.txt',
+ os.sep + 'tmp' + os.sep + 'ambari-agent' + os.sep + 'errors-19.txt', override_output_files=True, retry=False),
+ call(command, os.sep + 'tmp' + os.sep + 'ambari-agent' + os.sep + 'output-19.txt',
+ os.sep + 'tmp' + os.sep + 'ambari-agent' + os.sep + 'errors-19.txt', override_output_files=False, retry=True),
+ call(command, os.sep + 'tmp' + os.sep + 'ambari-agent' + os.sep + 'output-19.txt',
+ os.sep + 'tmp' + os.sep + 'ambari-agent' + os.sep + 'errors-19.txt', override_output_files=False, retry=True)])
+
+
+ @patch("time.time")
+ @patch("time.sleep")
+ @patch.object(OSCheck, "os_distribution", new=MagicMock(return_value=os_distro_value))
+ @patch.object(CustomServiceOrchestrator, "__init__")
+ def test_execute_retryable_command_with_time_lapse(self, CustomServiceOrchestrator_mock,
+ sleep_mock, time_mock
+ ):
+ CustomServiceOrchestrator_mock.return_value = None
+ dummy_controller = MagicMock()
+ actionQueue = ActionQueue(AmbariConfig(), dummy_controller)
+ python_execution_result_dict = {
+ 'exitcode': 1,
+ 'stdout': 'out',
+ 'stderr': 'stderr',
+ 'structuredOut': '',
+ 'status': 'FAILED'
+ }
+
+ times_arr = [8, 10, 14, 18, 22]
+ if self.logger.isEnabledFor(logging.INFO):
+ times_arr.insert(0, 4)
+ time_mock.side_effect = times_arr
+
+ def side_effect(command, tmpoutfile, tmperrfile, override_output_files=True, retry=False):
+ return python_execution_result_dict
+
+ command = copy.deepcopy(self.retryable_command)
+ with patch.object(CustomServiceOrchestrator, "runCommand") as runCommand_mock:
+ runCommand_mock.side_effect = side_effect
+ actionQueue.execute_command(command)
+
+ #assert that python executor start
+ self.assertTrue(runCommand_mock.called)
+ self.assertEqual(2, runCommand_mock.call_count)
+ self.assertEqual(1, sleep_mock.call_count)
+ sleep_mock.assert_has_calls([call(2)], False)
+ runCommand_mock.assert_has_calls([
+ call(command, os.sep + 'tmp' + os.sep + 'ambari-agent' + os.sep + 'output-19.txt',
+ os.sep + 'tmp' + os.sep + 'ambari-agent' + os.sep + 'errors-19.txt', override_output_files=True, retry=False),
+ call(command, os.sep + 'tmp' + os.sep + 'ambari-agent' + os.sep + 'output-19.txt',
+ os.sep + 'tmp' + os.sep + 'ambari-agent' + os.sep + 'errors-19.txt', override_output_files=False, retry=True)])
+
+ #retryable_command
+ @not_for_platform(PLATFORM_LINUX)
+ @patch("time.sleep")
+ @patch.object(OSCheck, "os_distribution", new=MagicMock(return_value=os_distro_value))
+ @patch.object(CustomServiceOrchestrator, "__init__")
+ def test_execute_retryable_command_fail_and_succeed(self, CustomServiceOrchestrator_mock,
+ sleep_mock
+ ):
+ CustomServiceOrchestrator_mock.return_value = None
+ dummy_controller = MagicMock()
+ actionQueue = ActionQueue(AmbariConfig(), dummy_controller)
+ execution_result_fail_dict = {
+ 'exitcode': 1,
+ 'stdout': 'out',
+ 'stderr': 'stderr',
+ 'structuredOut': '',
+ 'status': 'FAILED'
+ }
+ execution_result_succ_dict = {
+ 'exitcode': 0,
+ 'stdout': 'out',
+ 'stderr': 'stderr',
+ 'structuredOut': '',
+ 'status': 'COMPLETED'
+ }
+
+ command = copy.deepcopy(self.retryable_command)
+ with patch.object(CustomServiceOrchestrator, "runCommand") as runCommand_mock:
+ runCommand_mock.side_effect = [execution_result_fail_dict, execution_result_succ_dict]
+ actionQueue.execute_command(command)
+
+ #assert that python executor start
+ self.assertTrue(runCommand_mock.called)
+ self.assertEqual(2, runCommand_mock.call_count)
+ self.assertEqual(1, sleep_mock.call_count)
+ sleep_mock.assert_any_call(2)
+
+ @not_for_platform(PLATFORM_LINUX)
+ @patch("time.sleep")
+ @patch.object(OSCheck, "os_distribution", new=MagicMock(return_value=os_distro_value))
+ @patch.object(CustomServiceOrchestrator, "__init__")
+ def test_execute_retryable_command_succeed(self, CustomServiceOrchestrator_mock,
+ sleep_mock
+ ):
+ CustomServiceOrchestrator_mock.return_value = None
+ dummy_controller = MagicMock()
+ actionQueue = ActionQueue(AmbariConfig(), dummy_controller)
+ execution_result_succ_dict = {
+ 'exitcode': 0,
+ 'stdout': 'out',
+ 'stderr': 'stderr',
+ 'structuredOut': '',
+ 'status': 'COMPLETED'
+ }
+
+ command = copy.deepcopy(self.retryable_command)
+ with patch.object(CustomServiceOrchestrator, "runCommand") as runCommand_mock:
+ runCommand_mock.side_effect = [execution_result_succ_dict]
+ actionQueue.execute_command(command)
+
+ #assert that python executor start
+ self.assertTrue(runCommand_mock.called)
+ self.assertFalse(sleep_mock.called)
+ self.assertEqual(1, runCommand_mock.call_count)
+
+ @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
+ @patch.object(CustomServiceOrchestrator, "runCommand")
+ @patch.object(CustomServiceOrchestrator, "__init__")
+ def test_execute_background_command(self, CustomServiceOrchestrator_mock,
+ runCommand_mock,
+ ):
+ CustomServiceOrchestrator_mock.return_value = None
+ CustomServiceOrchestrator.runCommand.return_value = {'exitcode' : 0,
+ 'stdout': 'out-11',
+ 'stderr' : 'err-13'}
+
+ dummy_controller = MagicMock()
+ actionQueue = ActionQueue(AmbariConfig(), dummy_controller)
+
+ execute_command = copy.deepcopy(self.background_command)
+ actionQueue.put([execute_command])
+ actionQueue.processBackgroundQueueSafeEmpty();
+ actionQueue.processStatusCommandQueueSafeEmpty();
+
+ #assert that python execturor start
+ self.assertTrue(runCommand_mock.called)
+ runningCommand = actionQueue.commandStatuses.current_state.get(execute_command['taskId'])
+ self.assertTrue(runningCommand is not None)
+ self.assertEqual(runningCommand[1]['status'], ActionQueue.IN_PROGRESS_STATUS)
+
+ report = actionQueue.result()
+ self.assertEqual(len(report['reports']),1)
+
+ @patch.object(CustomServiceOrchestrator, "get_py_executor")
+ @patch.object(CustomServiceOrchestrator, "resolve_script_path")
+ def test_execute_python_executor(self, resolve_script_path_mock,
+ get_py_executor_mock):
+
+ dummy_controller = MagicMock()
+ cfg = AmbariConfig()
+ cfg.set('agent', 'tolerate_download_failures', 'true')
+ cfg.set('agent', 'prefix', '.')
+ cfg.set('agent', 'cache_dir', 'background_tasks')
+
+ actionQueue = ActionQueue(cfg, dummy_controller)
+ pyex = PythonExecutor(actionQueue.customServiceOrchestrator.tmp_dir, actionQueue.customServiceOrchestrator.config)
+ patch_output_file(pyex)
+ get_py_executor_mock.return_value = pyex
+ actionQueue.customServiceOrchestrator.dump_command_to_json = MagicMock()
+
+ result = {}
+ lock = threading.RLock()
+ complete_done = threading.Condition(lock)
+
+ def command_complete_w(process_condensed_result, handle):
+ with lock:
+ result['command_complete'] = {'condensed_result' : copy.copy(process_condensed_result),
+ 'handle' : copy.copy(handle),
+ 'command_status' : actionQueue.commandStatuses.get_command_status(handle.command['taskId'])
+ }
+ complete_done.notifyAll()
+
+ actionQueue.on_background_command_complete_callback = wraped(actionQueue.on_background_command_complete_callback,
+ None, command_complete_w)
+ actionQueue.put([self.background_command])
+ actionQueue.processBackgroundQueueSafeEmpty();
+ actionQueue.processStatusCommandQueueSafeEmpty();
+
+ with lock:
+ complete_done.wait(0.1)
+
+ finished_status = result['command_complete']['command_status']
+ self.assertEqual(finished_status['status'], ActionQueue.COMPLETED_STATUS)
+ self.assertEqual(finished_status['stdout'], 'process_out')
+ self.assertEqual(finished_status['stderr'], 'process_err')
+ self.assertEqual(finished_status['exitCode'], 0)
+
+
+ runningCommand = actionQueue.commandStatuses.current_state.get(self.background_command['taskId'])
+ self.assertTrue(runningCommand is not None)
+
+ report = actionQueue.result()
+ self.assertEqual(len(report['reports']),1)
+ self.assertEqual(report['reports'][0]['stdout'],'process_out')
+# self.assertEqual(report['reports'][0]['structuredOut'],'{"a": "b."}')
+
+
+
+ cancel_background_command = {
+ "commandType":"CANCEL_COMMAND",
+ "role":"AMBARI_SERVER_ACTION",
+ "roleCommand":"ABORT",
+ "commandId":"2--1",
+ "taskId":20,
+ "clusterName":"c1",
+ "serviceName":"",
+ "hostname":"c6401",
+ "roleParams":{
+ "cancelTaskIdTargets":"13,14"
+ },
+ }
+
+def patch_output_file(pythonExecutor):
+ def windows_py(command, tmpout, tmperr):
+ proc = MagicMock()
+ proc.pid = 33
+ proc.returncode = 0
+ with tmpout:
+ tmpout.write('process_out')
+ with tmperr:
+ tmperr.write('process_err')
+ return proc
+ def open_subprocess_files_win(fout, ferr, f):
+ return MagicMock(), MagicMock()
+ def read_result_from_files(out_path, err_path, structured_out_path):
+ return 'process_out', 'process_err', '{"a": "b."}'
+ pythonExecutor.launch_python_subprocess = windows_py
+ pythonExecutor.open_subprocess_files = open_subprocess_files_win
+ pythonExecutor.read_result_from_files = read_result_from_files
+
+def wraped(func, before = None, after = None):
+ def wrapper(*args, **kwargs):
+ if(before is not None):
+ before(*args, **kwargs)
+ ret = func(*args, **kwargs)
+ if(after is not None):
+ after(*args, **kwargs)
+ return ret
+ return wrapper
+