You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by sm...@apache.org on 2015/04/22 22:13:00 UTC

[2/2] ambari git commit: Revert "Ambari-8189. Ambari agent support for parallel task execution during deployment (Ivan Mitic via smohanty)"

Revert "Ambari-8189. Ambari agent support for parallel task execution during deployment (Ivan Mitic via smohanty)"

This reverts commit b1196605ee95560cbda723bdcb3590d84bd434ed.


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/a93a3126
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/a93a3126
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/a93a3126

Branch: refs/heads/trunk
Commit: a93a31265d7ba8799b911a015697415860651bfd
Parents: 606ac37
Author: Sumit Mohanty <sm...@hortonworks.com>
Authored: Wed Apr 22 13:12:49 2015 -0700
Committer: Sumit Mohanty <sm...@hortonworks.com>
Committed: Wed Apr 22 13:12:49 2015 -0700

----------------------------------------------------------------------
 ambari-agent/conf/unix/ambari-agent.ini         |  1 -
 ambari-agent/conf/windows/ambari-agent.ini      |  1 -
 .../src/main/python/ambari_agent/ActionQueue.py | 33 +++------
 .../main/python/ambari_agent/AmbariConfig.py    |  4 --
 .../ambari_agent/CustomServiceOrchestrator.py   | 11 +--
 .../main/python/ambari_agent/PythonExecutor.py  | 11 +--
 .../test/python/ambari_agent/TestActionQueue.py | 73 +++++---------------
 .../TestCustomServiceOrchestrator.py            | 24 +++----
 .../test/python/ambari_agent/TestHeartbeat.py   | 10 +--
 9 files changed, 49 insertions(+), 119 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/a93a3126/ambari-agent/conf/unix/ambari-agent.ini
----------------------------------------------------------------------
diff --git a/ambari-agent/conf/unix/ambari-agent.ini b/ambari-agent/conf/unix/ambari-agent.ini
index 173bb51..ed9dab3 100644
--- a/ambari-agent/conf/unix/ambari-agent.ini
+++ b/ambari-agent/conf/unix/ambari-agent.ini
@@ -29,7 +29,6 @@ ping_port=8670
 cache_dir=/var/lib/ambari-agent/cache
 tolerate_download_failures=true
 run_as_user=root
-parallel_execution=0
 
 [command]
 maxretries=2

http://git-wip-us.apache.org/repos/asf/ambari/blob/a93a3126/ambari-agent/conf/windows/ambari-agent.ini
----------------------------------------------------------------------
diff --git a/ambari-agent/conf/windows/ambari-agent.ini b/ambari-agent/conf/windows/ambari-agent.ini
index 61a3ad9..377dbf4 100644
--- a/ambari-agent/conf/windows/ambari-agent.ini
+++ b/ambari-agent/conf/windows/ambari-agent.ini
@@ -28,7 +28,6 @@ data_cleanup_max_size_MB = 100
 ping_port=8670
 cache_dir=cache
 tolerate_download_failures=true
-parallel_execution=0
 
 [command]
 maxretries=2

http://git-wip-us.apache.org/repos/asf/ambari/blob/a93a3126/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
index c7286bc..212226c 100644
--- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
+++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
@@ -80,9 +80,7 @@ class ActionQueue(threading.Thread):
     self._stop = threading.Event()
     self.tmpdir = config.get('agent', 'prefix')
     self.customServiceOrchestrator = CustomServiceOrchestrator(config, controller)
-    self.parallel_execution = config.get_parallel_exec_option()
-    if self.parallel_execution == 1:
-      logger.info("Parallel execution is enabled, will start Agent commands in parallel")
+
 
   def stop(self):
     self._stop.set()
@@ -147,22 +145,10 @@ class ActionQueue(threading.Thread):
       self.processBackgroundQueueSafeEmpty();
       self.processStatusCommandQueueSafeEmpty();
       try:
