You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by sm...@apache.org on 2016/04/12 00:48:47 UTC

ambari git commit: Revert "AMBARI-15795. Parallel execution should only be allowed on commands that have auto retry enabled (smohanty)"

Repository: ambari
Updated Branches:
  refs/heads/trunk 1a7c781ce -> 995fc0be4


Revert "AMBARI-15795. Parallel execution should only be allowed on commands that have auto retry enabled (smohanty)"

This reverts commit 1a7c781ce5340e332dc182c2c93d010cf0eec902.


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/995fc0be
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/995fc0be
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/995fc0be

Branch: refs/heads/trunk
Commit: 995fc0be488d45bd9b44e412e034565267649a1a
Parents: 1a7c781
Author: Sumit Mohanty <sm...@hortonworks.com>
Authored: Mon Apr 11 15:48:30 2016 -0700
Committer: Sumit Mohanty <sm...@hortonworks.com>
Committed: Mon Apr 11 15:48:30 2016 -0700

----------------------------------------------------------------------
 .../src/main/python/ambari_agent/ActionQueue.py |   22 +-
 .../test/python/ambari_agent/TestActionQueue.py |   48 +-
 .../python/ambari_agent/TestActionQueue.py.orig | 1158 ------------------
 3 files changed, 7 insertions(+), 1221 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/995fc0be/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
index ccae62c..d603566 100644
--- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
+++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
@@ -156,23 +156,11 @@ class ActionQueue(threading.Thread):
           # commands using separate threads
           while (True):
             command = self.commandQueue.get(True, self.EXECUTION_COMMAND_WAIT_TIME)
-            # If command is not retry_enabled then do not start them in parallel
-            # checking just one command is enough as all commands for a stage is sent
-            # at the same time and retry is only enabled for initial start/install
-            retryAble = False
-            if 'command_retry_enabled' in command['commandParams']:
-              retryAble = command['commandParams']['command_retry_enabled'] == "true"
-            if retryAble:
-              logger.info("Kicking off a thread for the command, id=" +
-                          str(command['commandId']) + " taskId=" + str(command['taskId']))
-              t = threading.Thread(target=self.process_command, args=(command,))
-              t.daemon = True
-              t.start()
-            else:
-              self.process_command(command)
-              break;
-            pass
-          pass
+            logger.info("Kicking off a thread for the command, id=" +
+                        str(command['commandId']) + " taskId=" + str(command['taskId']))
+            t = threading.Thread(target=self.process_command, args=(command,))
+            t.daemon = True
+            t.start()
       except (Queue.Empty):
         pass
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/995fc0be/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
index 2adf4ed..8bd5ddc 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
@@ -59,26 +59,7 @@ class TestActionQueue(TestCase):
     'serviceName': u'HDFS',
     'hostLevelParams': {},
     'configurations':{'global' : {}},
