You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ao...@apache.org on 2017/05/19 10:22:43 UTC

[2/2] ambari git commit: AMBARI-21056. Run execution commands sent to /user/commands (aonishuk)

AMBARI-21056. Run execution commands sent to /user/commands (aonishuk)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/c4c2ec79
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/c4c2ec79
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/c4c2ec79

Branch: refs/heads/branch-3.0-perf
Commit: c4c2ec79b69a5474ef262ba90ba427baacd80ae7
Parents: 2eb7844
Author: Andrew Onishuk <ao...@hortonworks.com>
Authored: Fri May 19 13:22:40 2017 +0300
Committer: Andrew Onishuk <ao...@hortonworks.com>
Committed: Fri May 19 13:22:40 2017 +0300

----------------------------------------------------------------------
 ambari-agent/conf/unix/ambari-agent.ini         |    2 +
 .../src/main/python/ambari_agent/ActionQueue.py |   61 +-
 .../main/python/ambari_agent/AmbariConfig.py    |    1 +
 .../python/ambari_agent/CommandStatusDict.py    |   48 +-
 .../ambari_agent/CommandStatusReporter.py       |   54 +
 .../ambari_agent/ComponentStatusExecutor.py     |    1 +
 .../src/main/python/ambari_agent/Constants.py   |    1 +
 .../ambari_agent/CustomServiceOrchestrator.py   |    6 +-
 .../main/python/ambari_agent/HeartbeatThread.py |    4 +-
 .../python/ambari_agent/InitializerModule.py    |    6 +
 .../main/python/ambari_agent/PythonExecutor.py  |    5 +-
 .../listeners/CommandsEventListener.py          |   54 +
 .../listeners/ConfigurationEventListener.py     |    4 +-
 .../python/ambari_agent/listeners/__init__.py   |    6 +-
 .../src/main/python/ambari_agent/main.py        |    6 +
 .../ambari_agent/TestAgentStompResponses.py     |   35 +-
 .../dummy_files/stomp/execution_commands.json   |   12 +-
 .../resources/Ambari-DDL-AzureDB-CREATE.sql     | 2175 ++++++++++++++++++
 18 files changed, 2402 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/c4c2ec79/ambari-agent/conf/unix/ambari-agent.ini
----------------------------------------------------------------------
diff --git a/ambari-agent/conf/unix/ambari-agent.ini b/ambari-agent/conf/unix/ambari-agent.ini
index 441a01d..609f0fa 100644
--- a/ambari-agent/conf/unix/ambari-agent.ini
+++ b/ambari-agent/conf/unix/ambari-agent.ini
@@ -36,6 +36,8 @@ run_as_user=root
 parallel_execution=0
 alert_grace_period=5
 status_command_timeout=5
+; 0 - don't report commands output periodically. Reduces bandwidth on big cluster
+command_reports_interval=5
 alert_kinit_timeout=14400000
 system_resource_overrides=/etc/resource_overrides
 ; memory_threshold_soft_mb=400

