You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by sm...@apache.org on 2015/04/22 20:38:57 UTC
ambari git commit: Ambari-8189. Ambari agent support for parallel
task execution during deployment (Ivan Mitic via smohanty)
Repository: ambari
Updated Branches:
refs/heads/trunk fb87f8d29 -> b1196605e
Ambari-8189. Ambari agent support for parallel task execution during deployment (Ivan Mitic via smohanty)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/b1196605
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/b1196605
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/b1196605
Branch: refs/heads/trunk
Commit: b1196605ee95560cbda723bdcb3590d84bd434ed
Parents: fb87f8d
Author: Sumit Mohanty <sm...@hortonworks.com>
Authored: Wed Apr 22 11:38:42 2015 -0700
Committer: Sumit Mohanty <sm...@hortonworks.com>
Committed: Wed Apr 22 11:38:42 2015 -0700
----------------------------------------------------------------------
ambari-agent/conf/unix/ambari-agent.ini | 1 +
ambari-agent/conf/windows/ambari-agent.ini | 1 +
.../src/main/python/ambari_agent/ActionQueue.py | 33 ++++++---
.../main/python/ambari_agent/AmbariConfig.py | 4 ++
.../ambari_agent/CustomServiceOrchestrator.py | 11 ++-
.../main/python/ambari_agent/PythonExecutor.py | 11 ++-
.../test/python/ambari_agent/TestActionQueue.py | 73 +++++++++++++++-----
.../TestCustomServiceOrchestrator.py | 24 ++++---
.../test/python/ambari_agent/TestHeartbeat.py | 10 +--
9 files changed, 119 insertions(+), 49 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/b1196605/ambari-agent/conf/unix/ambari-agent.ini
----------------------------------------------------------------------
diff --git a/ambari-agent/conf/unix/ambari-agent.ini b/ambari-agent/conf/unix/ambari-agent.ini
index ed9dab3..173bb51 100644
--- a/ambari-agent/conf/unix/ambari-agent.ini
+++ b/ambari-agent/conf/unix/ambari-agent.ini
@@ -29,6 +29,7 @@ ping_port=8670
cache_dir=/var/lib/ambari-agent/cache
tolerate_download_failures=true
run_as_user=root
+parallel_execution=0
[command]
maxretries=2
http://git-wip-us.apache.org/repos/asf/ambari/blob/b1196605/ambari-agent/conf/windows/ambari-agent.ini
----------------------------------------------------------------------
diff --git a/ambari-agent/conf/windows/ambari-agent.ini b/ambari-agent/conf/windows/ambari-agent.ini
index 377dbf4..61a3ad9 100644
--- a/ambari-agent/conf/windows/ambari-agent.ini
+++ b/ambari-agent/conf/windows/ambari-agent.ini
@@ -28,6 +28,7 @@ data_cleanup_max_size_MB = 100
ping_port=8670
cache_dir=cache
tolerate_download_failures=true
+parallel_execution=0
[command]
maxretries=2
http://git-wip-us.apache.org/repos/asf/ambari/blob/b1196605/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
index 212226c..c7286bc 100644
--- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
+++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
@@ -80,7 +80,9 @@ class ActionQueue(threading.Thread):
self._stop = threading.Event()
self.tmpdir = config.get('agent', 'prefix')
self.customServiceOrchestrator = CustomServiceOrchestrator(config, controller)
-
+ self.parallel_execution = config.get_parallel_exec_option()
+ if self.parallel_execution == 1:
+ logger.info("Parallel execution is enabled, will start Agent commands in parallel")
def stop(self):
self._stop.set()
@@ -145,10 +147,22 @@ class ActionQueue(threading.Thread):
self.processBackgroundQueueSafeEmpty();
self.processStatusCommandQueueSafeEmpty();
try:
- command = self.commandQueue.get(True, self.EXECUTION_COMMAND_WAIT_TIME)
- self.process_command(command)
+ if self.parallel_execution == 0:
+ command = self.commandQueue.get(True, self.EXECUTION_COMMAND_WAIT_TIME)
+ self.process_command(command)
+ else:
+ # If parallel execution is enabled, just kick off all available
+ # commands using separate threads
+ while (True):
+ command = self.commandQueue.get(True, self.EXECUTION_COMMAND_WAIT_TIME)
+ logger.info("Kicking off a thread for the command, id=" +
+ str(command['commandId']) + " taskId=" + str(command['taskId']))
+ t = threading.Thread(target=self.process_command, args=(command,))
+ t.daemon = True
+ t.start()
except (Queue.Empty):
pass
+
def processBackgroundQueueSafeEmpty(self):
while not self.backgroundCommandQueue.empty():
try:
@@ -324,8 +338,9 @@ class ActionQueue(threading.Thread):
def command_was_canceled(self):
self.customServiceOrchestrator
- def on_background_command_complete_callback(self, process_condenced_result, handle):
- logger.debug('Start callback: %s' % process_condenced_result)
+
+ def on_background_command_complete_callback(self, process_condensed_result, handle):
+ logger.debug('Start callback: %s' % process_condensed_result)
logger.debug('The handle is: %s' % handle)
status = self.COMPLETED_STATUS if handle.exitCode == 0 else self.FAILED_STATUS
@@ -340,10 +355,10 @@ class ActionQueue(threading.Thread):
roleResult = self.commandStatuses.generate_report_template(handle.command)
roleResult.update({
- 'stdout': process_condenced_result['stdout'] + aborted_postfix,
- 'stderr': process_condenced_result['stderr'] + aborted_postfix,
- 'exitCode': process_condenced_result['exitcode'],
- 'structuredOut': str(json.dumps(process_condenced_result['structuredOut'])) if 'structuredOut' in process_condenced_result else '',
+ 'stdout': process_condensed_result['stdout'] + aborted_postfix,
+ 'stderr': process_condensed_result['stderr'] + aborted_postfix,
+ 'exitCode': process_condensed_result['exitcode'],
+ 'structuredOut': str(json.dumps(process_condensed_result['structuredOut'])) if 'structuredOut' in process_condensed_result else '',
'status': status,
})
http://git-wip-us.apache.org/repos/asf/ambari/blob/b1196605/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py b/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py
index ffaaac7..ffb56bd 100644
--- a/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py
+++ b/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py
@@ -43,6 +43,7 @@ data_cleanup_max_age=2592000
data_cleanup_max_size_MB = 100
ping_port=8670
cache_dir={ps}var{ps}lib{ps}ambari-agent{ps}cache
+parallel_execution=0
[services]
@@ -245,6 +246,9 @@ class AmbariConfig:
else:
return False
+ def get_parallel_exec_option(self):
+ return self.get('agent', 'parallel_execution', 0)
+
def updateConfigServerHostname(configFile, new_host):
# update agent config file
http://git-wip-us.apache.org/repos/asf/ambari/blob/b1196605/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py b/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
index 54738a6..b107e3f 100644
--- a/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
+++ b/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
@@ -61,7 +61,6 @@ class CustomServiceOrchestrator():
self.tmp_dir = config.get('agent', 'prefix')
self.exec_tmp_dir = config.get('agent', 'tmp_dir')
self.file_cache = FileCache(config)
- self.python_executor = PythonExecutor(self.tmp_dir, config)
self.status_commands_stdout = os.path.join(self.tmp_dir,
'status_command_stdout.txt')
self.status_commands_stderr = os.path.join(self.tmp_dir,
@@ -96,6 +95,13 @@ class CustomServiceOrchestrator():
else:
logger.warn("Unable to find pid by taskId = %s" % task_id)
+ def get_py_executor(self):
+ """
+ Wrapper for unit testing
+ :return:
+ """
+ return PythonExecutor(self.tmp_dir, self.config)
+
def runCommand(self, command, tmpoutfile, tmperrfile, forced_command_name=None,
override_output_files=True, retry=False):
"""
@@ -172,10 +178,11 @@ class CustomServiceOrchestrator():
if command.has_key('commandType') and command['commandType'] == ActionQueue.BACKGROUND_EXECUTION_COMMAND and len(filtered_py_file_list) > 1:
raise AgentException("Background commands are supported without hooks only")
+ python_executor = self.get_py_executor()
for py_file, current_base_dir in filtered_py_file_list:
log_info_on_failure = not command_name in self.DONT_DEBUG_FAILURES_FOR_COMMANDS
script_params = [command_name, json_path, current_base_dir]
- ret = self.python_executor.run_file(py_file, script_params,
+ ret = python_executor.run_file(py_file, script_params,
self.exec_tmp_dir, tmpoutfile, tmperrfile, timeout,
tmpstrucoutfile, logger_level, self.map_task_to_process,
task_id, override_output_files, handle = handle, log_info_on_failure=log_info_on_failure)
http://git-wip-us.apache.org/repos/asf/ambari/blob/b1196605/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py b/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py
index f215272..09be145 100644
--- a/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py
+++ b/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py
@@ -43,11 +43,11 @@ class PythonExecutor:
used as a singleton for a concurrent execution of python scripts
"""
NO_ERROR = "none"
- grep = Grep()
- event = threading.Event()
- python_process_has_been_killed = False
def __init__(self, tmpDir, config):
+ self.grep = Grep()
+ self.event = threading.Event()
+ self.python_process_has_been_killed = False
self.tmpDir = tmpDir
self.config = config
pass
@@ -181,11 +181,10 @@ class PythonExecutor:
def condenseOutput(self, stdout, stderr, retcode, structured_out):
log_lines_count = self.config.get('heartbeat', 'log_lines_count')
- grep = self.grep
result = {
"exitcode": retcode,
- "stdout": grep.tail(stdout, log_lines_count) if log_lines_count else stdout,
- "stderr": grep.tail(stderr, log_lines_count) if log_lines_count else stderr,
+ "stdout": self.grep.tail(stdout, log_lines_count) if log_lines_count else stdout,
+ "stderr": self.grep.tail(stderr, log_lines_count) if log_lines_count else stderr,
"structuredOut" : structured_out
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b1196605/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
index f43d3f7..6aab74e 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
@@ -228,14 +228,17 @@ class TestActionQueue(TestCase):
}
+ @patch.object(AmbariConfig, "get_parallel_exec_option")
@patch.object(ActionQueue, "process_command")
@patch.object(Queue, "get")
@patch.object(CustomServiceOrchestrator, "__init__")
def test_ActionQueueStartStop(self, CustomServiceOrchestrator_mock,
- get_mock, process_command_mock):
+ get_mock, process_command_mock, get_parallel_exec_option_mock):
CustomServiceOrchestrator_mock.return_value = None
dummy_controller = MagicMock()
config = MagicMock()
+ get_parallel_exec_option_mock.return_value = 0
+ config.get_parallel_exec_option = get_parallel_exec_option_mock
actionQueue = ActionQueue(config, dummy_controller)
actionQueue.start()
time.sleep(0.1)
@@ -481,7 +484,7 @@ class TestActionQueue(TestCase):
}
cso_runCommand_mock.return_value = custom_service_orchestrator_execution_result_dict
- config = AmbariConfig().getConfig()
+ config = AmbariConfig()
tempdir = tempfile.gettempdir()
config.set('agent', 'prefix', tempdir)
config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
@@ -523,7 +526,7 @@ class TestActionQueue(TestCase):
}
cso_runCommand_mock.return_value = custom_service_orchestrator_execution_result_dict
- config = AmbariConfig().getConfig()
+ config = AmbariConfig()
tempdir = tempfile.gettempdir()
config.set('agent', 'prefix', tempdir)
config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
@@ -564,7 +567,7 @@ class TestActionQueue(TestCase):
status_update_callback):
CustomServiceOrchestrator_mock.return_value = None
dummy_controller = MagicMock()
- actionQueue = ActionQueue(AmbariConfig().getConfig(), dummy_controller)
+ actionQueue = ActionQueue(AmbariConfig(), dummy_controller)
build_mock.return_value = {'dummy report': '' }
@@ -600,7 +603,7 @@ class TestActionQueue(TestCase):
status_update_callback):
CustomServiceOrchestrator_mock.return_value = None
dummy_controller = MagicMock()
- actionQueue = ActionQueue(AmbariConfig().getConfig(), dummy_controller)
+ actionQueue = ActionQueue(AmbariConfig(), dummy_controller)
requestComponentStatus_mock.reset_mock()
@@ -620,14 +623,17 @@ class TestActionQueue(TestCase):
self.assertEqual(len(report['componentStatus']), 1)
self.assertTrue(report['componentStatus'][0].has_key('alerts'))
+ @patch.object(AmbariConfig, "get_parallel_exec_option")
@patch.object(ActionQueue, "process_command")
@patch.object(Queue, "get")
@patch.object(CustomServiceOrchestrator, "__init__")
def test_reset_queue(self, CustomServiceOrchestrator_mock,
- get_mock, process_command_mock):
+ get_mock, process_command_mock, gpeo_mock):
CustomServiceOrchestrator_mock.return_value = None
dummy_controller = MagicMock()
config = MagicMock()
+ gpeo_mock.return_value = 0
+ config.get_parallel_exec_option = gpeo_mock
actionQueue = ActionQueue(config, dummy_controller)
actionQueue.start()
actionQueue.put([self.datanode_install_command, self.hbase_install_command])
@@ -639,14 +645,17 @@ class TestActionQueue(TestCase):
actionQueue.join()
self.assertEqual(actionQueue.stopped(), True, 'Action queue is not stopped.')
+ @patch.object(AmbariConfig, "get_parallel_exec_option")
@patch.object(ActionQueue, "process_command")
@patch.object(Queue, "get")
@patch.object(CustomServiceOrchestrator, "__init__")
def test_cancel(self, CustomServiceOrchestrator_mock,
- get_mock, process_command_mock):
+ get_mock, process_command_mock, gpeo_mock):
CustomServiceOrchestrator_mock.return_value = None
dummy_controller = MagicMock()
config = MagicMock()
+ gpeo_mock.return_value = 0
+ config.get_parallel_exec_option = gpeo_mock
actionQueue = ActionQueue(config, dummy_controller)
actionQueue.start()
actionQueue.put([self.datanode_install_command, self.hbase_install_command])
@@ -658,6 +667,27 @@ class TestActionQueue(TestCase):
actionQueue.join()
self.assertEqual(actionQueue.stopped(), True, 'Action queue is not stopped.')
+ @patch.object(AmbariConfig, "get_parallel_exec_option")
+ @patch.object(ActionQueue, "process_command")
+ @patch.object(CustomServiceOrchestrator, "__init__")
+ def test_parallel_exec(self, CustomServiceOrchestrator_mock,
+ process_command_mock, gpeo_mock):
+ CustomServiceOrchestrator_mock.return_value = None
+ dummy_controller = MagicMock()
+ config = MagicMock()
+ gpeo_mock.return_value = 1
+ config.get_parallel_exec_option = gpeo_mock
+ actionQueue = ActionQueue(config, dummy_controller)
+ actionQueue.start()
+ actionQueue.put([self.datanode_install_command, self.hbase_install_command])
+ self.assertEqual(2, actionQueue.commandQueue.qsize())
+ time.sleep(1)
+ actionQueue.stop()
+ actionQueue.join()
+ self.assertEqual(actionQueue.stopped(), True, 'Action queue is not stopped.')
+ self.assertEqual(2, process_command_mock.call_count)
+ process_command_mock.assert_any_calls([call(self.datanode_install_command), call(self.hbase_install_command)])
+
@patch("time.sleep")
@patch.object(OSCheck, "os_distribution", new=MagicMock(return_value=os_distro_value))
@@ -668,7 +698,7 @@ class TestActionQueue(TestCase):
):
CustomServiceOrchestrator_mock.return_value = None
dummy_controller = MagicMock()
- actionQueue = ActionQueue(AmbariConfig().getConfig(), dummy_controller)
+ actionQueue = ActionQueue(AmbariConfig(), dummy_controller)
python_execution_result_dict = {
'exitcode': 1,
'stdout': 'out',
@@ -706,7 +736,7 @@ class TestActionQueue(TestCase):
):
CustomServiceOrchestrator_mock.return_value = None
dummy_controller = MagicMock()
- actionQueue = ActionQueue(AmbariConfig().getConfig(), dummy_controller)
+ actionQueue = ActionQueue(AmbariConfig(), dummy_controller)
execution_result_fail_dict = {
'exitcode': 1,
'stdout': 'out',
@@ -742,7 +772,7 @@ class TestActionQueue(TestCase):
):
CustomServiceOrchestrator_mock.return_value = None
dummy_controller = MagicMock()
- actionQueue = ActionQueue(AmbariConfig().getConfig(), dummy_controller)
+ actionQueue = ActionQueue(AmbariConfig(), dummy_controller)
execution_result_succ_dict = {
'exitcode': 0,
'stdout': 'out',
@@ -774,7 +804,7 @@ class TestActionQueue(TestCase):
'stderr' : 'err-13'}
dummy_controller = MagicMock()
- actionQueue = ActionQueue(AmbariConfig().getConfig(), dummy_controller)
+ actionQueue = ActionQueue(AmbariConfig(), dummy_controller)
execute_command = copy.deepcopy(self.background_command)
actionQueue.put([execute_command])
@@ -791,39 +821,44 @@ class TestActionQueue(TestCase):
self.assertEqual(len(report['reports']),1)
@not_for_platform(PLATFORM_WINDOWS)
+ @patch.object(CustomServiceOrchestrator, "get_py_executor")
@patch.object(CustomServiceOrchestrator, "resolve_script_path")
@patch.object(StackVersionsFileHandler, "read_stack_version")
- def test_execute_python_executor(self, read_stack_version_mock, resolve_script_path_mock):
+ def test_execute_python_executor(self, read_stack_version_mock, resolve_script_path_mock,
+ get_py_executor_mock):
dummy_controller = MagicMock()
- cfg = AmbariConfig().getConfig()
+ cfg = AmbariConfig()
cfg.set('agent', 'tolerate_download_failures', 'true')
cfg.set('agent', 'prefix', '.')
cfg.set('agent', 'cache_dir', 'background_tasks')
actionQueue = ActionQueue(cfg, dummy_controller)
- patch_output_file(actionQueue.customServiceOrchestrator.python_executor)
+ pyex = PythonExecutor(actionQueue.customServiceOrchestrator.tmp_dir, actionQueue.customServiceOrchestrator.config)
+ patch_output_file(pyex)
+ get_py_executor_mock.return_value = pyex
actionQueue.customServiceOrchestrator.dump_command_to_json = MagicMock()
result = {}
lock = threading.RLock()
complete_done = threading.Condition(lock)
- def command_complete_w(process_condenced_result, handle):
+ def command_complete_w(process_condensed_result, handle):
with lock:
- result['command_complete'] = {'condenced_result' : copy.copy(process_condenced_result),
+ result['command_complete'] = {'condensed_result' : copy.copy(process_condensed_result),
'handle' : copy.copy(handle),
'command_status' : actionQueue.commandStatuses.get_command_status(handle.command['taskId'])
}
complete_done.notifyAll()
-
- actionQueue.on_background_command_complete_callback = wraped(actionQueue.on_background_command_complete_callback,None, command_complete_w)
+
+ actionQueue.on_background_command_complete_callback = wraped(actionQueue.on_background_command_complete_callback,
+ None, command_complete_w)
actionQueue.put([self.background_command])
actionQueue.processBackgroundQueueSafeEmpty();
actionQueue.processStatusCommandQueueSafeEmpty();
with lock:
- complete_done.wait(.1)
+ complete_done.wait(0.1)
finished_status = result['command_complete']['command_status']
self.assertEqual(finished_status['status'], ActionQueue.COMPLETED_STATUS)
http://git-wip-us.apache.org/repos/asf/ambari/blob/b1196605/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py b/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py
index a9e604d..2fb2ae5 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py
@@ -389,19 +389,21 @@ class TestCustomServiceOrchestrator(TestCase):
from ambari_agent.StackVersionsFileHandler import StackVersionsFileHandler
@patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
+ @patch.object(CustomServiceOrchestrator, "get_py_executor")
@patch("ambari_commons.shell.kill_process_with_children")
@patch.object(FileCache, "__init__")
@patch.object(CustomServiceOrchestrator, "resolve_script_path")
@patch.object(CustomServiceOrchestrator, "resolve_hook_script_path")
@patch.object(StackVersionsFileHandler, "read_stack_version")
- def test_cancel_backgound_command(self, read_stack_version_mock, resolve_hook_script_path_mock, resolve_script_path_mock, FileCache_mock,
- kill_process_with_children_mock):
+ def test_cancel_backgound_command(self, read_stack_version_mock, resolve_hook_script_path_mock,
+ resolve_script_path_mock, FileCache_mock, kill_process_with_children_mock,
+ get_py_executor_mock):
FileCache_mock.return_value = None
FileCache_mock.cache_dir = MagicMock()
resolve_hook_script_path_mock.return_value = None
# shell.kill_process_with_children = MagicMock()
dummy_controller = MagicMock()
- cfg = AmbariConfig().getConfig()
+ cfg = AmbariConfig()
cfg.set('agent', 'tolerate_download_failures', 'true')
cfg.set('agent', 'prefix', '.')
cfg.set('agent', 'cache_dir', 'background_tasks')
@@ -419,8 +421,10 @@ class TestCustomServiceOrchestrator(TestCase):
import TestActionQueue
import copy
- TestActionQueue.patch_output_file(orchestrator.python_executor)
- orchestrator.python_executor.prepare_process_result = MagicMock()
+ pyex = PythonExecutor(actionQueue.customServiceOrchestrator.tmp_dir, actionQueue.customServiceOrchestrator.config)
+ TestActionQueue.patch_output_file(pyex)
+ pyex.prepare_process_result = MagicMock()
+ get_py_executor_mock.return_value = pyex
orchestrator.dump_command_to_json = MagicMock()
lock = threading.RLock()
@@ -598,12 +602,14 @@ class TestCustomServiceOrchestrator(TestCase):
self.assertEqual('UNKNOWN', status)
+ @patch.object(CustomServiceOrchestrator, "get_py_executor")
@patch.object(CustomServiceOrchestrator, "dump_command_to_json")
@patch.object(FileCache, "__init__")
@patch.object(FileCache, "get_custom_actions_base_dir")
def test_runCommand_background_action(self, get_custom_actions_base_dir_mock,
FileCache_mock,
- dump_command_to_json_mock):
+ dump_command_to_json_mock,
+ get_py_executor_mock):
FileCache_mock.return_value = None
get_custom_actions_base_dir_mock.return_value = "some path"
_, script = tempfile.mkstemp()
@@ -625,8 +631,10 @@ class TestCustomServiceOrchestrator(TestCase):
orchestrator = CustomServiceOrchestrator(self.config, dummy_controller)
import TestActionQueue
- TestActionQueue.patch_output_file(orchestrator.python_executor)
- orchestrator.python_executor.condenseOutput = MagicMock()
+ pyex = PythonExecutor(orchestrator.tmp_dir, orchestrator.config)
+ TestActionQueue.patch_output_file(pyex)
+ pyex.condenseOutput = MagicMock()
+ get_py_executor_mock.return_value = pyex
orchestrator.dump_command_to_json = MagicMock()
ret = orchestrator.runCommand(command, "out.txt", "err.txt")
http://git-wip-us.apache.org/repos/asf/ambari/blob/b1196605/ambari-agent/src/test/python/ambari_agent/TestHeartbeat.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestHeartbeat.py b/ambari-agent/src/test/python/ambari_agent/TestHeartbeat.py
index 2f13ef5..0c78fdb 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestHeartbeat.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestHeartbeat.py
@@ -55,7 +55,7 @@ class TestHeartbeat(TestCase):
def test_build(self):
- config = AmbariConfig.AmbariConfig().getConfig()
+ config = AmbariConfig.AmbariConfig()
config.set('agent', 'prefix', 'tmp')
config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
config.set('agent', 'tolerate_download_failures', "true")
@@ -94,7 +94,7 @@ class TestHeartbeat(TestCase):
'exitCode': 777}],
'componentStatus': [{'status': 'HEALTHY', 'componentName': 'NAMENODE'}]
}
- config = AmbariConfig.AmbariConfig().getConfig()
+ config = AmbariConfig.AmbariConfig()
config.set('agent', 'prefix', 'tmp')
config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
config.set('agent', 'tolerate_download_failures', "true")
@@ -110,7 +110,7 @@ class TestHeartbeat(TestCase):
@patch.object(ActionQueue, "result")
def test_build_long_result(self, result_mock):
- config = AmbariConfig.AmbariConfig().getConfig()
+ config = AmbariConfig.AmbariConfig()
config.set('agent', 'prefix', 'tmp')
config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
config.set('agent', 'tolerate_download_failures', "true")
@@ -205,7 +205,7 @@ class TestHeartbeat(TestCase):
@patch.object(Hardware, "_chk_mount", new = MagicMock(return_value=True))
@patch.object(HostInfoLinux, 'register')
def test_heartbeat_no_host_check_cmd_in_queue(self, register_mock):
- config = AmbariConfig.AmbariConfig().getConfig()
+ config = AmbariConfig.AmbariConfig()
config.set('agent', 'prefix', 'tmp')
config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
config.set('agent', 'tolerate_download_failures', "true")
@@ -232,7 +232,7 @@ class TestHeartbeat(TestCase):
@patch.object(Hardware, "_chk_mount", new = MagicMock(return_value=True))
@patch.object(HostInfoLinux, 'register')
def test_heartbeat_host_check_no_cmd(self, register_mock):
- config = AmbariConfig.AmbariConfig().getConfig()
+ config = AmbariConfig.AmbariConfig()
config.set('agent', 'prefix', 'tmp')
config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
config.set('agent', 'tolerate_download_failures', "true")