You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by sm...@apache.org on 2015/04/22 22:13:00 UTC
[2/2] ambari git commit: Revert "Ambari-8189. Ambari agent support
for parallel task execution during deployment (Ivan Mitic via smohanty)"
Revert "Ambari-8189. Ambari agent support for parallel task execution during deployment (Ivan Mitic via smohanty)"
This reverts commit b1196605ee95560cbda723bdcb3590d84bd434ed.
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/a93a3126
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/a93a3126
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/a93a3126
Branch: refs/heads/trunk
Commit: a93a31265d7ba8799b911a015697415860651bfd
Parents: 606ac37
Author: Sumit Mohanty <sm...@hortonworks.com>
Authored: Wed Apr 22 13:12:49 2015 -0700
Committer: Sumit Mohanty <sm...@hortonworks.com>
Committed: Wed Apr 22 13:12:49 2015 -0700
----------------------------------------------------------------------
ambari-agent/conf/unix/ambari-agent.ini | 1 -
ambari-agent/conf/windows/ambari-agent.ini | 1 -
.../src/main/python/ambari_agent/ActionQueue.py | 33 +++------
.../main/python/ambari_agent/AmbariConfig.py | 4 --
.../ambari_agent/CustomServiceOrchestrator.py | 11 +--
.../main/python/ambari_agent/PythonExecutor.py | 11 +--
.../test/python/ambari_agent/TestActionQueue.py | 73 +++++---------------
.../TestCustomServiceOrchestrator.py | 24 +++----
.../test/python/ambari_agent/TestHeartbeat.py | 10 +--
9 files changed, 49 insertions(+), 119 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/a93a3126/ambari-agent/conf/unix/ambari-agent.ini
----------------------------------------------------------------------
diff --git a/ambari-agent/conf/unix/ambari-agent.ini b/ambari-agent/conf/unix/ambari-agent.ini
index 173bb51..ed9dab3 100644
--- a/ambari-agent/conf/unix/ambari-agent.ini
+++ b/ambari-agent/conf/unix/ambari-agent.ini
@@ -29,7 +29,6 @@ ping_port=8670
cache_dir=/var/lib/ambari-agent/cache
tolerate_download_failures=true
run_as_user=root
-parallel_execution=0
[command]
maxretries=2
http://git-wip-us.apache.org/repos/asf/ambari/blob/a93a3126/ambari-agent/conf/windows/ambari-agent.ini
----------------------------------------------------------------------
diff --git a/ambari-agent/conf/windows/ambari-agent.ini b/ambari-agent/conf/windows/ambari-agent.ini
index 61a3ad9..377dbf4 100644
--- a/ambari-agent/conf/windows/ambari-agent.ini
+++ b/ambari-agent/conf/windows/ambari-agent.ini
@@ -28,7 +28,6 @@ data_cleanup_max_size_MB = 100
ping_port=8670
cache_dir=cache
tolerate_download_failures=true
-parallel_execution=0
[command]
maxretries=2
http://git-wip-us.apache.org/repos/asf/ambari/blob/a93a3126/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
index c7286bc..212226c 100644
--- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
+++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
@@ -80,9 +80,7 @@ class ActionQueue(threading.Thread):
self._stop = threading.Event()
self.tmpdir = config.get('agent', 'prefix')
self.customServiceOrchestrator = CustomServiceOrchestrator(config, controller)
- self.parallel_execution = config.get_parallel_exec_option()
- if self.parallel_execution == 1:
- logger.info("Parallel execution is enabled, will start Agent commands in parallel")
+
def stop(self):
self._stop.set()
@@ -147,22 +145,10 @@ class ActionQueue(threading.Thread):
self.processBackgroundQueueSafeEmpty();
self.processStatusCommandQueueSafeEmpty();
try:
- if self.parallel_execution == 0:
- command = self.commandQueue.get(True, self.EXECUTION_COMMAND_WAIT_TIME)
- self.process_command(command)
- else:
- # If parallel execution is enabled, just kick off all available
- # commands using separate threads
- while (True):
- command = self.commandQueue.get(True, self.EXECUTION_COMMAND_WAIT_TIME)
- logger.info("Kicking off a thread for the command, id=" +
- str(command['commandId']) + " taskId=" + str(command['taskId']))
- t = threading.Thread(target=self.process_command, args=(command,))
- t.daemon = True
- t.start()
+ command = self.commandQueue.get(True, self.EXECUTION_COMMAND_WAIT_TIME)
+ self.process_command(command)
except (Queue.Empty):
pass
-
def processBackgroundQueueSafeEmpty(self):
while not self.backgroundCommandQueue.empty():
try:
@@ -338,9 +324,8 @@ class ActionQueue(threading.Thread):
def command_was_canceled(self):
self.customServiceOrchestrator
-
- def on_background_command_complete_callback(self, process_condensed_result, handle):
- logger.debug('Start callback: %s' % process_condensed_result)
+ def on_background_command_complete_callback(self, process_condenced_result, handle):
+ logger.debug('Start callback: %s' % process_condenced_result)
logger.debug('The handle is: %s' % handle)
status = self.COMPLETED_STATUS if handle.exitCode == 0 else self.FAILED_STATUS
@@ -355,10 +340,10 @@ class ActionQueue(threading.Thread):
roleResult = self.commandStatuses.generate_report_template(handle.command)
roleResult.update({
- 'stdout': process_condensed_result['stdout'] + aborted_postfix,
- 'stderr': process_condensed_result['stderr'] + aborted_postfix,
- 'exitCode': process_condensed_result['exitcode'],
- 'structuredOut': str(json.dumps(process_condensed_result['structuredOut'])) if 'structuredOut' in process_condensed_result else '',
+ 'stdout': process_condenced_result['stdout'] + aborted_postfix,
+ 'stderr': process_condenced_result['stderr'] + aborted_postfix,
+ 'exitCode': process_condenced_result['exitcode'],
+ 'structuredOut': str(json.dumps(process_condenced_result['structuredOut'])) if 'structuredOut' in process_condenced_result else '',
'status': status,
})
http://git-wip-us.apache.org/repos/asf/ambari/blob/a93a3126/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py b/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py
index ffb56bd..ffaaac7 100644
--- a/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py
+++ b/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py
@@ -43,7 +43,6 @@ data_cleanup_max_age=2592000
data_cleanup_max_size_MB = 100
ping_port=8670
cache_dir={ps}var{ps}lib{ps}ambari-agent{ps}cache
-parallel_execution=0
[services]
@@ -246,9 +245,6 @@ class AmbariConfig:
else:
return False
- def get_parallel_exec_option(self):
- return self.get('agent', 'parallel_execution', 0)
-
def updateConfigServerHostname(configFile, new_host):
# update agent config file
http://git-wip-us.apache.org/repos/asf/ambari/blob/a93a3126/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py b/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
index b107e3f..54738a6 100644
--- a/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
+++ b/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
@@ -61,6 +61,7 @@ class CustomServiceOrchestrator():
self.tmp_dir = config.get('agent', 'prefix')
self.exec_tmp_dir = config.get('agent', 'tmp_dir')
self.file_cache = FileCache(config)
+ self.python_executor = PythonExecutor(self.tmp_dir, config)
self.status_commands_stdout = os.path.join(self.tmp_dir,
'status_command_stdout.txt')
self.status_commands_stderr = os.path.join(self.tmp_dir,
@@ -95,13 +96,6 @@ class CustomServiceOrchestrator():
else:
logger.warn("Unable to find pid by taskId = %s" % task_id)
- def get_py_executor(self):
- """
- Wrapper for unit testing
- :return:
- """
- return PythonExecutor(self.tmp_dir, self.config)
-
def runCommand(self, command, tmpoutfile, tmperrfile, forced_command_name=None,
override_output_files=True, retry=False):
"""
@@ -178,11 +172,10 @@ class CustomServiceOrchestrator():
if command.has_key('commandType') and command['commandType'] == ActionQueue.BACKGROUND_EXECUTION_COMMAND and len(filtered_py_file_list) > 1:
raise AgentException("Background commands are supported without hooks only")
- python_executor = self.get_py_executor()
for py_file, current_base_dir in filtered_py_file_list:
log_info_on_failure = not command_name in self.DONT_DEBUG_FAILURES_FOR_COMMANDS
script_params = [command_name, json_path, current_base_dir]
- ret = python_executor.run_file(py_file, script_params,
+ ret = self.python_executor.run_file(py_file, script_params,
self.exec_tmp_dir, tmpoutfile, tmperrfile, timeout,
tmpstrucoutfile, logger_level, self.map_task_to_process,
task_id, override_output_files, handle = handle, log_info_on_failure=log_info_on_failure)
http://git-wip-us.apache.org/repos/asf/ambari/blob/a93a3126/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py b/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py
index 09be145..f215272 100644
--- a/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py
+++ b/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py
@@ -43,11 +43,11 @@ class PythonExecutor:
used as a singleton for a concurrent execution of python scripts
"""
NO_ERROR = "none"
+ grep = Grep()
+ event = threading.Event()
+ python_process_has_been_killed = False
def __init__(self, tmpDir, config):
- self.grep = Grep()
- self.event = threading.Event()
- self.python_process_has_been_killed = False
self.tmpDir = tmpDir
self.config = config
pass
@@ -181,10 +181,11 @@ class PythonExecutor:
def condenseOutput(self, stdout, stderr, retcode, structured_out):
log_lines_count = self.config.get('heartbeat', 'log_lines_count')
+ grep = self.grep
result = {
"exitcode": retcode,
- "stdout": self.grep.tail(stdout, log_lines_count) if log_lines_count else stdout,
- "stderr": self.grep.tail(stderr, log_lines_count) if log_lines_count else stderr,
+ "stdout": grep.tail(stdout, log_lines_count) if log_lines_count else stdout,
+ "stderr": grep.tail(stderr, log_lines_count) if log_lines_count else stderr,
"structuredOut" : structured_out
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/a93a3126/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
index 6aab74e..f43d3f7 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
@@ -228,17 +228,14 @@ class TestActionQueue(TestCase):
}
- @patch.object(AmbariConfig, "get_parallel_exec_option")
@patch.object(ActionQueue, "process_command")
@patch.object(Queue, "get")
@patch.object(CustomServiceOrchestrator, "__init__")
def test_ActionQueueStartStop(self, CustomServiceOrchestrator_mock,
- get_mock, process_command_mock, get_parallel_exec_option_mock):
+ get_mock, process_command_mock):
CustomServiceOrchestrator_mock.return_value = None
dummy_controller = MagicMock()
config = MagicMock()
- get_parallel_exec_option_mock.return_value = 0
- config.get_parallel_exec_option = get_parallel_exec_option_mock
actionQueue = ActionQueue(config, dummy_controller)
actionQueue.start()
time.sleep(0.1)
@@ -484,7 +481,7 @@ class TestActionQueue(TestCase):
}
cso_runCommand_mock.return_value = custom_service_orchestrator_execution_result_dict
- config = AmbariConfig()
+ config = AmbariConfig().getConfig()
tempdir = tempfile.gettempdir()
config.set('agent', 'prefix', tempdir)
config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
@@ -526,7 +523,7 @@ class TestActionQueue(TestCase):
}
cso_runCommand_mock.return_value = custom_service_orchestrator_execution_result_dict
- config = AmbariConfig()
+ config = AmbariConfig().getConfig()
tempdir = tempfile.gettempdir()
config.set('agent', 'prefix', tempdir)
config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
@@ -567,7 +564,7 @@ class TestActionQueue(TestCase):
status_update_callback):
CustomServiceOrchestrator_mock.return_value = None
dummy_controller = MagicMock()
- actionQueue = ActionQueue(AmbariConfig(), dummy_controller)
+ actionQueue = ActionQueue(AmbariConfig().getConfig(), dummy_controller)
build_mock.return_value = {'dummy report': '' }
@@ -603,7 +600,7 @@ class TestActionQueue(TestCase):
status_update_callback):
CustomServiceOrchestrator_mock.return_value = None
dummy_controller = MagicMock()
- actionQueue = ActionQueue(AmbariConfig(), dummy_controller)
+ actionQueue = ActionQueue(AmbariConfig().getConfig(), dummy_controller)
requestComponentStatus_mock.reset_mock()
@@ -623,17 +620,14 @@ class TestActionQueue(TestCase):
self.assertEqual(len(report['componentStatus']), 1)
self.assertTrue(report['componentStatus'][0].has_key('alerts'))
- @patch.object(AmbariConfig, "get_parallel_exec_option")
@patch.object(ActionQueue, "process_command")
@patch.object(Queue, "get")
@patch.object(CustomServiceOrchestrator, "__init__")
def test_reset_queue(self, CustomServiceOrchestrator_mock,
- get_mock, process_command_mock, gpeo_mock):
+ get_mock, process_command_mock):
CustomServiceOrchestrator_mock.return_value = None
dummy_controller = MagicMock()
config = MagicMock()
- gpeo_mock.return_value = 0
- config.get_parallel_exec_option = gpeo_mock
actionQueue = ActionQueue(config, dummy_controller)
actionQueue.start()
actionQueue.put([self.datanode_install_command, self.hbase_install_command])
@@ -645,17 +639,14 @@ class TestActionQueue(TestCase):
actionQueue.join()
self.assertEqual(actionQueue.stopped(), True, 'Action queue is not stopped.')
- @patch.object(AmbariConfig, "get_parallel_exec_option")
@patch.object(ActionQueue, "process_command")
@patch.object(Queue, "get")
@patch.object(CustomServiceOrchestrator, "__init__")
def test_cancel(self, CustomServiceOrchestrator_mock,
- get_mock, process_command_mock, gpeo_mock):
+ get_mock, process_command_mock):
CustomServiceOrchestrator_mock.return_value = None
dummy_controller = MagicMock()
config = MagicMock()
- gpeo_mock.return_value = 0
- config.get_parallel_exec_option = gpeo_mock
actionQueue = ActionQueue(config, dummy_controller)
actionQueue.start()
actionQueue.put([self.datanode_install_command, self.hbase_install_command])
@@ -667,27 +658,6 @@ class TestActionQueue(TestCase):
actionQueue.join()
self.assertEqual(actionQueue.stopped(), True, 'Action queue is not stopped.')
- @patch.object(AmbariConfig, "get_parallel_exec_option")
- @patch.object(ActionQueue, "process_command")
- @patch.object(CustomServiceOrchestrator, "__init__")
- def test_parallel_exec(self, CustomServiceOrchestrator_mock,
- process_command_mock, gpeo_mock):
- CustomServiceOrchestrator_mock.return_value = None
- dummy_controller = MagicMock()
- config = MagicMock()
- gpeo_mock.return_value = 1
- config.get_parallel_exec_option = gpeo_mock
- actionQueue = ActionQueue(config, dummy_controller)
- actionQueue.start()
- actionQueue.put([self.datanode_install_command, self.hbase_install_command])
- self.assertEqual(2, actionQueue.commandQueue.qsize())
- time.sleep(1)
- actionQueue.stop()
- actionQueue.join()
- self.assertEqual(actionQueue.stopped(), True, 'Action queue is not stopped.')
- self.assertEqual(2, process_command_mock.call_count)
- process_command_mock.assert_any_calls([call(self.datanode_install_command), call(self.hbase_install_command)])
-
@patch("time.sleep")
@patch.object(OSCheck, "os_distribution", new=MagicMock(return_value=os_distro_value))
@@ -698,7 +668,7 @@ class TestActionQueue(TestCase):
):
CustomServiceOrchestrator_mock.return_value = None
dummy_controller = MagicMock()
- actionQueue = ActionQueue(AmbariConfig(), dummy_controller)
+ actionQueue = ActionQueue(AmbariConfig().getConfig(), dummy_controller)
python_execution_result_dict = {
'exitcode': 1,
'stdout': 'out',
@@ -736,7 +706,7 @@ class TestActionQueue(TestCase):
):
CustomServiceOrchestrator_mock.return_value = None
dummy_controller = MagicMock()
- actionQueue = ActionQueue(AmbariConfig(), dummy_controller)
+ actionQueue = ActionQueue(AmbariConfig().getConfig(), dummy_controller)
execution_result_fail_dict = {
'exitcode': 1,
'stdout': 'out',
@@ -772,7 +742,7 @@ class TestActionQueue(TestCase):
):
CustomServiceOrchestrator_mock.return_value = None
dummy_controller = MagicMock()
- actionQueue = ActionQueue(AmbariConfig(), dummy_controller)
+ actionQueue = ActionQueue(AmbariConfig().getConfig(), dummy_controller)
execution_result_succ_dict = {
'exitcode': 0,
'stdout': 'out',
@@ -804,7 +774,7 @@ class TestActionQueue(TestCase):
'stderr' : 'err-13'}
dummy_controller = MagicMock()
- actionQueue = ActionQueue(AmbariConfig(), dummy_controller)
+ actionQueue = ActionQueue(AmbariConfig().getConfig(), dummy_controller)
execute_command = copy.deepcopy(self.background_command)
actionQueue.put([execute_command])
@@ -821,44 +791,39 @@ class TestActionQueue(TestCase):
self.assertEqual(len(report['reports']),1)
@not_for_platform(PLATFORM_WINDOWS)
- @patch.object(CustomServiceOrchestrator, "get_py_executor")
@patch.object(CustomServiceOrchestrator, "resolve_script_path")
@patch.object(StackVersionsFileHandler, "read_stack_version")
- def test_execute_python_executor(self, read_stack_version_mock, resolve_script_path_mock,
- get_py_executor_mock):
+ def test_execute_python_executor(self, read_stack_version_mock, resolve_script_path_mock):
dummy_controller = MagicMock()
- cfg = AmbariConfig()
+ cfg = AmbariConfig().getConfig()
cfg.set('agent', 'tolerate_download_failures', 'true')
cfg.set('agent', 'prefix', '.')
cfg.set('agent', 'cache_dir', 'background_tasks')
actionQueue = ActionQueue(cfg, dummy_controller)
- pyex = PythonExecutor(actionQueue.customServiceOrchestrator.tmp_dir, actionQueue.customServiceOrchestrator.config)
- patch_output_file(pyex)
- get_py_executor_mock.return_value = pyex
+ patch_output_file(actionQueue.customServiceOrchestrator.python_executor)
actionQueue.customServiceOrchestrator.dump_command_to_json = MagicMock()
result = {}
lock = threading.RLock()
complete_done = threading.Condition(lock)
- def command_complete_w(process_condensed_result, handle):
+ def command_complete_w(process_condenced_result, handle):
with lock:
- result['command_complete'] = {'condensed_result' : copy.copy(process_condensed_result),
+ result['command_complete'] = {'condenced_result' : copy.copy(process_condenced_result),
'handle' : copy.copy(handle),
'command_status' : actionQueue.commandStatuses.get_command_status(handle.command['taskId'])
}
complete_done.notifyAll()
-
- actionQueue.on_background_command_complete_callback = wraped(actionQueue.on_background_command_complete_callback,
- None, command_complete_w)
+
+ actionQueue.on_background_command_complete_callback = wraped(actionQueue.on_background_command_complete_callback,None, command_complete_w)
actionQueue.put([self.background_command])
actionQueue.processBackgroundQueueSafeEmpty();
actionQueue.processStatusCommandQueueSafeEmpty();
with lock:
- complete_done.wait(0.1)
+ complete_done.wait(.1)
finished_status = result['command_complete']['command_status']
self.assertEqual(finished_status['status'], ActionQueue.COMPLETED_STATUS)
http://git-wip-us.apache.org/repos/asf/ambari/blob/a93a3126/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py b/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py
index 2fb2ae5..a9e604d 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py
@@ -389,21 +389,19 @@ class TestCustomServiceOrchestrator(TestCase):
from ambari_agent.StackVersionsFileHandler import StackVersionsFileHandler
@patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
- @patch.object(CustomServiceOrchestrator, "get_py_executor")
@patch("ambari_commons.shell.kill_process_with_children")
@patch.object(FileCache, "__init__")
@patch.object(CustomServiceOrchestrator, "resolve_script_path")
@patch.object(CustomServiceOrchestrator, "resolve_hook_script_path")
@patch.object(StackVersionsFileHandler, "read_stack_version")
- def test_cancel_backgound_command(self, read_stack_version_mock, resolve_hook_script_path_mock,
- resolve_script_path_mock, FileCache_mock, kill_process_with_children_mock,
- get_py_executor_mock):
+ def test_cancel_backgound_command(self, read_stack_version_mock, resolve_hook_script_path_mock, resolve_script_path_mock, FileCache_mock,
+ kill_process_with_children_mock):
FileCache_mock.return_value = None
FileCache_mock.cache_dir = MagicMock()
resolve_hook_script_path_mock.return_value = None
# shell.kill_process_with_children = MagicMock()
dummy_controller = MagicMock()
- cfg = AmbariConfig()
+ cfg = AmbariConfig().getConfig()
cfg.set('agent', 'tolerate_download_failures', 'true')
cfg.set('agent', 'prefix', '.')
cfg.set('agent', 'cache_dir', 'background_tasks')
@@ -421,10 +419,8 @@ class TestCustomServiceOrchestrator(TestCase):
import TestActionQueue
import copy
- pyex = PythonExecutor(actionQueue.customServiceOrchestrator.tmp_dir, actionQueue.customServiceOrchestrator.config)
- TestActionQueue.patch_output_file(pyex)
- pyex.prepare_process_result = MagicMock()
- get_py_executor_mock.return_value = pyex
+ TestActionQueue.patch_output_file(orchestrator.python_executor)
+ orchestrator.python_executor.prepare_process_result = MagicMock()
orchestrator.dump_command_to_json = MagicMock()
lock = threading.RLock()
@@ -602,14 +598,12 @@ class TestCustomServiceOrchestrator(TestCase):
self.assertEqual('UNKNOWN', status)
- @patch.object(CustomServiceOrchestrator, "get_py_executor")
@patch.object(CustomServiceOrchestrator, "dump_command_to_json")
@patch.object(FileCache, "__init__")
@patch.object(FileCache, "get_custom_actions_base_dir")
def test_runCommand_background_action(self, get_custom_actions_base_dir_mock,
FileCache_mock,
- dump_command_to_json_mock,
- get_py_executor_mock):
+ dump_command_to_json_mock):
FileCache_mock.return_value = None
get_custom_actions_base_dir_mock.return_value = "some path"
_, script = tempfile.mkstemp()
@@ -631,10 +625,8 @@ class TestCustomServiceOrchestrator(TestCase):
orchestrator = CustomServiceOrchestrator(self.config, dummy_controller)
import TestActionQueue
- pyex = PythonExecutor(orchestrator.tmp_dir, orchestrator.config)
- TestActionQueue.patch_output_file(pyex)
- pyex.condenseOutput = MagicMock()
- get_py_executor_mock.return_value = pyex
+ TestActionQueue.patch_output_file(orchestrator.python_executor)
+ orchestrator.python_executor.condenseOutput = MagicMock()
orchestrator.dump_command_to_json = MagicMock()
ret = orchestrator.runCommand(command, "out.txt", "err.txt")
http://git-wip-us.apache.org/repos/asf/ambari/blob/a93a3126/ambari-agent/src/test/python/ambari_agent/TestHeartbeat.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestHeartbeat.py b/ambari-agent/src/test/python/ambari_agent/TestHeartbeat.py
index 0c78fdb..2f13ef5 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestHeartbeat.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestHeartbeat.py
@@ -55,7 +55,7 @@ class TestHeartbeat(TestCase):
def test_build(self):
- config = AmbariConfig.AmbariConfig()
+ config = AmbariConfig.AmbariConfig().getConfig()
config.set('agent', 'prefix', 'tmp')
config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
config.set('agent', 'tolerate_download_failures', "true")
@@ -94,7 +94,7 @@ class TestHeartbeat(TestCase):
'exitCode': 777}],
'componentStatus': [{'status': 'HEALTHY', 'componentName': 'NAMENODE'}]
}
- config = AmbariConfig.AmbariConfig()
+ config = AmbariConfig.AmbariConfig().getConfig()
config.set('agent', 'prefix', 'tmp')
config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
config.set('agent', 'tolerate_download_failures', "true")
@@ -110,7 +110,7 @@ class TestHeartbeat(TestCase):
@patch.object(ActionQueue, "result")
def test_build_long_result(self, result_mock):
- config = AmbariConfig.AmbariConfig()
+ config = AmbariConfig.AmbariConfig().getConfig()
config.set('agent', 'prefix', 'tmp')
config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
config.set('agent', 'tolerate_download_failures', "true")
@@ -205,7 +205,7 @@ class TestHeartbeat(TestCase):
@patch.object(Hardware, "_chk_mount", new = MagicMock(return_value=True))
@patch.object(HostInfoLinux, 'register')
def test_heartbeat_no_host_check_cmd_in_queue(self, register_mock):
- config = AmbariConfig.AmbariConfig()
+ config = AmbariConfig.AmbariConfig().getConfig()
config.set('agent', 'prefix', 'tmp')
config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
config.set('agent', 'tolerate_download_failures', "true")
@@ -232,7 +232,7 @@ class TestHeartbeat(TestCase):
@patch.object(Hardware, "_chk_mount", new = MagicMock(return_value=True))
@patch.object(HostInfoLinux, 'register')
def test_heartbeat_host_check_no_cmd(self, register_mock):
- config = AmbariConfig.AmbariConfig()
+ config = AmbariConfig.AmbariConfig().getConfig()
config.set('agent', 'prefix', 'tmp')
config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
config.set('agent', 'tolerate_download_failures', "true")