You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by sm...@apache.org on 2016/04/12 00:46:22 UTC

ambari git commit: AMBARI-15795. Parallel execution should only be allowed on commands that have auto retry enabled (smohanty)

Repository: ambari
Updated Branches:
  refs/heads/trunk ca7b901e7 -> 1a7c781ce


AMBARI-15795. Parallel execution should only be allowed on commands that have auto retry enabled (smohanty)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/1a7c781c
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/1a7c781c
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/1a7c781c

Branch: refs/heads/trunk
Commit: 1a7c781ce5340e332dc182c2c93d010cf0eec902
Parents: ca7b901
Author: Sumit Mohanty <sm...@hortonworks.com>
Authored: Mon Apr 11 15:45:53 2016 -0700
Committer: Sumit Mohanty <sm...@hortonworks.com>
Committed: Mon Apr 11 15:45:59 2016 -0700

----------------------------------------------------------------------
 .../src/main/python/ambari_agent/ActionQueue.py |   22 +-
 .../test/python/ambari_agent/TestActionQueue.py |   48 +-
 .../python/ambari_agent/TestActionQueue.py.orig | 1158 ++++++++++++++++++
 3 files changed, 1221 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/1a7c781c/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
index d603566..ccae62c 100644
--- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
+++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
@@ -156,11 +156,23 @@ class ActionQueue(threading.Thread):
           # commands using separate threads
           while (True):
             command = self.commandQueue.get(True, self.EXECUTION_COMMAND_WAIT_TIME)
-            logger.info("Kicking off a thread for the command, id=" +
-                        str(command['commandId']) + " taskId=" + str(command['taskId']))
-            t = threading.Thread(target=self.process_command, args=(command,))
-            t.daemon = True
-            t.start()
+            # If command is not retry_enabled then do not start them in parallel
+            # checking just one command is enough as all commands for a stage is sent
+            # at the same time and retry is only enabled for initial start/install
+            retryAble = False
+            if 'command_retry_enabled' in command['commandParams']:
+              retryAble = command['commandParams']['command_retry_enabled'] == "true"
+            if retryAble:
+              logger.info("Kicking off a thread for the command, id=" +
+                          str(command['commandId']) + " taskId=" + str(command['taskId']))
+              t = threading.Thread(target=self.process_command, args=(command,))
+              t.daemon = True
+              t.start()
+            else:
+              self.process_command(command)
+              break;
+            pass
+          pass
       except (Queue.Empty):
         pass
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/1a7c781c/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
index 8bd5ddc..2adf4ed 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
@@ -59,7 +59,26 @@ class TestActionQueue(TestCase):
     'serviceName': u'HDFS',
     'hostLevelParams': {},
     'configurations':{'global' : {}},