-        if self.parallel_execution == 0:
-          command = self.commandQueue.get(True, self.EXECUTION_COMMAND_WAIT_TIME)
-          self.process_command(command)
-        else:
-          # If parallel execution is enabled, just kick off all available
-          # commands using separate threads
-          while (True):
-            command = self.commandQueue.get(True, self.EXECUTION_COMMAND_WAIT_TIME)
-            logger.info("Kicking off a thread for the command, id=" +
-                        str(command['commandId']) + " taskId=" + str(command['taskId']))
-            t = threading.Thread(target=self.process_command, args=(command,))
-            t.daemon = True
-            t.start()
+        command = self.commandQueue.get(True, self.EXECUTION_COMMAND_WAIT_TIME)
+        self.process_command(command)
       except (Queue.Empty):
         pass
-
   def processBackgroundQueueSafeEmpty(self):
     while not self.backgroundCommandQueue.empty():
       try:
@@ -338,9 +324,8 @@ class ActionQueue(threading.Thread):
 
   def command_was_canceled(self):
     self.customServiceOrchestrator
-
-  def on_background_command_complete_callback(self, process_condensed_result, handle):
-    logger.debug('Start callback: %s' % process_condensed_result)
+  def on_background_command_complete_callback(self, process_condenced_result, handle):
+    logger.debug('Start callback: %s' % process_condenced_result)
     logger.debug('The handle is: %s' % handle)
     status = self.COMPLETED_STATUS if handle.exitCode == 0 else self.FAILED_STATUS
 
@@ -355,10 +340,10 @@ class ActionQueue(threading.Thread):
     roleResult = self.commandStatuses.generate_report_template(handle.command)
 
     roleResult.update({
-      'stdout': process_condensed_result['stdout'] + aborted_postfix,
-      'stderr': process_condensed_result['stderr'] + aborted_postfix,
-      'exitCode': process_condensed_result['exitcode'],
-      'structuredOut': str(json.dumps(process_condensed_result['structuredOut'])) if 'structuredOut' in process_condensed_result else '',
+      'stdout': process_condenced_result['stdout'] + aborted_postfix,
+      'stderr': process_condenced_result['stderr'] + aborted_postfix,
+      'exitCode': process_condenced_result['exitcode'],
+      'structuredOut': str(json.dumps(process_condenced_result['structuredOut'])) if 'structuredOut' in process_condenced_result else '',
       'status': status,
     })
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/a93a3126/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py b/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py
index ffb56bd..ffaaac7 100644
--- a/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py
+++ b/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py
@@ -43,7 +43,6 @@ data_cleanup_max_age=2592000
 data_cleanup_max_size_MB = 100
 ping_port=8670
 cache_dir={ps}var{ps}lib{ps}ambari-agent{ps}cache
-parallel_execution=0
 
 [services]
 
@@ -246,9 +245,6 @@ class AmbariConfig:
     else:
       return False
 
-  def get_parallel_exec_option(self):
-    return self.get('agent', 'parallel_execution', 0)
-
 
 def updateConfigServerHostname(configFile, new_host):
   # update agent config file

http://git-wip-us.apache.org/repos/asf/ambari/blob/a93a3126/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py b/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
index b107e3f..54738a6 100644
--- a/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
+++ b/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
@@ -61,6 +61,7 @@ class CustomServiceOrchestrator():
     self.tmp_dir = config.get('agent', 'prefix')
     self.exec_tmp_dir = config.get('agent', 'tmp_dir')
     self.file_cache = FileCache(config)