http://git-wip-us.apache.org/repos/asf/ambari/blob/c4c2ec79/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
index 1eda5c2..e9a3045 100644
--- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
+++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
@@ -18,7 +18,6 @@ See the License for the specific language governing permissions and
 limitations under the License.
 '''
 import Queue
-import multiprocessing
 
 import logging
 import traceback
@@ -28,11 +27,11 @@ import os
 import ambari_simplejson as json
 import time
 import signal
+import copy
 
 from AgentException import AgentException
 from LiveStatus import LiveStatus
 from ActualConfigHandler import ActualConfigHandler
-from CommandStatusDict import CommandStatusDict
 from CustomServiceOrchestrator import CustomServiceOrchestrator
 from ambari_agent.BackgroundCommandExecutionHandle import BackgroundCommandExecutionHandle
 from ambari_commons.str_utils import split_on_chunks
@@ -73,40 +72,34 @@ class ActionQueue(threading.Thread):
   COMPLETED_STATUS = 'COMPLETED'
   FAILED_STATUS = 'FAILED'
 
-  def __init__(self, config, controller):
+  def __init__(self, initializer_module):
     super(ActionQueue, self).__init__()
     self.commandQueue = Queue.Queue()
-    self.statusCommandResultQueue = multiprocessing.Queue() # this queue is filled by StatuCommandsExecutor.
     self.backgroundCommandQueue = Queue.Queue()
-    self.commandStatuses = CommandStatusDict(callback_action =
-      self.status_update_callback)
-    self.config = config
-    self.controller = controller
+    self.commandStatuses = initializer_module.commandStatuses
+    self.configurations_cache = initializer_module.configurations_cache
+    self.config = initializer_module.ambariConfig
     self.configTags = {}
-    self._stop = threading.Event()
-    self.tmpdir = config.get('agent', 'prefix')
-    self.customServiceOrchestrator = CustomServiceOrchestrator(config, controller)
-    self.parallel_execution = config.get_parallel_exec_option()
+    self.stop_event = initializer_module.stop_event
+    self.tmpdir = self.config.get('agent', 'prefix')
+    self.customServiceOrchestrator = CustomServiceOrchestrator(self.config)
+    self.parallel_execution = self.config.get_parallel_exec_option()
     if self.parallel_execution == 1:
       logger.info("Parallel execution is enabled, will execute agent commands in parallel")
     self.lock = threading.Lock()
 
-  def stop(self):
-    self._stop.set()
-
-  def stopped(self):
-    return self._stop.isSet()
-
-  def put_status(self, commands):
-    self.controller.statusCommandsExecutor.put_commands(commands)
-
   def put(self, commands):
     for command in commands:
       if not command.has_key('serviceName'):
         command['serviceName'] = "null"
       if not command.has_key('clusterName'):
         command['clusterName'] = 'null'
-
+      
+      if command.has_key('clusterId'):
+        cluster_id = command['clusterId']
+        # TODO STOMP: what if has no configs yet?
+        if cluster_id != 'null':
+          command['configurations'] = dict(self.configurations_cache[str(cluster_id)])
       logger.info("Adding " + command['commandType'] + " for role " + \
                   command['role'] + " for service " + \
                   command['serviceName'] + " of cluster " + \
@@ -144,9 +137,8 @@ class ActionQueue(threading.Thread):
 
   def run(self):
     try:
-      while not self.stopped():
+      while not self.stop_event.is_set():
         self.processBackgroundQueueSafeEmpty()
-        self.controller.get_status_commands_executor().process_results() # process status commands
         try:
           if self.parallel_execution == 0:
             command = self.commandQueue.get(True, self.EXECUTION_COMMAND_WAIT_TIME)
@@ -154,7 +146,7 @@ class ActionQueue(threading.Thread):
           else:
             # If parallel execution is enabled, just kick off all available
             # commands using separate threads
-            while (True):
+            while not self.stop_event.is_set():
               command = self.commandQueue.get(True, self.EXECUTION_COMMAND_WAIT_TIME)
               # If command is not retry_enabled then do not start them in parallel
               # checking just one command is enough as all commands for a stage is sent
@@ -203,12 +195,14 @@ class ActionQueue(threading.Thread):
     try:
       if commandType in [self.EXECUTION_COMMAND, self.BACKGROUND_EXECUTION_COMMAND, self.AUTO_EXECUTION_COMMAND]:
         try:
-          if self.controller.recovery_manager.enabled():
-            self.controller.recovery_manager.start_execution_command()
+          # TODO STOMP: fix recovery manager for execution commands
+          #if self.controller.recovery_manager.enabled():
+          #  self.controller.recovery_manager.start_execution_command()
           self.execute_command(command)
         finally:
-          if self.controller.recovery_manager.enabled():
-            self.controller.recovery_manager.stop_execution_command()
+          pass
+          #if self.controller.recovery_manager.enabled():
+          #  self.controller.recovery_manager.stop_execution_command()
       else:
         logger.error("Unrecognized command " + pprint.pformat(command))
     except Exception:
@@ -380,6 +374,8 @@ class ActionQueue(threading.Thread):
 
     # let recovery manager know the current state
     if status == self.COMPLETED_STATUS:
+      # TODO STOMP:fix recovery_manager
+      """
       if self.controller.recovery_manager.enabled() and command.has_key('roleCommand') \
           and self.controller.recovery_manager.configured_for_recovery(command['role']):
         if command['roleCommand'] == self.ROLE_COMMAND_START:
@@ -400,7 +396,7 @@ class ActionQueue(threading.Thread):
             self.controller.recovery_manager.update_config_staleness(command['role'], False)
             logger.info("After EXECUTION_COMMAND (RESTART), current state of " + command['role'] + " to " +
                          self.controller.recovery_manager.get_current_status(command['role']) )
-      pass
+      """
 
       # let ambari know that configuration tags were applied
       configHandler = ActualConfigHandler(self.config, self.configTags)
@@ -437,6 +433,8 @@ class ActionQueue(threading.Thread):
         roleResult['configurationTags'] = configHandler.read_actual_component(
             command['role'])
     elif status == self.FAILED_STATUS:
+      # TODO STOMP: recovery manager
+      """
       if self.controller.recovery_manager.enabled() and command.has_key('roleCommand') \
               and self.controller.recovery_manager.configured_for_recovery(command['role']):
         if command['roleCommand'] == self.ROLE_COMMAND_INSTALL:
@@ -444,6 +442,7 @@ class ActionQueue(threading.Thread):
           logger.info("After EXECUTION_COMMAND (INSTALL), with taskId=" + str(command['taskId']) +
                       ", current state of " + command['role'] + " to " +
                       self.controller.recovery_manager.get_current_status(command['role']))
+      """
 
     self.commandStatuses.put_command_status(command, roleResult)
 
@@ -504,6 +503,7 @@ class ActionQueue(threading.Thread):
     '''
     Executes commands of type STATUS_COMMAND
     '''
+    # TODO STOMP: review if we need to run this with new status commands
     try:
       command, component_status_result = result
       cluster = command['clusterName']
@@ -515,6 +515,7 @@ class ActionQueue(threading.Thread):
       else:
         globalConfig = {}
 
+      # TODO STOMP: check why we need this
       if not Script.config :
         logger.debug('Setting Script.config to last status command configuration')
         Script.config = command

http://git-wip-us.apache.org/repos/asf/ambari/blob/c4c2ec79/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py b/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py
index fe48870..3d480ca 100644
--- a/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py
+++ b/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py
@@ -46,6 +46,7 @@ ping_port=8670
 cache_dir={ps}tmp
 parallel_execution=0
 system_resource_overrides={ps}etc{ps}resource_overrides
+tolerate_download_failures=false
 
 [services]
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/c4c2ec79/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py b/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py
index 7a97f3f..bb0cea3 100644
--- a/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py
+++ b/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py
@@ -18,12 +18,14 @@ See the License for the specific language governing permissions and
 limitations under the License.
 '''
 
