You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ao...@apache.org on 2018/01/22 16:12:35 UTC

[ambari] 03/04: AMBARI-22810. Rewrite TestActionQueue (aonishuk)

This is an automated email from the ASF dual-hosted git repository.

aonishuk pushed a commit to branch branch-3.0-perf
in repository https://gitbox.apache.org/repos/asf/ambari.git

commit de56e9bb5f07ad8f7eca198beaa689c367b98a5a
Author: Andrew Onishuk <ao...@hortonworks.com>
AuthorDate: Thu Jan 18 15:17:34 2018 +0200

    AMBARI-22810. Rewrite TestActionQueue (aonishuk)
---
 .../src/main/python/ambari_agent/ActionQueue.py    |  10 -
 .../test/python/ambari_agent/TestActionQueue.py    | 878 ++++++++++-----------
 2 files changed, 413 insertions(+), 475 deletions(-)

diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
index cba0a65..16c6c14 100644
--- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
+++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
@@ -471,16 +471,6 @@ class ActionQueue(threading.Thread):
 
     self.commandStatuses.put_command_status(handle.command, roleResult)
 
-  def execute_status_command_and_security_status(self, command):
-    component_status_result = self.customServiceOrchestrator.requestComponentStatus(command)
-    return command, component_status_result
-
-  def status_update_callback(self):
-    """
-    Actions that are executed every time when command status changes
-    """
-    self.controller.trigger_heartbeat()
-
   # Removes all commands from the queue
   def reset(self):
     queue = self.commandQueue
diff --git a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
index faa9b81..666d3c9 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
@@ -36,9 +36,15 @@ from ambari_agent.ActualConfigHandler import ActualConfigHandler
 from ambari_agent.RecoveryManager import RecoveryManager
 from ambari_commons import OSCheck
 from only_for_platform import not_for_platform, os_distro_value, PLATFORM_WINDOWS, PLATFORM_LINUX
+from ambari_agent.InitializerModule import InitializerModule
+
+from ambari_agent.CustomServiceOrchestrator import CustomServiceOrchestrator
+default_run_command = CustomServiceOrchestrator.runCommand
 
 import logging
 
+CLUSTER_ID = '0'
+
 class TestActionQueue(TestCase):
   def setUp(self):
     # save original open() method for later use
@@ -63,7 +69,8 @@ class TestActionQueue(TestCase):
     'configurationTags':{'global' : { 'tag': 'v1' }},
     'commandParams': {
       'command_retry_enabled': 'true'
-    }
+    },
+    'clusterId': CLUSTER_ID,
   }
 
   datanode_install_no_retry_command = {
@@ -79,7 +86,8 @@ class TestActionQueue(TestCase):
     'configurationTags':{'global' : { 'tag': 'v1' }},
     'commandParams': {
       'command_retry_enabled': 'false'
-    }
+    },
+    'clusterId': CLUSTER_ID,
   }
 
   datanode_auto_start_command = {
@@ -92,7 +100,8 @@ class TestActionQueue(TestCase):
     'serviceName': u'HDFS',
     'hostLevelParams': {},
     'configurations':{'global' : {}},
-    'configurationTags':{'global' : { 'tag': 'v1' }}
+    'configurationTags':{'global' : { 'tag': 'v1' }},
+    'clusterId': CLUSTER_ID,
   }
 
   datanode_upgrade_command = {
@@ -111,7 +120,8 @@ class TestActionQueue(TestCase):
       'commandParams' :	{
         'source_stack_version' : 'HDP-1.2.1',
         'target_stack_version' : 'HDP-1.3.0'
-      }
+      },
+      'clusterId': CLUSTER_ID,
     }
 
   namenode_install_command = {
@@ -122,7 +132,8 @@ class TestActionQueue(TestCase):
     'taskId': 4,
     'clusterName': u'cc',
     'serviceName': u'HDFS',
-    'hostLevelParams': {}
+    'hostLevelParams': {},
+    'clusterId': CLUSTER_ID,
     }
 
   snamenode_install_command = {
@@ -133,7 +144,8 @@ class TestActionQueue(TestCase):
     'taskId': 5,
     'clusterName': u'cc',
     'serviceName': u'HDFS',
-    'hostLevelParams': {}
+    'hostLevelParams': {},
+    'clusterId': CLUSTER_ID,
     }
 
   hbase_install_command = {
@@ -147,7 +159,8 @@ class TestActionQueue(TestCase):
     'hostLevelParams': {},
     'commandParams': {
       'command_retry_enabled': 'true'
-    }
+    },
+    'clusterId': CLUSTER_ID,
   }
 
   status_command = {
@@ -156,7 +169,8 @@ class TestActionQueue(TestCase):
     "clusterName" : "",
     "componentName" : "DATANODE",
     'configurations':{},
-    'hostLevelParams': {}
+    'hostLevelParams': {},
+    'clusterId': CLUSTER_ID,
   }
 
   datanode_restart_command = {
@@ -169,7 +183,8 @@ class TestActionQueue(TestCase):
     'serviceName': u'HDFS',
     'configurations':{'global' : {}},
     'configurationTags':{'global' : { 'tag': 'v123' }},
-    'hostLevelParams':{'custom_command': 'RESTART', 'clientsToUpdateConfigs': []}
+    'hostLevelParams':{'custom_command': 'RESTART', 'clientsToUpdateConfigs': []},
+    'clusterId': CLUSTER_ID,
   }
 
   datanode_restart_command_no_logging = {
@@ -185,7 +200,8 @@ class TestActionQueue(TestCase):
     'commandParams': {
       'log_output': 'false'
     },
-    'hostLevelParams': {'custom_command': 'RESTART', 'clientsToUpdateConfigs': []}
+    'hostLevelParams': {'custom_command': 'RESTART', 'clientsToUpdateConfigs': []},
+    'clusterId': CLUSTER_ID,
   }
 
   datanode_restart_command_no_clients_update = {
@@ -198,10 +214,12 @@ class TestActionQueue(TestCase):
     'serviceName': u'HDFS',
     'configurations':{'global' : {}},
     'configurationTags':{'global' : { 'tag': 'v123' }},
-    'hostLevelParams':{'custom_command': 'RESTART'}
+    'hostLevelParams':{'custom_command': 'RESTART'},
+    'clusterId': CLUSTER_ID,
   }
 
   datanode_start_custom_command = {
+    'clusterId': CLUSTER_ID,
     'commandType': 'EXECUTION_COMMAND',
     'role': u'DATANODE',
     'roleCommand': u'CUSTOM_COMMAND',
@@ -211,7 +229,8 @@ class TestActionQueue(TestCase):
     'serviceName': u'HDFS',
     'configurations':{'global' : {}},
     'configurationTags':{'global' : { 'tag': 'v123' }},
-    'hostLevelParams':{'custom_command': 'START'}
+    'hostLevelParams':{'custom_command': 'START'},
+    'clusterId': CLUSTER_ID,
   }
 
   yarn_refresh_queues_custom_command = {
@@ -225,7 +244,8 @@ class TestActionQueue(TestCase):
     'commandParams' : {'forceRefreshConfigTags' : 'capacity-scheduler'},
     'configurations':{'global' : {}},
     'configurationTags':{'global' : { 'tag': 'v123' }, 'capacity-scheduler' : {'tag': 'v123'}},
-    'hostLevelParams':{'custom_command': 'REFRESHQUEUES'}
+    'hostLevelParams':{'custom_command': 'REFRESHQUEUES'},
+    'clusterId': CLUSTER_ID,
   }
 
   status_command_for_alerts = {
@@ -234,7 +254,8 @@ class TestActionQueue(TestCase):
     "clusterName" : "",
     "componentName" : "FLUME_HANDLER",
     'configurations':{},
-    'hostLevelParams': {}
+    'hostLevelParams': {},
+    'clusterId': CLUSTER_ID,
   }
 
   retryable_command = {
@@ -256,7 +277,8 @@ class TestActionQueue(TestCase):
       'command_retry_enabled' : 'true',
       'max_duration_for_retries' : '5'
     },