+    self.python_executor = PythonExecutor(self.tmp_dir, config)
     self.status_commands_stdout = os.path.join(self.tmp_dir,
                                                'status_command_stdout.txt')
     self.status_commands_stderr = os.path.join(self.tmp_dir,
@@ -95,13 +96,6 @@ class CustomServiceOrchestrator():
       else: 
         logger.warn("Unable to find pid by taskId = %s" % task_id)
 
-  def get_py_executor(self):
-    """
-    Wrapper for unit testing
-    :return:
-    """
-    return PythonExecutor(self.tmp_dir, self.config)
-
   def runCommand(self, command, tmpoutfile, tmperrfile, forced_command_name=None,
                  override_output_files=True, retry=False):
     """
@@ -178,11 +172,10 @@ class CustomServiceOrchestrator():
       if command.has_key('commandType') and command['commandType'] == ActionQueue.BACKGROUND_EXECUTION_COMMAND and len(filtered_py_file_list) > 1:
         raise AgentException("Background commands are supported without hooks only")
 
-      python_executor = self.get_py_executor()
       for py_file, current_base_dir in filtered_py_file_list:
         log_info_on_failure = not command_name in self.DONT_DEBUG_FAILURES_FOR_COMMANDS
         script_params = [command_name, json_path, current_base_dir]
-        ret = python_executor.run_file(py_file, script_params,
+        ret = self.python_executor.run_file(py_file, script_params,
                                self.exec_tmp_dir, tmpoutfile, tmperrfile, timeout,
                                tmpstrucoutfile, logger_level, self.map_task_to_process,
                                task_id, override_output_files, handle = handle, log_info_on_failure=log_info_on_failure)

http://git-wip-us.apache.org/repos/asf/ambari/blob/a93a3126/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py b/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py
index 09be145..f215272 100644
--- a/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py
+++ b/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py
@@ -43,11 +43,11 @@ class PythonExecutor:
   used as a singleton for a concurrent execution of python scripts
   """
   NO_ERROR = "none"
+  grep = Grep()
+  event = threading.Event()
+  python_process_has_been_killed = False
 
   def __init__(self, tmpDir, config):
-    self.grep = Grep()
-    self.event = threading.Event()
-    self.python_process_has_been_killed = False
     self.tmpDir = tmpDir
     self.config = config
     pass
@@ -181,10 +181,11 @@ class PythonExecutor:
   def condenseOutput(self, stdout, stderr, retcode, structured_out):
     log_lines_count = self.config.get('heartbeat', 'log_lines_count')
     
+    grep = self.grep
     result = {
       "exitcode": retcode,
-      "stdout": self.grep.tail(stdout, log_lines_count) if log_lines_count else stdout,
-      "stderr": self.grep.tail(stderr, log_lines_count) if log_lines_count else stderr,
+      "stdout": grep.tail(stdout, log_lines_count) if log_lines_count else stdout,
+      "stderr": grep.tail(stderr, log_lines_count) if log_lines_count else stderr,
       "structuredOut" : structured_out
     }
     

http://git-wip-us.apache.org/repos/asf/ambari/blob/a93a3126/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
index 6aab74e..f43d3f7 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
@@ -228,17 +228,14 @@ class TestActionQueue(TestCase):
   }
 
 
-  @patch.object(AmbariConfig, "get_parallel_exec_option")
   @patch.object(ActionQueue, "process_command")
   @patch.object(Queue, "get")
   @patch.object(CustomServiceOrchestrator, "__init__")
   def test_ActionQueueStartStop(self, CustomServiceOrchestrator_mock,
-                                get_mock, process_command_mock, get_parallel_exec_option_mock):
+                                get_mock, process_command_mock):
     CustomServiceOrchestrator_mock.return_value = None
     dummy_controller = MagicMock()
     config = MagicMock()
-    get_parallel_exec_option_mock.return_value = 0
-    config.get_parallel_exec_option = get_parallel_exec_option_mock
     actionQueue = ActionQueue(config, dummy_controller)
     actionQueue.start()
     time.sleep(0.1)
@@ -484,7 +481,7 @@ class TestActionQueue(TestCase):
     }
     cso_runCommand_mock.return_value = custom_service_orchestrator_execution_result_dict
 
-    config = AmbariConfig()
+    config = AmbariConfig().getConfig()
     tempdir = tempfile.gettempdir()
     config.set('agent', 'prefix', tempdir)
     config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