-import ambari_simplejson as json
 import logging
 import threading
 import copy
+import json
 from Grep import Grep
 
+from ambari_agent import Constants
+
 logger = logging.getLogger()
 
 class CommandStatusDict():
@@ -34,46 +36,29 @@ class CommandStatusDict():
     task_id -> (command, cmd_report)
   """
 
-  def __init__(self, callback_action):
+  def __init__(self, initializer_module):
     """
     callback_action is called every time when status of some command is
     updated
     """
     self.current_state = {} # Contains all statuses
-    self.callback_action = callback_action
     self.lock = threading.RLock()
+    self.initializer_module = initializer_module
 
 
   def put_command_status(self, command, new_report):
     """
     Stores new version of report for command (replaces previous)
     """
-    if 'taskId' in command:
-      key = command['taskId']
-      status_command = False
-    else: # Status command reports has no task id
-      key = id(command)
-      status_command = True
+    key = command['taskId']
     with self.lock: # Synchronized
       self.current_state[key] = (command, new_report)
-    if not status_command:
-      self.callback_action()
 
-  def update_command_status(self, command, delta):
-    """
-    Updates status of command without replacing (overwrites with delta value)
-    """
-    if 'taskId' in command:
-      key = command['taskId']
-      status_command = False
-    else: # Status command reports has no task id
-      key = id(command)
-      status_command = True
-    with self.lock: # Synchronized
-      self.current_state[key][1].update(delta)
-    if not status_command:
-      self.callback_action()
-  
+    self.force_update_to_server([new_report])
+
+  def force_update_to_server(self, reports):
+    self.initializer_module.connection.send(body=json.dumps(reports), destination=Constants.COMMANDS_STATUS_REPORTS_ENDPOINT)
+
   def get_command_status(self, taskId):
     with self.lock:
       c = copy.copy(self.current_state[taskId][1])
@@ -88,7 +73,6 @@ class CommandStatusDict():
     from ActionQueue import ActionQueue
     with self.lock: # Synchronized
       resultReports = []
-      resultComponentStatus = []
       for key, item in self.current_state.items():
         command = item[0]
         report = item[1]
@@ -100,19 +84,11 @@ class CommandStatusDict():
           else:
             in_progress_report = self.generate_in_progress_report(command, report)
             resultReports.append(in_progress_report)
-        elif command ['commandType'] == ActionQueue.STATUS_COMMAND:
-          resultComponentStatus.append(report)
-          # Component status is useful once, removing it
-          del self.current_state[key]
         elif command ['commandType'] in [ActionQueue.AUTO_EXECUTION_COMMAND]:
           logger.debug("AUTO_EXECUTION_COMMAND task deleted " + str(command['commandId']))
           del self.current_state[key]
           pass
-      result = {
-        'reports': resultReports,
-        'componentStatus': resultComponentStatus
-      }
-      return result
+      return resultReports
 
 
   def generate_in_progress_report(self, command, report):

http://git-wip-us.apache.org/repos/asf/ambari/blob/c4c2ec79/ambari-agent/src/main/python/ambari_agent/CommandStatusReporter.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/CommandStatusReporter.py b/ambari-agent/src/main/python/ambari_agent/CommandStatusReporter.py
new file mode 100644
index 0000000..acee3b1
--- /dev/null
+++ b/ambari-agent/src/main/python/ambari_agent/CommandStatusReporter.py
@@ -0,0 +1,54 @@
+#!/usr/bin/env python
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+import json
+import logging
+import threading
+
+from ambari_agent import Constants
+
+logger = logging.getLogger(__name__)
+
+class CommandStatusReporter(threading.Thread):
+  def __init__(self, initializer_module):
+    self.initializer_module = initializer_module
+    self.commandStatuses = initializer_module.commandStatuses
+    self.stop_event = initializer_module.stop_event
+    self.command_reports_interval = initializer_module.command_reports_interval
+    threading.Thread.__init__(self)
+
+  def run(self):
+    """
+    Run an endless loop which reports all the commands results (IN_PROGRESS, FAILED, COMPLETE) every self.command_reports_interval seconds.
+    """
+    if self.command_reports_interval == 0:
+      return
+
+    while not self.stop_event.is_set():
+      try:
+        # TODO STOMP: what if not registered?
+        report = self.commandStatuses.generate_report()
+        if report:
+          self.initializer_module.connection.send(body=json.dumps(report), destination=Constants.COMMANDS_STATUS_REPORTS_ENDPOINT)
+        self.stop_event.wait(self.command_reports_interval)
+      except:
+        logger.exception("Exception in CommandStatusReporter. Re-running it")
+        pass
+    logger.info("CommandStatusReporter has successfully finished")

http://git-wip-us.apache.org/repos/asf/ambari/blob/c4c2ec79/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py b/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py
index a3798c6..1f6a7dc 100644
--- a/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py
+++ b/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py
@@ -88,6 +88,7 @@ class ComponentStatusExecutor(threading.Thread):
                 logging.info("Status for {0} has changed to {1}".format(component_name, status))
                 cluster_reports[cluster_id].append(result)
 
+        # TODO STOMP: what if not registered?
         self.send_updates_to_server(cluster_reports)
         self.stop_event.wait(Constants.STATUS_COMMANDS_PACK_INTERVAL_SECONDS)
       except:

http://git-wip-us.apache.org/repos/asf/ambari/blob/c4c2ec79/ambari-agent/src/main/python/ambari_agent/Constants.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/Constants.py b/ambari-agent/src/main/python/ambari_agent/Constants.py
index 3fbb485..6a054cc 100644
--- a/ambari-agent/src/main/python/ambari_agent/Constants.py
+++ b/ambari-agent/src/main/python/ambari_agent/Constants.py
@@ -28,6 +28,7 @@ SERVER_RESPONSES_TOPIC = '/user/'
 TOPICS_TO_SUBSCRIBE = [SERVER_RESPONSES_TOPIC, COMMANDS_TOPIC, CONFIGURATIONS_TOPIC, METADATA_TOPIC, TOPOLOGIES_TOPIC]
 
 COMPONENT_STATUS_REPORTS_ENDPOINT = '/reports/component_status'
+COMMANDS_STATUS_REPORTS_ENDPOINT = '/reports/commands_status'
 
 HEARTBEAT_ENDPOINT = '/heartbeat'
 REGISTRATION_ENDPOINT = '/register'

http://git-wip-us.apache.org/repos/asf/ambari/blob/c4c2ec79/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py b/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
index 8b8a8f9..656e9a1 100644
--- a/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
+++ b/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
@@ -78,7 +78,7 @@ class CustomServiceOrchestrator():
   # Property name for credential store class path
   CREDENTIAL_STORE_CLASS_PATH_NAME = 'credentialStoreClassPath'
 
-  def __init__(self, config, controller):
+  def __init__(self, config):
     self.config = config
     self.tmp_dir = config.get('agent', 'prefix')
     self.force_https_protocol = config.get_force_https_protocol()
@@ -89,8 +89,8 @@ class CustomServiceOrchestrator():
     self.status_commands_stderr = os.path.join(self.tmp_dir,
                                                'status_command_stderr.txt')
     self.public_fqdn = hostname.public_hostname(config)
-    # cache reset will be called on every agent registration
-    controller.registration_listeners.append(self.file_cache.reset)
+    # TODO STOMP: cache reset should be called on every agent registration
+    #controller.registration_listeners.append(self.file_cache.reset)
 
     # Construct the hadoop credential lib JARs path
     self.credential_shell_lib_path = os.path.join(config.get('security', 'credential_lib_dir',

http://git-wip-us.apache.org/repos/asf/ambari/blob/c4c2ec79/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py b/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
index 57748a0..70fe7e7 100644
--- a/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
+++ b/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
@@ -29,6 +29,7 @@ from ambari_agent.listeners.ServerResponsesListener import ServerResponsesListen
 from ambari_agent.listeners.TopologyEventListener import TopologyEventListener
 from ambari_agent.listeners.ConfigurationEventListener import ConfigurationEventListener
 from ambari_agent.listeners.MetadataEventListener import MetadataEventListener
+from ambari_agent.listeners.CommandsEventListener import CommandsEventListener
 
 HEARTBEAT_INTERVAL = 10
 
@@ -49,10 +50,11 @@ class HeartbeatThread(threading.Thread):
 
     # listeners
     self.server_responses_listener = ServerResponsesListener()
+    self.commands_events_listener = CommandsEventListener(initializer_module.action_queue)
     self.metadata_events_listener = MetadataEventListener(initializer_module.metadata_cache)
     self.topology_events_listener = TopologyEventListener(initializer_module.topology_cache)
     self.configuration_events_listener = ConfigurationEventListener(initializer_module.configurations_cache)
-    self.listeners = [self.server_responses_listener, self.metadata_events_listener, self.topology_events_listener, self.configuration_events_listener]
+    self.listeners = [self.server_responses_listener, self.commands_events_listener, self.metadata_events_listener, self.topology_events_listener, self.configuration_events_listener]
 
   def run(self):
     """