-    'configurationTags':{'global' : { 'tag': 'v1' }},
-    'commandParams': {
-      'command_retry_enabled': 'true'
-    }
-  }
-
-  datanode_install_no_retry_command = {
-    'commandType': 'EXECUTION_COMMAND',
-    'role': u'DATANODE',
-    'roleCommand': u'INSTALL',
-    'commandId': '1-1',
-    'taskId': 3,
-    'clusterName': u'cc',
-    'serviceName': u'HDFS',
-    'hostLevelParams': {},
-    'configurations':{'global' : {}},
-    'configurationTags':{'global' : { 'tag': 'v1' }},
-    'commandParams': {
-      'command_retry_enabled': 'false'
-    }
+    'configurationTags':{'global' : { 'tag': 'v1' }}
   }
 
   datanode_auto_start_command = {
@@ -143,11 +124,8 @@ class TestActionQueue(TestCase):
     'taskId': 7,
     'clusterName': u'cc',
     'serviceName': u'HDFS',
-    'hostLevelParams': {},
-    'commandParams': {
-      'command_retry_enabled': 'true'
+    'hostLevelParams': {}
     }
-  }
 
   status_command = {
     "serviceName" : 'HDFS',
@@ -905,28 +883,6 @@ class TestActionQueue(TestCase):
     self.assertEqual(2, process_command_mock.call_count)
     process_command_mock.assert_any_calls([call(self.datanode_install_command), call(self.hbase_install_command)])
 
-  @patch("threading.Thread")
-  @patch.object(AmbariConfig, "get_parallel_exec_option")
-  @patch.object(ActionQueue, "process_command")
-  @patch.object(CustomServiceOrchestrator, "__init__")
-  def test_parallel_exec_no_retry(self, CustomServiceOrchestrator_mock,
-                         process_command_mock, gpeo_mock, threading_mock):
-    CustomServiceOrchestrator_mock.return_value = None
-    dummy_controller = MagicMock()
-    config = MagicMock()
-    gpeo_mock.return_value = 1
-    config.get_parallel_exec_option = gpeo_mock
-    actionQueue = ActionQueue(config, dummy_controller)
-    actionQueue.put([self.datanode_install_no_retry_command, self.snamenode_install_command])
-    self.assertEqual(2, actionQueue.commandQueue.qsize())
-    actionQueue.start()
-    time.sleep(1)
-    actionQueue.stop()
-    actionQueue.join()
-    self.assertEqual(actionQueue.stopped(), True, 'Action queue is not stopped.')
-    self.assertEqual(1, process_command_mock.call_count)
-    self.assertEqual(0, threading_mock.call_count)
-    process_command_mock.assert_any_calls([call(self.datanode_install_command), call(self.hbase_install_command)])
 
   @not_for_platform(PLATFORM_LINUX)
   @patch("time.sleep")

http://git-wip-us.apache.org/repos/asf/ambari/blob/995fc0be/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py.orig
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py.orig b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py.orig
deleted file mode 100644
index 8bd5ddc..0000000
--- a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py.orig
+++ /dev/null
@@ -1,1158 +0,0 @@
-#!/usr/bin/env python
-
-'''
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-'''
-from Queue import Queue
-
-from unittest import TestCase
-from ambari_agent.LiveStatus import LiveStatus
-from ambari_agent.ActionQueue import ActionQueue
-from ambari_agent.AmbariConfig import AmbariConfig
-import os, errno, time, pprint, tempfile, threading
-import sys
-from threading import Thread
-import copy
-
-from mock.mock import patch, MagicMock, call
-from ambari_agent.CustomServiceOrchestrator import CustomServiceOrchestrator
-from ambari_agent.PythonExecutor import PythonExecutor
-from ambari_agent.ActualConfigHandler import ActualConfigHandler
-from ambari_agent.RecoveryManager import RecoveryManager
-from ambari_commons import OSCheck
-from only_for_platform import not_for_platform, os_distro_value, PLATFORM_WINDOWS, PLATFORM_LINUX
-
-import logging
-
-class TestActionQueue(TestCase):
-  def setUp(self):
-    # save original open() method for later use
-    self.original_open = open
-
-
-  def tearDown(self):
-    sys.stdout = sys.__stdout__
-
-  logger = logging.getLogger()
-
-  datanode_install_command = {
-    'commandType': 'EXECUTION_COMMAND',
-    'role': u'DATANODE',
-    'roleCommand': u'INSTALL',
-    'commandId': '1-1',
-    'taskId': 3,
-    'clusterName': u'cc',
-    'serviceName': u'HDFS',
-    'hostLevelParams': {},
-    'configurations':{'global' : {}},
-    'configurationTags':{'global' : { 'tag': 'v1' }}
-  }
-
-  datanode_auto_start_command = {
-    'commandType': 'AUTO_EXECUTION_COMMAND',
-    'role': u'DATANODE',
-    'roleCommand': u'START',
-    'commandId': '1-1',
-    'taskId': 3,
-    'clusterName': u'cc',
-    'serviceName': u'HDFS',
-    'hostLevelParams': {},
-    'configurations':{'global' : {}},
-    'configurationTags':{'global' : { 'tag': 'v1' }}
-  }
-
-  datanode_upgrade_command = {
-      'commandId': 17,
-      'role' : "role",
-      'taskId' : "taskId",
-      'clusterName' : "clusterName",
-      'serviceName' : "serviceName",
-      'roleCommand' : 'UPGRADE',
-      'hostname' : "localhost.localdomain",
-      'hostLevelParams': {},
-      'clusterHostInfo': "clusterHostInfo",
-      'commandType': "EXECUTION_COMMAND",
-      'configurations':{'global' : {}},
-      'roleParams': {},
-      'commandParams' :	{
-        'source_stack_version' : 'HDP-1.2.1',
-        'target_stack_version' : 'HDP-1.3.0'
-      }
-    }
-
-  namenode_install_command = {
-    'commandType': 'EXECUTION_COMMAND',
-    'role': u'NAMENODE',
-    'roleCommand': u'INSTALL',
-    'commandId': '1-1',
-    'taskId': 4,
-    'clusterName': u'cc',
-    'serviceName': u'HDFS',
-    'hostLevelParams': {}
-    }
-
-  snamenode_install_command = {
-    'commandType': 'EXECUTION_COMMAND',
-    'role': u'SECONDARY_NAMENODE',
-    'roleCommand': u'INSTALL',
-    'commandId': '1-1',
-    'taskId': 5,
-    'clusterName': u'cc',
-    'serviceName': u'HDFS',
-    'hostLevelParams': {}
-    }
-
-  hbase_install_command = {
-    'commandType': 'EXECUTION_COMMAND',
-    'role': u'HBASE',
-    'roleCommand': u'INSTALL',
-    'commandId': '1-1',
-    'taskId': 7,
-    'clusterName': u'cc',
-    'serviceName': u'HDFS',
-    'hostLevelParams': {}
-    }
-
-  status_command = {
-    "serviceName" : 'HDFS',
-    "commandType" : "STATUS_COMMAND",
-    "clusterName" : "",
-    "componentName" : "DATANODE",
-    'configurations':{},
-    'hostLevelParams': {}
-  }
-
-  datanode_restart_command = {
-    'commandType': 'EXECUTION_COMMAND',
-    'role': u'DATANODE',
-    'roleCommand': u'CUSTOM_COMMAND',
-    'commandId': '1-1',
-    'taskId': 9,
-    'clusterName': u'cc',
-    'serviceName': u'HDFS',
-    'configurations':{'global' : {}},
-    'configurationTags':{'global' : { 'tag': 'v123' }},
-    'hostLevelParams':{'custom_command': 'RESTART', 'clientsToUpdateConfigs': []}
-  }
-
-  datanode_restart_command_no_clients_update = {
-    'commandType': 'EXECUTION_COMMAND',
-    'role': u'DATANODE',
-    'roleCommand': u'CUSTOM_COMMAND',
-    'commandId': '1-1',
-    'taskId': 9,
-    'clusterName': u'cc',
-    'serviceName': u'HDFS',
-    'configurations':{'global' : {}},
-    'configurationTags':{'global' : { 'tag': 'v123' }},
-    'hostLevelParams':{'custom_command': 'RESTART'}
-  }
-
-  status_command_for_alerts = {
-    "serviceName" : 'FLUME',
-    "commandType" : "STATUS_COMMAND",
-    "clusterName" : "",
-    "componentName" : "FLUME_HANDLER",
-    'configurations':{},
-    'hostLevelParams': {}
-  }
-
-  retryable_command = {
-    'commandType': 'EXECUTION_COMMAND',
-    'role': 'NAMENODE',
-    'roleCommand': 'INSTALL',
-    'commandId': '1-1',
-    'taskId': 19,
-    'clusterName': 'c1',
-    'serviceName': 'HDFS',
-    'configurations':{'global' : {}},
-    'configurationTags':{'global' : { 'tag': 'v123' }},
-    'commandParams' :  {
-      'script_type' : 'PYTHON',
-      'script' : 'script.py',
-      'command_timeout' : '600',
-      'jdk_location' : '.',
-      'service_package_folder' : '.',
-      'command_retry_enabled' : 'true',
-      'max_duration_for_retries' : '5'
-    },
-    'hostLevelParams' : {}
-  }
-
-  background_command = {
-    'commandType': 'BACKGROUND_EXECUTION_COMMAND',
-    'role': 'NAMENODE',
-    'roleCommand': 'CUSTOM_COMMAND',
-    'commandId': '1-1',
-    'taskId': 19,
-    'clusterName': 'c1',
-    'serviceName': 'HDFS',
-    'configurations':{'global' : {}},
-    'configurationTags':{'global' : { 'tag': 'v123' }},
-    'hostLevelParams':{'custom_command': 'REBALANCE_HDFS'},
-    'commandParams' :  {
-      'script_type' : 'PYTHON',
-      'script' : 'script.py',
-      'command_timeout' : '600',
-      'jdk_location' : '.',
-      'service_package_folder' : '.'
-      }
-  }
-  cancel_background_command = {
-    'commandType': 'EXECUTION_COMMAND',
-    'role': 'NAMENODE',
-    'roleCommand': 'ACTIONEXECUTE',
-    'commandId': '1-1',
-    'taskId': 20,
-    'clusterName': 'c1',
-    'serviceName': 'HDFS',
-    'configurations':{'global' : {}},
-    'configurationTags':{'global' : {}},
-    'hostLevelParams':{},
-    'commandParams' :  {
-      'script_type' : 'PYTHON',
-      'script' : 'cancel_background_task.py',
-      'before_system_hook_function' : 'fetch_bg_pid_by_taskid',
-      'jdk_location' : '.',
-      'command_timeout' : '600',
-      'service_package_folder' : '.',
-      'cancel_policy': 'SIGKILL',
-      'cancel_task_id': "19",
-      }
-  }
-
-
-  @patch.object(AmbariConfig, "get_parallel_exec_option")
-  @patch.object(ActionQueue, "process_command")
-  @patch.object(Queue, "get")
-  @patch.object(CustomServiceOrchestrator, "__init__")
-  def test_ActionQueueStartStop(self, CustomServiceOrchestrator_mock,
-                                get_mock, process_command_mock, get_parallel_exec_option_mock):
-    CustomServiceOrchestrator_mock.return_value = None
-    dummy_controller = MagicMock()
-    config = MagicMock()
-    get_parallel_exec_option_mock.return_value = 0
-    config.get_parallel_exec_option = get_parallel_exec_option_mock
-    actionQueue = ActionQueue(config, dummy_controller)
-    actionQueue.start()
-    time.sleep(0.1)
-    actionQueue.stop()
-    actionQueue.join()
-    self.assertEqual(actionQueue.stopped(), True, 'Action queue is not stopped.')
-    self.assertTrue(process_command_mock.call_count > 1)
-
-
-  @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
-  @patch("traceback.print_exc")
-  @patch.object(ActionQueue, "execute_command")
-  @patch.object(ActionQueue, "execute_status_command")
-  def test_process_command(self, execute_status_command_mock,
-                           execute_command_mock, print_exc_mock):
-    dummy_controller = MagicMock()
-    config = AmbariConfig()
-    config.set('agent', 'tolerate_download_failures', "true")
-    actionQueue = ActionQueue(config, dummy_controller)
-    execution_command = {
-      'commandType' : ActionQueue.EXECUTION_COMMAND,
-    }
-    status_command = {
-      'commandType' : ActionQueue.STATUS_COMMAND,
-    }
-    wrong_command = {
-      'commandType' : "SOME_WRONG_COMMAND",
-    }
-    # Try wrong command
-    actionQueue.process_command(wrong_command)
-    self.assertFalse(execute_command_mock.called)
-    self.assertFalse(execute_status_command_mock.called)
-    self.assertFalse(print_exc_mock.called)
-
-    execute_command_mock.reset_mock()
-    execute_status_command_mock.reset_mock()
-    print_exc_mock.reset_mock()
-    # Try normal execution
-    actionQueue.process_command(execution_command)
-    self.assertTrue(execute_command_mock.called)
-    self.assertFalse(execute_status_command_mock.called)
-    self.assertFalse(print_exc_mock.called)
-
-    execute_command_mock.reset_mock()
-    execute_status_command_mock.reset_mock()
-    print_exc_mock.reset_mock()
-
-    actionQueue.process_command(status_command)
-    self.assertFalse(execute_command_mock.called)
-    self.assertTrue(execute_status_command_mock.called)
-    self.assertFalse(print_exc_mock.called)
-
-    execute_command_mock.reset_mock()
-    execute_status_command_mock.reset_mock()
-    print_exc_mock.reset_mock()
-
-    # Try exception to check proper logging
-    def side_effect(self):
-      raise Exception("TerribleException")
-    execute_command_mock.side_effect = side_effect
-    actionQueue.process_command(execution_command)
-    self.assertTrue(print_exc_mock.called)
-
-    print_exc_mock.reset_mock()
-
-    execute_status_command_mock.side_effect = side_effect
-    actionQueue.process_command(execution_command)
-    self.assertTrue(print_exc_mock.called)
-
-  @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
-  @patch.object(CustomServiceOrchestrator, "runCommand")
-  @patch("CommandStatusDict.CommandStatusDict")
-  @patch.object(ActionQueue, "status_update_callback")
-  def test_log_execution_commands(self, status_update_callback_mock,
-                                  command_status_dict_mock,
-                                  cso_runCommand_mock):
-    custom_service_orchestrator_execution_result_dict = {
-        'stdout': 'out',
-        'stderr': 'stderr',
-        'structuredOut' : '',
-        'exitcode' : 0
-    }
-    cso_runCommand_mock.return_value = custom_service_orchestrator_execution_result_dict
-
-    config = AmbariConfig()
-    tempdir = tempfile.gettempdir()
-    config.set('agent', 'prefix', tempdir)
-    config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
-    config.set('agent', 'tolerate_download_failures', "true")
-    config.set('logging', 'log_command_executes', 1)
-    dummy_controller = MagicMock()
-    actionQueue = ActionQueue(config, dummy_controller)
-    actionQueue.execute_command(self.datanode_restart_command)
-    report = actionQueue.result()
-    expected = {'status': 'COMPLETED',
-                'configurationTags': {'global': {'tag': 'v123'}},
-                'stderr': 'stderr',
-                'stdout': 'out',
-                'clusterName': u'cc',
-                'structuredOut': '""',
-                'roleCommand': u'CUSTOM_COMMAND',
-                'serviceName': u'HDFS',
-                'role': u'DATANODE',
-                'actionId': '1-1',
-                'taskId': 9,
-                'customCommand': 'RESTART',
-                'exitCode': 0}
-    # Agent caches configurationTags if custom_command RESTART completed
-    self.assertEqual(len(report['reports']), 1)
-    self.assertEqual(expected, report['reports'][0])
-
-
-  @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
-  @patch("__builtin__.open")
-  @patch.object(ActionQueue, "status_update_callback")
-  def test_auto_execute_command(self, status_update_callback_mock, open_mock):
-    # Make file read calls visible
-    def open_side_effect(file, mode):
-      if mode == 'r':
-        file_mock = MagicMock()
-        file_mock.read.return_value = "Read from " + str(file)
-        return file_mock
-      else:
-        return self.original_open(file, mode)
-    open_mock.side_effect = open_side_effect
-
-    config = AmbariConfig()
-    tempdir = tempfile.gettempdir()
-    config.set('agent', 'prefix', tempdir)
-    config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
-    config.set('agent', 'tolerate_download_failures', "true")
-    dummy_controller = MagicMock()
-    dummy_controller.recovery_manager = RecoveryManager(tempfile.mktemp())
-    dummy_controller.recovery_manager.update_config(5, 5, 1, 11, True, False, "", -1)
-
-    actionQueue = ActionQueue(config, dummy_controller)
-    unfreeze_flag = threading.Event()
-    python_execution_result_dict = {
-      'stdout': 'out',
-      'stderr': 'stderr',
-      'structuredOut' : ''
-    }
-
-    def side_effect(command, tmpoutfile, tmperrfile, override_output_files=True, retry=False):
-      unfreeze_flag.wait()
-      return python_execution_result_dict
-    def patched_aq_execute_command(command):
-      # We have to perform patching for separate thread in the same thread
-      with patch.object(CustomServiceOrchestrator, "runCommand") as runCommand_mock:
-        runCommand_mock.side_effect = side_effect
-        actionQueue.process_command(command)
-
-    python_execution_result_dict['status'] = 'COMPLETE'
-    python_execution_result_dict['exitcode'] = 0
-    self.assertFalse(actionQueue.tasks_in_progress_or_pending())
-    # We call method in a separate thread
-    execution_thread = Thread(target = patched_aq_execute_command ,
-                              args = (self.datanode_auto_start_command, ))
-    execution_thread.start()
-    #  check in progress report
-    # wait until ready
-    while True:
-      time.sleep(0.1)
-      if actionQueue.tasks_in_progress_or_pending():
-        break
-    # Continue command execution
-    unfreeze_flag.set()
-    # wait until ready
-    check_queue = True
-    while check_queue:
-      report = actionQueue.result()
-      if not actionQueue.tasks_in_progress_or_pending():
-        break
-      time.sleep(0.1)
-
-    self.assertEqual(len(report['reports']), 0)
-
-    ## Test failed execution
-    python_execution_result_dict['status'] = 'FAILED'
-    python_execution_result_dict['exitcode'] = 13
-    # We call method in a separate thread
-    execution_thread = Thread(target = patched_aq_execute_command ,
-                              args = (self.datanode_auto_start_command, ))
-    execution_thread.start()
-    unfreeze_flag.set()
-    #  check in progress report
-    # wait until ready
-    while check_queue:
-      report = actionQueue.result()
-      if not actionQueue.tasks_in_progress_or_pending():
-        break
-      time.sleep(0.1)
-
-    self.assertEqual(len(report['reports']), 0)
-
-  @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
-  @patch("__builtin__.open")
-  @patch.object(ActionQueue, "status_update_callback")
-  def test_execute_command(self, status_update_callback_mock, open_mock):
-    # Make file read calls visible
-    def open_side_effect(file, mode):
-      if mode == 'r':
-        file_mock = MagicMock()
-        file_mock.read.return_value = "Read from " + str(file)
-        return file_mock
-      else:
-        return self.original_open(file, mode)
-    open_mock.side_effect = open_side_effect
-
-    config = AmbariConfig()
-    tempdir = tempfile.gettempdir()
-    config.set('agent', 'prefix', tempdir)
-    config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
-    config.set('agent', 'tolerate_download_failures', "true")
-    dummy_controller = MagicMock()
-    actionQueue = ActionQueue(config, dummy_controller)
-    unfreeze_flag = threading.Event()
-    python_execution_result_dict = {
-      'stdout': 'out',
-      'stderr': 'stderr',
-      'structuredOut' : ''
-      }
-
-    def side_effect(command, tmpoutfile, tmperrfile, override_output_files=True, retry=False):
-      unfreeze_flag.wait()
-      return python_execution_result_dict
-    def patched_aq_execute_command(command):
-      # We have to perform patching for separate thread in the same thread
-      with patch.object(CustomServiceOrchestrator, "runCommand") as runCommand_mock:
-          runCommand_mock.side_effect = side_effect
-          actionQueue.execute_command(command)
-    ### Test install/start/stop command ###
-    ## Test successful execution with configuration tags
-    python_execution_result_dict['status'] = 'COMPLETE'
-    python_execution_result_dict['exitcode'] = 0
-    # We call method in a separate thread
-    execution_thread = Thread(target = patched_aq_execute_command ,
-                              args = (self.datanode_install_command, ))
-    execution_thread.start()
-    #  check in progress report
-    # wait until ready
-    while True:
-      time.sleep(0.1)
-      report = actionQueue.result()
-      if len(report['reports']) != 0:
-        break
-    expected = {'status': 'IN_PROGRESS',
-                'stderr': 'Read from {0}'.format(os.path.join(tempdir, "errors-3.txt")),
-                'stdout': 'Read from {0}'.format(os.path.join(tempdir, "output-3.txt")),
-                'structuredOut' : 'Read from {0}'.format(os.path.join(tempdir, "structured-out-3.json")),
-                'clusterName': u'cc',
-                'roleCommand': u'INSTALL',
-                'serviceName': u'HDFS',
-                'role': u'DATANODE',
-                'actionId': '1-1',
-                'taskId': 3,
-                'exitCode': 777}
-    self.assertEqual(report['reports'][0], expected)
-    self.assertTrue(actionQueue.tasks_in_progress_or_pending())
-
-  # Continue command execution
-    unfreeze_flag.set()
-    # wait until ready
-    while report['reports'][0]['status'] == 'IN_PROGRESS':
-      time.sleep(0.1)
-      report = actionQueue.result()
-    # check report
-    configname = os.path.join(tempdir, 'config.json')
-    expected = {'status': 'COMPLETED',
-                'stderr': 'stderr',
-                'stdout': 'out',
-                'clusterName': u'cc',
-                'structuredOut': '""',
-                'roleCommand': u'INSTALL',
-                'serviceName': u'HDFS',
-                'role': u'DATANODE',
-                'actionId': '1-1',
-                'taskId': 3,
-                'configurationTags': {'global': {'tag': 'v1'}},
-                'exitCode': 0}
-    self.assertEqual(len(report['reports']), 1)
-    self.assertEqual(report['reports'][0], expected)
-    self.assertTrue(os.path.isfile(configname))
-    # Check that we had 2 status update calls ( IN_PROGRESS and COMPLETE)
-    self.assertEqual(status_update_callback_mock.call_count, 2)
-    os.remove(configname)
-
-    # now should not have reports (read complete/failed reports are deleted)
-    report = actionQueue.result()
-    self.assertEqual(len(report['reports']), 0)
-
-    ## Test failed execution
-    python_execution_result_dict['status'] = 'FAILED'
-    python_execution_result_dict['exitcode'] = 13
-    # We call method in a separate thread
-    execution_thread = Thread(target = patched_aq_execute_command ,
-                              args = (self.datanode_install_command, ))
-    execution_thread.start()
-    unfreeze_flag.set()
-    #  check in progress report
-    # wait until ready
-    report = actionQueue.result()
-    while len(report['reports']) == 0 or \
-                    report['reports'][0]['status'] == 'IN_PROGRESS':
-      time.sleep(0.1)
-      report = actionQueue.result()
-      # check report
-    expected = {'status': 'FAILED',
-                'stderr': 'stderr',
-                'stdout': 'out',
-                'clusterName': u'cc',
-                'structuredOut': '""',
-                'roleCommand': u'INSTALL',
-                'serviceName': u'HDFS',
-                'role': u'DATANODE',
-                'actionId': '1-1',
-                'taskId': 3,
-                'exitCode': 13}
-    self.assertEqual(len(report['reports']), 1)
-    self.assertEqual(report['reports'][0], expected)
-
-    # now should not have reports (read complete/failed reports are deleted)
-    report = actionQueue.result()
-    self.assertEqual(len(report['reports']), 0)
-
-    ### Test upgrade command ###
-    python_execution_result_dict['status'] = 'COMPLETE'
-    python_execution_result_dict['exitcode'] = 0
-    execution_thread = Thread(target = patched_aq_execute_command ,
-                              args = (self.datanode_upgrade_command, ))
-    execution_thread.start()
-    unfreeze_flag.set()
-    # wait until ready
-    report = actionQueue.result()
-    while len(report['reports']) == 0 or \
-                    report['reports'][0]['status'] == 'IN_PROGRESS':
-      time.sleep(0.1)
-      report = actionQueue.result()
-    # check report
-    expected = {'status': 'COMPLETED',
-                'stderr': 'stderr',
-                'stdout': 'out',
-                'clusterName': 'clusterName',
-                'structuredOut': '""',
-                'roleCommand': 'UPGRADE',
-                'serviceName': 'serviceName',
-                'role': 'role',
-                'actionId': 17,
-                'taskId': 'taskId',
-                'exitCode': 0}
-    self.assertEqual(len(report['reports']), 1)
-    self.assertEqual(report['reports'][0], expected)
-
-    # now should not have reports (read complete/failed reports are deleted)
-    report = actionQueue.result()
-    self.assertEqual(len(report['reports']), 0)
-
-
-  @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
-  @patch.object(CustomServiceOrchestrator, "runCommand")
-  @patch("CommandStatusDict.CommandStatusDict")
-  @patch.object(ActionQueue, "status_update_callback")
-  def test_store_configuration_tags(self, status_update_callback_mock,
-                                    command_status_dict_mock,
-                                    cso_runCommand_mock):
-    custom_service_orchestrator_execution_result_dict = {
-      'stdout': 'out',
-      'stderr': 'stderr',
-      'structuredOut' : '',
-      'exitcode' : 0
-    }
-    cso_runCommand_mock.return_value = custom_service_orchestrator_execution_result_dict
-
-    config = AmbariConfig()
-    tempdir = tempfile.gettempdir()
-    config.set('agent', 'prefix', tempdir)
-    config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
-    config.set('agent', 'tolerate_download_failures', "true")
-    dummy_controller = MagicMock()
-    actionQueue = ActionQueue(config, dummy_controller)
-    actionQueue.execute_command(self.datanode_restart_command)
-    report = actionQueue.result()
-    expected = {'status': 'COMPLETED',
-                'configurationTags': {'global': {'tag': 'v123'}},
-                'stderr': 'stderr',
-                'stdout': 'out',
-                'clusterName': u'cc',
-                'structuredOut': '""',
-                'roleCommand': u'CUSTOM_COMMAND',
-                'serviceName': u'HDFS',
-                'role': u'DATANODE',
-                'actionId': '1-1',
-                'taskId': 9,
-                'customCommand': 'RESTART',
-                'exitCode': 0}
-    # Agent caches configurationTags if custom_command RESTART completed
-    self.assertEqual(len(report['reports']), 1)
-    self.assertEqual(expected, report['reports'][0])
-
-  @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
-  @patch.object(ActualConfigHandler, "write_client_components")
-  @patch.object(CustomServiceOrchestrator, "runCommand")
-  @patch("CommandStatusDict.CommandStatusDict")
-  @patch.object(ActionQueue, "status_update_callback")
-  def test_store_configuration_tags_no_clients(self, status_update_callback_mock,
-                                    command_status_dict_mock,
-                                    cso_runCommand_mock, write_client_components_mock):
-    custom_service_orchestrator_execution_result_dict = {
-      'stdout': 'out',
-      'stderr': 'stderr',
-      'structuredOut' : '',
-      'exitcode' : 0
-    }
-    cso_runCommand_mock.return_value = custom_service_orchestrator_execution_result_dict
-
-    config = AmbariConfig()
-    tempdir = tempfile.gettempdir()
-    config.set('agent', 'prefix', tempdir)
-    config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
-    config.set('agent', 'tolerate_download_failures', "true")
-    dummy_controller = MagicMock()
-    actionQueue = ActionQueue(config, dummy_controller)
-    actionQueue.execute_command(self.datanode_restart_command_no_clients_update)
-    report = actionQueue.result()
-    expected = {'status': 'COMPLETED',
-                'configurationTags': {'global': {'tag': 'v123'}},
-                'stderr': 'stderr',
-                'stdout': 'out',
-                'clusterName': u'cc',
-                'structuredOut': '""',
-                'roleCommand': u'CUSTOM_COMMAND',
-                'serviceName': u'HDFS',
-                'role': u'DATANODE',
-                'actionId': '1-1',
-                'taskId': 9,
-                'customCommand': 'RESTART',
-                'exitCode': 0}
-    # Agent caches configurationTags if custom_command RESTART completed
-    self.assertEqual(len(report['reports']), 1)
-    self.assertEqual(expected, report['reports'][0])
-    self.assertFalse(write_client_components_mock.called)
-
-  @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
-  @patch.object(ActionQueue, "status_update_callback")
-  @patch.object(CustomServiceOrchestrator, "requestComponentStatus")
-  @patch.object(CustomServiceOrchestrator, "requestComponentSecurityState")
-  @patch.object(ActionQueue, "execute_command")
-  @patch.object(LiveStatus, "build")
-  @patch.object(CustomServiceOrchestrator, "__init__")
-  def test_execute_status_command(self, CustomServiceOrchestrator_mock,
-                                  build_mock, execute_command_mock, requestComponentSecurityState_mock,
-                                  requestComponentStatus_mock,
-                                  status_update_callback):
-    CustomServiceOrchestrator_mock.return_value = None
-    dummy_controller = MagicMock()
-    actionQueue = ActionQueue(AmbariConfig(), dummy_controller)
-
-    build_mock.return_value = {'dummy report': '' }
-
-    dummy_controller.recovery_manager = RecoveryManager(tempfile.mktemp())
-
-    requestComponentStatus_mock.reset_mock()
-    requestComponentStatus_mock.return_value = {'exitcode': 0 }
-
-    requestComponentSecurityState_mock.reset_mock()
-    requestComponentSecurityState_mock.return_value = 'UNKNOWN'
-
-    actionQueue.execute_status_command(self.status_command)
-    report = actionQueue.result()
-    expected = {'dummy report': '',
-                'securityState' : 'UNKNOWN'}
-
-    self.assertEqual(len(report['componentStatus']), 1)
-    self.assertEqual(report['componentStatus'][0], expected)
-    self.assertTrue(requestComponentStatus_mock.called)
-
-  @patch.object(RecoveryManager, "command_exists")
-  @patch.object(RecoveryManager, "requires_recovery")
-  @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
-  @patch.object(ActionQueue, "status_update_callback")
-  @patch.object(CustomServiceOrchestrator, "requestComponentStatus")
-  @patch.object(CustomServiceOrchestrator, "requestComponentSecurityState")
-  @patch.object(ActionQueue, "execute_command")
-  @patch.object(LiveStatus, "build")
-  @patch.object(CustomServiceOrchestrator, "__init__")
-  def test_execute_status_command_recovery(self, CustomServiceOrchestrator_mock,
-                                  build_mock, execute_command_mock, requestComponentSecurityState_mock,
-                                  requestComponentStatus_mock,
-                                  status_update_callback, requires_recovery_mock,
-                                  command_exists_mock):
-    CustomServiceOrchestrator_mock.return_value = None
-    dummy_controller = MagicMock()
-    actionQueue = ActionQueue(AmbariConfig(), dummy_controller)
-
-    build_mock.return_value = {'dummy report': '' }
-    requires_recovery_mock.return_value = True
-    command_exists_mock.return_value = False
-
-    dummy_controller.recovery_manager = RecoveryManager(tempfile.mktemp(), True, False)
-
-    requestComponentStatus_mock.reset_mock()
-    requestComponentStatus_mock.return_value = {'exitcode': 0 }
-
-    requestComponentSecurityState_mock.reset_mock()
-    requestComponentSecurityState_mock.return_value = 'UNKNOWN'
-
-    actionQueue.execute_status_command(self.status_command)
-    report = actionQueue.result()
-    expected = {'dummy report': '',
-                'securityState' : 'UNKNOWN',
-                'sendExecCmdDet': 'True'}
-
-    self.assertEqual(len(report['componentStatus']), 1)
-    self.assertEqual(report['componentStatus'][0], expected)
-    self.assertTrue(requestComponentStatus_mock.called)
-
-    requires_recovery_mock.return_value = True
-    command_exists_mock.return_value = True
-    requestComponentStatus_mock.reset_mock()
-    requestComponentStatus_mock.return_value = {'exitcode': 0 }
-
-    requestComponentSecurityState_mock.reset_mock()
-    requestComponentSecurityState_mock.return_value = 'UNKNOWN'
-
-    actionQueue.execute_status_command(self.status_command)
-    report = actionQueue.result()
-    expected = {'dummy report': '',
-                'securityState' : 'UNKNOWN',
-                'sendExecCmdDet': 'False'}
-
-    self.assertEqual(len(report['componentStatus']), 1)
-    self.assertEqual(report['componentStatus'][0], expected)
-    self.assertTrue(requestComponentStatus_mock.called)
-
-  @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
-  @patch.object(ActionQueue, "status_update_callback")
-  @patch.object(CustomServiceOrchestrator, "requestComponentStatus")
-  @patch.object(CustomServiceOrchestrator, "requestComponentSecurityState")
-  @patch.object(ActionQueue, "execute_command")
-  @patch.object(LiveStatus, "build")
-  @patch.object(CustomServiceOrchestrator, "__init__")
-  def test_execute_status_command_with_alerts(self, CustomServiceOrchestrator_mock,
-                                              requestComponentSecurityState_mock,
-                                  build_mock, execute_command_mock,
-                                  requestComponentStatus_mock,
-                                  status_update_callback):
-    CustomServiceOrchestrator_mock.return_value = None
-    dummy_controller = MagicMock()
-    actionQueue = ActionQueue(AmbariConfig(), dummy_controller)
-
-
-    requestComponentStatus_mock.reset_mock()
-    requestComponentStatus_mock.return_value = {
-      'exitcode': 0,
-      'stdout': 'out',
-      'stderr': 'err',
-      'structuredOut': {'alerts': [ {'name': 'flume_alert'} ] }
-    }
-    build_mock.return_value = {'somestatusresult': 'aresult'}
-
-    actionQueue.execute_status_command(self.status_command_for_alerts)
-
-    report = actionQueue.result()
-
-    self.assertTrue(requestComponentStatus_mock.called)
-    self.assertEqual(len(report['componentStatus']), 1)
-    self.assertTrue(report['componentStatus'][0].has_key('alerts'))
-
-  @patch.object(AmbariConfig, "get_parallel_exec_option")
-  @patch.object(ActionQueue, "process_command")
-  @patch.object(Queue, "get")
-  @patch.object(CustomServiceOrchestrator, "__init__")
-  def test_reset_queue(self, CustomServiceOrchestrator_mock,
-                                get_mock, process_command_mock, gpeo_mock):
-    CustomServiceOrchestrator_mock.return_value = None
-    dummy_controller = MagicMock()
-    dummy_controller.recovery_manager = RecoveryManager(tempfile.mktemp())
-    config = MagicMock()
-    gpeo_mock.return_value = 0
-    config.get_parallel_exec_option = gpeo_mock
-    actionQueue = ActionQueue(config, dummy_controller)
-    actionQueue.start()
-    actionQueue.put([self.datanode_install_command, self.hbase_install_command])
-    self.assertEqual(2, actionQueue.commandQueue.qsize())
-    self.assertTrue(actionQueue.tasks_in_progress_or_pending())
-    actionQueue.reset()
-    self.assertTrue(actionQueue.commandQueue.empty())
-    self.assertFalse(actionQueue.tasks_in_progress_or_pending())
-    time.sleep(0.1)
-    actionQueue.stop()
-    actionQueue.join()
-    self.assertEqual(actionQueue.stopped(), True, 'Action queue is not stopped.')
-
-  @patch.object(AmbariConfig, "get_parallel_exec_option")
-  @patch.object(ActionQueue, "process_command")
-  @patch.object(Queue, "get")
-  @patch.object(CustomServiceOrchestrator, "__init__")
-  def test_cancel(self, CustomServiceOrchestrator_mock,
-                       get_mock, process_command_mock, gpeo_mock):
-    CustomServiceOrchestrator_mock.return_value = None
-    dummy_controller = MagicMock()
-    config = MagicMock()
-    gpeo_mock.return_value = 0
-    config.get_parallel_exec_option = gpeo_mock
-    actionQueue = ActionQueue(config, dummy_controller)
-    actionQueue.start()
-    actionQueue.put([self.datanode_install_command, self.hbase_install_command])
-    self.assertEqual(2, actionQueue.commandQueue.qsize())
-    actionQueue.reset()
-    self.assertTrue(actionQueue.commandQueue.empty())
-    time.sleep(0.1)
-    actionQueue.stop()
-    actionQueue.join()
-    self.assertEqual(actionQueue.stopped(), True, 'Action queue is not stopped.')
-
-  @patch.object(AmbariConfig, "get_parallel_exec_option")
-  @patch.object(ActionQueue, "process_command")
-  @patch.object(CustomServiceOrchestrator, "__init__")
-  def test_parallel_exec(self, CustomServiceOrchestrator_mock,
-                         process_command_mock, gpeo_mock):
-    CustomServiceOrchestrator_mock.return_value = None
-    dummy_controller = MagicMock()
-    config = MagicMock()
-    gpeo_mock.return_value = 1
-    config.get_parallel_exec_option = gpeo_mock
-    actionQueue = ActionQueue(config, dummy_controller)
-    actionQueue.put([self.datanode_install_command, self.hbase_install_command])
-    self.assertEqual(2, actionQueue.commandQueue.qsize())
-    actionQueue.start()
-    time.sleep(1)
-    actionQueue.stop()
-    actionQueue.join()
-    self.assertEqual(actionQueue.stopped(), True, 'Action queue is not stopped.')
-    self.assertEqual(2, process_command_mock.call_count)
-    process_command_mock.assert_any_calls([call(self.datanode_install_command), call(self.hbase_install_command)])
-
-
-  @not_for_platform(PLATFORM_LINUX)
-  @patch("time.sleep")
-  @patch.object(OSCheck, "os_distribution", new=MagicMock(return_value=os_distro_value))
-  @patch.object(CustomServiceOrchestrator, "__init__")
-  def test_execute_retryable_command(self, CustomServiceOrchestrator_mock,
-                                     sleep_mock
-  ):
-    CustomServiceOrchestrator_mock.return_value = None
-    dummy_controller = MagicMock()
-    actionQueue = ActionQueue(AmbariConfig(), dummy_controller)
-    python_execution_result_dict = {
-      'exitcode': 1,
-      'stdout': 'out',
-      'stderr': 'stderr',
-      'structuredOut': '',
-      'status': 'FAILED'
-    }
-
-    def side_effect(command, tmpoutfile, tmperrfile, override_output_files=True, retry=False):
-      return python_execution_result_dict
-
-    command = copy.deepcopy(self.retryable_command)
-    with patch.object(CustomServiceOrchestrator, "runCommand") as runCommand_mock:
-      runCommand_mock.side_effect = side_effect
-      actionQueue.execute_command(command)
-
-    #assert that python executor start
-    self.assertTrue(runCommand_mock.called)
-    self.assertEqual(3, runCommand_mock.call_count)
-    self.assertEqual(2, sleep_mock.call_count)
-    sleep_mock.assert_has_calls([call(2), call(3)], False)
-    runCommand_mock.assert_has_calls([
-      call(command, os.sep + 'tmp' + os.sep + 'ambari-agent' + os.sep + 'output-19.txt',
-           os.sep + 'tmp' + os.sep + 'ambari-agent' + os.sep + 'errors-19.txt', override_output_files=True, retry=False),
-      call(command, os.sep + 'tmp' + os.sep + 'ambari-agent' + os.sep + 'output-19.txt',
-           os.sep + 'tmp' + os.sep + 'ambari-agent' + os.sep + 'errors-19.txt', override_output_files=False, retry=True),
-      call(command, os.sep + 'tmp' + os.sep + 'ambari-agent' + os.sep + 'output-19.txt',
-           os.sep + 'tmp' + os.sep + 'ambari-agent' + os.sep + 'errors-19.txt', override_output_files=False, retry=True)])
-
-
-  @patch("time.time")
-  @patch("time.sleep")
-  @patch.object(OSCheck, "os_distribution", new=MagicMock(return_value=os_distro_value))
-  @patch.object(CustomServiceOrchestrator, "__init__")
-  def test_execute_retryable_command_with_time_lapse(self, CustomServiceOrchestrator_mock,
-                                     sleep_mock, time_mock
-  ):
-    CustomServiceOrchestrator_mock.return_value = None
-    dummy_controller = MagicMock()
-    actionQueue = ActionQueue(AmbariConfig(), dummy_controller)
-    python_execution_result_dict = {
-      'exitcode': 1,
-      'stdout': 'out',
-      'stderr': 'stderr',
-      'structuredOut': '',
-      'status': 'FAILED'
-    }
-
-    times_arr = [8, 10, 14, 18, 22]
-    if self.logger.isEnabledFor(logging.INFO):
-      times_arr.insert(0, 4)
-    time_mock.side_effect = times_arr
-
-    def side_effect(command, tmpoutfile, tmperrfile, override_output_files=True, retry=False):
-      return python_execution_result_dict
-
-    command = copy.deepcopy(self.retryable_command)
-    with patch.object(CustomServiceOrchestrator, "runCommand") as runCommand_mock:
-      runCommand_mock.side_effect = side_effect
-      actionQueue.execute_command(command)
-
-    #assert that python executor start
-    self.assertTrue(runCommand_mock.called)
-    self.assertEqual(2, runCommand_mock.call_count)
-    self.assertEqual(1, sleep_mock.call_count)
-    sleep_mock.assert_has_calls([call(2)], False)
-    runCommand_mock.assert_has_calls([
-      call(command, os.sep + 'tmp' + os.sep + 'ambari-agent' + os.sep + 'output-19.txt',
-           os.sep + 'tmp' + os.sep + 'ambari-agent' + os.sep + 'errors-19.txt', override_output_files=True, retry=False),
-      call(command, os.sep + 'tmp' + os.sep + 'ambari-agent' + os.sep + 'output-19.txt',
-           os.sep + 'tmp' + os.sep + 'ambari-agent' + os.sep + 'errors-19.txt', override_output_files=False, retry=True)])
-
-  #retryable_command
-  @not_for_platform(PLATFORM_LINUX)
-  @patch("time.sleep")
-  @patch.object(OSCheck, "os_distribution", new=MagicMock(return_value=os_distro_value))
-  @patch.object(CustomServiceOrchestrator, "__init__")
-  def test_execute_retryable_command_fail_and_succeed(self, CustomServiceOrchestrator_mock,
-                                                      sleep_mock
-  ):
-    CustomServiceOrchestrator_mock.return_value = None
-    dummy_controller = MagicMock()
-    actionQueue = ActionQueue(AmbariConfig(), dummy_controller)
-    execution_result_fail_dict = {
-      'exitcode': 1,
-      'stdout': 'out',
-      'stderr': 'stderr',
-      'structuredOut': '',
-      'status': 'FAILED'
-    }
-    execution_result_succ_dict = {
-      'exitcode': 0,
-      'stdout': 'out',
-      'stderr': 'stderr',
-      'structuredOut': '',
-      'status': 'COMPLETED'
-    }
-
-    command = copy.deepcopy(self.retryable_command)
-    with patch.object(CustomServiceOrchestrator, "runCommand") as runCommand_mock:
-      runCommand_mock.side_effect = [execution_result_fail_dict, execution_result_succ_dict]
-      actionQueue.execute_command(command)
-
-    #assert that python executor start
-    self.assertTrue(runCommand_mock.called)
-    self.assertEqual(2, runCommand_mock.call_count)
-    self.assertEqual(1, sleep_mock.call_count)
-    sleep_mock.assert_any_call(2)
-
-  @not_for_platform(PLATFORM_LINUX)
-  @patch("time.sleep")
-  @patch.object(OSCheck, "os_distribution", new=MagicMock(return_value=os_distro_value))
-  @patch.object(CustomServiceOrchestrator, "__init__")
-  def test_execute_retryable_command_succeed(self, CustomServiceOrchestrator_mock,
-                                             sleep_mock
-  ):
-    CustomServiceOrchestrator_mock.return_value = None
-    dummy_controller = MagicMock()
-    actionQueue = ActionQueue(AmbariConfig(), dummy_controller)
-    execution_result_succ_dict = {
-      'exitcode': 0,
-      'stdout': 'out',
-      'stderr': 'stderr',
-      'structuredOut': '',
-      'status': 'COMPLETED'
-    }
-
-    command = copy.deepcopy(self.retryable_command)
-    with patch.object(CustomServiceOrchestrator, "runCommand") as runCommand_mock:
-      runCommand_mock.side_effect = [execution_result_succ_dict]
-      actionQueue.execute_command(command)
-
-    #assert that python executor start
-    self.assertTrue(runCommand_mock.called)
-    self.assertFalse(sleep_mock.called)
-    self.assertEqual(1, runCommand_mock.call_count)
-
-  @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
-  @patch.object(CustomServiceOrchestrator, "runCommand")
-  @patch.object(CustomServiceOrchestrator, "__init__")
-  def test_execute_background_command(self, CustomServiceOrchestrator_mock,
-                                  runCommand_mock,
-                                  ):
-    CustomServiceOrchestrator_mock.return_value = None
-    CustomServiceOrchestrator.runCommand.return_value = {'exitcode' : 0,
-                                                         'stdout': 'out-11',
-                                                         'stderr' : 'err-13'}
-    
-    dummy_controller = MagicMock()
-    actionQueue = ActionQueue(AmbariConfig(), dummy_controller)
-
-    execute_command = copy.deepcopy(self.background_command)
-    actionQueue.put([execute_command])
-    actionQueue.processBackgroundQueueSafeEmpty();
-    actionQueue.processStatusCommandQueueSafeEmpty();
-    
-    #assert that python execturor start
-    self.assertTrue(runCommand_mock.called)
-    runningCommand = actionQueue.commandStatuses.current_state.get(execute_command['taskId'])
-    self.assertTrue(runningCommand is not None)
-    self.assertEqual(runningCommand[1]['status'], ActionQueue.IN_PROGRESS_STATUS)
-    
-    report = actionQueue.result()
-    self.assertEqual(len(report['reports']),1)
-
-  @patch.object(CustomServiceOrchestrator, "get_py_executor")
-  @patch.object(CustomServiceOrchestrator, "resolve_script_path")
-  def test_execute_python_executor(self, resolve_script_path_mock,
-                                   get_py_executor_mock):
-    
-    dummy_controller = MagicMock()
-    cfg = AmbariConfig()
-    cfg.set('agent', 'tolerate_download_failures', 'true')
-    cfg.set('agent', 'prefix', '.')
-    cfg.set('agent', 'cache_dir', 'background_tasks')
-    
-    actionQueue = ActionQueue(cfg, dummy_controller)
-    pyex = PythonExecutor(actionQueue.customServiceOrchestrator.tmp_dir, actionQueue.customServiceOrchestrator.config)
-    patch_output_file(pyex)
-    get_py_executor_mock.return_value = pyex
-    actionQueue.customServiceOrchestrator.dump_command_to_json = MagicMock()
-   
-    result = {}
-    lock = threading.RLock()
-    complete_done = threading.Condition(lock)
-    
-    def command_complete_w(process_condensed_result, handle):
-      with lock:
-        result['command_complete'] = {'condensed_result' : copy.copy(process_condensed_result),
-                                      'handle' : copy.copy(handle),
-                                      'command_status' : actionQueue.commandStatuses.get_command_status(handle.command['taskId'])
-                                      }
-        complete_done.notifyAll()
-
-    actionQueue.on_background_command_complete_callback = wraped(actionQueue.on_background_command_complete_callback,
-                                                                 None, command_complete_w)
-    actionQueue.put([self.background_command])
-    actionQueue.processBackgroundQueueSafeEmpty();
-    actionQueue.processStatusCommandQueueSafeEmpty();
-    
-    with lock:
-      complete_done.wait(0.1)
-      
-      finished_status = result['command_complete']['command_status']
-      self.assertEqual(finished_status['status'], ActionQueue.COMPLETED_STATUS)
-      self.assertEqual(finished_status['stdout'], 'process_out')
-      self.assertEqual(finished_status['stderr'], 'process_err')
-      self.assertEqual(finished_status['exitCode'], 0)
-      
-    
-    runningCommand = actionQueue.commandStatuses.current_state.get(self.background_command['taskId'])
-    self.assertTrue(runningCommand is not None)
-    
-    report = actionQueue.result()
-    self.assertEqual(len(report['reports']),1)
-    self.assertEqual(report['reports'][0]['stdout'],'process_out')
-#    self.assertEqual(report['reports'][0]['structuredOut'],'{"a": "b."}')
-    
-    
-  
-  cancel_background_command = {
-    "commandType":"CANCEL_COMMAND",
-    "role":"AMBARI_SERVER_ACTION",
-    "roleCommand":"ABORT",
-    "commandId":"2--1",
-    "taskId":20,
-    "clusterName":"c1",
-    "serviceName":"",
-    "hostname":"c6401",
-    "roleParams":{
-      "cancelTaskIdTargets":"13,14"
-    },
-  }
-
-def patch_output_file(pythonExecutor):
-  def windows_py(command, tmpout, tmperr):
-    proc = MagicMock()
-    proc.pid = 33
-    proc.returncode = 0
-    with tmpout:
-      tmpout.write('process_out')
-    with tmperr:
-      tmperr.write('process_err')
-    return proc
-  def open_subprocess_files_win(fout, ferr, f):
-    return MagicMock(), MagicMock()
-  def read_result_from_files(out_path, err_path, structured_out_path):
-    return 'process_out', 'process_err', '{"a": "b."}'
-  pythonExecutor.launch_python_subprocess = windows_py
-  pythonExecutor.open_subprocess_files = open_subprocess_files_win
-  pythonExecutor.read_result_from_files = read_result_from_files
-
-def wraped(func, before = None, after = None):
-    def wrapper(*args, **kwargs):
-      if(before is not None):
-        before(*args, **kwargs)
-      ret =  func(*args, **kwargs)
-      if(after is not None):
-        after(*args, **kwargs)
-      return ret
-    return wrapper   
-