@@ -526,7 +523,7 @@ class TestActionQueue(TestCase):
     }
     cso_runCommand_mock.return_value = custom_service_orchestrator_execution_result_dict
 
-    config = AmbariConfig()
+    config = AmbariConfig().getConfig()
     tempdir = tempfile.gettempdir()
     config.set('agent', 'prefix', tempdir)
     config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
@@ -567,7 +564,7 @@ class TestActionQueue(TestCase):
                                   status_update_callback):
     CustomServiceOrchestrator_mock.return_value = None
     dummy_controller = MagicMock()
-    actionQueue = ActionQueue(AmbariConfig(), dummy_controller)
+    actionQueue = ActionQueue(AmbariConfig().getConfig(), dummy_controller)
 
     build_mock.return_value = {'dummy report': '' }
 
@@ -603,7 +600,7 @@ class TestActionQueue(TestCase):
                                   status_update_callback):
     CustomServiceOrchestrator_mock.return_value = None
     dummy_controller = MagicMock()
-    actionQueue = ActionQueue(AmbariConfig(), dummy_controller)
+    actionQueue = ActionQueue(AmbariConfig().getConfig(), dummy_controller)
 
 
     requestComponentStatus_mock.reset_mock()
@@ -623,17 +620,14 @@ class TestActionQueue(TestCase):
     self.assertEqual(len(report['componentStatus']), 1)
     self.assertTrue(report['componentStatus'][0].has_key('alerts'))
 
-  @patch.object(AmbariConfig, "get_parallel_exec_option")
   @patch.object(ActionQueue, "process_command")
   @patch.object(Queue, "get")
   @patch.object(CustomServiceOrchestrator, "__init__")
   def test_reset_queue(self, CustomServiceOrchestrator_mock,
-                                get_mock, process_command_mock, gpeo_mock):
+                                get_mock, process_command_mock):
     CustomServiceOrchestrator_mock.return_value = None
     dummy_controller = MagicMock()
     config = MagicMock()
-    gpeo_mock.return_value = 0
-    config.get_parallel_exec_option = gpeo_mock
     actionQueue = ActionQueue(config, dummy_controller)
     actionQueue.start()
     actionQueue.put([self.datanode_install_command, self.hbase_install_command])
@@ -645,17 +639,14 @@ class TestActionQueue(TestCase):
     actionQueue.join()
     self.assertEqual(actionQueue.stopped(), True, 'Action queue is not stopped.')
 
-  @patch.object(AmbariConfig, "get_parallel_exec_option")
   @patch.object(ActionQueue, "process_command")
   @patch.object(Queue, "get")
   @patch.object(CustomServiceOrchestrator, "__init__")
   def test_cancel(self, CustomServiceOrchestrator_mock,
-                       get_mock, process_command_mock, gpeo_mock):
+                       get_mock, process_command_mock):
     CustomServiceOrchestrator_mock.return_value = None
     dummy_controller = MagicMock()
     config = MagicMock()
-    gpeo_mock.return_value = 0
-    config.get_parallel_exec_option = gpeo_mock
     actionQueue = ActionQueue(config, dummy_controller)
     actionQueue.start()
     actionQueue.put([self.datanode_install_command, self.hbase_install_command])
@@ -667,27 +658,6 @@ class TestActionQueue(TestCase):
     actionQueue.join()
     self.assertEqual(actionQueue.stopped(), True, 'Action queue is not stopped.')
 