-    'hostLevelParams' : {}
+    'hostLevelParams' : {},
+    'clusterId': CLUSTER_ID,
   }
 
   background_command = {
@@ -276,7 +298,8 @@ class TestActionQueue(TestCase):
       'command_timeout' : '600',
       'jdk_location' : '.',
       'service_package_folder' : '.'
-      }
+      },
+      'clusterId': CLUSTER_ID,
   }
   cancel_background_command = {
     'commandType': 'EXECUTION_COMMAND',
@@ -298,7 +321,8 @@ class TestActionQueue(TestCase):
       'service_package_folder' : '.',
       'cancel_policy': 'SIGKILL',
       'cancel_task_id': "19",
-      }
+      },
+      'clusterId': CLUSTER_ID,
   }
 
 
@@ -313,23 +337,31 @@ class TestActionQueue(TestCase):
     config = MagicMock()
     get_parallel_exec_option_mock.return_value = 0
     config.get_parallel_exec_option = get_parallel_exec_option_mock
-    actionQueue = ActionQueue(config, dummy_controller)
+    
+    initializer_module = InitializerModule()
+    initializer_module.init()
+    
+    actionQueue = ActionQueue(initializer_module)
     actionQueue.start()
     time.sleep(0.1)
-    actionQueue.stop()
+    initializer_module.stop_event.set()
     actionQueue.join()
-    self.assertEqual(actionQueue.stopped(), True, 'Action queue is not stopped.')
+    self.assertEqual(actionQueue.is_alive(), False, 'Action queue is not stopped.')
     self.assertTrue(process_command_mock.call_count > 1)
 
 
-  @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
+  @patch.object(OSCheck, "os_distribution", new=MagicMock(return_value=os_distro_value))
   @patch("logging.RootLogger.exception")
   @patch.object(ActionQueue, "execute_command")
   def test_process_command(self, execute_command_mock, log_exc_mock):
     dummy_controller = MagicMock()
     config = AmbariConfig()
     config.set('agent', 'tolerate_download_failures', "true")
-    actionQueue = ActionQueue(config, dummy_controller)
+    
+    initializer_module = InitializerModule()
+    initializer_module.init()
+    
+    actionQueue = ActionQueue(initializer_module)
     execution_command = {
       'commandType' : ActionQueue.EXECUTION_COMMAND,
     }
@@ -370,12 +402,10 @@ class TestActionQueue(TestCase):
     self.assertTrue(log_exc_mock.called)
 
   @patch.object(ActionQueue, "log_command_output")
-  @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
+  @patch.object(OSCheck, "os_distribution", new=MagicMock(return_value=os_distro_value))
   @patch.object(CustomServiceOrchestrator, "runCommand")
   @patch("CommandStatusDict.CommandStatusDict")