http://git-wip-us.apache.org/repos/asf/ambari/blob/c4c2ec79/ambari-agent/src/main/python/ambari_agent/InitializerModule.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/InitializerModule.py b/ambari-agent/src/main/python/ambari_agent/InitializerModule.py
index e1b4ed7..c36bd68 100644
--- a/ambari-agent/src/main/python/ambari_agent/InitializerModule.py
+++ b/ambari-agent/src/main/python/ambari_agent/InitializerModule.py
@@ -28,6 +28,8 @@ from ambari_agent.ClusterTopologyCache import ClusterTopologyCache
 from ambari_agent.ClusterMetadataCache import ClusterMetadataCache
 from ambari_agent.Utils import lazy_property
 from ambari_agent.security import AmbariStompConnection
+from ambari_agent.ActionQueue import ActionQueue
+from ambari_agent.CommandStatusDict import CommandStatusDict
 
 logger = logging.getLogger()
 
@@ -52,6 +54,7 @@ class InitializerModule:
     self.secured_url_port = self.ambariConfig.get('server', 'secured_url_port')
 
     self.cache_dir = self.ambariConfig.get('agent', 'cache_dir', default='/var/lib/ambari-agent/cache')
+    self.command_reports_interval = int(self.ambariConfig.get('agent', 'command_reports_interval', default='5'))
     self.cluster_cache_dir = os.path.join(self.cache_dir, FileCache.CLUSTER_CACHE_DIRECTORY)
 
   def init(self):