-  @patch.object(AmbariConfig, "get_parallel_exec_option")
-  @patch.object(ActionQueue, "process_command")
-  @patch.object(CustomServiceOrchestrator, "__init__")
-  def test_parallel_exec(self, CustomServiceOrchestrator_mock,
-                         process_command_mock, gpeo_mock):
-    CustomServiceOrchestrator_mock.return_value = None
-    dummy_controller = MagicMock()
-    config = MagicMock()
-    gpeo_mock.return_value = 1
-    config.get_parallel_exec_option = gpeo_mock
-    actionQueue = ActionQueue(config, dummy_controller)
-    actionQueue.start()
-    actionQueue.put([self.datanode_install_command, self.hbase_install_command])
-    self.assertEqual(2, actionQueue.commandQueue.qsize())
-    time.sleep(1)
-    actionQueue.stop()
-    actionQueue.join()
-    self.assertEqual(actionQueue.stopped(), True, 'Action queue is not stopped.')
-    self.assertEqual(2, process_command_mock.call_count)
-    process_command_mock.assert_any_calls([call(self.datanode_install_command), call(self.hbase_install_command)])
-
 
   @patch("time.sleep")
   @patch.object(OSCheck, "os_distribution", new=MagicMock(return_value=os_distro_value))
@@ -698,7 +668,7 @@ class TestActionQueue(TestCase):
   ):
     CustomServiceOrchestrator_mock.return_value = None
     dummy_controller = MagicMock()
-    actionQueue = ActionQueue(AmbariConfig(), dummy_controller)
+    actionQueue = ActionQueue(AmbariConfig().getConfig(), dummy_controller)
     python_execution_result_dict = {
       'exitcode': 1,
       'stdout': 'out',
@@ -736,7 +706,7 @@ class TestActionQueue(TestCase):
   ):
     CustomServiceOrchestrator_mock.return_value = None
     dummy_controller = MagicMock()
-    actionQueue = ActionQueue(AmbariConfig(), dummy_controller)
+    actionQueue = ActionQueue(AmbariConfig().getConfig(), dummy_controller)
     execution_result_fail_dict = {
       'exitcode': 1,
       'stdout': 'out',
@@ -772,7 +742,7 @@ class TestActionQueue(TestCase):
   ):
     CustomServiceOrchestrator_mock.return_value = None
     dummy_controller = MagicMock()
-    actionQueue = ActionQueue(AmbariConfig(), dummy_controller)
+    actionQueue = ActionQueue(AmbariConfig().getConfig(), dummy_controller)
     execution_result_succ_dict = {
       'exitcode': 0,
       'stdout': 'out',
@@ -804,7 +774,7 @@ class TestActionQueue(TestCase):
                                                          'stderr' : 'err-13'}
     
     dummy_controller = MagicMock()
-    actionQueue = ActionQueue(AmbariConfig(), dummy_controller)
+    actionQueue = ActionQueue(AmbariConfig().getConfig(), dummy_controller)
 
     execute_command = copy.deepcopy(self.background_command)
     actionQueue.put([execute_command])
@@ -821,44 +791,39 @@ class TestActionQueue(TestCase):
     self.assertEqual(len(report['reports']),1)
 
   @not_for_platform(PLATFORM_WINDOWS)
-  @patch.object(CustomServiceOrchestrator, "get_py_executor")
   @patch.object(CustomServiceOrchestrator, "resolve_script_path")
   @patch.object(StackVersionsFileHandler, "read_stack_version")
-  def test_execute_python_executor(self, read_stack_version_mock, resolve_script_path_mock,
-                                   get_py_executor_mock):
+  def test_execute_python_executor(self, read_stack_version_mock, resolve_script_path_mock):
     
     dummy_controller = MagicMock()
-    cfg = AmbariConfig()
+    cfg = AmbariConfig().getConfig()
     cfg.set('agent', 'tolerate_download_failures', 'true')
     cfg.set('agent', 'prefix', '.')
     cfg.set('agent', 'cache_dir', 'background_tasks')
     
     actionQueue = ActionQueue(cfg, dummy_controller)
-    pyex = PythonExecutor(actionQueue.customServiceOrchestrator.tmp_dir, actionQueue.customServiceOrchestrator.config)
-    patch_output_file(pyex)
-    get_py_executor_mock.return_value = pyex
+    patch_output_file(actionQueue.customServiceOrchestrator.python_executor)
     actionQueue.customServiceOrchestrator.dump_command_to_json = MagicMock()
    
     result = {}
     lock = threading.RLock()
     complete_done = threading.Condition(lock)
     
