You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ao...@apache.org on 2017/06/20 12:26:36 UTC

ambari git commit: AMBARI-21270. Ability to auto-start component without server intervention (aonishuk)

Repository: ambari
Updated Branches:
  refs/heads/branch-3.0-perf 11f16c83a -> 670a08eed


AMBARI-21270. Ability to auto-start component without server intervention (aonishuk)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/670a08ee
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/670a08ee
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/670a08ee

Branch: refs/heads/branch-3.0-perf
Commit: 670a08eed3c8b45b58ab729c8592d6f46af3d4f2
Parents: 11f16c8
Author: Andrew Onishuk <ao...@hortonworks.com>
Authored: Tue Jun 20 15:25:47 2017 +0300
Committer: Andrew Onishuk <ao...@hortonworks.com>
Committed: Tue Jun 20 15:25:47 2017 +0300

----------------------------------------------------------------------
 .../src/main/python/ambari_agent/ActionQueue.py | 146 ++-------
 .../python/ambari_agent/CommandStatusDict.py    |   2 +-
 .../ambari_agent/ComponentStatusExecutor.py     |   5 +-
 .../main/python/ambari_agent/HeartbeatThread.py |   2 +-
 .../python/ambari_agent/InitializerModule.py    |   4 +-
 .../main/python/ambari_agent/RecoveryManager.py | 303 +++++++------------
 .../listeners/MetadataEventListener.py          |  17 +-
 .../ambari_agent/TestAgentStompResponses.py     |  23 +-
 .../dummy_files/stomp/execution_commands.json   |  10 +-
 .../stomp/metadata_after_registration.json      |  12 +-
 10 files changed, 176 insertions(+), 348 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/670a08ee/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
index 5632b5b..a470697 100644
--- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
+++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
@@ -76,6 +76,7 @@ class ActionQueue(threading.Thread):
     self.backgroundCommandQueue = Queue.Queue()
     self.commandStatuses = initializer_module.commandStatuses
     self.config = initializer_module.config
+    self.recovery_manager = initializer_module.recovery_manager
     self.configTags = {}
     self.stop_event = initializer_module.stop_event
     self.tmpdir = self.config.get('agent', 'prefix')
@@ -91,9 +92,6 @@ class ActionQueue(threading.Thread):
         command['serviceName'] = "null"
       if command.has_key('clusterId'):
         command['clusterId'] = "null"
-      if not command.has_key('clusterName'):
-        command['clusterName'] = 'null'
-
 
       logger.info("Adding " + command['commandType'] + " for role " + \
                   command['role'] + " for service " + \
@@ -134,6 +132,7 @@ class ActionQueue(threading.Thread):
     try:
       while not self.stop_event.is_set():
         self.processBackgroundQueueSafeEmpty()
+        self.fillRecoveryCommands()
         try:
           if self.parallel_execution == 0:
             command = self.commandQueue.get(True, self.EXECUTION_COMMAND_WAIT_TIME)
@@ -168,6 +167,10 @@ class ActionQueue(threading.Thread):
 
     logger.info("ActionQueue thread has successfully finished")
 
+  def fillRecoveryCommands(self):
+    if not self.tasks_in_progress_or_pending():
+      self.put(self.recovery_manager.get_recovery_commands())
+
   def processBackgroundQueueSafeEmpty(self):
     while not self.backgroundCommandQueue.empty():
       try:
@@ -190,40 +193,34 @@ class ActionQueue(threading.Thread):
     try:
       if commandType in [self.EXECUTION_COMMAND, self.BACKGROUND_EXECUTION_COMMAND, self.AUTO_EXECUTION_COMMAND]:
         try:
-          # TODO STOMP: fix recovery manager for execution commands
-          #if self.controller.recovery_manager.enabled():
-          #  self.controller.recovery_manager.start_execution_command()
+          if self.recovery_manager.enabled():
+            self.recovery_manager.on_execution_command_start()
+            self.recovery_manager.process_execution_command(command)
+
           self.execute_command(command)
         finally:
-          pass
-          #if self.controller.recovery_manager.enabled():
-          #  self.controller.recovery_manager.stop_execution_command()
+          if self.recovery_manager.enabled():
+            self.recovery_manager.on_execution_command_finish()
       else:
         logger.error("Unrecognized command " + pprint.pformat(command))
     except Exception:
       logger.exception("Exception while processing {0} command".format(commandType))
 
   def tasks_in_progress_or_pending(self):
-    return_val = False
-    if not self.commandQueue.empty():
-      return_val = True
-    if self.controller.recovery_manager.has_active_command():
-      return_val = True
-    return return_val
-    pass
+    return not self.commandQueue.empty() or self.recovery_manager.has_active_command()
 
   def execute_command(self, command):
     '''
     Executes commands of type EXECUTION_COMMAND
     '''
-    clusterName = command['clusterName']
+    clusterId = command['clusterId']
     commandId = command['commandId']
     isCommandBackground = command['commandType'] == self.BACKGROUND_EXECUTION_COMMAND
     isAutoExecuteCommand = command['commandType'] == self.AUTO_EXECUTION_COMMAND
     message = "Executing command with id = {commandId}, taskId = {taskId} for role = {role} of " \
-              "cluster {cluster}.".format(
+              "cluster_id {cluster}.".format(
               commandId = str(commandId), taskId = str(command['taskId']),
-              role=command['role'], cluster=clusterName)
+              role=command['role'], cluster=clusterId)
     logger.info(message)
 
     taskId = command['taskId']
@@ -359,10 +356,10 @@ class ActionQueue(threading.Thread):
       roleResult['stderr'] = 'None'
 
     # let ambari know name of custom command
-    """
-    if command['hostLevelParams'].has_key('custom_command'):
-      roleResult['customCommand'] = command['hostLevelParams']['custom_command']
-    """
+
+    if command.has_key('custom_command'):
+      roleResult['customCommand'] = command['custom_command']
+
     if 'structuredOut' in commandresult:
       roleResult['structuredOut'] = str(json.dumps(commandresult['structuredOut']))
     else:
@@ -370,30 +367,6 @@ class ActionQueue(threading.Thread):
 
     # let recovery manager know the current state
     if status == self.COMPLETED_STATUS:
-      # TODO STOMP:fix recovery_manager
-      """
-      if self.controller.recovery_manager.enabled() and command.has_key('roleCommand') \
-          and self.controller.recovery_manager.configured_for_recovery(command['role']):
-        if command['roleCommand'] == self.ROLE_COMMAND_START:
-          self.controller.recovery_manager.update_current_status(command['role'], LiveStatus.LIVE_STATUS)
-          self.controller.recovery_manager.update_config_staleness(command['role'], False)
-          logger.info("After EXECUTION_COMMAND (START), with taskId=" + str(command['taskId']) +
-                      ", current state of " + command['role'] + " to " +
-                       self.controller.recovery_manager.get_current_status(command['role']) )
-        elif command['roleCommand'] == self.ROLE_COMMAND_STOP or command['roleCommand'] == self.ROLE_COMMAND_INSTALL:
-          self.controller.recovery_manager.update_current_status(command['role'], LiveStatus.DEAD_STATUS)
-          logger.info("After EXECUTION_COMMAND (STOP/INSTALL), with taskId=" + str(command['taskId']) +
-                      ", current state of " + command['role'] + " to " +
-                       self.controller.recovery_manager.get_current_status(command['role']) )
-        elif command['roleCommand'] == self.ROLE_COMMAND_CUSTOM_COMMAND:
-          if command['hostLevelParams'].has_key('custom_command') and \
-                  command['hostLevelParams']['custom_command'] == self.CUSTOM_COMMAND_RESTART:
-            self.controller.recovery_manager.update_current_status(command['role'], LiveStatus.LIVE_STATUS)
-            self.controller.recovery_manager.update_config_staleness(command['role'], False)
-            logger.info("After EXECUTION_COMMAND (RESTART), current state of " + command['role'] + " to " +
-                         self.controller.recovery_manager.get_current_status(command['role']) )
-      """
-
       # let ambari know that configuration tags were applied
       configHandler = ActualConfigHandler(self.config, self.configTags)
 
@@ -428,18 +401,8 @@ class ActionQueue(threading.Thread):
                                                 command['hostLevelParams']['clientsToUpdateConfigs'])
         roleResult['configurationTags'] = configHandler.read_actual_component(
             command['role'])