-    'configurationTags':{'global' : { 'tag': 'v1' }}
+    'configurationTags':{'global' : { 'tag': 'v1' }},
+    'commandParams': {
+      'command_retry_enabled': 'true'
+    }
+  }
+
+  datanode_install_no_retry_command = {
+    'commandType': 'EXECUTION_COMMAND',
+    'role': u'DATANODE',
+    'roleCommand': u'INSTALL',
+    'commandId': '1-1',
+    'taskId': 3,
+    'clusterName': u'cc',
+    'serviceName': u'HDFS',
+    'hostLevelParams': {},
+    'configurations':{'global' : {}},
+    'configurationTags':{'global' : { 'tag': 'v1' }},
+    'commandParams': {
+      'command_retry_enabled': 'false'
+    }
   }
 
   datanode_auto_start_command = {
@@ -124,8 +143,11 @@ class TestActionQueue(TestCase):
     'taskId': 7,
     'clusterName': u'cc',
     'serviceName': u'HDFS',
-    'hostLevelParams': {}
+    'hostLevelParams': {},
+    'commandParams': {
+      'command_retry_enabled': 'true'
     }
+  }
 
   status_command = {
     "serviceName" : 'HDFS',
@@ -883,6 +905,28 @@ class TestActionQueue(TestCase):
     self.assertEqual(2, process_command_mock.call_count)
     process_command_mock.assert_any_calls([call(self.datanode_install_command), call(self.hbase_install_command)])
 
+  @patch("threading.Thread")
+  @patch.object(AmbariConfig, "get_parallel_exec_option")
+  @patch.object(ActionQueue, "process_command")
+  @patch.object(CustomServiceOrchestrator, "__init__")
+  def test_parallel_exec_no_retry(self, CustomServiceOrchestrator_mock,
+                         process_command_mock, gpeo_mock, threading_mock):
+    CustomServiceOrchestrator_mock.return_value = None
+    dummy_controller = MagicMock()
+    config = MagicMock()
+    gpeo_mock.return_value = 1
+    config.get_parallel_exec_option = gpeo_mock
+    actionQueue = ActionQueue(config, dummy_controller)
+    actionQueue.put([self.datanode_install_no_retry_command, self.snamenode_install_command])
+    self.assertEqual(2, actionQueue.commandQueue.qsize())
+    actionQueue.start()
+    time.sleep(1)
+    actionQueue.stop()
+    actionQueue.join()
+    self.assertEqual(actionQueue.stopped(), True, 'Action queue is not stopped.')
+    self.assertEqual(1, process_command_mock.call_count)
+    self.assertEqual(0, threading_mock.call_count)
+    process_command_mock.assert_any_calls([call(self.datanode_install_command), call(self.hbase_install_command)])
 
   @not_for_platform(PLATFORM_LINUX)
   @patch("time.sleep")

http://git-wip-us.apache.org/repos/asf/ambari/blob/1a7c781c/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py.orig
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py.orig b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py.orig
new file mode 100644
index 0000000..8bd5ddc
--- /dev/null
+++ b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py.orig
@@ -0,0 +1,1158 @@
+#!/usr/bin/env python
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+from Queue import Queue
+
+from unittest import TestCase
+from ambari_agent.LiveStatus import LiveStatus
+from ambari_agent.ActionQueue import ActionQueue
+from ambari_agent.AmbariConfig import AmbariConfig
+import os, errno, time, pprint, tempfile, threading
+import sys
+from threading import Thread
+import copy
+
+from mock.mock import patch, MagicMock, call
+from ambari_agent.CustomServiceOrchestrator import CustomServiceOrchestrator
+from ambari_agent.PythonExecutor import PythonExecutor
+from ambari_agent.ActualConfigHandler import ActualConfigHandler
+from ambari_agent.RecoveryManager import RecoveryManager
+from ambari_commons import OSCheck
+from only_for_platform import not_for_platform, os_distro_value, PLATFORM_WINDOWS, PLATFORM_LINUX
+
+import logging
+
+class TestActionQueue(TestCase):
+  def setUp(self):
+    # save original open() method for later use
+    self.original_open = open
+
+
+  def tearDown(self):
+    sys.stdout = sys.__stdout__
+
+  logger = logging.getLogger()
+
+  datanode_install_command = {
+    'commandType': 'EXECUTION_COMMAND',
+    'role': u'DATANODE',
+    'roleCommand': u'INSTALL',
+    'commandId': '1-1',
+    'taskId': 3,
+    'clusterName': u'cc',
+    'serviceName': u'HDFS',
+    'hostLevelParams': {},
+    'configurations':{'global' : {}},
+    'configurationTags':{'global' : { 'tag': 'v1' }}
+  }
+
+  datanode_auto_start_command = {
+    'commandType': 'AUTO_EXECUTION_COMMAND',
+    'role': u'DATANODE',
+    'roleCommand': u'START',
+    'commandId': '1-1',
+    'taskId': 3,
+    'clusterName': u'cc',
+    'serviceName': u'HDFS',
+    'hostLevelParams': {},
+    'configurations':{'global' : {}},
+    'configurationTags':{'global' : { 'tag': 'v1' }}
+  }
+
+  datanode_upgrade_command = {
+      'commandId': 17,
+      'role' : "role",
+      'taskId' : "taskId",
+      'clusterName' : "clusterName",
+      'serviceName' : "serviceName",
+      'roleCommand' : 'UPGRADE',
+      'hostname' : "localhost.localdomain",
+      'hostLevelParams': {},
+      'clusterHostInfo': "clusterHostInfo",
+      'commandType': "EXECUTION_COMMAND",
+      'configurations':{'global' : {}},
+      'roleParams': {},
+      'commandParams' :	{
+        'source_stack_version' : 'HDP-1.2.1',
+        'target_stack_version' : 'HDP-1.3.0'
+      }
+    }
+
+  namenode_install_command = {
+    'commandType': 'EXECUTION_COMMAND',
+    'role': u'NAMENODE',
+    'roleCommand': u'INSTALL',
+    'commandId': '1-1',
+    'taskId': 4,
+    'clusterName': u'cc',
+    'serviceName': u'HDFS',
+    'hostLevelParams': {}
+    }
+
+  snamenode_install_command = {
+    'commandType': 'EXECUTION_COMMAND',
+    'role': u'SECONDARY_NAMENODE',
+    'roleCommand': u'INSTALL',
+    'commandId': '1-1',
+    'taskId': 5,
+    'clusterName': u'cc',
+    'serviceName': u'HDFS',
+    'hostLevelParams': {}
+    }
+
+  hbase_install_command = {
+    'commandType': 'EXECUTION_COMMAND',
+    'role': u'HBASE',
+    'roleCommand': u'INSTALL',
+    'commandId': '1-1',
+    'taskId': 7,
+    'clusterName': u'cc',
+    'serviceName': u'HDFS',
+    'hostLevelParams': {}
+    }
+
+  status_command = {
+    "serviceName" : 'HDFS',
+    "commandType" : "STATUS_COMMAND",
+    "clusterName" : "",
+    "componentName" : "DATANODE",
+    'configurations':{},
+    'hostLevelParams': {}
+  }
+
+  datanode_restart_command = {
+    'commandType': 'EXECUTION_COMMAND',
+    'role': u'DATANODE',
+    'roleCommand': u'CUSTOM_COMMAND',
+    'commandId': '1-1',
+    'taskId': 9,
+    'clusterName': u'cc',
+    'serviceName': u'HDFS',
+    'configurations':{'global' : {}},
+    'configurationTags':{'global' : { 'tag': 'v123' }},
+    'hostLevelParams':{'custom_command': 'RESTART', 'clientsToUpdateConfigs': []}
+  }
+
+  datanode_restart_command_no_clients_update = {
+    'commandType': 'EXECUTION_COMMAND',
+    'role': u'DATANODE',
+    'roleCommand': u'CUSTOM_COMMAND',
+    'commandId': '1-1',
+    'taskId': 9,
+    'clusterName': u'cc',
+    'serviceName': u'HDFS',
+    'configurations':{'global' : {}},
+    'configurationTags':{'global' : { 'tag': 'v123' }},
+    'hostLevelParams':{'custom_command': 'RESTART'}
+  }
+
+  status_command_for_alerts = {
+    "serviceName" : 'FLUME',
+    "commandType" : "STATUS_COMMAND",
+    "clusterName" : "",
+    "componentName" : "FLUME_HANDLER",
+    'configurations':{},
+    'hostLevelParams': {}
+  }
+
+  retryable_command = {
+    'commandType': 'EXECUTION_COMMAND',
+    'role': 'NAMENODE',
+    'roleCommand': 'INSTALL',
+    'commandId': '1-1',
+    'taskId': 19,
+    'clusterName': 'c1',
+    'serviceName': 'HDFS',
+    'configurations':{'global' : {}},
+    'configurationTags':{'global' : { 'tag': 'v123' }},
+    'commandParams' :  {
+      'script_type' : 'PYTHON',
+      'script' : 'script.py',
+      'command_timeout' : '600',
+      'jdk_location' : '.',
+      'service_package_folder' : '.',
+      'command_retry_enabled' : 'true',
+      'max_duration_for_retries' : '5'
+    },
+    'hostLevelParams' : {}
+  }
+
+  background_command = {
+    'commandType': 'BACKGROUND_EXECUTION_COMMAND',
+    'role': 'NAMENODE',
+    'roleCommand': 'CUSTOM_COMMAND',
+    'commandId': '1-1',
+    'taskId': 19,
+    'clusterName': 'c1',
+    'serviceName': 'HDFS',
+    'configurations':{'global' : {}},
+    'configurationTags':{'global' : { 'tag': 'v123' }},
+    'hostLevelParams':{'custom_command': 'REBALANCE_HDFS'},
+    'commandParams' :  {
+      'script_type' : 'PYTHON',
+      'script' : 'script.py',
+      'command_timeout' : '600',
+      'jdk_location' : '.',
+      'service_package_folder' : '.'
+      }
+  }
+  cancel_background_command = {
+    'commandType': 'EXECUTION_COMMAND',
+    'role': 'NAMENODE',
+    'roleCommand': 'ACTIONEXECUTE',
+    'commandId': '1-1',
+    'taskId': 20,
+    'clusterName': 'c1',
+    'serviceName': 'HDFS',
+    'configurations':{'global' : {}},
+    'configurationTags':{'global' : {}},
+    'hostLevelParams':{},
+    'commandParams' :  {
+      'script_type' : 'PYTHON',
+      'script' : 'cancel_background_task.py',
+      'before_system_hook_function' : 'fetch_bg_pid_by_taskid',
+      'jdk_location' : '.',
+      'command_timeout' : '600',
+      'service_package_folder' : '.',
+      'cancel_policy': 'SIGKILL',
+      'cancel_task_id': "19",
+      }
+  }
+
+
+  @patch.object(AmbariConfig, "get_parallel_exec_option")
+  @patch.object(ActionQueue, "process_command")
+  @patch.object(Queue, "get")
+  @patch.object(CustomServiceOrchestrator, "__init__")
+  def test_ActionQueueStartStop(self, CustomServiceOrchestrator_mock,
+                                get_mock, process_command_mock, get_parallel_exec_option_mock):
+    CustomServiceOrchestrator_mock.return_value = None
+    dummy_controller = MagicMock()
+    config = MagicMock()
+    get_parallel_exec_option_mock.return_value = 0
+    config.get_parallel_exec_option = get_parallel_exec_option_mock
+    actionQueue = ActionQueue(config, dummy_controller)
+    actionQueue.start()
+    time.sleep(0.1)
+    actionQueue.stop()
+    actionQueue.join()
+    self.assertEqual(actionQueue.stopped(), True, 'Action queue is not stopped.')
+    self.assertTrue(process_command_mock.call_count > 1)
+
+
+  @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
+  @patch("traceback.print_exc")
+  @patch.object(ActionQueue, "execute_command")
+  @patch.object(ActionQueue, "execute_status_command")
+  def test_process_command(self, execute_status_command_mock,
+                           execute_command_mock, print_exc_mock):
+    dummy_controller = MagicMock()
+    config = AmbariConfig()
+    config.set('agent', 'tolerate_download_failures', "true")
+    actionQueue = ActionQueue(config, dummy_controller)
+    execution_command = {
+      'commandType' : ActionQueue.EXECUTION_COMMAND,
+    }
+    status_command = {
+      'commandType' : ActionQueue.STATUS_COMMAND,
+    }
+    wrong_command = {
+      'commandType' : "SOME_WRONG_COMMAND",
+    }
+    # Try wrong command
+    actionQueue.process_command(wrong_command)
+    self.assertFalse(execute_command_mock.called)
+    self.assertFalse(execute_status_command_mock.called)
+    self.assertFalse(print_exc_mock.called)
+
+    execute_command_mock.reset_mock()
+    execute_status_command_mock.reset_mock()
+    print_exc_mock.reset_mock()
+    # Try normal execution
+    actionQueue.process_command(execution_command)
+    self.assertTrue(execute_command_mock.called)
+    self.assertFalse(execute_status_command_mock.called)
+    self.assertFalse(print_exc_mock.called)
+
+    execute_command_mock.reset_mock()
+    execute_status_command_mock.reset_mock()
+    print_exc_mock.reset_mock()
+
+    actionQueue.process_command(status_command)
+    self.assertFalse(execute_command_mock.called)
+    self.assertTrue(execute_status_command_mock.called)
+    self.assertFalse(print_exc_mock.called)
+
+    execute_command_mock.reset_mock()
+    execute_status_command_mock.reset_mock()
+    print_exc_mock.reset_mock()
+
+    # Try exception to check proper logging
+    def side_effect(self):
+      raise Exception("TerribleException")
+    execute_command_mock.side_effect = side_effect
+    actionQueue.process_command(execution_command)
+    self.assertTrue(print_exc_mock.called)
+
+    print_exc_mock.reset_mock()
+
+    execute_status_command_mock.side_effect = side_effect
+    actionQueue.process_command(execution_command)
+    self.assertTrue(print_exc_mock.called)
+
+  @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
+  @patch.object(CustomServiceOrchestrator, "runCommand")
+  @patch("CommandStatusDict.CommandStatusDict")
+  @patch.object(ActionQueue, "status_update_callback")
+  def test_log_execution_commands(self, status_update_callback_mock,
+                                  command_status_dict_mock,
+                                  cso_runCommand_mock):
+    custom_service_orchestrator_execution_result_dict = {
+        'stdout': 'out',
+        'stderr': 'stderr',
+        'structuredOut' : '',
+        'exitcode' : 0
+    }
+    cso_runCommand_mock.return_value = custom_service_orchestrator_execution_result_dict
+
+    config = AmbariConfig()
+    tempdir = tempfile.gettempdir()
+    config.set('agent', 'prefix', tempdir)
+    config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
+    config.set('agent', 'tolerate_download_failures', "true")
+    config.set('logging', 'log_command_executes', 1)
+    dummy_controller = MagicMock()
+    actionQueue = ActionQueue(config, dummy_controller)
+    actionQueue.execute_command(self.datanode_restart_command)
+    report = actionQueue.result()
+    expected = {'status': 'COMPLETED',
+                'configurationTags': {'global': {'tag': 'v123'}},
+                'stderr': 'stderr',
+                'stdout': 'out',
+                'clusterName': u'cc',
+                'structuredOut': '""',
+                'roleCommand': u'CUSTOM_COMMAND',
+                'serviceName': u'HDFS',
+                'role': u'DATANODE',
+                'actionId': '1-1',
+                'taskId': 9,
+                'customCommand': 'RESTART',
+                'exitCode': 0}
+    # Agent caches configurationTags if custom_command RESTART completed
+    self.assertEqual(len(report['reports']), 1)
+    self.assertEqual(expected, report['reports'][0])
+
+
+  @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
+  @patch("__builtin__.open")
+  @patch.object(ActionQueue, "status_update_callback")
+  def test_auto_execute_command(self, status_update_callback_mock, open_mock):
+    # Make file read calls visible
+    def open_side_effect(file, mode):
+      if mode == 'r':
+        file_mock = MagicMock()
+        file_mock.read.return_value = "Read from " + str(file)
+        return file_mock
+      else:
+        return self.original_open(file, mode)
+    open_mock.side_effect = open_side_effect
+
+    config = AmbariConfig()
+    tempdir = tempfile.gettempdir()
+    config.set('agent', 'prefix', tempdir)
+    config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
+    config.set('agent', 'tolerate_download_failures', "true")
+    dummy_controller = MagicMock()
+    dummy_controller.recovery_manager = RecoveryManager(tempfile.mktemp())
+    dummy_controller.recovery_manager.update_config(5, 5, 1, 11, True, False, "", -1)
+
+    actionQueue = ActionQueue(config, dummy_controller)
+    unfreeze_flag = threading.Event()
+    python_execution_result_dict = {
+      'stdout': 'out',
+      'stderr': 'stderr',
+      'structuredOut' : ''
+    }
+
+    def side_effect(command, tmpoutfile, tmperrfile, override_output_files=True, retry=False):
+      unfreeze_flag.wait()
+      return python_execution_result_dict
+    def patched_aq_execute_command(command):
+      # We have to perform patching for separate thread in the same thread
+      with patch.object(CustomServiceOrchestrator, "runCommand") as runCommand_mock:
+        runCommand_mock.side_effect = side_effect
+        actionQueue.process_command(command)
+
+    python_execution_result_dict['status'] = 'COMPLETE'
+    python_execution_result_dict['exitcode'] = 0
+    self.assertFalse(actionQueue.tasks_in_progress_or_pending())
+    # We call method in a separate thread
+    execution_thread = Thread(target = patched_aq_execute_command ,
+                              args = (self.datanode_auto_start_command, ))
+    execution_thread.start()
+    #  check in progress report
+    # wait until ready
+    while True:
+      time.sleep(0.1)
+      if actionQueue.tasks_in_progress_or_pending():
+        break
+    # Continue command execution
+    unfreeze_flag.set()
+    # wait until ready
+    check_queue = True
+    while check_queue:
+      report = actionQueue.result()
+      if not actionQueue.tasks_in_progress_or_pending():
+        break
+      time.sleep(0.1)
+
+    self.assertEqual(len(report['reports']), 0)
+
+    ## Test failed execution
+    python_execution_result_dict['status'] = 'FAILED'
+    python_execution_result_dict['exitcode'] = 13
+    # We call method in a separate thread
+    execution_thread = Thread(target = patched_aq_execute_command ,
+                              args = (self.datanode_auto_start_command, ))
+    execution_thread.start()
+    unfreeze_flag.set()
+    #  check in progress report
+    # wait until ready
+    while check_queue:
+      report = actionQueue.result()
+      if not actionQueue.tasks_in_progress_or_pending():
+        break
+      time.sleep(0.1)
+
+    self.assertEqual(len(report['reports']), 0)
+
+  @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
+  @patch("__builtin__.open")
+  @patch.object(ActionQueue, "status_update_callback")
+  def test_execute_command(self, status_update_callback_mock, open_mock):
+    # Make file read calls visible
+    def open_side_effect(file, mode):
+      if mode == 'r':
+        file_mock = MagicMock()
+        file_mock.read.return_value = "Read from " + str(file)
+        return file_mock
+      else:
+        return self.original_open(file, mode)
+    open_mock.side_effect = open_side_effect
+
+    config = AmbariConfig()
+    tempdir = tempfile.gettempdir()
+    config.set('agent', 'prefix', tempdir)
+    config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
+    config.set('agent', 'tolerate_download_failures', "true")
+    dummy_controller = MagicMock()
+    actionQueue = ActionQueue(config, dummy_controller)
+    unfreeze_flag = threading.Event()
+    python_execution_result_dict = {
+      'stdout': 'out',
+      'stderr': 'stderr',
+      'structuredOut' : ''
+      }
+
+    def side_effect(command, tmpoutfile, tmperrfile, override_output_files=True, retry=False):
+      unfreeze_flag.wait()
+      return python_execution_result_dict
+    def patched_aq_execute_command(command):
+      # We have to perform patching for separate thread in the same thread
+      with patch.object(CustomServiceOrchestrator, "runCommand") as runCommand_mock:
+          runCommand_mock.side_effect = side_effect
+          actionQueue.execute_command(command)
+    ### Test install/start/stop command ###
+    ## Test successful execution with configuration tags
+    python_execution_result_dict['status'] = 'COMPLETE'
+    python_execution_result_dict['exitcode'] = 0
+    # We call method in a separate thread
+    execution_thread = Thread(target = patched_aq_execute_command ,
+                              args = (self.datanode_install_command, ))
+    execution_thread.start()
+    #  check in progress report
+    # wait until ready
+    while True:
+      time.sleep(0.1)
+      report = actionQueue.result()
+      if len(report['reports']) != 0:
+        break
+    expected = {'status': 'IN_PROGRESS',
+                'stderr': 'Read from {0}'.format(os.path.join(tempdir, "errors-3.txt")),
+                'stdout': 'Read from {0}'.format(os.path.join(tempdir, "output-3.txt")),
+                'structuredOut' : 'Read from {0}'.format(os.path.join(tempdir, "structured-out-3.json")),
+                'clusterName': u'cc',
+                'roleCommand': u'INSTALL',
+                'serviceName': u'HDFS',
+                'role': u'DATANODE',
+                'actionId': '1-1',
+                'taskId': 3,
+                'exitCode': 777}
+    self.assertEqual(report['reports'][0], expected)
+    self.assertTrue(actionQueue.tasks_in_progress_or_pending())
+
+  # Continue command execution
+    unfreeze_flag.set()
+    # wait until ready
+    while report['reports'][0]['status'] == 'IN_PROGRESS':
+      time.sleep(0.1)
+      report = actionQueue.result()
+    # check report
+    configname = os.path.join(tempdir, 'config.json')
+    expected = {'status': 'COMPLETED',
+                'stderr': 'stderr',
+                'stdout': 'out',
+                'clusterName': u'cc',
+                'structuredOut': '""',
+                'roleCommand': u'INSTALL',
+                'serviceName': u'HDFS',
+                'role': u'DATANODE',
+                'actionId': '1-1',
+                'taskId': 3,
+                'configurationTags': {'global': {'tag': 'v1'}},
+                'exitCode': 0}
+    self.assertEqual(len(report['reports']), 1)
+    self.assertEqual(report['reports'][0], expected)
+    self.assertTrue(os.path.isfile(configname))
+    # Check that we had 2 status update calls ( IN_PROGRESS and COMPLETE)
+    self.assertEqual(status_update_callback_mock.call_count, 2)
+    os.remove(configname)
+
+    # now should not have reports (read complete/failed reports are deleted)
+    report = actionQueue.result()
+    self.assertEqual(len(report['reports']), 0)
+
+    ## Test failed execution
+    python_execution_result_dict['status'] = 'FAILED'
+    python_execution_result_dict['exitcode'] = 13
+    # We call method in a separate thread
+    execution_thread = Thread(target = patched_aq_execute_command ,
+                              args = (self.datanode_install_command, ))
+    execution_thread.start()
+    unfreeze_flag.set()
+    #  check in progress report
+    # wait until ready
+    report = actionQueue.result()
+    while len(report['reports']) == 0 or \
+                    report['reports'][0]['status'] == 'IN_PROGRESS':
+      time.sleep(0.1)
+      report = actionQueue.result()
+      # check report
+    expected = {'status': 'FAILED',
+                'stderr': 'stderr',
+                'stdout': 'out',
+                'clusterName': u'cc',
+                'structuredOut': '""',
+                'roleCommand': u'INSTALL',
+                'serviceName': u'HDFS',
+                'role': u'DATANODE',
+                'actionId': '1-1',
+                'taskId': 3,
+                'exitCode': 13}
+    self.assertEqual(len(report['reports']), 1)
+    self.assertEqual(report['reports'][0], expected)
+
+    # now should not have reports (read complete/failed reports are deleted)
+    report = actionQueue.result()
+    self.assertEqual(len(report['reports']), 0)
+
+    ### Test upgrade command ###
+    python_execution_result_dict['status'] = 'COMPLETE'
+    python_execution_result_dict['exitcode'] = 0
+    execution_thread = Thread(target = patched_aq_execute_command ,
+                              args = (self.datanode_upgrade_command, ))
+    execution_thread.start()
+    unfreeze_flag.set()
+    # wait until ready
+    report = actionQueue.result()
+    while len(report['reports']) == 0 or \
+                    report['reports'][0]['status'] == 'IN_PROGRESS':
+      time.sleep(0.1)
+      report = actionQueue.result()
+    # check report
+    expected = {'status': 'COMPLETED',
+                'stderr': 'stderr',
+                'stdout': 'out',
+                'clusterName': 'clusterName',
+                'structuredOut': '""',
+                'roleCommand': 'UPGRADE',
+                'serviceName': 'serviceName',
+                'role': 'role',
+                'actionId': 17,
+                'taskId': 'taskId',
+                'exitCode': 0}
+    self.assertEqual(len(report['reports']), 1)
+    self.assertEqual(report['reports'][0], expected)
+
+    # now should not have reports (read complete/failed reports are deleted)
+    report = actionQueue.result()
+    self.assertEqual(len(report['reports']), 0)
+
+
+  @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
+  @patch.object(CustomServiceOrchestrator, "runCommand")
+  @patch("CommandStatusDict.CommandStatusDict")
+  @patch.object(ActionQueue, "status_update_callback")
+  def test_store_configuration_tags(self, status_update_callback_mock,
+                                    command_status_dict_mock,
+                                    cso_runCommand_mock):
+    custom_service_orchestrator_execution_result_dict = {
+      'stdout': 'out',
+      'stderr': 'stderr',
+      'structuredOut' : '',
+      'exitcode' : 0
+    }
+    cso_runCommand_mock.return_value = custom_service_orchestrator_execution_result_dict
+
+    config = AmbariConfig()
+    tempdir = tempfile.gettempdir()
+    config.set('agent', 'prefix', tempdir)
+    config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
+    config.set('agent', 'tolerate_download_failures', "true")
+    dummy_controller = MagicMock()
+    actionQueue = ActionQueue(config, dummy_controller)
+    actionQueue.execute_command(self.datanode_restart_command)
+    report = actionQueue.result()
+    expected = {'status': 'COMPLETED',
+                'configurationTags': {'global': {'tag': 'v123'}},
+                'stderr': 'stderr',
+                'stdout': 'out',
+                'clusterName': u'cc',
+                'structuredOut': '""',
+                'roleCommand': u'CUSTOM_COMMAND',
+                'serviceName': u'HDFS',
+                'role': u'DATANODE',
+                'actionId': '1-1',
+                'taskId': 9,
+                'customCommand': 'RESTART',
+                'exitCode': 0}
+    # Agent caches configurationTags if custom_command RESTART completed
+    self.assertEqual(len(report['reports']), 1)
+    self.assertEqual(expected, report['reports'][0])
+
+  @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
+  @patch.object(ActualConfigHandler, "write_client_components")
+  @patch.object(CustomServiceOrchestrator, "runCommand")
+  @patch("CommandStatusDict.CommandStatusDict")
+  @patch.object(ActionQueue, "status_update_callback")
+  def test_store_configuration_tags_no_clients(self, status_update_callback_mock,
+                                    command_status_dict_mock,
+                                    cso_runCommand_mock, write_client_components_mock):
+    custom_service_orchestrator_execution_result_dict = {
+      'stdout': 'out',
+      'stderr': 'stderr',
+      'structuredOut' : '',
+      'exitcode' : 0
+    }
+    cso_runCommand_mock.return_value = custom_service_orchestrator_execution_result_dict
+
+    config = AmbariConfig()
+    tempdir = tempfile.gettempdir()
+    config.set('agent', 'prefix', tempdir)
+    config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
+    config.set('agent', 'tolerate_download_failures', "true")
+    dummy_controller = MagicMock()
+    actionQueue = ActionQueue(config, dummy_controller)
+    actionQueue.execute_command(self.datanode_restart_command_no_clients_update)
+    report = actionQueue.result()
+    expected = {'status': 'COMPLETED',
+                'configurationTags': {'global': {'tag': 'v123'}},
+                'stderr': 'stderr',
+                'stdout': 'out',
+                'clusterName': u'cc',
+                'structuredOut': '""',
+                'roleCommand': u'CUSTOM_COMMAND',
+                'serviceName': u'HDFS',
+                'role': u'DATANODE',
+                'actionId': '1-1',
+                'taskId': 9,
+                'customCommand': 'RESTART',
+                'exitCode': 0}
+    # Agent caches configurationTags if custom_command RESTART completed
+    self.assertEqual(len(report['reports']), 1)
+    self.assertEqual(expected, report['reports'][0])
+    self.assertFalse(write_client_components_mock.called)
+
+  @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
+  @patch.object(ActionQueue, "status_update_callback")
+  @patch.object(CustomServiceOrchestrator, "requestComponentStatus")
+  @patch.object(CustomServiceOrchestrator, "requestComponentSecurityState")
+  @patch.object(ActionQueue, "execute_command")
+  @patch.object(LiveStatus, "build")
+  @patch.object(CustomServiceOrchestrator, "__init__")
+  def test_execute_status_command(self, CustomServiceOrchestrator_mock,
+                                  build_mock, execute_command_mock, requestComponentSecurityState_mock,
+                                  requestComponentStatus_mock,
+                                  status_update_callback):
+    CustomServiceOrchestrator_mock.return_value = None
+    dummy_controller = MagicMock()
+    actionQueue = ActionQueue(AmbariConfig(), dummy_controller)
+
+    build_mock.return_value = {'dummy report': '' }
+
+    dummy_controller.recovery_manager = RecoveryManager(tempfile.mktemp())
+
+    requestComponentStatus_mock.reset_mock()
+    requestComponentStatus_mock.return_value = {'exitcode': 0 }
+
+    requestComponentSecurityState_mock.reset_mock()
+    requestComponentSecurityState_mock.return_value = 'UNKNOWN'
+
+    actionQueue.execute_status_command(self.status_command)
+    report = actionQueue.result()
+    expected = {'dummy report': '',
+                'securityState' : 'UNKNOWN'}
+
+    self.assertEqual(len(report['componentStatus']), 1)
+    self.assertEqual(report['componentStatus'][0], expected)
+    self.assertTrue(requestComponentStatus_mock.called)
+
+  @patch.object(RecoveryManager, "command_exists")
+  @patch.object(RecoveryManager, "requires_recovery")
+  @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
+  @patch.object(ActionQueue, "status_update_callback")
+  @patch.object(CustomServiceOrchestrator, "requestComponentStatus")
+  @patch.object(CustomServiceOrchestrator, "requestComponentSecurityState")
+  @patch.object(ActionQueue, "execute_command")
+  @patch.object(LiveStatus, "build")
+  @patch.object(CustomServiceOrchestrator, "__init__")
+  def test_execute_status_command_recovery(self, CustomServiceOrchestrator_mock,
+                                  build_mock, execute_command_mock, requestComponentSecurityState_mock,
+                                  requestComponentStatus_mock,
+                                  status_update_callback, requires_recovery_mock,
+                                  command_exists_mock):
+    CustomServiceOrchestrator_mock.return_value = None
+    dummy_controller = MagicMock()
+    actionQueue = ActionQueue(AmbariConfig(), dummy_controller)
+
+    build_mock.return_value = {'dummy report': '' }
+    requires_recovery_mock.return_value = True
+    command_exists_mock.return_value = False
+
+    dummy_controller.recovery_manager = RecoveryManager(tempfile.mktemp(), True, False)
+
+    requestComponentStatus_mock.reset_mock()
+    requestComponentStatus_mock.return_value = {'exitcode': 0 }
+
+    requestComponentSecurityState_mock.reset_mock()
+    requestComponentSecurityState_mock.return_value = 'UNKNOWN'
+
+    actionQueue.execute_status_command(self.status_command)
+    report = actionQueue.result()
+    expected = {'dummy report': '',
+                'securityState' : 'UNKNOWN',
+                'sendExecCmdDet': 'True'}
+
+    self.assertEqual(len(report['componentStatus']), 1)
+    self.assertEqual(report['componentStatus'][0], expected)
+    self.assertTrue(requestComponentStatus_mock.called)
+
+    requires_recovery_mock.return_value = True
+    command_exists_mock.return_value = True
+    requestComponentStatus_mock.reset_mock()
+    requestComponentStatus_mock.return_value = {'exitcode': 0 }
+
+    requestComponentSecurityState_mock.reset_mock()
+    requestComponentSecurityState_mock.return_value = 'UNKNOWN'
+
+    actionQueue.execute_status_command(self.status_command)
+    report = actionQueue.result()
+    expected = {'dummy report': '',
+                'securityState' : 'UNKNOWN',
+                'sendExecCmdDet': 'False'}
+
+    self.assertEqual(len(report['componentStatus']), 1)
+    self.assertEqual(report['componentStatus'][0], expected)
+    self.assertTrue(requestComponentStatus_mock.called)
+
+  @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
+  @patch.object(ActionQueue, "status_update_callback")
+  @patch.object(CustomServiceOrchestrator, "requestComponentStatus")
+  @patch.object(CustomServiceOrchestrator, "requestComponentSecurityState")
+  @patch.object(ActionQueue, "execute_command")
+  @patch.object(LiveStatus, "build")
+  @patch.object(CustomServiceOrchestrator, "__init__")
+  def test_execute_status_command_with_alerts(self, CustomServiceOrchestrator_mock,
+                                              requestComponentSecurityState_mock,
+                                  build_mock, execute_command_mock,
+                                  requestComponentStatus_mock,
+                                  status_update_callback):
+    CustomServiceOrchestrator_mock.return_value = None
+    dummy_controller = MagicMock()
+    actionQueue = ActionQueue(AmbariConfig(), dummy_controller)
+
+
+    requestComponentStatus_mock.reset_mock()
+    requestComponentStatus_mock.return_value = {
+      'exitcode': 0,
+      'stdout': 'out',
+      'stderr': 'err',
+      'structuredOut': {'alerts': [ {'name': 'flume_alert'} ] }
+    }
+    build_mock.return_value = {'somestatusresult': 'aresult'}
+
+    actionQueue.execute_status_command(self.status_command_for_alerts)
+
+    report = actionQueue.result()
+
+    self.assertTrue(requestComponentStatus_mock.called)
+    self.assertEqual(len(report['componentStatus']), 1)
+    self.assertTrue(report['componentStatus'][0].has_key('alerts'))
+
+  @patch.object(AmbariConfig, "get_parallel_exec_option")
+  @patch.object(ActionQueue, "process_command")
+  @patch.object(Queue, "get")
+  @patch.object(CustomServiceOrchestrator, "__init__")
+  def test_reset_queue(self, CustomServiceOrchestrator_mock,
+                                get_mock, process_command_mock, gpeo_mock):
+    CustomServiceOrchestrator_mock.return_value = None
+    dummy_controller = MagicMock()
+    dummy_controller.recovery_manager = RecoveryManager(tempfile.mktemp())
+    config = MagicMock()
+    gpeo_mock.return_value = 0
+    config.get_parallel_exec_option = gpeo_mock
+    actionQueue = ActionQueue(config, dummy_controller)
+    actionQueue.start()
+    actionQueue.put([self.datanode_install_command, self.hbase_install_command])
+    self.assertEqual(2, actionQueue.commandQueue.qsize())
+    self.assertTrue(actionQueue.tasks_in_progress_or_pending())
+    actionQueue.reset()
+    self.assertTrue(actionQueue.commandQueue.empty())
+    self.assertFalse(actionQueue.tasks_in_progress_or_pending())
+    time.sleep(0.1)
+    actionQueue.stop()
+    actionQueue.join()
+    self.assertEqual(actionQueue.stopped(), True, 'Action queue is not stopped.')
+
+  @patch.object(AmbariConfig, "get_parallel_exec_option")
+  @patch.object(ActionQueue, "process_command")
+  @patch.object(Queue, "get")
+  @patch.object(CustomServiceOrchestrator, "__init__")
+  def test_cancel(self, CustomServiceOrchestrator_mock,
+                       get_mock, process_command_mock, gpeo_mock):
+    CustomServiceOrchestrator_mock.return_value = None
+    dummy_controller = MagicMock()
+    config = MagicMock()
+    gpeo_mock.return_value = 0
+    config.get_parallel_exec_option = gpeo_mock
+    actionQueue = ActionQueue(config, dummy_controller)
+    actionQueue.start()
+    actionQueue.put([self.datanode_install_command, self.hbase_install_command])
+    self.assertEqual(2, actionQueue.commandQueue.qsize())
+    actionQueue.reset()
+    self.assertTrue(actionQueue.commandQueue.empty())
+    time.sleep(0.1)
+    actionQueue.stop()
+    actionQueue.join()
+    self.assertEqual(actionQueue.stopped(), True, 'Action queue is not stopped.')
+
+  @patch.object(AmbariConfig, "get_parallel_exec_option")
+  @patch.object(ActionQueue, "process_command")
+  @patch.object(CustomServiceOrchestrator, "__init__")
+  def test_parallel_exec(self, CustomServiceOrchestrator_mock,
+                         process_command_mock, gpeo_mock):
+    CustomServiceOrchestrator_mock.return_value = None
+    dummy_controller = MagicMock()
+    config = MagicMock()
+    gpeo_mock.return_value = 1
+    config.get_parallel_exec_option = gpeo_mock
+    actionQueue = ActionQueue(config, dummy_controller)
+    actionQueue.put([self.datanode_install_command, self.hbase_install_command])
+    self.assertEqual(2, actionQueue.commandQueue.qsize())
+    actionQueue.start()
+    time.sleep(1)
+    actionQueue.stop()
+    actionQueue.join()
+    self.assertEqual(actionQueue.stopped(), True, 'Action queue is not stopped.')
+    self.assertEqual(2, process_command_mock.call_count)
+    process_command_mock.assert_any_calls([call(self.datanode_install_command), call(self.hbase_install_command)])
+
+
+  @not_for_platform(PLATFORM_LINUX)
+  @patch("time.sleep")
+  @patch.object(OSCheck, "os_distribution", new=MagicMock(return_value=os_distro_value))
+  @patch.object(CustomServiceOrchestrator, "__init__")
+  def test_execute_retryable_command(self, CustomServiceOrchestrator_mock,
+                                     sleep_mock
+  ):
+    CustomServiceOrchestrator_mock.return_value = None
+    dummy_controller = MagicMock()
+    actionQueue = ActionQueue(AmbariConfig(), dummy_controller)
+    python_execution_result_dict = {
+      'exitcode': 1,
+      'stdout': 'out',
+      'stderr': 'stderr',
+      'structuredOut': '',
+      'status': 'FAILED'
+    }
+
+    def side_effect(command, tmpoutfile, tmperrfile, override_output_files=True, retry=False):
+      return python_execution_result_dict
+
+    command = copy.deepcopy(self.retryable_command)
+    with patch.object(CustomServiceOrchestrator, "runCommand") as runCommand_mock:
+      runCommand_mock.side_effect = side_effect
+      actionQueue.execute_command(command)
+
+    #assert that python executor start
+    self.assertTrue(runCommand_mock.called)
+    self.assertEqual(3, runCommand_mock.call_count)
+    self.assertEqual(2, sleep_mock.call_count)
+    sleep_mock.assert_has_calls([call(2), call(3)], False)
+    runCommand_mock.assert_has_calls([
+      call(command, os.sep + 'tmp' + os.sep + 'ambari-agent' + os.sep + 'output-19.txt',
+           os.sep + 'tmp' + os.sep + 'ambari-agent' + os.sep + 'errors-19.txt', override_output_files=True, retry=False),
+      call(command, os.sep + 'tmp' + os.sep + 'ambari-agent' + os.sep + 'output-19.txt',
+           os.sep + 'tmp' + os.sep + 'ambari-agent' + os.sep + 'errors-19.txt', override_output_files=False, retry=True),
+      call(command, os.sep + 'tmp' + os.sep + 'ambari-agent' + os.sep + 'output-19.txt',
+           os.sep + 'tmp' + os.sep + 'ambari-agent' + os.sep + 'errors-19.txt', override_output_files=False, retry=True)])
+
+
+  @patch("time.time")
+  @patch("time.sleep")
+  @patch.object(OSCheck, "os_distribution", new=MagicMock(return_value=os_distro_value))
+  @patch.object(CustomServiceOrchestrator, "__init__")
+  def test_execute_retryable_command_with_time_lapse(self, CustomServiceOrchestrator_mock,
+                                     sleep_mock, time_mock
+  ):
+    CustomServiceOrchestrator_mock.return_value = None
+    dummy_controller = MagicMock()
+    actionQueue = ActionQueue(AmbariConfig(), dummy_controller)
+    python_execution_result_dict = {
+      'exitcode': 1,
+      'stdout': 'out',
+      'stderr': 'stderr',
+      'structuredOut': '',
+      'status': 'FAILED'
+    }
+
+    times_arr = [8, 10, 14, 18, 22]
+    if self.logger.isEnabledFor(logging.INFO):
+      times_arr.insert(0, 4)
+    time_mock.side_effect = times_arr
+
+    def side_effect(command, tmpoutfile, tmperrfile, override_output_files=True, retry=False):
+      return python_execution_result_dict
+
+    command = copy.deepcopy(self.retryable_command)
+    with patch.object(CustomServiceOrchestrator, "runCommand") as runCommand_mock:
+      runCommand_mock.side_effect = side_effect
+      actionQueue.execute_command(command)
+
+    #assert that python executor start
+    self.assertTrue(runCommand_mock.called)
+    self.assertEqual(2, runCommand_mock.call_count)
+    self.assertEqual(1, sleep_mock.call_count)
+    sleep_mock.assert_has_calls([call(2)], False)
+    runCommand_mock.assert_has_calls([
+      call(command, os.sep + 'tmp' + os.sep + 'ambari-agent' + os.sep + 'output-19.txt',
+           os.sep + 'tmp' + os.sep + 'ambari-agent' + os.sep + 'errors-19.txt', override_output_files=True, retry=False),
+      call(command, os.sep + 'tmp' + os.sep + 'ambari-agent' + os.sep + 'output-19.txt',
+           os.sep + 'tmp' + os.sep + 'ambari-agent' + os.sep + 'errors-19.txt', override_output_files=False, retry=True)])
+
+  #retryable_command
+  @not_for_platform(PLATFORM_LINUX)
+  @patch("time.sleep")
+  @patch.object(OSCheck, "os_distribution", new=MagicMock(return_value=os_distro_value))
+  @patch.object(CustomServiceOrchestrator, "__init__")
+  def test_execute_retryable_command_fail_and_succeed(self, CustomServiceOrchestrator_mock,
+                                                      sleep_mock
+  ):
+    CustomServiceOrchestrator_mock.return_value = None
+    dummy_controller = MagicMock()
+    actionQueue = ActionQueue(AmbariConfig(), dummy_controller)
+    execution_result_fail_dict = {
+      'exitcode': 1,
+      'stdout': 'out',
+      'stderr': 'stderr',
+      'structuredOut': '',
+      'status': 'FAILED'
+    }
+    execution_result_succ_dict = {
+      'exitcode': 0,
+      'stdout': 'out',
+      'stderr': 'stderr',
+      'structuredOut': '',
+      'status': 'COMPLETED'
+    }
+
+    command = copy.deepcopy(self.retryable_command)
+    with patch.object(CustomServiceOrchestrator, "runCommand") as runCommand_mock:
+      runCommand_mock.side_effect = [execution_result_fail_dict, execution_result_succ_dict]
+      actionQueue.execute_command(command)
+
+    #assert that python executor start
+    self.assertTrue(runCommand_mock.called)
+    self.assertEqual(2, runCommand_mock.call_count)
+    self.assertEqual(1, sleep_mock.call_count)
+    sleep_mock.assert_any_call(2)
+
+  @not_for_platform(PLATFORM_LINUX)
+  @patch("time.sleep")
+  @patch.object(OSCheck, "os_distribution", new=MagicMock(return_value=os_distro_value))
+  @patch.object(CustomServiceOrchestrator, "__init__")
+  def test_execute_retryable_command_succeed(self, CustomServiceOrchestrator_mock,
+                                             sleep_mock
+  ):
+    CustomServiceOrchestrator_mock.return_value = None
+    dummy_controller = MagicMock()
+    actionQueue = ActionQueue(AmbariConfig(), dummy_controller)
+    execution_result_succ_dict = {
+      'exitcode': 0,
+      'stdout': 'out',
+      'stderr': 'stderr',
+      'structuredOut': '',
+      'status': 'COMPLETED'
+    }
+
+    command = copy.deepcopy(self.retryable_command)
+    with patch.object(CustomServiceOrchestrator, "runCommand") as runCommand_mock:
+      runCommand_mock.side_effect = [execution_result_succ_dict]
+      actionQueue.execute_command(command)
+
+    #assert that python executor start
+    self.assertTrue(runCommand_mock.called)
+    self.assertFalse(sleep_mock.called)
+    self.assertEqual(1, runCommand_mock.call_count)
+
+  @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
+  @patch.object(CustomServiceOrchestrator, "runCommand")
+  @patch.object(CustomServiceOrchestrator, "__init__")
+  def test_execute_background_command(self, CustomServiceOrchestrator_mock,
+                                  runCommand_mock,
+                                  ):
+    CustomServiceOrchestrator_mock.return_value = None
+    CustomServiceOrchestrator.runCommand.return_value = {'exitcode' : 0,
+                                                         'stdout': 'out-11',
+                                                         'stderr' : 'err-13'}
+    
+    dummy_controller = MagicMock()
+    actionQueue = ActionQueue(AmbariConfig(), dummy_controller)
+
+    execute_command = copy.deepcopy(self.background_command)
+    actionQueue.put([execute_command])
+    actionQueue.processBackgroundQueueSafeEmpty();
+    actionQueue.processStatusCommandQueueSafeEmpty();
+    
+    #assert that python execturor start
+    self.assertTrue(runCommand_mock.called)
+    runningCommand = actionQueue.commandStatuses.current_state.get(execute_command['taskId'])
+    self.assertTrue(runningCommand is not None)
+    self.assertEqual(runningCommand[1]['status'], ActionQueue.IN_PROGRESS_STATUS)
+    
+    report = actionQueue.result()
+    self.assertEqual(len(report['reports']),1)
+
+  @patch.object(CustomServiceOrchestrator, "get_py_executor")
+  @patch.object(CustomServiceOrchestrator, "resolve_script_path")
+  def test_execute_python_executor(self, resolve_script_path_mock,
+                                   get_py_executor_mock):
+    
+    dummy_controller = MagicMock()
+    cfg = AmbariConfig()
+    cfg.set('agent', 'tolerate_download_failures', 'true')
+    cfg.set('agent', 'prefix', '.')
+    cfg.set('agent', 'cache_dir', 'background_tasks')
+    
+    actionQueue = ActionQueue(cfg, dummy_controller)
+    pyex = PythonExecutor(actionQueue.customServiceOrchestrator.tmp_dir, actionQueue.customServiceOrchestrator.config)
+    patch_output_file(pyex)
+    get_py_executor_mock.return_value = pyex
+    actionQueue.customServiceOrchestrator.dump_command_to_json = MagicMock()
+   
+    result = {}
+    lock = threading.RLock()
+    complete_done = threading.Condition(lock)
+    
+    def command_complete_w(process_condensed_result, handle):
+      with lock:
+        result['command_complete'] = {'condensed_result' : copy.copy(process_condensed_result),
+                                      'handle' : copy.copy(handle),
+                                      'command_status' : actionQueue.commandStatuses.get_command_status(handle.command['taskId'])
+                                      }
+        complete_done.notifyAll()
+
+    actionQueue.on_background_command_complete_callback = wraped(actionQueue.on_background_command_complete_callback,
+                                                                 None, command_complete_w)
+    actionQueue.put([self.background_command])
+    actionQueue.processBackgroundQueueSafeEmpty();
+    actionQueue.processStatusCommandQueueSafeEmpty();
+    
+    with lock:
+      complete_done.wait(0.1)
+      
+      finished_status = result['command_complete']['command_status']
+      self.assertEqual(finished_status['status'], ActionQueue.COMPLETED_STATUS)
+      self.assertEqual(finished_status['stdout'], 'process_out')
+      self.assertEqual(finished_status['stderr'], 'process_err')
+      self.assertEqual(finished_status['exitCode'], 0)
+      
+    
+    runningCommand = actionQueue.commandStatuses.current_state.get(self.background_command['taskId'])
+    self.assertTrue(runningCommand is not None)
+    
+    report = actionQueue.result()
+    self.assertEqual(len(report['reports']),1)
+    self.assertEqual(report['reports'][0]['stdout'],'process_out')
+#    self.assertEqual(report['reports'][0]['structuredOut'],'{"a": "b."}')
+    
+    
+  
+  cancel_background_command = {
+    "commandType":"CANCEL_COMMAND",
+    "role":"AMBARI_SERVER_ACTION",
+    "roleCommand":"ABORT",
+    "commandId":"2--1",
+    "taskId":20,
+    "clusterName":"c1",
+    "serviceName":"",
+    "hostname":"c6401",
+    "roleParams":{
+      "cancelTaskIdTargets":"13,14"
+    },
+  }
+
+def patch_output_file(pythonExecutor):
+  def windows_py(command, tmpout, tmperr):
+    proc = MagicMock()
+    proc.pid = 33
+    proc.returncode = 0
+    with tmpout:
+      tmpout.write('process_out')
+    with tmperr:
+      tmperr.write('process_err')
+    return proc
+  def open_subprocess_files_win(fout, ferr, f):
+    return MagicMock(), MagicMock()
+  def read_result_from_files(out_path, err_path, structured_out_path):
+    return 'process_out', 'process_err', '{"a": "b."}'
+  pythonExecutor.launch_python_subprocess = windows_py
+  pythonExecutor.open_subprocess_files = open_subprocess_files_win
+  pythonExecutor.read_result_from_files = read_result_from_files
+
+def wraped(func, before = None, after = None):
+    def wrapper(*args, **kwargs):
+      if(before is not None):
+        before(*args, **kwargs)
+      ret =  func(*args, **kwargs)
+      if(after is not None):
+        after(*args, **kwargs)
+      return ret
+    return wrapper   
+