-    def command_complete_w(process_condensed_result, handle):
+    def command_complete_w(process_condenced_result, handle):
       with lock:
-        result['command_complete'] = {'condensed_result' : copy.copy(process_condensed_result),
+        result['command_complete'] = {'condenced_result' : copy.copy(process_condenced_result), 
                                       'handle' : copy.copy(handle),
                                       'command_status' : actionQueue.commandStatuses.get_command_status(handle.command['taskId'])
                                       }
         complete_done.notifyAll()
-
-    actionQueue.on_background_command_complete_callback = wraped(actionQueue.on_background_command_complete_callback,
-                                                                 None, command_complete_w)
+    
+    actionQueue.on_background_command_complete_callback = wraped(actionQueue.on_background_command_complete_callback,None, command_complete_w)
     actionQueue.put([self.background_command])
     actionQueue.processBackgroundQueueSafeEmpty();
     actionQueue.processStatusCommandQueueSafeEmpty();
     
     with lock:
-      complete_done.wait(0.1)
+      complete_done.wait(.1)
       
       finished_status = result['command_complete']['command_status']
       self.assertEqual(finished_status['status'], ActionQueue.COMPLETED_STATUS)

http://git-wip-us.apache.org/repos/asf/ambari/blob/a93a3126/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py b/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py
index 2fb2ae5..a9e604d 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py
@@ -389,21 +389,19 @@ class TestCustomServiceOrchestrator(TestCase):
   from ambari_agent.StackVersionsFileHandler import StackVersionsFileHandler
 
   @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
-  @patch.object(CustomServiceOrchestrator, "get_py_executor")
   @patch("ambari_commons.shell.kill_process_with_children")
   @patch.object(FileCache, "__init__")
   @patch.object(CustomServiceOrchestrator, "resolve_script_path")
   @patch.object(CustomServiceOrchestrator, "resolve_hook_script_path")
   @patch.object(StackVersionsFileHandler, "read_stack_version")
-  def test_cancel_backgound_command(self, read_stack_version_mock, resolve_hook_script_path_mock,
-                                    resolve_script_path_mock, FileCache_mock, kill_process_with_children_mock,
-                                    get_py_executor_mock):
+  def test_cancel_backgound_command(self, read_stack_version_mock, resolve_hook_script_path_mock, resolve_script_path_mock, FileCache_mock,
+                                      kill_process_with_children_mock):
     FileCache_mock.return_value = None
     FileCache_mock.cache_dir = MagicMock()
     resolve_hook_script_path_mock.return_value = None
 #     shell.kill_process_with_children = MagicMock()
     dummy_controller = MagicMock()
-    cfg = AmbariConfig()
+    cfg = AmbariConfig().getConfig()
     cfg.set('agent', 'tolerate_download_failures', 'true')
     cfg.set('agent', 'prefix', '.')
     cfg.set('agent', 'cache_dir', 'background_tasks')
@@ -421,10 +419,8 @@ class TestCustomServiceOrchestrator(TestCase):
     import TestActionQueue
     import copy
 
-    pyex = PythonExecutor(actionQueue.customServiceOrchestrator.tmp_dir, actionQueue.customServiceOrchestrator.config)
-    TestActionQueue.patch_output_file(pyex)
-    pyex.prepare_process_result = MagicMock()
-    get_py_executor_mock.return_value = pyex
+    TestActionQueue.patch_output_file(orchestrator.python_executor)
+    orchestrator.python_executor.prepare_process_result = MagicMock()
     orchestrator.dump_command_to_json = MagicMock()
 
     lock = threading.RLock()
@@ -602,14 +598,12 @@ class TestCustomServiceOrchestrator(TestCase):
     self.assertEqual('UNKNOWN', status)
 
 
-  @patch.object(CustomServiceOrchestrator, "get_py_executor")
   @patch.object(CustomServiceOrchestrator, "dump_command_to_json")
   @patch.object(FileCache, "__init__")
   @patch.object(FileCache, "get_custom_actions_base_dir")
   def test_runCommand_background_action(self, get_custom_actions_base_dir_mock,
                                     FileCache_mock,
-                                    dump_command_to_json_mock,
-                                    get_py_executor_mock):
+                                    dump_command_to_json_mock):
     FileCache_mock.return_value = None
     get_custom_actions_base_dir_mock.return_value = "some path"
     _, script = tempfile.mkstemp()