-    elif status == self.FAILED_STATUS:
-      # TODO STOMP: recovery manager
-      """
-      if self.controller.recovery_manager.enabled() and command.has_key('roleCommand') \
-              and self.controller.recovery_manager.configured_for_recovery(command['role']):
-        if command['roleCommand'] == self.ROLE_COMMAND_INSTALL:
-          self.controller.recovery_manager.update_current_status(command['role'], self.controller.recovery_manager.INSTALL_FAILED)
-          logger.info("After EXECUTION_COMMAND (INSTALL), with taskId=" + str(command['taskId']) +
-                      ", current state of " + command['role'] + " to " +
-                      self.controller.recovery_manager.get_current_status(command['role']))
-      """
 
+    self.recovery_manager.process_execution_command_result(command, status)
     self.commandStatuses.put_command_status(command, roleResult)
 
   def log_command_output(self, text, taskId):
@@ -495,73 +458,6 @@ class ActionQueue(threading.Thread):
     component_status_result = self.customServiceOrchestrator.requestComponentStatus(command)
     return command, component_status_result
 
-  def process_status_command_result(self, result):
-    '''
-    Executes commands of type STATUS_COMMAND
-    '''
-    # TODO STOMP: review if we need to run this with new status commands
-    try:
-      command, component_status_result = result
-      cluster = command['clusterName']
-      service = command['serviceName']
-      component = command['componentName']
-      configurations = command['configurations']
-      if configurations.has_key('global'):
-        globalConfig = configurations['global']
-      else:
-        globalConfig = {}
-
-      # TODO STOMP: check why we need this
-      if not Script.config :
-        logger.debug('Setting Script.config to last status command configuration')
-        Script.config = command
-
-      livestatus = LiveStatus(cluster, service, component,
-                              globalConfig, self.config, self.configTags)
-
-      component_extra = None
-
-      if component_status_result['exitcode'] == 0:
-        component_status = LiveStatus.LIVE_STATUS
-        if self.controller.recovery_manager.enabled() \
-          and self.controller.recovery_manager.configured_for_recovery(component):
-          self.controller.recovery_manager.update_current_status(component, component_status)
-      else:
-        component_status = LiveStatus.DEAD_STATUS
-        if self.controller.recovery_manager.enabled() \
-          and self.controller.recovery_manager.configured_for_recovery(component):
-          if (self.controller.recovery_manager.get_current_status(component) != self.controller.recovery_manager.INSTALL_FAILED):
-            self.controller.recovery_manager.update_current_status(component, component_status)
-
-      request_execution_cmd = self.controller.recovery_manager.requires_recovery(component) and \
-                                not self.controller.recovery_manager.command_exists(component, ActionQueue.EXECUTION_COMMAND)
-
-      if 'structuredOut' in component_status_result:
-        component_extra = component_status_result['structuredOut']
-
-      result = livestatus.build(component_status=component_status)
-      if self.controller.recovery_manager.enabled():
-        result['sendExecCmdDet'] = str(request_execution_cmd)
-
-      if component_extra is not None and len(component_extra) != 0:
-        if component_extra.has_key('alerts'):
-          result['alerts'] = component_extra['alerts']
-          del component_extra['alerts']
-
-        result['extra'] = component_extra
-
-      logger.debug("Got live status for component " + component + \
-                   " of service " + str(service) + \
-                   " of cluster " + str(cluster))
-
-      logger.debug(pprint.pformat(result))
-      if result is not None:
-        self.commandStatuses.put_command_status(command, result)
-    except Exception, err:
-      traceback.print_exc()
-      logger.warn(err)
-    pass
-
   def status_update_callback(self):
     """
     Actions that are executed every time when command status changes

http://git-wip-us.apache.org/repos/asf/ambari/blob/670a08ee/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py b/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py
index e27a243..133701f 100644
--- a/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py
+++ b/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py
@@ -136,7 +136,7 @@ class CommandStatusDict():
       'role': command['role'],
       'actionId': command['commandId'],
       'taskId': command['taskId'],
-      'clusterName': command['clusterName'],
+      'clusterId': command['clusterId'],
       'serviceName': command['serviceName'],
       'roleCommand': command['roleCommand']
     }

http://git-wip-us.apache.org/repos/asf/ambari/blob/670a08ee/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py b/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py
index 520c97d..2ac904f 100644
--- a/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py
+++ b/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py
@@ -22,6 +22,7 @@ import logging
 import threading
 
 from ambari_agent import Constants
+from ambari_agent.LiveStatus import LiveStatus
 from collections import defaultdict
 
 logger = logging.getLogger(__name__)
@@ -33,6 +34,7 @@ class ComponentStatusExecutor(threading.Thread):
     self.topology_cache = initializer_module.topology_cache
     self.customServiceOrchestrator = initializer_module.customServiceOrchestrator
     self.stop_event = initializer_module.stop_event
+    self.recovery_manager = initializer_module.recovery_manager
     self.reported_component_status = defaultdict(lambda:defaultdict(lambda:None)) # component statuses which were received by server
     threading.Thread.__init__(self)
 
@@ -77,7 +79,7 @@ class ComponentStatusExecutor(threading.Thread):
 
               component_status_result = self.customServiceOrchestrator.requestComponentStatus(command_dict)
               logger.info(component_status_result)
-              status = "STARTED" if component_status_result['exitcode'] == 0 else "INSTALLED"
+              status = LiveStatus.LIVE_STATUS if component_status_result['exitcode'] == 0 else LiveStatus.DEAD_STATUS
 
               result = {
                 'serviceName': service_name,
@@ -90,6 +92,7 @@ class ComponentStatusExecutor(threading.Thread):
               if status != self.reported_component_status[component_name][command_name]:
                 logging.info("Status for {0} has changed to {1}".format(component_name, status))
                 cluster_reports[cluster_id].append(result)
+                self.recovery_manager.handle_status_change(component_name, status)
 
         self.send_updates_to_server(cluster_reports)
       except:

http://git-wip-us.apache.org/repos/asf/ambari/blob/670a08ee/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py b/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
index 40e5b12..dbf4006 100644
--- a/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
+++ b/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
@@ -54,7 +54,7 @@ class HeartbeatThread(threading.Thread):
     # listeners
     self.server_responses_listener = ServerResponsesListener()
     self.commands_events_listener = CommandsEventListener(initializer_module.action_queue)
-    self.metadata_events_listener = MetadataEventListener(initializer_module.metadata_cache)
+    self.metadata_events_listener = MetadataEventListener(initializer_module.metadata_cache, initializer_module.recovery_manager)
     self.topology_events_listener = TopologyEventListener(initializer_module.topology_cache)
     self.configuration_events_listener = ConfigurationEventListener(initializer_module.configurations_cache)
     self.listeners = [self.server_responses_listener, self.commands_events_listener, self.metadata_events_listener, self.topology_events_listener, self.configuration_events_listener]

http://git-wip-us.apache.org/repos/asf/ambari/blob/670a08ee/ambari-agent/src/main/python/ambari_agent/InitializerModule.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/InitializerModule.py b/ambari-agent/src/main/python/ambari_agent/InitializerModule.py
index f0c3b43..8de1fa5 100644
--- a/ambari-agent/src/main/python/ambari_agent/InitializerModule.py
+++ b/ambari-agent/src/main/python/ambari_agent/InitializerModule.py
@@ -31,7 +31,7 @@ from ambari_agent.security import AmbariStompConnection
 from ambari_agent.ActionQueue import ActionQueue
 from ambari_agent.CommandStatusDict import CommandStatusDict
 from ambari_agent.CustomServiceOrchestrator import CustomServiceOrchestrator
-from ambari_agent.HostStatusReporter import HostStatusReporter
+from ambari_agent.RecoveryManager import RecoveryManager
 
 logger = logging.getLogger()
 
@@ -58,6 +58,7 @@ class InitializerModule:
     self.cache_dir = self.config.get('agent', 'cache_dir', default='/var/lib/ambari-agent/cache')
     self.command_reports_interval = int(self.config.get('agent', 'command_reports_interval', default='5'))
     self.cluster_cache_dir = os.path.join(self.cache_dir, FileCache.CLUSTER_CACHE_DIRECTORY)
+    self.recovery_cache_dir = os.path.join(self.cache_dir, FileCache.RECOVERY_CACHE_DIRECTORY)
 
     self.host_status_report_interval = int(self.config.get('heartbeat', 'state_interval_seconds', '60'))
 
@@ -74,6 +75,7 @@ class InitializerModule:
     self.configurations_cache = ClusterConfigurationCache(self.cluster_cache_dir)
     self.customServiceOrchestrator = CustomServiceOrchestrator(self)
 
+    self.recovery_manager = RecoveryManager(self.recovery_cache_dir)
     self.commandStatuses = CommandStatusDict(self)
     self.action_queue = ActionQueue(self)
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/670a08ee/ambari-agent/src/main/python/ambari_agent/RecoveryManager.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/RecoveryManager.py b/ambari-agent/src/main/python/ambari_agent/RecoveryManager.py
index be335f2..68dd0be 100644
--- a/ambari-agent/src/main/python/ambari_agent/RecoveryManager.py
+++ b/ambari-agent/src/main/python/ambari_agent/RecoveryManager.py
@@ -43,11 +43,12 @@ class RecoveryManager:
   COMPONENT_NAME = "componentName"
   ROLE = "role"
   TASK_ID = "taskId"
+  CLUSTER_ID = "clusterId"
   DESIRED_STATE = "desiredState"
   HAS_STALE_CONFIG = "hasStaleConfigs"
   EXECUTION_COMMAND_DETAILS = "executionCommandDetails"
   ROLE_COMMAND = "roleCommand"
-  HOST_LEVEL_PARAMS = "hostLevelParams"
+  COMMAND_ID = "commandId"
   PAYLOAD_LEVEL_DEFAULT = "DEFAULT"
   PAYLOAD_LEVEL_MINIMAL = "MINIMAL"
   PAYLOAD_LEVEL_EXECUTION_COMMAND = "EXECUTION_COMMAND"
@@ -85,7 +86,6 @@ class RecoveryManager:
     self.retry_gap = 5
     self.max_lifetime_count = 12
 
-    self.stored_exec_commands = {}
     self.id = int(time.time())
     self.allowed_desired_states = [self.STARTED, self.INSTALLED]
     self.allowed_current_states = [self.INIT, self.INSTALLED]
@@ -98,6 +98,7 @@ class RecoveryManager:
     self.active_command_count = 0
     self.paused = False
     self.recovery_timestamp = -1
+    self.cluster_id = None
 
     if not os.path.exists(cache_dir):
       try:
@@ -113,12 +114,12 @@ class RecoveryManager:
 
     pass
 
-  def start_execution_command(self):
+  def on_execution_command_start(self):
     with self.__active_command_lock:
       self.active_command_count += 1
     pass
 
-  def stop_execution_command(self):
+  def on_execution_command_finish(self):
     with self.__active_command_lock:
       self.active_command_count -= 1
     pass
@@ -160,11 +161,18 @@ class RecoveryManager:
       pass
 
     self.statuses[component]["stale_config"] = is_config_stale
-    if self.statuses[component]["current"] == self.statuses[component]["desired"] and \
-            self.statuses[component]["stale_config"] == False:
-      self.remove_command(component)
     pass
 
+  def handle_status_change(self, component, component_status):
+    if not self.enabled() or not self.configured_for_recovery(component):
+      return
+
+    if component_status == LiveStatus.LIVE_STATUS:
+        self.update_current_status(component, component_status)
+    else:
+        if (self.get_current_status(component) != self.INSTALL_FAILED):
+          self.update_current_status(component, component_status)
+
   def update_current_status(self, component, state):
     """
     Updates the current status of a host component managed by the agent