@@ -64,6 +67,9 @@ class InitializerModule:
     self.topology_cache = ClusterTopologyCache(self.cluster_cache_dir)
     self.configurations_cache = ClusterConfigurationCache(self.cluster_cache_dir)
 
+    self.commandStatuses = CommandStatusDict(self)
+    self.action_queue = ActionQueue(self)
+
   @lazy_property
   def connection(self):
     """

http://git-wip-us.apache.org/repos/asf/ambari/blob/c4c2ec79/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py b/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py
index ea6f895..6008f39 100644
--- a/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py
+++ b/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py
@@ -127,8 +127,9 @@ class PythonExecutor(object):
     """
     Log some useful information after task failure.
     """
-    logger.info("Command " + pprint.pformat(pythonCommand) + " failed with exitcode=" + str(result['exitcode']))
-    log_process_information(logger)
+    pass
+    #logger.info("Command " + pprint.pformat(pythonCommand) + " failed with exitcode=" + str(result['exitcode']))
+    #log_process_information(logger)
 
   def prepare_process_result(self, returncode, tmpoutfile, tmperrfile, tmpstructedoutfile, timeout=None):
     out, error, structured_out = self.read_result_from_files(tmpoutfile, tmperrfile, tmpstructedoutfile)

http://git-wip-us.apache.org/repos/asf/ambari/blob/c4c2ec79/ambari-agent/src/main/python/ambari_agent/listeners/CommandsEventListener.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/listeners/CommandsEventListener.py b/ambari-agent/src/main/python/ambari_agent/listeners/CommandsEventListener.py
new file mode 100644
index 0000000..b851443
--- /dev/null
+++ b/ambari-agent/src/main/python/ambari_agent/listeners/CommandsEventListener.py
@@ -0,0 +1,54 @@
+#!/usr/bin/env python
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+import logging
+import ambari_stomp
+
+from ambari_agent.listeners import EventListener
+from ambari_agent import Constants
+
+logger = logging.getLogger(__name__)
+
+class CommandsEventListener(EventListener):
+  """
+  Listener of Constants.CONFIGURATIONS_TOPIC events from server.
+  """
+  def __init__(self, action_queue):
+    self.action_queue = action_queue
+
+  def on_event(self, headers, message):
+    """
+    Is triggered when an event to Constants.COMMANDS_TOPIC topic is received from server.
+
+    @param headers: headers dictionary
+    @param message: message payload dictionary
+    """
+    commands = []
+    for cluster_id in message.keys():
+      cluster_dict = message[cluster_id]
+      host_level_params = cluster_dict['hostLevelParams']
+      for command in cluster_dict['commands']:
+        command['hostLevelParams'] = host_level_params
+        commands.append(command)
+
+    self.action_queue.put(commands)
+
+  def get_handled_path(self):
+    return Constants.COMMANDS_TOPIC
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/c4c2ec79/ambari-agent/src/main/python/ambari_agent/listeners/ConfigurationEventListener.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/listeners/ConfigurationEventListener.py b/ambari-agent/src/main/python/ambari_agent/listeners/ConfigurationEventListener.py
index 722ec3c..20b42e6 100644
--- a/ambari-agent/src/main/python/ambari_agent/listeners/ConfigurationEventListener.py
+++ b/ambari-agent/src/main/python/ambari_agent/listeners/ConfigurationEventListener.py
@@ -31,7 +31,7 @@ class ConfigurationEventListener(EventListener):
   Listener of Constants.CONFIGURATIONS_TOPIC events from server.
   """
   def __init__(self, configuration_cache):