@@ -631,10 +625,8 @@ class TestCustomServiceOrchestrator(TestCase):
     orchestrator = CustomServiceOrchestrator(self.config, dummy_controller)
 
     import TestActionQueue
-    pyex = PythonExecutor(orchestrator.tmp_dir, orchestrator.config)
-    TestActionQueue.patch_output_file(pyex)
-    pyex.condenseOutput = MagicMock()
-    get_py_executor_mock.return_value = pyex
+    TestActionQueue.patch_output_file(orchestrator.python_executor)
+    orchestrator.python_executor.condenseOutput = MagicMock()
     orchestrator.dump_command_to_json = MagicMock()
 
     ret = orchestrator.runCommand(command, "out.txt", "err.txt")

http://git-wip-us.apache.org/repos/asf/ambari/blob/a93a3126/ambari-agent/src/test/python/ambari_agent/TestHeartbeat.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestHeartbeat.py b/ambari-agent/src/test/python/ambari_agent/TestHeartbeat.py
index 0c78fdb..2f13ef5 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestHeartbeat.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestHeartbeat.py
@@ -55,7 +55,7 @@ class TestHeartbeat(TestCase):
 
 
   def test_build(self):
-    config = AmbariConfig.AmbariConfig()
+    config = AmbariConfig.AmbariConfig().getConfig()
     config.set('agent', 'prefix', 'tmp')
     config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
     config.set('agent', 'tolerate_download_failures', "true")
@@ -94,7 +94,7 @@ class TestHeartbeat(TestCase):
                    'exitCode': 777}],
       'componentStatus': [{'status': 'HEALTHY', 'componentName': 'NAMENODE'}]
     }
-    config = AmbariConfig.AmbariConfig()
+    config = AmbariConfig.AmbariConfig().getConfig()
     config.set('agent', 'prefix', 'tmp')
     config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
     config.set('agent', 'tolerate_download_failures', "true")
@@ -110,7 +110,7 @@ class TestHeartbeat(TestCase):
 
   @patch.object(ActionQueue, "result")
   def test_build_long_result(self, result_mock):
-    config = AmbariConfig.AmbariConfig()
+    config = AmbariConfig.AmbariConfig().getConfig()
     config.set('agent', 'prefix', 'tmp')
     config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
     config.set('agent', 'tolerate_download_failures', "true")
@@ -205,7 +205,7 @@ class TestHeartbeat(TestCase):
   @patch.object(Hardware, "_chk_mount", new = MagicMock(return_value=True))
   @patch.object(HostInfoLinux, 'register')
   def test_heartbeat_no_host_check_cmd_in_queue(self, register_mock):
-    config = AmbariConfig.AmbariConfig()
+    config = AmbariConfig.AmbariConfig().getConfig()
     config.set('agent', 'prefix', 'tmp')
     config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
     config.set('agent', 'tolerate_download_failures', "true")
@@ -232,7 +232,7 @@ class TestHeartbeat(TestCase):
   @patch.object(Hardware, "_chk_mount", new = MagicMock(return_value=True))
   @patch.object(HostInfoLinux, 'register')
   def test_heartbeat_host_check_no_cmd(self, register_mock):
-    config = AmbariConfig.AmbariConfig()
+    config = AmbariConfig.AmbariConfig().getConfig()
     config.set('agent', 'prefix', 'tmp')
     config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
     config.set('agent', 'tolerate_download_failures', "true")