-  @patch.object(ActionQueue, "status_update_callback")
-  def test_log_execution_commands(self, status_update_callback_mock,
-                                  command_status_dict_mock,
+  def test_log_execution_commands(self, command_status_dict_mock,
                                   cso_runCommand_mock, mock_log_command_output):
     custom_service_orchestrator_execution_result_dict = {
         'stdout': 'out',
@@ -392,35 +422,36 @@ class TestActionQueue(TestCase):
     config.set('agent', 'tolerate_download_failures', "true")
     config.set('logging', 'log_command_executes', 1)
     dummy_controller = MagicMock()
-    actionQueue = ActionQueue(config, dummy_controller)
+    
+    initializer_module = InitializerModule()
+    initializer_module.init()
+    initializer_module.config = config
+    
+    actionQueue = ActionQueue(initializer_module)
     actionQueue.execute_command(self.datanode_restart_command)
-    report = actionQueue.result()
+    reports = actionQueue.commandStatuses.generate_report()[CLUSTER_ID]
     expected = {'status': 'COMPLETED',
-                'configurationTags': {'global': {'tag': 'v123'}},
                 'stderr': 'stderr',
                 'stdout': 'out\n\nCommand completed successfully!\n',
-                'clusterName': u'cc',
+                'clusterId': CLUSTER_ID,
                 'structuredOut': '""',
                 'roleCommand': u'CUSTOM_COMMAND',
                 'serviceName': u'HDFS',
                 'role': u'DATANODE',
                 'actionId': '1-1',
                 'taskId': 9,
-                'customCommand': 'RESTART',
                 'exitCode': 0}
     # Agent caches configurationTags if custom_command RESTART completed
     mock_log_command_output.assert_has_calls([call("out\n\nCommand completed successfully!\n", "9"), call("stderr", "9")], any_order=True)
-    self.assertEqual(len(report['reports']), 1)
-    self.assertEqual(expected, report['reports'][0])
+    self.assertEqual(len(reports), 1)
+    self.assertEqual(expected, reports[0])
 
 
   @patch.object(ActionQueue, "log_command_output")
   @patch.object(OSCheck, "os_distribution", new=MagicMock(return_value=os_distro_value))
   @patch.object(CustomServiceOrchestrator, "runCommand")
   @patch("CommandStatusDict.CommandStatusDict")
-  @patch.object(ActionQueue, "status_update_callback")
-  def test_do_not_log_execution_commands(self, status_update_callback_mock,
-                                         command_status_dict_mock,
+  def test_do_not_log_execution_commands(self, command_status_dict_mock,
                                          cso_runCommand_mock, mock_log_command_output):
     custom_service_orchestrator_execution_result_dict = {
       'stdout': 'out',
@@ -437,59 +468,62 @@ class TestActionQueue(TestCase):
     config.set('agent', 'tolerate_download_failures', "true")
     config.set('logging', 'log_command_executes', 1)
     dummy_controller = MagicMock()
-    actionQueue = ActionQueue(config, dummy_controller)
+    
+    initializer_module = InitializerModule()
+    initializer_module.init()
+    
+    actionQueue = ActionQueue(initializer_module)
     actionQueue.execute_command(self.datanode_restart_command_no_logging)
-    report = actionQueue.result()
+    reports = actionQueue.commandStatuses.generate_report()[CLUSTER_ID]
     expected = {'status': 'COMPLETED',
-                'configurationTags': {'global': {'tag': 'v123'}},
                 'stderr': 'stderr',
                 'stdout': 'out\n\nCommand completed successfully!\n',
-                'clusterName': u'cc',
+                'clusterId': CLUSTER_ID,
                 'structuredOut': '""',
                 'roleCommand': u'CUSTOM_COMMAND',
                 'serviceName': u'HDFS',
                 'role': u'DATANODE',
                 'actionId': '1-1',
                 'taskId': 9,
-                'customCommand': 'RESTART',
                 'exitCode': 0}
     # Agent caches configurationTags if custom_command RESTART completed
     mock_log_command_output.assert_not_called(
       [call("out\n\nCommand completed successfully!\n", "9"), call("stderr", "9")], any_order=True)
-    self.assertEqual(len(report['reports']), 1)
-    self.assertEqual(expected, report['reports'][0])
-
-
-  @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
-  @patch("__builtin__.open")
-  @patch.object(ActionQueue, "status_update_callback")
-  def test_auto_execute_command(self, status_update_callback_mock, open_mock):
-    # Make file read calls visible
-    def open_side_effect(file, mode):
-      if mode == 'r':
-        file_mock = MagicMock()
-        file_mock.read.return_value = "Read from " + str(file)
-        return file_mock
-      else:
-        return self.original_open(file, mode)
-    open_mock.side_effect = open_side_effect
+    self.assertEqual(len(reports), 1)
+    self.assertEqual(expected, reports[0])
+
 
+  @patch.object(OSCheck, "os_distribution", new=MagicMock(return_value=os_distro_value))
+  def test_auto_execute_command(self):
     config = AmbariConfig()
     tempdir = tempfile.gettempdir()
     config.set('agent', 'prefix', tempdir)
     config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
     config.set('agent', 'tolerate_download_failures', "true")
-    dummy_controller = MagicMock()
-    dummy_controller.recovery_manager = RecoveryManager(tempfile.mktemp())
-    dummy_controller.recovery_manager.update_config(5, 5, 1, 11, True, False, False, "", -1)
-
-    actionQueue = ActionQueue(config, dummy_controller)
-    unfreeze_flag = threading.Event()
-    python_execution_result_dict = {
-      'stdout': 'out',
-      'stderr': 'stderr',
-      'structuredOut' : ''
-    }
+    
+    initializer_module = InitializerModule()
+    initializer_module.init()
+    initializer_module.config = config
+    initializer_module.recovery_manager = RecoveryManager(tempfile.mktemp())
+    initializer_module.recovery_manager.update_config(5, 5, 1, 11, True, False, False, "")
+
+    with patch("__builtin__.open") as open_mock:
+      # Make file read calls visible
+      def open_side_effect(file, mode):
+        if mode == 'r':
+          file_mock = MagicMock()
+          file_mock.read.return_value = "Read from " + str(file)
+          return file_mock
+        else:
+          return self.original_open(file, mode)
+      open_mock.side_effect = open_side_effect
+      actionQueue = ActionQueue(initializer_module)
+      unfreeze_flag = threading.Event()
+      python_execution_result_dict = {
+        'stdout': 'out',
+        'stderr': 'stderr',
+        'structuredOut' : ''
+      }
 
     def side_effect(command, tmpoutfile, tmperrfile, override_output_files=True, retry=False):
       unfreeze_flag.wait()
@@ -504,206 +538,201 @@ class TestActionQueue(TestCase):
     python_execution_result_dict['exitcode'] = 0
     self.assertFalse(actionQueue.tasks_in_progress_or_pending())
     # We call method in a separate thread
-    execution_thread = Thread(target = patched_aq_execute_command ,
-                              args = (self.datanode_auto_start_command, ))
+    execution_thread = Thread(target=patched_aq_execute_command ,
+                              args=(self.datanode_auto_start_command,))
     execution_thread.start()
     #  check in progress report
     # wait until ready
     while True:
       time.sleep(0.1)
-      if actionQueue.tasks_in_progress_or_pending():
+      if actionQueue.commandStatuses.current_state:
         break
     # Continue command execution
     unfreeze_flag.set()
     # wait until ready
     check_queue = True
     while check_queue:
-      report = actionQueue.result()
-      if not actionQueue.tasks_in_progress_or_pending():
+      reports = actionQueue.commandStatuses.generate_report()[CLUSTER_ID]
+      if actionQueue.commandStatuses.current_state:
         break
       time.sleep(0.1)
 
-    self.assertEqual(len(report['reports']), 0)
+    self.assertEqual(len(reports), 0)
 
-    ## Test failed execution
+    # # Test failed execution
     python_execution_result_dict['status'] = 'FAILED'
     python_execution_result_dict['exitcode'] = 13
     # We call method in a separate thread
-    execution_thread = Thread(target = patched_aq_execute_command ,
-                              args = (self.datanode_auto_start_command, ))
+    execution_thread = Thread(target=patched_aq_execute_command ,
+                              args=(self.datanode_auto_start_command,))
     execution_thread.start()
     unfreeze_flag.set()
-    #  check in progress report
-    # wait until ready
-    while check_queue:
-      report = actionQueue.result()
-      if not actionQueue.tasks_in_progress_or_pending():
-        break
-      time.sleep(0.1)
-
-    self.assertEqual(len(report['reports']), 0)
-
-  @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
-  @patch("__builtin__.open")
-  @patch.object(ActionQueue, "status_update_callback")
-  def test_execute_command(self, status_update_callback_mock, open_mock):
-    # Make file read calls visible
-    def open_side_effect(file, mode):
-      if mode == 'r':
-        file_mock = MagicMock()
-        file_mock.read.return_value = "Read from " + str(file)
-        return file_mock
-      else:
-        return self.original_open(file, mode)
-    open_mock.side_effect = open_side_effect
 
+  @patch.object(OSCheck, "os_distribution", new=MagicMock(return_value=os_distro_value))
+  # @patch("__builtin__.open")
+  def test_execute_command(self):
     config = AmbariConfig()
     tempdir = tempfile.gettempdir()
     config.set('agent', 'prefix', tempdir)
     config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
     config.set('agent', 'tolerate_download_failures', "true")
-    dummy_controller = MagicMock()
-    actionQueue = ActionQueue(config, dummy_controller)
-    unfreeze_flag = threading.Event()
-    python_execution_result_dict = {
-      'stdout': 'out',
-      'stderr': 'stderr',
-      'structuredOut' : ''
-      }
-
-    def side_effect(command, tmpoutfile, tmperrfile, override_output_files=True, retry=False):
-      unfreeze_flag.wait()
-      return python_execution_result_dict
-    def patched_aq_execute_command(command):
-      # We have to perform patching for separate thread in the same thread
-      with patch.object(CustomServiceOrchestrator, "runCommand") as runCommand_mock:
-          runCommand_mock.side_effect = side_effect
-          actionQueue.execute_command(command)
-    ### Test install/start/stop command ###
-    ## Test successful execution with configuration tags
-    python_execution_result_dict['status'] = 'COMPLETE'
-    python_execution_result_dict['exitcode'] = 0
-    # We call method in a separate thread
-    execution_thread = Thread(target = patched_aq_execute_command ,
-                              args = (self.datanode_install_command, ))
-    execution_thread.start()
-    #  check in progress report
-    # wait until ready
-    while True:
-      time.sleep(0.1)
-      report = actionQueue.result()
-      if len(report['reports']) != 0:
-        break
-    expected = {'status': 'IN_PROGRESS',
-                'stderr': 'Read from {0}'.format(os.path.join(tempdir, "errors-3.txt")),
-                'stdout': 'Read from {0}'.format(os.path.join(tempdir, "output-3.txt")),
-                'structuredOut' : 'Read from {0}'.format(os.path.join(tempdir, "structured-out-3.json")),
-                'clusterName': u'cc',
-                'roleCommand': u'INSTALL',
-                'serviceName': u'HDFS',
-                'role': u'DATANODE',
-                'actionId': '1-1',
-                'taskId': 3,
-                'exitCode': 777}
-    self.assertEqual(report['reports'][0], expected)
-    self.assertTrue(actionQueue.tasks_in_progress_or_pending())
-
-  # Continue command execution
-    unfreeze_flag.set()
-    # wait until ready
-    while report['reports'][0]['status'] == 'IN_PROGRESS':
-      time.sleep(0.1)
-      report = actionQueue.result()
-    # check report
-    configname = os.path.join(tempdir, 'config.json')
-    expected = {'status': 'COMPLETED',
-                'stderr': 'stderr',
-                'stdout': 'out\n\nCommand completed successfully!\n',
-                'clusterName': u'cc',
-                'structuredOut': '""',
-                'roleCommand': u'INSTALL',
-                'serviceName': u'HDFS',
-                'role': u'DATANODE',
-                'actionId': '1-1',
-                'taskId': 3,
-                'configurationTags': {'global': {'tag': 'v1'}},
-                'exitCode': 0}
-    self.assertEqual(len(report['reports']), 1)
-    self.assertEqual(report['reports'][0], expected)
-    self.assertTrue(os.path.isfile(configname))
-    # Check that we had 2 status update calls ( IN_PROGRESS and COMPLETE)
-    self.assertEqual(status_update_callback_mock.call_count, 2)
-    os.remove(configname)
-
-    # now should not have reports (read complete/failed reports are deleted)
-    report = actionQueue.result()
-    self.assertEqual(len(report['reports']), 0)
-
-    ## Test failed execution
-    python_execution_result_dict['status'] = 'FAILED'
-    python_execution_result_dict['exitcode'] = 13
-    # We call method in a separate thread
-    execution_thread = Thread(target = patched_aq_execute_command ,
-                              args = (self.datanode_install_command, ))
-    execution_thread.start()
-    unfreeze_flag.set()
-    #  check in progress report
-    # wait until ready
-    report = actionQueue.result()
-    while len(report['reports']) == 0 or \
-                    report['reports'][0]['status'] == 'IN_PROGRESS':
-      time.sleep(0.1)
-      report = actionQueue.result()
+    
+    initializer_module = InitializerModule()
+    initializer_module.init()
+    initializer_module.config = config
+    
+    with patch("__builtin__.open") as open_mock:
+      # Make file read calls visible
+      def open_side_effect(file, mode):
+        if mode == 'r':
+          file_mock = MagicMock()
+          file_mock.read.return_value = "Read from " + str(file)
+          return file_mock
+        else:
+          return self.original_open(file, mode)
+      open_mock.side_effect = open_side_effect
+      
+      actionQueue = ActionQueue(initializer_module)
+      unfreeze_flag = threading.Event()
+      python_execution_result_dict = {
+        'stdout': 'out',
+        'stderr': 'stderr',
+        'structuredOut' : ''
+        }
+  
+      def side_effect(command, tmpoutfile, tmperrfile, override_output_files=True, retry=False):
+        unfreeze_flag.wait()
+        return python_execution_result_dict
+      def patched_aq_execute_command(command):
+        # We have to perform patching for separate thread in the same thread
+        with patch.object(CustomServiceOrchestrator, "runCommand") as runCommand_mock:
+            runCommand_mock.side_effect = side_effect
+            actionQueue.execute_command(command)
+      ### Test install/start/stop command ###
+      # # Test successful execution with configuration tags
+      python_execution_result_dict['status'] = 'COMPLETE'
+      python_execution_result_dict['exitcode'] = 0
+      # We call method in a separate thread
+      execution_thread = Thread(target=patched_aq_execute_command ,
+                                args=(self.datanode_install_command,))
+      execution_thread.start()
+      #  check in progress report
+      # wait until ready
+      while True:
+        time.sleep(0.1)
+        reports = actionQueue.commandStatuses.generate_report()[CLUSTER_ID]
+        if len(reports) != 0:
+          break
+      expected = {'status': 'IN_PROGRESS',
+                  'stderr': 'Read from {0}'.format(os.path.join(tempdir, "errors-3.txt")),
+                  'stdout': 'Read from {0}'.format(os.path.join(tempdir, "output-3.txt")),
+                  'structuredOut' : 'Read from {0}'.format(os.path.join(tempdir, "structured-out-3.json")),
+                  'clusterId': CLUSTER_ID,
+                  'roleCommand': u'INSTALL',
+                  'serviceName': u'HDFS',
+                  'role': u'DATANODE',
+                  'actionId': '1-1',
+                  'taskId': 3,
+                  'exitCode': 777}
+      self.assertEqual(reports[0], expected)
+  
+    # Continue command execution
+      unfreeze_flag.set()
+      # wait until ready
+      while reports[0]['status'] == 'IN_PROGRESS':
+        time.sleep(0.1)
+        reports = actionQueue.commandStatuses.generate_report()[CLUSTER_ID]
+  
       # check report
-    expected = {'status': 'FAILED',
-                'stderr': 'stderr',
-                'stdout': 'out\n\nCommand completed successfully!\n\n\nCommand failed after 1 tries\n',
-                'clusterName': u'cc',
-                'structuredOut': '""',
-                'roleCommand': u'INSTALL',
-                'serviceName': u'HDFS',
-                'role': u'DATANODE',
-                'actionId': '1-1',
-                'taskId': 3,
-                'exitCode': 13}
-    self.assertEqual(len(report['reports']), 1)
-    self.assertEqual(report['reports'][0], expected)
-
-    # now should not have reports (read complete/failed reports are deleted)
-    report = actionQueue.result()
-    self.assertEqual(len(report['reports']), 0)
-
-    ### Test upgrade command ###
-    python_execution_result_dict['status'] = 'COMPLETE'
-    python_execution_result_dict['exitcode'] = 0
-    execution_thread = Thread(target = patched_aq_execute_command ,
-                              args = (self.datanode_upgrade_command, ))
-    execution_thread.start()
-    unfreeze_flag.set()
-    # wait until ready
-    report = actionQueue.result()
-    while len(report['reports']) == 0 or \
-                    report['reports'][0]['status'] == 'IN_PROGRESS':
-      time.sleep(0.1)
-      report = actionQueue.result()
-    # check report
-    expected = {'status': 'COMPLETED',
-                'stderr': 'stderr',
-                'stdout': 'out\n\nCommand completed successfully!\n\n\nCommand failed after 1 tries\n\n\nCommand completed successfully!\n',
-                'clusterName': 'clusterName',
-                'structuredOut': '""',
-                'roleCommand': 'UPGRADE',
-                'serviceName': 'serviceName',
-                'role': 'role',
-                'actionId': 17,
-                'taskId': 'taskId',
-                'exitCode': 0}
-    self.assertEqual(len(report['reports']), 1)
-    self.assertEqual(report['reports'][0], expected)
-
-    # now should not have reports (read complete/failed reports are deleted)
-    report = actionQueue.result()
-    self.assertEqual(len(report['reports']), 0)
+      expected = {'status': 'COMPLETED',
+                  'stderr': 'stderr',
+                  'stdout': 'out\n\nCommand completed successfully!\n',
+                  'clusterId': CLUSTER_ID,
+                  'structuredOut': '""',
+                  'roleCommand': u'INSTALL',
+                  'serviceName': u'HDFS',
+                  'role': u'DATANODE',
+                  'actionId': '1-1',
+                  'taskId': 3,
+                  'exitCode': 0}
+      self.assertEqual(len(reports), 1)
+      self.assertEqual(reports[0], expected)
+  
+      # now should not have reports (read complete/failed reports are deleted)
+      actionQueue.commandStatuses.clear_reported_reports()
+      reports = actionQueue.commandStatuses.generate_report()[CLUSTER_ID]
+      self.assertEqual(len(reports), 0)
+  
+      # # Test failed execution
+      python_execution_result_dict['status'] = 'FAILED'
+      python_execution_result_dict['exitcode'] = 13
+      # We call method in a separate thread
+      execution_thread = Thread(target=patched_aq_execute_command ,
+                                args=(self.datanode_install_command,))
+      execution_thread.start()
+      unfreeze_flag.set()
+      #  check in progress report
+      # wait until ready
+      reports = actionQueue.commandStatuses.generate_report()[CLUSTER_ID]
+      while len(reports) == 0 or \
+                      reports[0]['status'] == 'IN_PROGRESS':
+        time.sleep(0.1)
+        reports = actionQueue.commandStatuses.generate_report()[CLUSTER_ID]
+        actionQueue.commandStatuses.clear_reported_reports()
+        
+        # check report
+      expected = {'status': 'FAILED',
+                  'stderr': 'stderr',
+                  'stdout': 'out\n\nCommand completed successfully!\n\n\nCommand failed after 1 tries\n',
+                  'clusterId': CLUSTER_ID,
+                  'structuredOut': '""',
+                  'roleCommand': u'INSTALL',
+                  'serviceName': u'HDFS',
+                  'role': u'DATANODE',
+                  'actionId': '1-1',
+                  'taskId': 3,
+                  'exitCode': 13}
+      self.assertEqual(len(reports), 1)
+      self.assertEqual(reports[0], expected)
+  
+      # now should not have reports (read complete/failed reports are deleted)
+      actionQueue.commandStatuses.clear_reported_reports()
+      reports = actionQueue.commandStatuses.generate_report()[CLUSTER_ID]
+      self.assertEqual(len(reports), 0)
+  
+      ### Test upgrade command ###
+      python_execution_result_dict['status'] = 'COMPLETE'
+      python_execution_result_dict['exitcode'] = 0
+      execution_thread = Thread(target=patched_aq_execute_command ,
+                                args=(self.datanode_upgrade_command,))
+      execution_thread.start()
+      unfreeze_flag.set()
+      # wait until ready
+      report = actionQueue.commandStatuses.generate_report()[CLUSTER_ID]
+      while len(reports) == 0 or \
+                      reports[0]['status'] == 'IN_PROGRESS':
+        time.sleep(0.1)
+        reports = actionQueue.commandStatuses.generate_report()[CLUSTER_ID]
+        actionQueue.commandStatuses.clear_reported_reports()
+      # check report
+      expected = {'status': 'COMPLETED',
+                  'stderr': 'stderr',
+                  'stdout': 'out\n\nCommand completed successfully!\n\n\nCommand failed after 1 tries\n\n\nCommand completed successfully!\n',
+                  'clusterId': CLUSTER_ID,
+                  'structuredOut': '""',
+                  'roleCommand': 'UPGRADE',
+                  'serviceName': 'serviceName',
+                  'role': 'role',
+                  'actionId': 17,
+                  'taskId': 'taskId',
+                  'exitCode': 0}
+      self.assertEqual(len(reports), 1)
+      self.assertEqual(reports[0], expected)
+  
+      # now should not have reports (read complete/failed reports are deleted)
+      actionQueue.commandStatuses.clear_reported_reports()
+      reports = actionQueue.commandStatuses.generate_report()[CLUSTER_ID]
+      self.assertEqual(len(reports), 0)
 
   def test_cancel_with_reschedule_command(self):
     config = AmbariConfig()
@@ -712,14 +741,18 @@ class TestActionQueue(TestCase):
     config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
     config.set('agent', 'tolerate_download_failures', "true")
     dummy_controller = MagicMock()
-    actionQueue = ActionQueue(config, dummy_controller)
+    
+    initializer_module = InitializerModule()
+    initializer_module.init()
+    
+    actionQueue = ActionQueue(initializer_module)
     unfreeze_flag = threading.Event()
     python_execution_result_dict = {
       'stdout': 'out',
       'stderr': 'stderr',
       'structuredOut' : '',
       'status' : '',
-      'exitcode' : -signal.SIGTERM
+      'exitcode' :-signal.SIGTERM
     }
 
     def side_effect(command, tmpoutfile, tmperrfile, override_output_files=True, retry=False):
@@ -732,33 +765,22 @@ class TestActionQueue(TestCase):
         actionQueue.execute_command(command)
 
     # We call method in a separate thread
-    execution_thread = Thread(target = patched_aq_execute_command ,
-                              args = (self.datanode_install_command, ))
+    execution_thread = Thread(target=patched_aq_execute_command ,
+                              args=(self.datanode_install_command,))
     execution_thread.start()
     #  check in progress report
     # wait until ready
     while True:
       time.sleep(0.1)
-      report = actionQueue.result()
-      if len(report['reports']) != 0:
+      reports = actionQueue.commandStatuses.generate_report()[CLUSTER_ID]
+      if len(reports) != 0:
         break
 
-    unfreeze_flag.set()
-    # wait until ready
-    while len(report['reports']) != 0:
-      time.sleep(0.1)
-      report = actionQueue.result()
-
-    # check report
-    self.assertEqual(len(report['reports']), 0)
-
 
-  @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
+  @patch.object(OSCheck, "os_distribution", new=MagicMock(return_value=os_distro_value))
   @patch.object(CustomServiceOrchestrator, "runCommand")
   @patch("CommandStatusDict.CommandStatusDict")
-  @patch.object(ActionQueue, "status_update_callback")
-  def test_store_configuration_tags(self, status_update_callback_mock,
-                                    command_status_dict_mock,
+  def test_store_configuration_tags(self, command_status_dict_mock,
                                     cso_runCommand_mock):
     custom_service_orchestrator_execution_result_dict = {
       'stdout': 'out',
@@ -774,33 +796,33 @@ class TestActionQueue(TestCase):
     config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
     config.set('agent', 'tolerate_download_failures', "true")
     dummy_controller = MagicMock()
-    actionQueue = ActionQueue(config, dummy_controller)
+    
+    initializer_module = InitializerModule()
+    initializer_module.init()
+    
+    actionQueue = ActionQueue(initializer_module)
     actionQueue.execute_command(self.datanode_restart_command)
-    report = actionQueue.result()
+    reports = actionQueue.commandStatuses.generate_report()[CLUSTER_ID]
     expected = {'status': 'COMPLETED',
-                'configurationTags': {'global': {'tag': 'v123'}},
                 'stderr': 'stderr',
                 'stdout': 'out\n\nCommand completed successfully!\n',
-                'clusterName': u'cc',
                 'structuredOut': '""',
                 'roleCommand': u'CUSTOM_COMMAND',
                 'serviceName': u'HDFS',
                 'role': u'DATANODE',
                 'actionId': '1-1',
                 'taskId': 9,
-                'customCommand': 'RESTART',
+                'clusterId': CLUSTER_ID,
                 'exitCode': 0}
     # Agent caches configurationTags if custom_command RESTART completed
-    self.assertEqual(len(report['reports']), 1)
-    self.assertEqual(expected, report['reports'][0])
+    self.assertEqual(len(reports), 1)
+    self.assertEqual(expected, reports[0])
 
-  @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
+  @patch.object(OSCheck, "os_distribution", new=MagicMock(return_value=os_distro_value))
   @patch.object(ActualConfigHandler, "write_client_components")
   @patch.object(CustomServiceOrchestrator, "runCommand")
   @patch("CommandStatusDict.CommandStatusDict")
-  @patch.object(ActionQueue, "status_update_callback")
-  def test_store_configuration_tags_no_clients(self, status_update_callback_mock,
-                                    command_status_dict_mock,
+  def test_store_configuration_tags_no_clients(self, command_status_dict_mock,
                                     cso_runCommand_mock, write_client_components_mock):
     custom_service_orchestrator_execution_result_dict = {
       'stdout': 'out',
@@ -816,37 +838,33 @@ class TestActionQueue(TestCase):
     config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
     config.set('agent', 'tolerate_download_failures', "true")
     dummy_controller = MagicMock()
-    actionQueue = ActionQueue(config, dummy_controller)
+    
+    initializer_module = InitializerModule()
+    initializer_module.init()
+    
+    actionQueue = ActionQueue(initializer_module)
     actionQueue.execute_command(self.datanode_restart_command_no_clients_update)
-    report = actionQueue.result()
+    reports = actionQueue.commandStatuses.generate_report()[CLUSTER_ID]
     expected = {'status': 'COMPLETED',
-                'configurationTags': {'global': {'tag': 'v123'}},
                 'stderr': 'stderr',
                 'stdout': 'out\n\nCommand completed successfully!\n',
-                'clusterName': u'cc',
+                'clusterId': CLUSTER_ID,
                 'structuredOut': '""',
                 'roleCommand': u'CUSTOM_COMMAND',
                 'serviceName': u'HDFS',
                 'role': u'DATANODE',
                 'actionId': '1-1',
                 'taskId': 9,
-                'customCommand': 'RESTART',
                 'exitCode': 0}
     # Agent caches configurationTags if custom_command RESTART completed
-    self.assertEqual(len(report['reports']), 1)
-    self.assertEqual(expected, report['reports'][0])
-    self.assertFalse(write_client_components_mock.called)
+    self.assertEqual(len(reports), 1)
+    self.assertEqual(expected, reports[0])
 
-  @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
-  @patch.object(ActualConfigHandler, "write_client_components")
-  @patch.object(ActualConfigHandler, "write_actual_component")
-  @patch.object(ActualConfigHandler, "update_component_tag")
+  @patch.object(OSCheck, "os_distribution", new=MagicMock(return_value=os_distro_value))
   @patch.object(CustomServiceOrchestrator, "runCommand")
   @patch("CommandStatusDict.CommandStatusDict")
-  @patch.object(ActionQueue, "status_update_callback")
-  def test_refresh_queues_custom_command(self, status_update_callback_mock,
-                                                            command_status_dict_mock,
-                                                            cso_runCommand_mock, update_component_tag, write_actual_component_mock, write_client_components_mock):
+  def test_refresh_queues_custom_command(self, command_status_dict_mock,
+                                                            cso_runCommand_mock):
     custom_service_orchestrator_execution_result_dict = {
       'stdout': 'out',
       'stderr': 'stderr',
@@ -861,38 +879,33 @@ class TestActionQueue(TestCase):
     config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
     config.set('agent', 'tolerate_download_failures', "true")
     dummy_controller = MagicMock()
-    actionQueue = ActionQueue(config, dummy_controller)
+    
+    initializer_module = InitializerModule()
+    initializer_module.init()
+    
+    actionQueue = ActionQueue(initializer_module)
     actionQueue.execute_command(self.yarn_refresh_queues_custom_command)
 
-    report = actionQueue.result()
+    reports = actionQueue.commandStatuses.generate_report()[CLUSTER_ID]
     expected = {'status': 'COMPLETED',
-                'configurationTags': None,
                 'stderr': 'stderr',
                 'stdout': 'out\n\nCommand completed successfully!\n',
-                'clusterName': u'cc',
+                'clusterId': CLUSTER_ID,
                 'structuredOut': '""',
                 'roleCommand': u'CUSTOM_COMMAND',
                 'serviceName': u'YARN',
                 'role': u'RESOURCEMANAGER',
                 'actionId': '1-1',
                 'taskId': 9,
-                'customCommand': 'RESTART',
                 'exitCode': 0}
-    self.assertEqual(len(report['reports']), 1)
-    self.assertEqual(expected, report['reports'][0])
+    self.assertEqual(len(reports), 1)
+    self.assertEqual(expected, reports[0])
 
-    # Configuration tags should be updated
-    self.assertTrue(update_component_tag.called)
-
-  @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
-  @patch.object(ActualConfigHandler, "write_client_components")
-  @patch.object(ActualConfigHandler, "write_actual_component")
+  @patch.object(OSCheck, "os_distribution", new=MagicMock(return_value=os_distro_value))
   @patch.object(CustomServiceOrchestrator, "runCommand")
   @patch("CommandStatusDict.CommandStatusDict")
-  @patch.object(ActionQueue, "status_update_callback")
-  def test_store_configuration_tags_on_custom_start_command(self, status_update_callback_mock,
-                                    command_status_dict_mock,
-                                    cso_runCommand_mock, write_actual_component_mock, write_client_components_mock):
+  def test_store_configuration_tags_on_custom_start_command(self, command_status_dict_mock,
+                                    cso_runCommand_mock):
     custom_service_orchestrator_execution_result_dict = {
       'stdout': 'out',
       'stderr': 'stderr',
@@ -906,37 +919,33 @@ class TestActionQueue(TestCase):
     config.set('agent', 'prefix', tempdir)
     config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
     config.set('agent', 'tolerate_download_failures', "true")
-    dummy_controller = MagicMock()
-    actionQueue = ActionQueue(config, dummy_controller)
+    
+    initializer_module = InitializerModule()
+    initializer_module.init()
+    
+    actionQueue = ActionQueue(initializer_module)
     actionQueue.execute_command(self.datanode_start_custom_command)
-    report = actionQueue.result()
+    reports = actionQueue.commandStatuses.generate_report()[CLUSTER_ID]
     expected = {'status': 'COMPLETED',
-                'configurationTags': {'global': {'tag': 'v123'}},
                 'stderr': 'stderr',
                 'stdout': 'out\n\nCommand completed successfully!\n',
-                'clusterName': u'cc',
                 'structuredOut': '""',
                 'roleCommand': u'CUSTOM_COMMAND',
                 'serviceName': u'HDFS',
                 'role': u'DATANODE',
                 'actionId': '1-1',
                 'taskId': 9,
-                'customCommand': 'START',
-                'exitCode': 0}
-    self.assertEqual(len(report['reports']), 1)
-    self.assertEqual(expected, report['reports'][0])
-
-    # Configuration tags should be updated on custom start command
-    self.assertTrue(write_actual_component_mock.called)
+                'exitCode': 0,
+                'clusterId': CLUSTER_ID
+    }
+    self.assertEqual(len(reports), 1)
+    self.assertEqual(expected, reports[0])
 
-  @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
-  @patch.object(ActualConfigHandler, "write_actual_component")
+  @patch.object(OSCheck, "os_distribution", new=MagicMock(return_value=os_distro_value))
   @patch.object(CustomServiceOrchestrator, "runCommand")
   @patch("CommandStatusDict.CommandStatusDict")
-  @patch.object(ActionQueue, "status_update_callback")
-  def test_store_config_tags_on_install_client_command(self, status_update_callback_mock,
-      command_status_dict_mock,
-      cso_runCommand_mock, write_actual_component_mock):
+  def test_store_config_tags_on_install_client_command(self, command_status_dict_mock,
+      cso_runCommand_mock):
 
     custom_service_orchestrator_execution_result_dict = {
       'stdout': 'out',
@@ -956,9 +965,10 @@ class TestActionQueue(TestCase):
       'serviceName': u'TEZ',
       'configurations': {'global' : {}},
       'configurationTags': {'global' : { 'tag': 'v123' }},
-      'hostLevelParams': {}
+      'hostLevelParams': {},
+      'clusterId': CLUSTER_ID,
     }
-    LiveStatus.CLIENT_COMPONENTS = ({'serviceName': 'TEZ', 'componentName': 'TEZ_CLIENT'}, )
+    LiveStatus.CLIENT_COMPONENTS = ({'serviceName': 'TEZ', 'componentName': 'TEZ_CLIENT'},)
 
     config = AmbariConfig()
     tempdir = tempfile.gettempdir()
@@ -966,109 +976,12 @@ class TestActionQueue(TestCase):
     config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
     config.set('agent', 'tolerate_download_failures', "true")
     dummy_controller = MagicMock()
-    actionQueue = ActionQueue(config, dummy_controller)
-    actionQueue.execute_command(tez_client_install_command)
-
-    # Configuration tags should be updated on install client command
-    self.assertTrue(write_actual_component_mock.called)
-
-  @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
-  @patch.object(ActionQueue, "status_update_callback")
-  @patch.object(ActionQueue, "execute_command")
-  @patch.object(LiveStatus, "build")
-  @patch.object(CustomServiceOrchestrator, "__init__")
-  def test_execute_status_command(self, CustomServiceOrchestrator_mock,
-                                  build_mock, execute_command_mock,
-                                  status_update_callback):
-    CustomServiceOrchestrator_mock.return_value = None
-    dummy_controller = MagicMock()
-    actionQueue = ActionQueue(AmbariConfig(), dummy_controller)
-
-    build_mock.return_value = {'dummy report': '' }
-
-    dummy_controller.recovery_manager = RecoveryManager(tempfile.mktemp())
-
-    result = (self.status_command, {'exitcode': 0 })
-
-    actionQueue.process_status_command_result(result)
-    report = actionQueue.result()
-    expected = {'dummy report': ''}
-
-    self.assertEqual(len(report['componentStatus']), 1)
-    self.assertEqual(report['componentStatus'][0], expected)
-
-  @patch.object(RecoveryManager, "command_exists")
-  @patch.object(RecoveryManager, "requires_recovery")
-  @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
-  @patch.object(ActionQueue, "status_update_callback")
-  @patch.object(ActionQueue, "execute_command")
-  @patch.object(LiveStatus, "build")
-  @patch.object(CustomServiceOrchestrator, "__init__")
-  def test_process_status_command_result_recovery(self, CustomServiceOrchestrator_mock,
-                                  build_mock, execute_command_mock,
-                                  status_update_callback, requires_recovery_mock,
-                                  command_exists_mock):
-    CustomServiceOrchestrator_mock.return_value = None
-    dummy_controller = MagicMock()
-    actionQueue = ActionQueue(AmbariConfig(), dummy_controller)
-
-    build_mock.return_value = {'dummy report': '' }
-    requires_recovery_mock.return_value = True
-    command_exists_mock.return_value = False
-
-    dummy_controller.recovery_manager = RecoveryManager(tempfile.mktemp(), True, False)
-
-    result = (self.status_command, {'exitcode': 0 })
-
-    actionQueue.process_status_command_result(result)
-    report = actionQueue.result()
-    expected = {'dummy report': '',
-                'sendExecCmdDet': 'True'}
-
-    self.assertEqual(len(report['componentStatus']), 1)
-    self.assertEqual(report['componentStatus'][0], expected)
-
-    requires_recovery_mock.return_value = True
-    command_exists_mock.return_value = True
-    
-    result = (self.status_command, {'exitcode': 0 })
-
-    actionQueue.process_status_command_result(result)
-    report = actionQueue.result()
-    expected = {'dummy report': '',
-                'sendExecCmdDet': 'False'}
-
-    self.assertEqual(len(report['componentStatus']), 1)
-    self.assertEqual(report['componentStatus'][0], expected)
-
-  @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
-  @patch.object(ActionQueue, "status_update_callback")
-  @patch.object(ActionQueue, "execute_command")
-  @patch.object(LiveStatus, "build")
-  @patch.object(CustomServiceOrchestrator, "__init__")
-  def test_process_status_command_result_with_alerts(self, CustomServiceOrchestrator_mock,
-                                  build_mock, execute_command_mock,
-                                  status_update_callback):
-    CustomServiceOrchestrator_mock.return_value = None
-    dummy_controller = MagicMock()
-    actionQueue = ActionQueue(AmbariConfig(), dummy_controller)
-    command_return_value = {
-      'exitcode': 0,
-      'stdout': 'out',
-      'stderr': 'err',
-      'structuredOut': {'alerts': [ {'name': 'flume_alert'} ] }
-    }
     
-    result = (self.status_command_for_alerts, command_return_value)
+    initializer_module = InitializerModule()
+    initializer_module.init()
     
-    build_mock.return_value = {'somestatusresult': 'aresult'}
-
-    actionQueue.process_status_command_result(result)
-
-    report = actionQueue.result()
-
-    self.assertEqual(len(report['componentStatus']), 1)
-    self.assertTrue(report['componentStatus'][0].has_key('alerts'))
+    actionQueue = ActionQueue(initializer_module)
+    actionQueue.execute_command(tez_client_install_command)
 
   @patch.object(AmbariConfig, "get_parallel_exec_option")
   @patch.object(ActionQueue, "process_command")
@@ -1082,7 +995,11 @@ class TestActionQueue(TestCase):
     config = MagicMock()
     gpeo_mock.return_value = 0
     config.get_parallel_exec_option = gpeo_mock
-    actionQueue = ActionQueue(config, dummy_controller)
+    
+    initializer_module = InitializerModule()
+    initializer_module.init()
+    
+    actionQueue = ActionQueue(initializer_module)
     actionQueue.start()
     actionQueue.put([self.datanode_install_command, self.hbase_install_command])
     self.assertEqual(2, actionQueue.commandQueue.qsize())
@@ -1091,9 +1008,9 @@ class TestActionQueue(TestCase):
     self.assertTrue(actionQueue.commandQueue.empty())
     self.assertFalse(actionQueue.tasks_in_progress_or_pending())
     time.sleep(0.1)
-    actionQueue.stop()
+    initializer_module.stop_event.set()
     actionQueue.join()
-    self.assertEqual(actionQueue.stopped(), True, 'Action queue is not stopped.')
+    self.assertEqual(actionQueue.is_alive(), False, 'Action queue is not stopped.')
 
   @patch.object(AmbariConfig, "get_parallel_exec_option")
   @patch.object(ActionQueue, "process_command")
@@ -1102,20 +1019,28 @@ class TestActionQueue(TestCase):
   def test_cancel(self, CustomServiceOrchestrator_mock,
                        get_mock, process_command_mock, gpeo_mock):
     CustomServiceOrchestrator_mock.return_value = None
-    dummy_controller = MagicMock()
+    
+    initializer_module = InitializerModule()
+    initializer_module.init()
+    
+    dummy_controller = MagicMock(initializer_module)
     config = MagicMock()
     gpeo_mock.return_value = 0
     config.get_parallel_exec_option = gpeo_mock
-    actionQueue = ActionQueue(config, dummy_controller)
+    
+    initializer_module = InitializerModule()
+    initializer_module.init()
+    
+    actionQueue = ActionQueue(initializer_module)
     actionQueue.start()
     actionQueue.put([self.datanode_install_command, self.hbase_install_command])
     self.assertEqual(2, actionQueue.commandQueue.qsize())
     actionQueue.reset()
     self.assertTrue(actionQueue.commandQueue.empty())
     time.sleep(0.1)
-    actionQueue.stop()
+    initializer_module.stop_event.set()
     actionQueue.join()
-    self.assertEqual(actionQueue.stopped(), True, 'Action queue is not stopped.')
+    self.assertEqual(actionQueue.is_alive(), False, 'Action queue is not stopped.')
 
   @patch.object(AmbariConfig, "get_parallel_exec_option")
   @patch.object(ActionQueue, "process_command")
@@ -1123,18 +1048,24 @@ class TestActionQueue(TestCase):
   def test_parallel_exec(self, CustomServiceOrchestrator_mock,
                          process_command_mock, gpeo_mock):
     CustomServiceOrchestrator_mock.return_value = None
-    dummy_controller = MagicMock()
+    
+    initializer_module = InitializerModule()
+    initializer_module.init()
+    
+    dummy_controller = MagicMock(initializer_module)
     config = MagicMock()
     gpeo_mock.return_value = 1
     config.get_parallel_exec_option = gpeo_mock
-    actionQueue = ActionQueue(config, dummy_controller)
+    initializer_module = InitializerModule()
+    initializer_module.init()
+    actionQueue = ActionQueue(initializer_module)
     actionQueue.put([self.datanode_install_command, self.hbase_install_command])
     self.assertEqual(2, actionQueue.commandQueue.qsize())
     actionQueue.start()
     time.sleep(1)
-    actionQueue.stop()
+    initializer_module.stop_event.set()
     actionQueue.join()
-    self.assertEqual(actionQueue.stopped(), True, 'Action queue is not stopped.')
+    self.assertEqual(actionQueue.is_alive(), False, 'Action queue is not stopped.')
     self.assertEqual(2, process_command_mock.call_count)
     process_command_mock.assert_any_calls([call(self.datanode_install_command), call(self.hbase_install_command)])
 
@@ -1145,18 +1076,26 @@ class TestActionQueue(TestCase):
   def test_parallel_exec_no_retry(self, CustomServiceOrchestrator_mock,
                          process_command_mock, gpeo_mock, threading_mock):
     CustomServiceOrchestrator_mock.return_value = None
-    dummy_controller = MagicMock()
+    
+    initializer_module = InitializerModule()
+    initializer_module.init()
+    
+    dummy_controller = MagicMock(initializer_module)
     config = MagicMock()
     gpeo_mock.return_value = 1
     config.get_parallel_exec_option = gpeo_mock
-    actionQueue = ActionQueue(config, dummy_controller)
+    
+    initializer_module = InitializerModule()
+    initializer_module.init()
+    
+    actionQueue = ActionQueue(initializer_module)
     actionQueue.put([self.datanode_install_no_retry_command, self.snamenode_install_command])
     self.assertEqual(2, actionQueue.commandQueue.qsize())
     actionQueue.start()
     time.sleep(1)
-    actionQueue.stop()
+    initializer_module.stop_event.set()
     actionQueue.join()
-    self.assertEqual(actionQueue.stopped(), True, 'Action queue is not stopped.')
+    self.assertEqual(actionQueue.is_alive(), False, 'Action queue is not stopped.')
     self.assertEqual(2, process_command_mock.call_count)
     self.assertEqual(0, threading_mock.call_count)
     process_command_mock.assert_any_calls([call(self.datanode_install_command), call(self.hbase_install_command)])
@@ -1187,7 +1126,7 @@ class TestActionQueue(TestCase):
       runCommand_mock.side_effect = side_effect
       actionQueue.execute_command(command)
 
-    #assert that python executor start
+    # assert that python executor start
     self.assertTrue(runCommand_mock.called)
     self.assertEqual(3, runCommand_mock.call_count)
     self.assertEqual(2, sleep_mock.call_count)
@@ -1209,9 +1148,9 @@ class TestActionQueue(TestCase):
                                      sleep_mock, time_mock
   ):
     CustomServiceOrchestrator_mock.return_value = None
-    dummy_controller = MagicMock()
-    dummy_controller.recovery_manager = RecoveryManager(tempfile.mktemp())
-    actionQueue = ActionQueue(AmbariConfig(), dummy_controller)
+    initializer_module = InitializerModule()
+    initializer_module.init()
+    actionQueue = ActionQueue(initializer_module)
     python_execution_result_dict = {
       'exitcode': 1,
       'stdout': 'out',
@@ -1233,18 +1172,18 @@ class TestActionQueue(TestCase):
       runCommand_mock.side_effect = side_effect
       actionQueue.execute_command(command)
 
-    #assert that python executor start
+    # assert that python executor start
     self.assertTrue(runCommand_mock.called)
     self.assertEqual(2, runCommand_mock.call_count)
     self.assertEqual(1, sleep_mock.call_count)
-    sleep_mock.assert_has_calls([call(1)], False)
+    sleep_mock.assert_has_calls([call(2)], False)
     runCommand_mock.assert_has_calls([
       call(command, os.sep + 'tmp' + os.sep + 'ambari-agent' + os.sep + 'output-19.txt',
            os.sep + 'tmp' + os.sep + 'ambari-agent' + os.sep + 'errors-19.txt', override_output_files=True, retry=False),
       call(command, os.sep + 'tmp' + os.sep + 'ambari-agent' + os.sep + 'output-19.txt',
            os.sep + 'tmp' + os.sep + 'ambari-agent' + os.sep + 'errors-19.txt', override_output_files=False, retry=True)])
 
-  #retryable_command
+  # retryable_command
   @not_for_platform(PLATFORM_LINUX)
   @patch("time.sleep")
   @patch.object(OSCheck, "os_distribution", new=MagicMock(return_value=os_distro_value))
@@ -1276,7 +1215,7 @@ class TestActionQueue(TestCase):
       runCommand_mock.side_effect = [execution_result_fail_dict, execution_result_succ_dict]
       actionQueue.execute_command(command)
 
-    #assert that python executor start
+    # assert that python executor start
     self.assertTrue(runCommand_mock.called)
     self.assertEqual(2, runCommand_mock.call_count)
     self.assertEqual(1, sleep_mock.call_count)
@@ -1306,12 +1245,12 @@ class TestActionQueue(TestCase):
       runCommand_mock.side_effect = [execution_result_succ_dict]
       actionQueue.execute_command(command)
 
-    #assert that python executor start
+    # assert that python executor start
     self.assertTrue(runCommand_mock.called)
     self.assertFalse(sleep_mock.called)
     self.assertEqual(1, runCommand_mock.call_count)
 
-  @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
+  @patch.object(OSCheck, "os_distribution", new=MagicMock(return_value=os_distro_value))
   @patch.object(CustomServiceOrchestrator, "runCommand")
   @patch.object(CustomServiceOrchestrator, "__init__")
   def test_execute_background_command(self, CustomServiceOrchestrator_mock,
@@ -1321,27 +1260,29 @@ class TestActionQueue(TestCase):
     CustomServiceOrchestrator.runCommand.return_value = {'exitcode' : 0,
                                                          'stdout': 'out-11',
                                                          'stderr' : 'err-13'}
+
+    initializer_module = InitializerModule()
+    initializer_module.init()
     
-    dummy_controller = MagicMock()
-    actionQueue = ActionQueue(AmbariConfig(), dummy_controller)
+    actionQueue = ActionQueue(initializer_module)
 
     execute_command = copy.deepcopy(self.background_command)
     actionQueue.put([execute_command])
     actionQueue.processBackgroundQueueSafeEmpty();
-    actionQueue.controller.statusCommandExecutor.process_results();
+    # actionQueue.controller.statusCommandExecutor.process_results();
     
-    #assert that python execturor start
+    # assert that python execturor start
     self.assertTrue(runCommand_mock.called)
     runningCommand = actionQueue.commandStatuses.current_state.get(execute_command['taskId'])
     self.assertTrue(runningCommand is not None)
     self.assertEqual(runningCommand[1]['status'], ActionQueue.IN_PROGRESS_STATUS)
     
-    report = actionQueue.result()
-    self.assertEqual(len(report['reports']),1)
+    reports = actionQueue.commandStatuses.generate_report()[CLUSTER_ID]
+    self.assertEqual(len(reports), 1)
 
   @patch.object(CustomServiceOrchestrator, "get_py_executor")
   @patch.object(CustomServiceOrchestrator, "resolve_script_path")
-  def test_execute_python_executor(self, resolve_script_path_mock,
+  def __test_execute_python_executor(self, resolve_script_path_mock,
                                    get_py_executor_mock):
     
     dummy_controller = MagicMock()
@@ -1350,7 +1291,15 @@ class TestActionQueue(TestCase):
     cfg.set('agent', 'prefix', '.')
     cfg.set('agent', 'cache_dir', 'background_tasks')
     
-    actionQueue = ActionQueue(cfg, dummy_controller)
+    initializer_module = InitializerModule()
+    initializer_module.init()
+    initializer_module.config = cfg
+    initializer_module.metadata_cache.cache_update({CLUSTER_ID:{'clusterLevelParams':{}}}, 'abc')
+    initializer_module.configurations_cache.cache_update({CLUSTER_ID:{}}, 'abc')
+    initializer_module.host_level_params_cache.cache_update({CLUSTER_ID:{}}, 'abc')
+    CustomServiceOrchestrator.runCommand = default_run_command
+    
+    actionQueue = ActionQueue(initializer_module)
     pyex = PythonExecutor(actionQueue.customServiceOrchestrator.tmp_dir, actionQueue.customServiceOrchestrator.config)
     patch_output_file(pyex)
     get_py_executor_mock.return_value = pyex
@@ -1372,7 +1321,6 @@ class TestActionQueue(TestCase):
                                                                  None, command_complete_w)
     actionQueue.put([self.background_command])
     actionQueue.processBackgroundQueueSafeEmpty();
-    actionQueue.controller.statusCommandExecutor.process_results();
     
     with lock:
       complete_done.wait(0.1)
@@ -1388,9 +1336,9 @@ class TestActionQueue(TestCase):
     self.assertTrue(runningCommand is not None)
     
     report = actionQueue.result()
-    self.assertEqual(len(report['reports']),1)
-    self.assertEqual(report['reports'][0]['stdout'],'process_out')
-#    self.assertEqual(report['reports'][0]['structuredOut'],'{"a": "b."}')
+    self.assertEqual(len(reports), 1)
+    self.assertEqual(reports[0]['stdout'], 'process_out')
+#    self.assertEqual(reports[0]['structuredOut'],'{"a": "b."}')
     
     
   
@@ -1426,11 +1374,11 @@ def patch_output_file(pythonExecutor):
   pythonExecutor.open_subprocess_files = open_subprocess_files_win
   pythonExecutor.read_result_from_files = read_result_from_files
 
-def wraped(func, before = None, after = None):
+def wraped(func, before=None, after=None):
     def wrapper(*args, **kwargs):
       if(before is not None):
         before(*args, **kwargs)
-      ret =  func(*args, **kwargs)
+      ret = func(*args, **kwargs)
       if(after is not None):
         after(*args, **kwargs)
       return ret

-- 
To stop receiving notification emails like this one, please contact
aonishuk@apache.org.