-    self.topology_cache = configuration_cache
+    self.configuration_cache = configuration_cache
 
   def on_event(self, headers, message):
     """
@@ -40,7 +40,7 @@ class ConfigurationEventListener(EventListener):
     @param headers: headers dictionary
     @param message: message payload dictionary
     """
-    self.topology_cache.update_cache(message)
+    self.configuration_cache.update_cache(message)
 
   def get_handled_path(self):
     return Constants.CONFIGURATIONS_TOPIC
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/c4c2ec79/ambari-agent/src/main/python/ambari_agent/listeners/__init__.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/listeners/__init__.py b/ambari-agent/src/main/python/ambari_agent/listeners/__init__.py
index 2b7e9bc..45b38ed 100644
--- a/ambari-agent/src/main/python/ambari_agent/listeners/__init__.py
+++ b/ambari-agent/src/main/python/ambari_agent/listeners/__init__.py
@@ -48,8 +48,10 @@ class EventListener(ambari_stomp.ConnectionListener):
 
       logger.info("Received event from {0}".format(destination))
       logger.debug("Received event from {0}: headers={1} ; message={2}".format(destination, headers, message))
-
-      self.on_event(headers, message_json)
+      try:
+        self.on_event(headers, message_json)
+      except:
+        logger.exception("Exception while handing event from {0}: headers={1} ; message={2}".format(destination, headers, message))
 
   def on_event(self, headers, message):
     """

http://git-wip-us.apache.org/repos/asf/ambari/blob/c4c2ec79/ambari-agent/src/main/python/ambari_agent/main.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/main.py b/ambari-agent/src/main/python/ambari_agent/main.py
index 29eb926..72d6c70 100644
--- a/ambari-agent/src/main/python/ambari_agent/main.py
+++ b/ambari-agent/src/main/python/ambari_agent/main.py
@@ -112,6 +112,7 @@ from resource_management.core.logger import Logger
 from ambari_agent import HeartbeatThread
 from ambari_agent.InitializerModule import InitializerModule
 from ambari_agent.ComponentStatusExecutor import ComponentStatusExecutor
+from ambari_agent.CommandStatusReporter import CommandStatusReporter
 
 logger = logging.getLogger()
 alerts_logger = logging.getLogger('ambari_alerts')
@@ -361,6 +362,11 @@ def run_threads():
   component_status_executor = ComponentStatusExecutor(initializer_module)
   component_status_executor.start()
 
+  command_status_reporter = CommandStatusReporter(initializer_module)
+  command_status_reporter.start()
+
+  initializer_module.action_queue.start()
+
   while not initializer_module.stop_event.is_set():
     time.sleep(0.1)
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/c4c2ec79/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py b/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py
index cab8fe1..9d59222 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py
@@ -29,19 +29,29 @@ from BaseStompServerTestCase import BaseStompServerTestCase
 from ambari_agent import HeartbeatThread
 from ambari_agent.InitializerModule import InitializerModule
 from ambari_agent.ComponentStatusExecutor import ComponentStatusExecutor
+from ambari_agent.CommandStatusReporter import CommandStatusReporter
+from ambari_agent.CustomServiceOrchestrator import CustomServiceOrchestrator
 
 from mock.mock import MagicMock, patch
 
 class TestAgentStompResponses(BaseStompServerTestCase):
-  def test_mock_server_can_start(self):
+  @patch.object(CustomServiceOrchestrator, "runCommand")
+  def test_mock_server_can_start(self, runCommand_mock):
+    runCommand_mock.return_value = {'stdout':'...', 'stderr':'...', 'structuredOut' : '{}', 'exitcode':1}
     self.init_stdout_logger()
 
     self.remove(['/tmp/cluster_cache/configurations.json', '/tmp/cluster_cache/metadata.json', '/tmp/cluster_cache/topology.json'])
 
+    if not os.path.exists("/tmp/ambari-agent"):
+      os.mkdir("/tmp/ambari-agent")
+
     initializer_module = InitializerModule()
     heartbeat_thread = HeartbeatThread.HeartbeatThread(initializer_module)
     heartbeat_thread.start()
 
+    action_queue = initializer_module.action_queue
+    action_queue.start()
+
     connect_frame = self.server.frames_queue.get()
     users_subscribe_frame = self.server.frames_queue.get()
     commands_subscribe_frame = self.server.frames_queue.get()
@@ -53,6 +63,9 @@ class TestAgentStompResponses(BaseStompServerTestCase):
     component_status_executor = ComponentStatusExecutor(initializer_module)
     component_status_executor.start()
 
+    command_status_reporter = CommandStatusReporter(initializer_module)
+    command_status_reporter.start()
+
     status_reports_frame = self.server.frames_queue.get()
 
     # server sends registration response
@@ -72,6 +85,12 @@ class TestAgentStompResponses(BaseStompServerTestCase):
     self.server.topic_manager.send(f)
 
     heartbeat_frame = self.server.frames_queue.get()
+    dn_status_in_progress_frame = json.loads(self.server.frames_queue.get().body)
+    dn_status_failed_frame = json.loads(self.server.frames_queue.get().body)
+    zk_status_in_progress_frame = json.loads(self.server.frames_queue.get().body)
+    zk_status_failed_frame = json.loads(self.server.frames_queue.get().body)
+    action_status_in_progress_frame = json.loads(self.server.frames_queue.get().body)
+    action_status_failed_frame = json.loads(self.server.frames_queue.get().body)
     initializer_module.stop_event.set()
 
     f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '2'}, body=json.dumps({'heartbeat-response':'true'}))
@@ -79,10 +98,16 @@ class TestAgentStompResponses(BaseStompServerTestCase):
 
     heartbeat_thread.join()
     component_status_executor.join()
+    command_status_reporter.join()
+    action_queue.join()
 
     self.assertEquals(initializer_module.topology_cache['0']['hosts'][0]['hostname'], 'c6401.ambari.apache.org')
     self.assertEquals(initializer_module.metadata_cache['0']['status_commands_to_run'], ('STATUS',))
     self.assertEquals(initializer_module.configurations_cache['0']['configurations']['zoo.cfg']['clientPort'], '2181')
+    self.assertEquals(dn_status_in_progress_frame[0]['roleCommand'], 'START')
+    self.assertEquals(dn_status_in_progress_frame[0]['role'], 'DATANODE')
+    self.assertEquals(dn_status_in_progress_frame[0]['status'], 'IN_PROGRESS')
+    self.assertEquals(dn_status_failed_frame[0]['status'], 'FAILED')
 
     """
     ============================================================================================