@@ -184,9 +192,6 @@ class RecoveryManager:
     if self.statuses[component]["current"] != state:
       logger.info("current status is set to %s for %s", state, component)
     self.statuses[component]["current"] = state
-    if self.statuses[component]["current"] == self.statuses[component]["desired"] and \
-            self.statuses[component]["stale_config"] == False:
-      self.remove_command(component)
     pass
 
 
@@ -209,9 +214,6 @@ class RecoveryManager:
     if self.statuses[component]["desired"] != state:
       logger.info("desired status is set to %s for %s", state, component)
     self.statuses[component]["desired"] = state
-    if self.statuses[component]["current"] == self.statuses[component]["desired"] and \
-            self.statuses[component]["stale_config"] == False:
-      self.remove_command(component)
     pass
 
   """
@@ -354,9 +356,10 @@ class RecoveryManager:
 
         if command:
           self.execute(component)
+          logger.info("Created recovery command %s for component %s",
+                    command[self.ROLE_COMMAND], command[self.ROLE])
           commands.append(command)
     return commands
-    pass
 
 
   def may_execute(self, action):
@@ -549,7 +552,7 @@ class RecoveryManager:
     pass
 
 
-  def update_configuration_from_registration(self, reg_resp):
+  def update_recovery_config(self, dictionary):
     """
     TODO: Server sends the recovery configuration - call update_config after parsing
     "recoveryConfig": {
@@ -573,9 +576,9 @@ class RecoveryManager:
     recovery_timestamp = -1 # Default value if recoveryTimestamp is not available.
 
 
-    if reg_resp and "recoveryConfig" in reg_resp:
-      logger.info("RecoverConfig = " + pprint.pformat(reg_resp["recoveryConfig"]))
-      config = reg_resp["recoveryConfig"]
+    if dictionary and "recoveryConfig" in dictionary:
+      logger.info("RecoverConfig = " + pprint.pformat(dictionary["recoveryConfig"]))
+      config = dictionary["recoveryConfig"]
       if "type" in config:
         if config["type"] in ["AUTO_INSTALL_START", "AUTO_START", "FULL"]:
           recovery_enabled = True
@@ -681,209 +684,113 @@ class RecoveryManager:
   def get_unique_task_id(self):
     self.id += 1
     return self.id
-    pass
 
-
-  def process_status_commands(self, commands):
+  def process_execution_command_result(self, command, status):
+    """
+    Update current status for the components depending on command and its status.
+    """
     if not self.enabled():
       return
-
-    if commands and len(commands) > 0:
-      for command in commands:
-        self.store_or_update_command(command)
-        if self.EXECUTION_COMMAND_DETAILS in command:
-          logger.debug("Details to construct exec commands: " + pprint.pformat(command[self.EXECUTION_COMMAND_DETAILS]))
-
-    pass
-
-
-  def process_execution_commands(self, commands):
-    if not self.enabled():
+      
+    if not command.has_key(self.ROLE_COMMAND) or not self.configured_for_recovery(command['role']):
       return
-
-    if commands and len(commands) > 0:
-      for command in commands:
-        if self.COMMAND_TYPE in command and command[self.COMMAND_TYPE] == ActionQueue.EXECUTION_COMMAND:
-          if self.ROLE in command:
-            if command[self.ROLE_COMMAND] in (ActionQueue.ROLE_COMMAND_INSTALL, ActionQueue.ROLE_COMMAND_STOP) \
-                and self.configured_for_recovery(command[self.ROLE]):
-              self.update_desired_status(command[self.ROLE], LiveStatus.DEAD_STATUS)
-              logger.info("Received EXECUTION_COMMAND (STOP/INSTALL), desired state of " + command[self.ROLE] + " to " +
-                           self.get_desired_status(command[self.ROLE]) )
-            elif command[self.ROLE_COMMAND] == ActionQueue.ROLE_COMMAND_START \
-                and self.configured_for_recovery(command[self.ROLE]):
-              self.update_desired_status(command[self.ROLE], LiveStatus.LIVE_STATUS)
-              logger.info("Received EXECUTION_COMMAND (START), desired state of " + command[self.ROLE] + " to " +
-                           self.get_desired_status(command[self.ROLE]) )
-            elif command[self.HOST_LEVEL_PARAMS].has_key('custom_command') and \
-                    command[self.HOST_LEVEL_PARAMS]['custom_command'] == ActionQueue.CUSTOM_COMMAND_RESTART \
-                    and self.configured_for_recovery(command[self.ROLE]):
-              self.update_desired_status(command[self.ROLE], LiveStatus.LIVE_STATUS)
-              logger.info("Received EXECUTION_COMMAND (RESTART), desired state of " + command[self.ROLE] + " to " +
-                           self.get_desired_status(command[self.ROLE]) )
-
-    pass
-
-
-  def store_or_update_command(self, command):
-    """
-    Stores command details by reading them from the STATUS_COMMAND
-    Update desired state as well
+      
+    if status == ActionQueue.COMPLETED_STATUS:
+      if command[self.ROLE_COMMAND] == self.ROLE_COMMAND_START:
+        self.update_current_status(command[self.ROLE], LiveStatus.LIVE_STATUS)
+        #self.update_config_staleness(command['role'], False)
+        logger.info("After EXECUTION_COMMAND (START), with taskId=" + str(command['taskId']) +
+                    ", current state of " + command[self.ROLE] + " to " +
+                     self.get_current_status(command[self.ROLE]) )
+      elif command['roleCommand'] == ActionQueue.ROLE_COMMAND_STOP or command[self.ROLE_COMMAND] == ActionQueue.ROLE_COMMAND_INSTALL:
+        self.update_current_status(command[self.ROLE], LiveStatus.DEAD_STATUS)
+        logger.info("After EXECUTION_COMMAND (STOP/INSTALL), with taskId=" + str(command['taskId']) +
+                    ", current state of " + command[self.ROLE] + " to " +
+                     self.get_current_status(command[self.ROLE]) )
+      elif command[self.ROLE_COMMAND] == ActionQueue.ROLE_COMMAND_CUSTOM_COMMAND:
+        if command.has_key('custom_command') and command['custom_command'] == ActionQueue.CUSTOM_COMMAND_RESTART:
+          self.update_current_status(command['role'], LiveStatus.LIVE_STATUS)
+          #self.update_config_staleness(command['role'], False)
+          logger.info("After EXECUTION_COMMAND (RESTART), current state of " + command[self.ROLE] + " to " +
+                       self.get_current_status(command[self.ROLE]) )
+    elif status == ActionQueue.FAILED_STATUS:
+      if command[self.ROLE_COMMAND] == ActionQueue.ROLE_COMMAND_INSTALL:
+        self.update_current_status(command[self.ROLE], self.INSTALL_FAILED)
+        logger.info("After EXECUTION_COMMAND (INSTALL), with taskId=" + str(command['taskId']) +
+                    ", current state of " + command[self.ROLE] + " to " +
+                    self.get_current_status(command[self.ROLE]))
+
+  def process_execution_command(self, command):
+    """
+    Change desired state of the component depending on the execution command triggered.
     """
     if not self.enabled():
       return
+      
+    if not self.COMMAND_TYPE in command or not command[self.COMMAND_TYPE] == ActionQueue.EXECUTION_COMMAND:
+      return
+      
+    if not self.ROLE in command:
+      return
 
-    logger.debug("Inspecting command to store/update details")
-    if self.COMMAND_TYPE in command and command[self.COMMAND_TYPE] == ActionQueue.STATUS_COMMAND:
-      payloadLevel = self.PAYLOAD_LEVEL_DEFAULT
-      if self.PAYLOAD_LEVEL in command:
-        payloadLevel = command[self.PAYLOAD_LEVEL]
-
-      component = command[self.COMPONENT_NAME]
-      self.update_desired_status(component, command[self.DESIRED_STATE])
-      self.update_config_staleness(component, command[self.HAS_STALE_CONFIG])
-
-      if payloadLevel == self.PAYLOAD_LEVEL_EXECUTION_COMMAND:
-        if self.EXECUTION_COMMAND_DETAILS in command:
-          # Store the execution command details
-          self.remove_command(component)
-          self.add_command(component, command[self.EXECUTION_COMMAND_DETAILS])
-          logger.debug("Stored command details for " + component)
-        else:
-          logger.warn("Expected field " + self.EXECUTION_COMMAND_DETAILS + " unavailable.")
-        pass
-    pass
-
-
-  def get_install_command(self, component):
-    if self.paused:
-      logger.info("Recovery is paused, likely tasks waiting in pipeline for this host.")
-      return None
-
-    if self.enabled():
-      logger.debug("Using stored INSTALL command for %s", component)
-      if self.command_exists(component, ActionQueue.EXECUTION_COMMAND):
-        command = copy.deepcopy(self.stored_exec_commands[component])
-        command[self.ROLE_COMMAND] = "INSTALL"
-        command[self.COMMAND_TYPE] = ActionQueue.AUTO_EXECUTION_COMMAND
-        command[self.TASK_ID] = self.get_unique_task_id()
-        return command
-      else:
-        logger.info("INSTALL command cannot be computed as details are not received from Server.")
-    else:
-      logger.info("Recovery is not enabled. INSTALL command will not be computed.")
-    return None
-    pass
-
-  def get_stop_command(self, component):
-    if self.paused:
-      logger.info("Recovery is paused, likely tasks waiting in pipeline for this host.")
-      return None
-
-    if self.enabled():
-      logger.debug("Using stored STOP command for %s", component)
-      if self.command_exists(component, ActionQueue.EXECUTION_COMMAND):
-        command = copy.deepcopy(self.stored_exec_commands[component])
-        command[self.ROLE_COMMAND] = "STOP"
-        command[self.COMMAND_TYPE] = ActionQueue.AUTO_EXECUTION_COMMAND
-        command[self.TASK_ID] = self.get_unique_task_id()
-        return command
-      else:
-        logger.info("STOP command cannot be computed as details are not received from Server.")
-    else:
-      logger.info("Recovery is not enabled. STOP command will not be computed.")
-    return None
-    pass
-
-  def get_restart_command(self, component):
-    if self.paused:
-      logger.info("Recovery is paused, likely tasks waiting in pipeline for this host.")
-      return None
-
-    if self.enabled():
-      logger.debug("Using stored INSTALL command for %s", component)
-      if self.command_exists(component, ActionQueue.EXECUTION_COMMAND):
-        command = copy.deepcopy(self.stored_exec_commands[component])
-        command[self.ROLE_COMMAND] = "CUSTOM_COMMAND"
-        command[self.COMMAND_TYPE] = ActionQueue.AUTO_EXECUTION_COMMAND
-        command[self.TASK_ID] = self.get_unique_task_id()
-        command[self.HOST_LEVEL_PARAMS]['custom_command'] = 'RESTART'
-        return command
-      else:
-        logger.info("RESTART command cannot be computed as details are not received from Server.")
-    else:
-      logger.info("Recovery is not enabled. RESTART command will not be computed.")
-    return None
-    pass
-
-
-  def get_start_command(self, component):
+    if command[self.ROLE_COMMAND] in (ActionQueue.ROLE_COMMAND_INSTALL, ActionQueue.ROLE_COMMAND_STOP) \
+        and self.configured_for_recovery(command[self.ROLE]):
+      self.update_desired_status(command[self.ROLE], LiveStatus.DEAD_STATUS)
+      logger.info("Received EXECUTION_COMMAND (STOP/INSTALL), desired state of " + command[self.ROLE] + " to " +
+                   self.get_desired_status(command[self.ROLE]) )
+    elif command[self.ROLE_COMMAND] == ActionQueue.ROLE_COMMAND_START \
+        and self.configured_for_recovery(command[self.ROLE]):
+      self.update_desired_status(command[self.ROLE], LiveStatus.LIVE_STATUS)
+      logger.info("Received EXECUTION_COMMAND (START), desired state of " + command[self.ROLE] + " to " +
+                   self.get_desired_status(command[self.ROLE]) )
+    elif command.has_key('custom_command') and \
+            command['custom_command'] == ActionQueue.CUSTOM_COMMAND_RESTART \
+            and self.configured_for_recovery(command[self.ROLE]):
+      self.update_desired_status(command[self.ROLE], LiveStatus.LIVE_STATUS)
+      logger.info("Received EXECUTION_COMMAND (RESTART), desired state of " + command[self.ROLE] + " to " +
+                   self.get_desired_status(command[self.ROLE]) )
+
+  def get_command(self, component, command_name):
+    """
+    Get command dictionary by component name and command_name
+    """
     if self.paused:
       logger.info("Recovery is paused, likely tasks waiting in pipeline for this host.")
       return None
 
     if self.enabled():
-      logger.debug("Using stored START command for %s", component)
-      if self.command_exists(component, ActionQueue.EXECUTION_COMMAND):
-        command = copy.deepcopy(self.stored_exec_commands[component])
-        command[self.ROLE_COMMAND] = "START"
-        command[self.COMMAND_TYPE] = ActionQueue.AUTO_EXECUTION_COMMAND
-        command[self.TASK_ID] = self.get_unique_task_id()
-        return command
-      else:
-        logger.info("START command cannot be computed as details are not received from Server.")
+      command_id = self.get_unique_task_id()
+      command = {
+        self.CLUSTER_ID: self.cluster_id,
+        self.ROLE_COMMAND: command_name,
+        self.COMMAND_TYPE: ActionQueue.AUTO_EXECUTION_COMMAND,
+        self.TASK_ID: command_id,
+        self.ROLE: component,
+        self.COMMAND_ID: command_id
+      }
+      return command
     else:
       logger.info("Recovery is not enabled. START command will not be computed.")
 
     return None
-    pass
-
-
-  def command_exists(self, component, command_type):
-    if command_type == ActionQueue.EXECUTION_COMMAND:
-      self.remove_stale_command(component)
-      if component in self.stored_exec_commands:
-        return True
-
-    return False
-    pass
 
+  def get_restart_command(self, component):
+    command = self.get_command(component, "CUSTOM_COMMAND")
 
-  def remove_stale_command(self, component):
-    component_update_key = self.COMPONENT_UPDATE_KEY_FORMAT.format(component)
-    if component in self.stored_exec_commands:
-      insert_time = self.stored_exec_commands[component_update_key]
-      age = self._now_() - insert_time
-      if self.COMMAND_REFRESH_DELAY_SEC < age:
-        logger.debug("Removing stored command for component : " + str(component) + " as its " + str(age) + " sec old")
-        self.remove_command(component)
-    pass
+    if command is not None:
+      command[self.ROLE_COMMAND] = "CUSTOM_COMMAND"
+      command['custom_command'] = 'RESTART'
 
+    return command
 
-  def remove_command(self, component):
-    if component in self.stored_exec_commands:
-      self.__status_lock.acquire()
-      try:
-        component_update_key = self.COMPONENT_UPDATE_KEY_FORMAT.format(component)
-        del self.stored_exec_commands[component]
-        del self.stored_exec_commands[component_update_key]
-        logger.debug("Removed stored command for component : " + str(component))
-        return True
-      finally:
-        self.__status_lock.release()
-    return False
-
+  def get_install_command(self, component):
+    return self.get_command(component, "INSTALL")
 
-  def add_command(self, component, command):
-    self.__status_lock.acquire()
-    try:
-      component_update_key = self.COMPONENT_UPDATE_KEY_FORMAT.format(component)
-      self.stored_exec_commands[component] = command
-      self.stored_exec_commands[component_update_key] = self._now_()
-      logger.debug("Added command for component : " + str(component))
-    finally:
-      self.__status_lock.release()
+  def get_stop_command(self, component):
+    return self.get_command(component, "STOP")
 
+  def get_start_command(self, component):
+    return self.get_command(component, "START")
 
   def _read_int_(self, value, default_value=0):
     int_value = default_value

http://git-wip-us.apache.org/repos/asf/ambari/blob/670a08ee/ambari-agent/src/main/python/ambari_agent/listeners/MetadataEventListener.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/listeners/MetadataEventListener.py b/ambari-agent/src/main/python/ambari_agent/listeners/MetadataEventListener.py
index 788d381..364d8af 100644
--- a/ambari-agent/src/main/python/ambari_agent/listeners/MetadataEventListener.py
+++ b/ambari-agent/src/main/python/ambari_agent/listeners/MetadataEventListener.py
@@ -32,8 +32,9 @@ class MetadataEventListener(EventListener):
   """
   Listener of Constants.METADATA_TOPIC events from server.
   """
-  def __init__(self, configuration_cache):
-    self.topology_cache = configuration_cache
+  def __init__(self, metadata_cache, recovery_manager):
+    self.metadata_cache = metadata_cache
+    self.recovery_manager = recovery_manager
 
   def on_event(self, headers, message):
     """
@@ -46,8 +47,16 @@ class MetadataEventListener(EventListener):
     if message == {}:
       return
 
-    self.topology_cache.rewrite_cache(message['clusters'])
-    self.topology_cache.hash = message['hash']
+    self.metadata_cache.rewrite_cache(message['clusters'])
+    self.metadata_cache.hash = message['hash']
+
+    # FIXME: Recovery manager does not support multiple cluster as of now.
+    cluster_id = message['clusters'].keys()[0]
+
+    if 'recoveryConfig' in message['clusters'][cluster_id]:
+      logging.info("Updating recoveryConfig from metadata")
+      self.recovery_manager.update_recovery_config(self.metadata_cache[cluster_id])
+      self.recovery_manager.cluster_id = cluster_id
 
   def get_handled_path(self):
     return Constants.METADATA_TOPIC
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/670a08ee/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py b/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py
index f53097f..c41f87e 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py
@@ -104,12 +104,14 @@ class TestAgentStompResponses(BaseStompServerTestCase):
     metadata_subscribe_frame = self.server.frames_queue.get()
     topologies_subscribe_frame = self.server.frames_queue.get()
     heartbeat_frame = self.server.frames_queue.get()
-    dn_start_in_progress_frame = json.loads(self.server.frames_queue.get().body)
-    dn_start_failed_frame = json.loads(self.server.frames_queue.get().body)
-    zk_start_in_progress_frame = json.loads(self.server.frames_queue.get().body)
-    zk_start_failed_frame = json.loads(self.server.frames_queue.get().body)
+    dn_install_in_progress_frame = json.loads(self.server.frames_queue.get().body)
+    dn_install_failed_frame = json.loads(self.server.frames_queue.get().body)
+    zk_install_in_progress_frame = json.loads(self.server.frames_queue.get().body)
+    zk_install_failed_frame = json.loads(self.server.frames_queue.get().body)
     action_status_in_progress_frame = json.loads(self.server.frames_queue.get().body)
     action_status_failed_frame = json.loads(self.server.frames_queue.get().body)
+    dn_recovery_in_progress_frame = json.loads(self.server.frames_queue.get().body)
+    dn_recovery_failed_frame = json.loads(self.server.frames_queue.get().body)
     host_status_report = json.loads(self.server.frames_queue.get().body)
 
     initializer_module.stop_event.set()
@@ -129,10 +131,13 @@ class TestAgentStompResponses(BaseStompServerTestCase):
     self.assertEquals(initializer_module.topology_cache['0']['hosts'][0]['hostName'], 'c6401.ambari.apache.org')
     self.assertEquals(initializer_module.metadata_cache['0']['status_commands_to_run'], ('STATUS',))
     self.assertEquals(initializer_module.configurations_cache['0']['configurations']['zoo.cfg']['clientPort'], '2181')
-    self.assertEquals(dn_start_in_progress_frame[0]['roleCommand'], 'START')
-    self.assertEquals(dn_start_in_progress_frame[0]['role'], 'DATANODE')
-    self.assertEquals(dn_start_in_progress_frame[0]['status'], 'IN_PROGRESS')
-    self.assertEquals(dn_start_failed_frame[0]['status'], 'FAILED')
+    self.assertEquals(dn_install_in_progress_frame[0]['roleCommand'], 'INSTALL')
+    self.assertEquals(dn_install_in_progress_frame[0]['role'], 'DATANODE')
+    self.assertEquals(dn_install_in_progress_frame[0]['status'], 'IN_PROGRESS')
+    self.assertEquals(dn_install_failed_frame[0]['status'], 'FAILED')
+    self.assertEquals(dn_recovery_in_progress_frame[0]['roleCommand'], 'INSTALL')
+    self.assertEquals(dn_recovery_in_progress_frame[0]['role'], 'DATANODE')
+    self.assertEquals(dn_recovery_in_progress_frame[0]['status'], 'IN_PROGRESS')
 
     #============================================================================================
     #============================================================================================
@@ -253,7 +258,7 @@ class TestAgentStompResponses(BaseStompServerTestCase):
       self.assertEquals(json_topology, json_excepted_lopology)
       #self.assertEquals(initializer_module.topology_cache, self.get_dict_from_file("topology_cache_expected.json"))
 
-    self.assert_with_retries(is_json_equal, tries=40, try_sleep=0.1)
+    self.assert_with_retries(is_json_equal, tries=80, try_sleep=0.1)
 
     initializer_module.stop_event.set()
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/670a08ee/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/execution_commands.json
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/execution_commands.json b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/execution_commands.json
index 6e84319..075699e 100644
--- a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/execution_commands.json
+++ b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/execution_commands.json
@@ -9,8 +9,7 @@
           "serviceName":"HDFS",
           "role":"DATANODE",
           "commandType":"EXECUTION_COMMAND",
-          "roleCommand":"START",
-          "clusterName": "c1",
+          "roleCommand":"INSTALL",
           "clusterId": "0",
           "configuration_credentials":{
 
@@ -19,7 +18,7 @@
             "service_package_folder":"common-services/HDFS/2.1.0.2.0/package",
             "hooks_folder":"HDP/2.0.6/hooks",
             "script":"scripts/datanode.py",
-            "phase":"INITIAL_START",
+            "phase":"INITIAL_INSTALL",
             "max_duration_for_retries":"600",
             "command_retry_enabled":"false",
             "command_timeout":"1200",
@@ -35,8 +34,7 @@
           "serviceName":"ZOOKEEPER",
           "role":"ZOOKEEPER_SERVER",
           "commandType":"EXECUTION_COMMAND",
-          "roleCommand":"START",
-          "clusterName": "c1",
+          "roleCommand":"INSTALL",
           "configuration_credentials":{
 
           },
@@ -44,7 +42,7 @@
             "service_package_folder":"common-services/ZOOKEEPER/3.4.5/package",
             "hooks_folder":"HDP/2.0.6/hooks",
             "script":"scripts/datanode.py",
-            "phase":"INITIAL_START",
+            "phase":"INITIAL_INSTALL",
             "max_duration_for_retries":"600",
             "command_retry_enabled":"false",
             "command_timeout":"1200",

http://git-wip-us.apache.org/repos/asf/ambari/blob/670a08ee/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/metadata_after_registration.json
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/metadata_after_registration.json b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/metadata_after_registration.json
index f60b49a..6462ccf 100644
--- a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/metadata_after_registration.json
+++ b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/metadata_after_registration.json
@@ -25,7 +25,8 @@
         "ambari_db_rca_driver": "org.postgresql.Driver",
         "java_home": "/usr/jdk64/jdk1.8.0_112",
         "user_list": "[\"zookeeper\",\"ambari-qa\",\"hdfs\",\"yarn\",\"mapred\"]",
-        "hooks_folder": "HDP/2.0.6/hooks"
+        "hooks_folder": "HDP/2.0.6/hooks",
+        "cluster_name": "cl1"
       },
       "serviceLevelParams": {
         "HDFS": {
@@ -37,7 +38,14 @@
       },
       "status_commands_to_run": [
         "STATUS"
-      ]
+      ],
+      "recoveryConfig": {
+        "type" : "AUTO_INSTALL_START",
+        "maxCount" : 10,
+        "windowInMinutes" : 60,
+        "components" : "NAMENODE,DATANODE",
+        "recoveryTimestamp" : 1458150424380
+      }
     }
   }
 }
\ No newline at end of file