@@ -95,6 +120,9 @@ class TestAgentStompResponses(BaseStompServerTestCase):
     heartbeat_thread = HeartbeatThread.HeartbeatThread(initializer_module)
     heartbeat_thread.start()
 
+    action_queue = initializer_module.action_queue
+    action_queue.start()
+
     connect_frame = self.server.frames_queue.get()
     users_subscribe_frame = self.server.frames_queue.get()
     commands_subscribe_frame = self.server.frames_queue.get()
@@ -107,6 +135,9 @@ class TestAgentStompResponses(BaseStompServerTestCase):
     component_status_executor = ComponentStatusExecutor(initializer_module)
     component_status_executor.start()
 
+    command_status_reporter = CommandStatusReporter(initializer_module)
+    command_status_reporter.start()
+
     status_reports_frame = self.server.frames_queue.get()
 
     self.assertEquals(clusters_hashes['metadata_hash'], '21724f6ffa7aff0fe91a0c0c5b765dba')
@@ -125,6 +156,8 @@ class TestAgentStompResponses(BaseStompServerTestCase):
 
     heartbeat_thread.join()
     component_status_executor.join()
+    command_status_reporter.join()
+    action_queue.join()
 
   def remove(self, filepathes):
     for filepath in filepathes:

http://git-wip-us.apache.org/repos/asf/ambari/blob/c4c2ec79/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/execution_commands.json
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/execution_commands.json b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/execution_commands.json
index bf54b97..525af5c 100644
--- a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/execution_commands.json
+++ b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/execution_commands.json
@@ -32,10 +32,13 @@
       {
         "requestId":5,
         "taskId":9,
+        "commandId":1,
         "serviceName":"HDFS",
         "role":"DATANODE",
         "commandType":"EXECUTION_COMMAND",
         "roleCommand":"START",
+        "clusterName": "c1",
+        "clusterId": 0,
         "configuration_credentials":{
 
         },
@@ -45,7 +48,7 @@
           "script":"scripts/datanode.py",
           "phase":"INITIAL_START",
           "max_duration_for_retries":"600",
-          "command_retry_enabled":"true",
+          "command_retry_enabled":"false",
           "command_timeout":"1200",
           "refresh_topology":"True",
           "script_type":"PYTHON"
@@ -54,10 +57,13 @@
       {
         "requestId":6,
         "taskId":9,
+        "commandId":0,
+        "clusterId": "null",
         "serviceName":"ZOOKEEPER",
         "role":"ZOOKEEPER_SERVER",
         "commandType":"EXECUTION_COMMAND",
         "roleCommand":"START",
+        "clusterName": "c1",
         "configuration_credentials":{
 
         },
@@ -67,7 +73,7 @@
           "script":"scripts/datanode.py",
           "phase":"INITIAL_START",
           "max_duration_for_retries":"600",
-          "command_retry_enabled":"true",
+          "command_retry_enabled":"false",
           "command_timeout":"1200",
           "refresh_topology":"True",
           "script_type":"PYTHON"
@@ -88,6 +94,8 @@
         "role":"check_host",
         "commandType":"EXECUTION_COMMAND",
         "taskId":2,
+        "commandId":1,
+        "clusterId": "null",
         "commandParams":{
           "script":"check_host.py",
           "check_execute_list":"host_resolution_check",