You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by sm...@apache.org on 2015/04/15 00:32:32 UTC

[2/2] ambari git commit: AMBARI-10029. Node auto-recovery (phase-I)

AMBARI-10029. Node auto-recovery (phase-I)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/dcbf12ef
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/dcbf12ef
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/dcbf12ef

Branch: refs/heads/trunk
Commit: dcbf12ef92b3922d13e5fb00bc5551fd927cbf08
Parents: a58352b
Author: Sumit Mohanty <sm...@hortonworks.com>
Authored: Tue Apr 14 15:28:02 2015 -0700
Committer: Sumit Mohanty <sm...@hortonworks.com>
Committed: Tue Apr 14 15:28:16 2015 -0700

----------------------------------------------------------------------
 .../src/main/python/ambari_agent/ActionQueue.py |  38 +-
 .../src/main/python/ambari_agent/Controller.py  |  23 +-
 .../ambari_agent/CustomServiceOrchestrator.py   |   8 +-
 .../src/main/python/ambari_agent/DataCleaner.py |   5 +-
 .../src/main/python/ambari_agent/Heartbeat.py   |   3 +
 .../main/python/ambari_agent/RecoveryManager.py | 563 +++++++++++++++++++
 .../test/python/ambari_agent/TestActionQueue.py |   3 +
 .../test/python/ambari_agent/TestController.py  | 106 ++++
 .../python/ambari_agent/TestRecoveryManager.py  | 430 ++++++++++++++
 .../actionmanager/ExecutionCommandWrapper.java  |   6 +-
 .../ambari/server/agent/AgentRequests.java      |  85 +++
 .../server/agent/ComponentRecoveryReport.java   |  67 +++
 .../ambari/server/agent/ComponentStatus.java    |   9 +
 .../apache/ambari/server/agent/HeartBeat.java   |  10 +
 .../ambari/server/agent/HeartBeatHandler.java   |  24 +-
 .../ambari/server/agent/HeartbeatMonitor.java   |  33 +-
 .../ambari/server/agent/RecoveryConfig.java     | 113 ++++
 .../ambari/server/agent/RecoveryReport.java     |  67 +++
 .../server/agent/RegistrationResponse.java      |  13 +-
 .../ambari/server/agent/StatusCommand.java      |  47 +-
 .../server/configuration/Configuration.java     |  68 ++-
 .../controller/AmbariManagementController.java  |  10 +
 .../AmbariManagementControllerImpl.java         | 106 +++-
 .../ambari/server/controller/HostResponse.java  |  47 +-
 .../internal/HostResourceProvider.java          |  10 +-
 .../controller/internal/URLStreamProvider.java  |   4 +-
 .../org/apache/ambari/server/state/Host.java    |  12 +
 .../ambari/server/state/host/HostImpl.java      |  24 +
 .../src/main/resources/properties.json          |   2 +
 .../server/agent/TestHeartbeatHandler.java      | 137 ++++-
 .../AmbariManagementControllerTest.java         |  45 ++
 .../internal/HostResourceProviderTest.java      | 112 +++-
 32 files changed, 2166 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/dcbf12ef/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
index 0979d79..c4e2c33 100644
--- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
+++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
@@ -53,6 +53,7 @@ class ActionQueue(threading.Thread):
 
   STATUS_COMMAND = 'STATUS_COMMAND'
   EXECUTION_COMMAND = 'EXECUTION_COMMAND'
+  AUTO_EXECUTION_COMMAND = 'AUTO_EXECUTION_COMMAND'
   BACKGROUND_EXECUTION_COMMAND = 'BACKGROUND_EXECUTION_COMMAND'
   ROLE_COMMAND_INSTALL = 'INSTALL'
   ROLE_COMMAND_START = 'START'
@@ -91,8 +92,8 @@ class ActionQueue(threading.Thread):
 
     for command in commands:
       logger.info("Adding " + command['commandType'] + " for service " + \
-                    command['serviceName'] + " of cluster " + \
-                    command['clusterName'] + " to the queue.")
+                  command['serviceName'] + " of cluster " + \
+                  command['clusterName'] + " to the queue.")
       self.statusCommandQueue.put(command)
 
   def put(self, commands):
@@ -102,7 +103,8 @@ class ActionQueue(threading.Thread):
       if not command.has_key('clusterName'):
         command['clusterName'] = 'null'
 
-      logger.info("Adding " + command['commandType'] + " for service " + \
+      logger.info("Adding " + command['commandType'] + " for role " + \
+                  command['role'] + " for service " + \
                   command['serviceName'] + " of cluster " + \
                   command['clusterName'] + " to the queue.")
       if command['commandType'] == self.BACKGROUND_EXECUTION_COMMAND :
@@ -174,7 +176,7 @@ class ActionQueue(threading.Thread):
     commandType = command['commandType']
     logger.debug("Took an element of Queue (command type = %s)." % commandType)
     try:
-      if commandType in [self.EXECUTION_COMMAND, self.BACKGROUND_EXECUTION_COMMAND]:
+      if commandType in [self.EXECUTION_COMMAND, self.BACKGROUND_EXECUTION_COMMAND, self.AUTO_EXECUTION_COMMAND]:
         self.execute_command(command)
       elif commandType == self.STATUS_COMMAND:
         self.execute_status_command(command)
@@ -192,6 +194,7 @@ class ActionQueue(threading.Thread):
     clusterName = command['clusterName']
     commandId = command['commandId']
     isCommandBackground = command['commandType'] == self.BACKGROUND_EXECUTION_COMMAND
+    isAutoExecuteCommand = command['commandType'] == self.AUTO_EXECUTION_COMMAND
     message = "Executing command with id = {commandId} for role = {role} of " \
               "cluster {cluster}.".format(
               commandId = str(commandId), role=command['role'],
@@ -203,12 +206,21 @@ class ActionQueue(threading.Thread):
     in_progress_status = self.commandStatuses.generate_report_template(command)
     # The path of the files that contain the output log and error log use a prefix that the agent advertises to the
     # server. The prefix is defined in agent-config.ini
-    in_progress_status.update({
-      'tmpout': self.tmpdir + os.sep + 'output-' + str(taskId) + '.txt',
-      'tmperr': self.tmpdir + os.sep + 'errors-' + str(taskId) + '.txt',
-      'structuredOut' : self.tmpdir + os.sep + 'structured-out-' + str(taskId) + '.json',
-      'status': self.IN_PROGRESS_STATUS
-    })
+    if not isAutoExecuteCommand:
+      in_progress_status.update({
+        'tmpout': self.tmpdir + os.sep + 'output-' + str(taskId) + '.txt',
+        'tmperr': self.tmpdir + os.sep + 'errors-' + str(taskId) + '.txt',
+        'structuredOut' : self.tmpdir + os.sep + 'structured-out-' + str(taskId) + '.json',
+        'status': self.IN_PROGRESS_STATUS
+      })
+    else:
+      in_progress_status.update({
+        'tmpout': self.tmpdir + os.sep + 'auto_output-' + str(taskId) + '.txt',
+        'tmperr': self.tmpdir + os.sep + 'auto_errors-' + str(taskId) + '.txt',
+        'structuredOut' : self.tmpdir + os.sep + 'auto_structured-out-' + str(taskId) + '.json',
+        'status': self.IN_PROGRESS_STATUS
+      })
+
     self.commandStatuses.put_command_status(command, in_progress_status)
 
     # running command
@@ -322,6 +334,7 @@ class ActionQueue(threading.Thread):
                               globalConfig, self.config, self.configTags)
 
       component_extra = None
+      request_execution_cmd = False
 
       # For custom services, responsibility to determine service status is
       # delegated to python scripts
@@ -330,13 +343,18 @@ class ActionQueue(threading.Thread):
 
       if component_status_result['exitcode'] == 0:
         component_status = LiveStatus.LIVE_STATUS
+        self.controller.recovery_manager.update_current_status(component, component_status)
       else:
         component_status = LiveStatus.DEAD_STATUS
+        self.controller.recovery_manager.update_current_status(component, component_status)
+        request_execution_cmd = self.controller.recovery_manager.requires_recovery(component)
 
       if component_status_result.has_key('structuredOut'):
         component_extra = component_status_result['structuredOut']
 
       result = livestatus.build(forsed_component_status= component_status)
+      if self.controller.recovery_manager.enabled():
+        result['sendExecCmdDet'] = str(request_execution_cmd)
 
       # Add security state to the result
       result['securityState'] = component_security_status_result

http://git-wip-us.apache.org/repos/asf/ambari/blob/dcbf12ef/ambari-agent/src/main/python/ambari_agent/Controller.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/Controller.py b/ambari-agent/src/main/python/ambari_agent/Controller.py
index 3f3e55d..959c213 100644
--- a/ambari-agent/src/main/python/ambari_agent/Controller.py
+++ b/ambari-agent/src/main/python/ambari_agent/Controller.py
@@ -44,6 +44,7 @@ from ambari_agent.NetUtil import NetUtil
 from ambari_agent.LiveStatus import LiveStatus
 from ambari_agent.AlertSchedulerHandler import AlertSchedulerHandler
 from ambari_agent.ClusterConfiguration import  ClusterConfiguration
+from ambari_agent.RecoveryManager import  RecoveryManager
 from ambari_agent.HeartbeatHandlers import HeartbeatStopHandlers, bind_signal_handlers
 
 logger = logging.getLogger()
@@ -81,6 +82,7 @@ class Controller(threading.Thread):
     self.heartbeat_stop_callback = heartbeat_stop_callback
     # List of callbacks that are called at agent registration
     self.registration_listeners = []
+    self.recovery_manager = RecoveryManager()
 
     # pull config directory out of config
     cache_dir = config.get('agent', 'cache_dir')
@@ -128,8 +130,10 @@ class Controller(threading.Thread):
                       self.hostname, prettyData)
 
         ret = self.sendRequest(self.registerUrl, data)
+        prettyData = pprint.pformat(ret)
+        logger.debug("Registration response is %s", prettyData)
 
-        # exitstatus is a code of error which was rised on server side.
+        # exitstatus is a code of error which was raised on server side.
         # exitstatus = 0 (OK - Default)
         # exitstatus = 1 (Registration failed because different version of agent and server)
         exitstatus = 0
@@ -158,17 +162,20 @@ class Controller(threading.Thread):
         # always update cached cluster configurations on registration
         self.cluster_configuration.update_configurations_from_heartbeat(ret)
 
+        self.recovery_manager.update_configuration_from_registration(ret)
+
         # always update alert definitions on registration
         self.alert_scheduler_handler.update_definitions(ret)
       except ssl.SSLError:
         self.repeatRegistration = False
         self.isRegistered = False
         return
-      except Exception:
+      except Exception, ex:
         # try a reconnect only after a certain amount of random time
         delay = randint(0, self.range)
         logger.error("Unable to connect to: " + self.registerUrl, exc_info=True)
-        """ Sleeping for {0} seconds and then retrying again """.format(delay)
+        logger.error("Error:" + str(ex))
+        logger.warn(""" Sleeping for {0} seconds and then trying again """.format(delay,))
         time.sleep(delay)
 
     return ret
@@ -265,11 +272,21 @@ class Controller(threading.Thread):
 
         if 'executionCommands' in response_keys:
           execution_commands = response['executionCommands']
+          self.recovery_manager.process_execution_commands(execution_commands)
           self.addToQueue(execution_commands)
 
         if 'statusCommands' in response_keys:
+          # try storing execution command details and desired state
+          self.recovery_manager.process_status_commands(response['statusCommands'])
           self.addToStatusQueue(response['statusCommands'])
 
+        if self.actionQueue.commandQueue.empty():
+          recovery_commands = self.recovery_manager.get_recovery_commands()
+          for recovery_command in recovery_commands:
+            logger.info("Adding recovery command %s for component %s",
+                        recovery_command['roleCommand'], recovery_command['role'])
+            self.addToQueue([recovery_command])
+
         if 'alertDefinitionCommands' in response_keys:
           self.alert_scheduler_handler.update_definitions(response)
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/dcbf12ef/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py b/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
index 925cfff..72fb0af 100644
--- a/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
+++ b/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
@@ -230,6 +230,7 @@ class CustomServiceOrchestrator():
     override_output_files=True # by default, we override status command output
     if logger.level == logging.DEBUG:
       override_output_files = False
+
     res = self.runCommand(command, self.status_commands_stdout,
                           self.status_commands_stderr, self.COMMAND_NAME_STATUS,
                           override_output_files=override_output_files)
@@ -267,7 +268,7 @@ class CustomServiceOrchestrator():
 
   def resolve_script_path(self, base_dir, script):
     """
-    Incapsulates logic of script location determination.
+    Encapsulates logic of script location determination.
     """
     path = os.path.join(base_dir, script)
     if not os.path.exists(path):
@@ -305,7 +306,7 @@ class CustomServiceOrchestrator():
     command_type = command['commandType']
     from ActionQueue import ActionQueue  # To avoid cyclic dependency
     if command_type == ActionQueue.STATUS_COMMAND:
-      # These files are frequently created, thats why we don't
+      # These files are frequently created, that's why we don't
       # store them all, but only the latest one
       file_path = os.path.join(self.tmp_dir, "status_command.json")
     else:
@@ -313,6 +314,9 @@ class CustomServiceOrchestrator():
       if 'clusterHostInfo' in command and command['clusterHostInfo']:
         command['clusterHostInfo'] = self.decompressClusterHostInfo(command['clusterHostInfo'])
       file_path = os.path.join(self.tmp_dir, "command-{0}.json".format(task_id))
+      if command_type == ActionQueue.AUTO_EXECUTION_COMMAND:
+        file_path = os.path.join(self.tmp_dir, "auto_command-{0}.json".format(task_id))
+
     # Json may contain passwords, that's why we need proper permissions
     if os.path.isfile(file_path):
       os.unlink(file_path)

http://git-wip-us.apache.org/repos/asf/ambari/blob/dcbf12ef/ambari-agent/src/main/python/ambari_agent/DataCleaner.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/DataCleaner.py b/ambari-agent/src/main/python/ambari_agent/DataCleaner.py
index e767be1..f756eb4 100644
--- a/ambari-agent/src/main/python/ambari_agent/DataCleaner.py
+++ b/ambari-agent/src/main/python/ambari_agent/DataCleaner.py
@@ -28,7 +28,10 @@ import logging
 logger = logging.getLogger()
 
 class DataCleaner(threading.Thread):
-  FILE_NAME_PATTERN = 'errors-\d+.txt|output-\d+.txt|site-\d+.pp|structured-out-\d+.json|command-\d+.json'
+  COMMAND_FILE_NAMES_PATTERN = 'errors-\d+.txt|output-\d+.txt|site-\d+.pp|structured-out-\d+.json|command-\d+.json'
+  AUTO_COMMAND_FILE_NAMES_PATTERN = \
+    'auto_command-\d+.json|auto_errors-\d+.txt|auto_output-\d+.txt|auto_structured-out-\d+.json'
+  FILE_NAME_PATTERN = AUTO_COMMAND_FILE_NAMES_PATTERN + "|" + COMMAND_FILE_NAMES_PATTERN
 
   def __init__(self, config):
     threading.Thread.__init__(self)

http://git-wip-us.apache.org/repos/asf/ambari/blob/dcbf12ef/ambari-agent/src/main/python/ambari_agent/Heartbeat.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/Heartbeat.py b/ambari-agent/src/main/python/ambari_agent/Heartbeat.py
index 4b4937e..27aef04 100644
--- a/ambari-agent/src/main/python/ambari_agent/Heartbeat.py
+++ b/ambari-agent/src/main/python/ambari_agent/Heartbeat.py
@@ -55,6 +55,9 @@ class Heartbeat:
                   'nodeStatus'        : nodeStatus
                 }
 
+    rec_status = self.actionQueue.controller.recovery_manager.get_recovery_status()
+    heartbeat['recoveryReport'] = rec_status
+
     commandsInProgress = False
     if not self.actionQueue.commandQueue.empty():
       commandsInProgress = True

http://git-wip-us.apache.org/repos/asf/ambari/blob/dcbf12ef/ambari-agent/src/main/python/ambari_agent/RecoveryManager.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/RecoveryManager.py b/ambari-agent/src/main/python/ambari_agent/RecoveryManager.py
new file mode 100644
index 0000000..3deec2b
--- /dev/null
+++ b/ambari-agent/src/main/python/ambari_agent/RecoveryManager.py
@@ -0,0 +1,563 @@
+#!/usr/bin/env python
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import logging
+import copy
+import time
+import threading
+import pprint
+
+from ambari_agent.ActionQueue import ActionQueue
+from ambari_agent.LiveStatus import LiveStatus
+
+
+logger = logging.getLogger()
+
+"""
+RecoveryManager has the following capabilities:
+* Store data needed for execution commands extracted from STATUS command
+* Generate INSTALL command
+* Generate START command
+"""
+
+
+class RecoveryManager:
+  COMMAND_TYPE = "commandType"
+  PAYLOAD_LEVEL = "payloadLevel"
+  COMPONENT_NAME = "componentName"
+  ROLE = "role"
+  TASK_ID = "taskId"
+  DESIRED_STATE = "desiredState"
+  EXECUTION_COMMAND_DETAILS = "executionCommandDetails"
+  ROLE_COMMAND = "roleCommand"
+  PAYLOAD_LEVEL_DEFAULT = "DEFAULT"
+  PAYLOAD_LEVEL_MINIMAL = "MINIMAL"
+  PAYLOAD_LEVEL_EXECUTION_COMMAND = "EXECUTION_COMMAND"
+  STARTED = "STARTED"
+  INSTALLED = "INSTALLED"
+  INIT = "INIT"  # TODO: What is the state when machine is reset
+
+  default_action_counter = {
+    "lastAttempt": 0,
+    "count": 0,
+    "lastReset": 0,
+    "lifetimeCount" : 0,
+    "warnedLastAttempt": False,
+    "warnedLastReset": False,
+    "warnedThresholdReached": False
+  }
+
+
+  def __init__(self, recovery_enabled=False, auto_start_only=False):
+    self.recovery_enabled = recovery_enabled
+    self.auto_start_only = auto_start_only
+    self.max_count = 6
+    self.window_in_min = 60
+    self.retry_gap = 5
+    self.max_lifetime_count = 12
+
+    self.stored_exec_commands = {}
+    self.id = int(time.time())
+    self.allowed_desired_states = [self.STARTED, self.INSTALLED]
+    self.allowed_current_states = [self.INIT, self.INSTALLED]
+    self.actions = {}
+    self.statuses = {}
+    self.__status_lock = threading.RLock()
+
+    self.update_config(6, 60, 5, 12, recovery_enabled, auto_start_only)
+
+    pass
+
+
+  def enabled(self):
+    return self.recovery_enabled
+
+
+  def update_current_status(self, component, state):
+    """
+    Updates the current status of a host component managed by the agent
+    """
+    if component not in self.statuses:
+      self.__status_lock.acquire()
+      try:
+        if component not in self.statuses:
+          self.statuses[component] = {
+            "current": "",
+            "desired": ""
+          }
+      finally:
+        self.__status_lock.release()
+      pass
+
+    self.statuses[component]["current"] = state
+    pass
+
+
+  def update_desired_status(self, component, state):
+    """
+    Updates the desired status of a host component managed by the agent
+    """
+    if component not in self.statuses:
+      self.__status_lock.acquire()
+      try:
+        if component not in self.statuses:
+          self.statuses[component] = {
+            "current": "",
+            "desired": ""
+          }
+      finally:
+        self.__status_lock.release()
+      pass
+
+    self.statuses[component]["desired"] = state
+    pass
+
+
+  def requires_recovery(self, component):
+    """
+    Recovery is allowed for:
+    INISTALLED --> STARTED
+    INIT --> INSTALLED --> STARTED
+    CLIENTs may be RE-INSTALLED (TODO)
+    """
+    if not self.enabled():
+      return False
+
+    if component not in self.statuses:
+      return False
+
+    status = self.statuses[component]
+    if status["current"] == status["desired"]:
+      return False
+
+    if status["desired"] not in self.allowed_desired_states or status["current"] not in self.allowed_current_states:
+      return False
+
+    ### No recovery to INSTALLED or INIT states
+    if status["current"] == self.STARTED:
+      return False
+
+    logger.info("%s needs recovery.", component)
+    return True
+    pass
+
+
+
+  def get_recovery_status(self):
+    """
+    Creates a status in the form
+    {
+      "summary" : "RECOVERABLE|DISABLED|PARTIALLY_RECOVERABLE|UNRECOVERABLE",
+      "component_reports" : [
+        {
+          "name": "component_name",
+          "numAttempts" : "x",
+          "limitReached" : "true|false"
+          "status" : "REQUIRES_RECOVERY|RECOVERY_COMMAND_REQUESTED|RECOVERY_COMMAND_ISSUED|NO_RECOVERY_NEEDED"
+        }
+      ]
+    }
+    """
+    report = {}
+    report["summary"] = "DISABLED"
+    if self.enabled():
+      report["summary"] = "RECOVERABLE"
+      num_limits_reached = 0
+      recovery_states = []
+      report["componentReports"] = recovery_states
+      self.__status_lock.acquire()
+      try:
+        for component in self.actions.keys():
+          action = self.actions[component]
+          recovery_state = {}
+          recovery_state["name"] = component
+          recovery_state["numAttempts"] = action["lifetimeCount"]
+          recovery_state["limitReached"] = self.max_lifetime_count <= action["lifetimeCount"]
+          recovery_states.append(recovery_state)
+          if recovery_state["limitReached"] == True:
+            num_limits_reached += 1
+          pass
+      finally:
+        self.__status_lock.release()
+
+      if num_limits_reached > 0:
+        report["summary"] = "PARTIALLY_RECOVERABLE"
+        if num_limits_reached == len(recovery_states):
+          report["summary"] = "UNRECOVERABLE"
+
+    return report
+    pass
+
+  def get_recovery_commands(self):
+    """
+    This method computes the recovery commands for the following transitions
+    INSTALLED --> STARTED
+    INIT --> INSTALLED
+    """
+    commands = []
+    for component in self.statuses.keys():
+      if self.requires_recovery(component) and self.may_execute(component):
+        status = copy.deepcopy(self.statuses[component])
+        if status["desired"] == self.STARTED:
+          if status["current"] == self.INSTALLED:
+            command = self.get_start_command(component)
+          elif status["current"] == self.INIT:
+            command = self.get_install_command(component)
+        elif status["desired"] == self.INSTALLED:
+          if status["current"] == self.INIT:
+            command = self.get_install_command(component)
+        if command:
+          self.execute(component)
+          commands.append(command)
+    return commands
+    pass
+
+
+  def may_execute(self, action):
+    """
+    Check if an action can be executed
+    """
+    if not action or action.strip() == "":
+      return False
+
+    if action not in self.actions:
+      self.__status_lock.acquire()
+      try:
+        self.actions[action] = copy.deepcopy(self.default_action_counter)
+      finally:
+        self.__status_lock.release()
+    return self._execute_action_chk_only(action)
+    pass
+
+
+  def execute(self, action):
+    """
+    Executed an action
+    """
+    if not action or action.strip() == "":
+      return False
+
+    if action not in self.actions:
+      self.__status_lock.acquire()
+      try:
+        self.actions[action] = copy.deepcopy(self.default_action_counter)
+      finally:
+        self.__status_lock.release()
+    return self._execute_action_(action)
+    pass
+
+
+  def _execute_action_(self, action_name):
+    """
+    _private_ implementation of [may] execute
+    """
+    action_counter = self.actions[action_name]
+    now = self._now_()
+    seconds_since_last_attempt = now - action_counter["lastAttempt"]
+    if action_counter["lifetimeCount"] < self.max_lifetime_count:
+      if action_counter["count"] < self.max_count:
+        if seconds_since_last_attempt > self.retry_gap_in_sec:
+          action_counter["count"] += 1
+          action_counter["lifetimeCount"] +=1
+          if self.retry_gap > 0:
+            action_counter["lastAttempt"] = now
+          action_counter["warnedLastAttempt"] = False
+          if action_counter["count"] == 1:
+            action_counter["lastReset"] = now
+          return True
+        else:
+          if action_counter["warnedLastAttempt"] == False:
+            action_counter["warnedLastAttempt"] = True
+            logger.warn(
+              "%s seconds has not passed since last occurrence %s seconds back for %s. " +
+              "Will silently skip execution without warning till retry gap is passed",
+              self.retry_gap_in_sec, seconds_since_last_attempt, action_name)
+          else:
+            logger.debug(
+              "%s seconds has not passed since last occurrence %s seconds back for %s",
+              self.retry_gap_in_sec, seconds_since_last_attempt, action_name)
+      else:
+        sec_since_last_reset = now - action_counter["lastReset"]
+        if sec_since_last_reset > self.window_in_sec:
+          action_counter["count"] = 1
+          action_counter["lifetimeCount"] +=1
+          if self.retry_gap > 0:
+            action_counter["lastAttempt"] = now
+          action_counter["lastReset"] = now
+          action_counter["warnedLastReset"] = False
+          return True
+        else:
+          if action_counter["warnedLastReset"] == False:
+            action_counter["warnedLastReset"] = True
+            logger.warn("%s occurrences in %s minutes reached the limit for %s. " +
+                        "Will silently skip execution without warning till window is reset",
+                        action_counter["count"], self.window_in_min, action_name)
+          else:
+            logger.debug("%s occurrences in %s minutes reached the limit for %s",
+                         action_counter["count"], self.window_in_min, action_name)
+    else:
+      if action_counter["warnedThresholdReached"] == False:
+        action_counter["warnedThresholdReached"] = True
+        logger.warn("%s occurrences in agent life time reached the limit for %s. " +
+                    "Will silently skip execution without warning till window is reset",
+                    action_counter["lifetimeCount"], action_name)
+      else:
+        logger.debug("%s occurrences in agent life time reached the limit for %s",
+                     action_counter["lifetimeCount"], action_name)
+    return False
+    pass
+
+
+  def _execute_action_chk_only(self, action_name):
+    """
+    _private_ implementation of [may] execute check only
+    """
+    action_counter = self.actions[action_name]
+    now = self._now_()
+    seconds_since_last_attempt = now - action_counter["lastAttempt"]
+
+    if action_counter["lifetimeCount"] < self.max_lifetime_count:
+      if action_counter["count"] < self.max_count:
+        if seconds_since_last_attempt > self.retry_gap_in_sec:
+          return True
+      else:
+        sec_since_last_reset = now - action_counter["lastReset"]
+        if sec_since_last_reset > self.window_in_sec:
+          return True
+
+    return False
+    pass
+
+  def _now_(self):
+    return int(time.time())
+    pass
+
+
+  def update_configuration_from_registration(self, reg_resp):
+    """
+    TODO: Server sends the recovery configuration - call update_config after parsing
+    "recovery_config": {
+      "type" : "DEFAULT|AUTO_START|FULL",
+      "maxCount" : 10,
+      "windowInMinutes" : 60,
+      "retryGap" : 0 }
+    """
+
+    recovery_enabled = False
+    auto_start_only = True
+    max_count = 6
+    window_in_min = 60
+    retry_gap = 5
+    max_lifetime_count = 12
+
+    if reg_resp and "recoveryConfig" in reg_resp:
+      config = reg_resp["recoveryConfig"]
+      if "type" in config:
+        if config["type"] in ["AUTO_START", "FULL"]:
+          recovery_enabled = True
+          if config["type"] == "FULL":
+            auto_start_only = False
+      if "maxCount" in config:
+        max_count = self._read_int_(config["maxCount"], max_count)
+      if "windowInMinutes" in config:
+        window_in_min = self._read_int_(config["windowInMinutes"], window_in_min)
+      if "retryGap" in config:
+        retry_gap = self._read_int_(config["retryGap"], retry_gap)
+      if 'maxLifetimeCount' in config:
+        max_lifetime_count = self._read_int_(config['maxLifetimeCount'], max_lifetime_count)
+    self.update_config(max_count, window_in_min, retry_gap, max_lifetime_count, recovery_enabled, auto_start_only)
+    pass
+
+
+  def update_config(self, max_count, window_in_min, retry_gap, max_lifetime_count, recovery_enabled, auto_start_only):
+    """
+    Update recovery configuration, recovery is disabled if configuration values
+    are not correct
+    """
+    self.recovery_enabled = False;
+    if max_count <= 0:
+      logger.warn("Recovery disabled: max_count must be a non-negative number")
+      return
+
+    if window_in_min <= 0:
+      logger.warn("Recovery disabled: window_in_min must be a non-negative number")
+      return
+
+    if retry_gap < 1:
+      logger.warn("Recovery disabled: retry_gap must be a positive number and at least 1")
+      return
+    if retry_gap >= window_in_min:
+      logger.warn("Recovery disabled: retry_gap must be smaller than window_in_min")
+      return
+    if max_lifetime_count < 0 or max_lifetime_count < max_count:
+      logger.warn("Recovery disabled: max_lifetime_count must more than 0 and >= max_count")
+      return
+
+    self.max_count = max_count
+    self.window_in_min = window_in_min
+    self.retry_gap = retry_gap
+    self.window_in_sec = window_in_min * 60
+    self.retry_gap_in_sec = retry_gap * 60
+    self.auto_start_only = auto_start_only
+    self.max_lifetime_count = max_lifetime_count
+
+    self.allowed_desired_states = [self.STARTED, self.INSTALLED]
+    self.allowed_current_states = [self.INIT, self.INSTALLED]
+
+    if self.auto_start_only:
+      self.allowed_desired_states = [self.STARTED]
+      self.allowed_current_states = [self.INSTALLED]
+
+    self.recovery_enabled = recovery_enabled
+    if self.recovery_enabled:
+      logger.info(
+        "==> Auto recovery is enabled with maximum %s in %s minutes with gap of %s minutes between and lifetime max being %s.",
+        self.max_count, self.window_in_min, self.retry_gap, self.max_lifetime_count)
+    pass
+
+
+  def get_unique_task_id(self):
+    self.id += 1
+    return self.id
+    pass
+
+
+  def process_status_commands(self, commands):
+    if not self.enabled():
+      return
+
+    if commands and len(commands) > 0:
+      for command in commands:
+        self.store_or_update_command(command)
+        if self.EXECUTION_COMMAND_DETAILS in command:
+          logger.debug("Details to construct exec commands: " + pprint.pformat(command[self.EXECUTION_COMMAND_DETAILS]))
+
+    pass
+
+
+  def process_execution_commands(self, commands):
+    if not self.enabled():
+      return
+
+    if commands and len(commands) > 0:
+      for command in commands:
+        if self.COMMAND_TYPE in command and command[self.COMMAND_TYPE] == ActionQueue.EXECUTION_COMMAND:
+          if self.ROLE in command:
+            if command[self.ROLE_COMMAND] == ActionQueue.ROLE_COMMAND_INSTALL:
+              self.update_desired_status(command[self.ROLE], LiveStatus.DEAD_STATUS)
+            if command[self.ROLE_COMMAND] == ActionQueue.ROLE_COMMAND_START:
+              self.update_desired_status(command[self.ROLE], LiveStatus.LIVE_STATUS)
+    pass
+
+
+  def store_or_update_command(self, command):
+    """
+    Stores command details by reading them from the STATUS_COMMAND
+    Update desired state as well
+    """
+    if not self.enabled():
+      return
+
+    logger.debug("Inspecting command to store/update details")
+    if self.COMMAND_TYPE in command and command[self.COMMAND_TYPE] == ActionQueue.STATUS_COMMAND:
+      payloadLevel = self.PAYLOAD_LEVEL_DEFAULT
+      if self.PAYLOAD_LEVEL in command:
+        payloadLevel = command[self.PAYLOAD_LEVEL]
+
+      component = command[self.COMPONENT_NAME]
+      self.update_desired_status(component, command[self.DESIRED_STATE])
+
+      if payloadLevel == self.PAYLOAD_LEVEL_EXECUTION_COMMAND:
+        if self.EXECUTION_COMMAND_DETAILS in command:
+          # Store the execution command details
+          self.remove_command(component)
+          self.stored_exec_commands[component] = command[self.EXECUTION_COMMAND_DETAILS]
+          logger.debug("Stored command details for " + component)
+        else:
+          logger.warn("Expected field " + self.EXECUTION_COMMAND_DETAILS + " unavailable.")
+        pass
+    pass
+
+
+  def get_install_command(self, component):
+    if self.enabled():
+      logger.debug("Using stored INSTALL command for %s", component)
+      if self.command_exists(component, ActionQueue.EXECUTION_COMMAND):
+        command = copy.deepcopy(self.stored_exec_commands[component])
+        command[self.ROLE_COMMAND] = "INSTALL"
+        command[self.COMMAND_TYPE] = ActionQueue.AUTO_EXECUTION_COMMAND
+        command[self.TASK_ID] = self.get_unique_task_id()
+        return command
+      else:
+        logger.info("INSTALL command cannot be computed.")
+    else:
+      logger.info("Recovery is not enabled. INSTALL command will not be computed.")
+    return None
+    pass
+
+
+  def get_start_command(self, component):
+    if self.enabled():
+      logger.debug("Using stored START command for %s", component)
+      if self.command_exists(component, ActionQueue.EXECUTION_COMMAND):
+        command = copy.deepcopy(self.stored_exec_commands[component])
+        command[self.ROLE_COMMAND] = "START"
+        command[self.COMMAND_TYPE] = ActionQueue.AUTO_EXECUTION_COMMAND
+        command[self.TASK_ID] = self.get_unique_task_id()
+        return command
+      else:
+        logger.info("START command cannot be computed.")
+    else:
+      logger.info("Recovery is not enabled. START command will not be computed.")
+
+    return None
+    pass
+
+
+  def command_exists(self, component, command_type):
+    if command_type == ActionQueue.EXECUTION_COMMAND:
+      if component in self.stored_exec_commands:
+        return True
+
+    return False
+    pass
+
+
+  def remove_command(self, component):
+    if component in self.stored_exec_commands:
+      del self.stored_exec_commands[component]
+      return True
+    return False
+
+
+  def _read_int_(self, value, default_value=0):
+    int_value = default_value
+    try:
+      int_value = int(value)
+    except ValueError:
+      pass
+    return int_value
+
+
+def main(argv=None):
+  cmd_mgr = RecoveryManager()
+  pass
+
+
+if __name__ == '__main__':
+  main()
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/dcbf12ef/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
index 6e445c1..b9cbbe0 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
@@ -35,6 +35,7 @@ from ambari_agent.CustomServiceOrchestrator import CustomServiceOrchestrator
 from ambari_agent.PythonExecutor import PythonExecutor
 from ambari_agent.CommandStatusDict import CommandStatusDict
 from ambari_agent.ActualConfigHandler import ActualConfigHandler
+from ambari_agent.RecoveryManager import RecoveryManager
 from FileCache import FileCache
 from ambari_commons import OSCheck
 from only_for_platform import only_for_platform, get_platform, not_for_platform, PLATFORM_LINUX, PLATFORM_WINDOWS
@@ -544,6 +545,8 @@ class TestActionQueue(TestCase):
 
     build_mock.return_value = {'dummy report': '' }
 
+    dummy_controller.recovery_manager = RecoveryManager()
+
     requestComponentStatus_mock.reset_mock()
     requestComponentStatus_mock.return_value = {'exitcode': 0 }
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/dcbf12ef/ambari-agent/src/test/python/ambari_agent/TestController.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestController.py b/ambari-agent/src/test/python/ambari_agent/TestController.py
index 76470b3..f379fbf 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestController.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestController.py
@@ -598,6 +598,112 @@ class TestController(unittest.TestCase):
     self.assertEquals(LiveStatus_mock.CLIENT_COMPONENTS, client_components_expected)
     self.assertEquals(LiveStatus_mock.COMPONENTS, components_expected)
 
+  @patch("socket.gethostbyname")
+  @patch("json.dumps")
+  @patch("time.sleep")
+  @patch("pprint.pformat")
+  @patch.object(Controller, "randint")
+  @patch.object(Controller, "LiveStatus")
+  def test_recoveryRegConfig(self, LiveStatus_mock, randintMock, pformatMock, sleepMock,
+                    dumpsMock, socketGhbnMock):
+    self.assertEquals(self.controller.recovery_manager.recovery_enabled, False)
+    self.assertEquals(self.controller.recovery_manager.auto_start_only, False)
+    self.assertEquals(self.controller.recovery_manager.max_count, 6)
+    self.assertEquals(self.controller.recovery_manager.window_in_min, 60)
+    self.assertEquals(self.controller.recovery_manager.retry_gap, 5)
+
+    out = StringIO.StringIO()
+    sys.stdout = out
+
+
+    dumpsMock.return_value = '{"valid_object": true}'
+    socketGhbnMock.return_value = "host1"
+
+    sendRequest = MagicMock(name="sendRequest")
+    self.controller.sendRequest = sendRequest
+
+    register = MagicMock(name="register")
+    self.controller.register = register
+
+    sendRequest.return_value = {
+      "responseId": 1,
+      "recoveryConfig": {
+        "type": "FULL",
+        "maxCount": 5,
+        "windowInMinutes": 50,
+        "retryGap": 3,
+        "maxLifetimeCount": 7},
+      "log": "", "exitstatus": "0"}
+
+    self.controller.isRegistered = False
+    self.controller.registerWithServer()
+
+    self.assertEquals(self.controller.recovery_manager.recovery_enabled, True)
+    self.assertEquals(self.controller.recovery_manager.auto_start_only, False)
+    self.assertEquals(self.controller.recovery_manager.max_count, 5)
+    self.assertEquals(self.controller.recovery_manager.window_in_min, 50)
+    self.assertEquals(self.controller.recovery_manager.retry_gap, 3)
+    self.assertEquals(self.controller.recovery_manager.max_lifetime_count, 7)
+
+    sys.stdout = sys.__stdout__
+
+    self.controller.sendRequest = Controller.Controller.sendRequest
+    self.controller.addToStatusQueue = Controller.Controller.addToStatusQueue
+    pass
+
+  @patch.object(threading._Event, "wait")
+  @patch("time.sleep")
+  @patch("json.dumps")
+  def test_recoveryHbCmd(self, dumpsMock, sleepMock, event_mock):
+
+    out = StringIO.StringIO()
+    sys.stdout = out
+
+    hearbeat = MagicMock()
+    self.controller.heartbeat = hearbeat
+    event_mock.return_value = False
+    dumpsMock.return_value = "data"
+
+    sendRequest = MagicMock(name="sendRequest")
+    self.controller.sendRequest = sendRequest
+    addToQueue = MagicMock(name="addToQueue")
+    addToStatusQueue = MagicMock(name="addToStatusQueue")
+    self.addToQueue = addToQueue
+    self.addToStatusQueue = addToStatusQueue
+
+    process_execution_commands = MagicMock(name="process_execution_commands")
+    self.controller.recovery_manager.process_execution_commands = process_execution_commands
+    process_status_commands = MagicMock(name="process_status_commands")
+    self.controller.recovery_manager.process_status_commands = process_status_commands
+
+    self.controller.responseId = 0
+    response = {"responseId":1, "statusCommands": "commands2", "executionCommands" : "commands1", "log":"", "exitstatus":"0"}
+    sendRequest.return_value = response
+
+    def one_heartbeat(*args, **kwargs):
+      self.controller.DEBUG_STOP_HEARTBEATING = True
+      return response
+
+    sendRequest.side_effect = one_heartbeat
+
+    actionQueue = MagicMock()
+    actionQueue.isIdle.return_value = True
+
+    # one successful request, after stop
+    self.controller.actionQueue = actionQueue
+    self.controller.heartbeatWithServer()
+    self.assertTrue(sendRequest.called)
+    self.assertTrue(process_execution_commands.called)
+    self.assertTrue(process_status_commands.called)
+    process_execution_commands.assert_called_with("commands1")
+    process_status_commands.assert_called_with("commands2")
+
+    self.controller.heartbeatWithServer()
+    sys.stdout = sys.__stdout__
+    self.controller.sendRequest = Controller.Controller.sendRequest
+    self.controller.sendRequest = Controller.Controller.addToQueue
+    self.controller.sendRequest = Controller.Controller.addToStatusQueue
+    pass
 
 if __name__ == "__main__":
   unittest.main(verbosity=2)

http://git-wip-us.apache.org/repos/asf/ambari/blob/dcbf12ef/ambari-agent/src/test/python/ambari_agent/TestRecoveryManager.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestRecoveryManager.py b/ambari-agent/src/test/python/ambari_agent/TestRecoveryManager.py
new file mode 100644
index 0000000..4cc2c0b
--- /dev/null
+++ b/ambari-agent/src/test/python/ambari_agent/TestRecoveryManager.py
@@ -0,0 +1,430 @@
+#!/usr/bin/env python
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+from unittest import TestCase
+import copy
+
+from ambari_agent.RecoveryManager import RecoveryManager
+from mock.mock import patch, MagicMock, call
+
+
+class TestRecoveryManager(TestCase):
+  command = {
+    "commandType": "STATUS_COMMAND",
+    "payloadLevel": "EXECUTION_COMMAND",
+    "componentName": "NODEMANAGER",
+    "desiredState": "STARTED",
+    "executionCommandDetails": {
+      "commandType": "EXECUTION_COMMAND",
+      "roleCommand": "INSTALL",
+      "role": "NODEMANAGER",
+      "configurations": {
+        "capacity-scheduler": {
+          "yarn.scheduler.capacity.default.minimum-user-limit-percent": "100"},
+        "capacity-calculator": {
+          "yarn.scheduler.capacity.default.minimum-user-limit-percent": "100"},
+        "commandParams": {
+          "service_package_folder": "common-services/YARN/2.1.0.2.0/package"
+        }
+      }
+    }
+  }
+
+  exec_command1 = {
+    "commandType": "EXECUTION_COMMAND",
+    "roleCommand": "INSTALL",
+    "role": "NODEMANAGER",
+    "configurations": {
+      "capacity-scheduler": {
+        "yarn.scheduler.capacity.default.minimum-user-limit-percent": "100"},
+      "capacity-calculator": {
+        "yarn.scheduler.capacity.default.minimum-user-limit-percent": "100"},
+      "commandParams": {
+        "service_package_folder": "common-services/YARN/2.1.0.2.0/package"
+      }
+    }
+  }
+
+  exec_command2 = {
+    "commandType": "EXECUTION_COMMAND",
+    "roleCommand": "START",
+    "role": "NODEMANAGER",
+    "configurations": {
+      "capacity-scheduler": {
+        "yarn.scheduler.capacity.default.minimum-user-limit-percent": "100"},
+      "capacity-calculator": {
+        "yarn.scheduler.capacity.default.minimum-user-limit-percent": "100"},
+      "commandParams": {
+        "service_package_folder": "common-services/YARN/2.1.0.2.0/package"
+      }
+    }
+  }
+
+  exec_command3 = {
+    "commandType": "EXECUTION_COMMAND",
+    "roleCommand": "SERVICE_CHECK",
+    "role": "NODEMANAGER",
+    "configurations": {
+      "capacity-scheduler": {
+        "yarn.scheduler.capacity.default.minimum-user-limit-percent": "100"},
+      "capacity-calculator": {
+        "yarn.scheduler.capacity.default.minimum-user-limit-percent": "100"},
+      "commandParams": {
+        "service_package_folder": "common-services/YARN/2.1.0.2.0/package"
+      }
+    }
+  }
+
+  def setUp(self):
+    pass
+
+  def tearDown(self):
+    pass
+
+  @patch.object(RecoveryManager, "update_desired_status")
+  def test_process_commands(self, mock_uds):
+    rm = RecoveryManager(True)
+    rm.process_status_commands(None)
+    self.assertFalse(mock_uds.called)
+
+    rm.process_status_commands([])
+    self.assertFalse(mock_uds.called)
+
+    rm.process_status_commands([self.command])
+    mock_uds.assert_has_calls([call("NODEMANAGER", "STARTED")])
+
+    mock_uds.reset_mock()
+
+    rm.process_status_commands([self.command, self.exec_command1, self.command])
+    mock_uds.assert_has_calls([call("NODEMANAGER", "STARTED")], [call("NODEMANAGER", "STARTED")])
+
+    mock_uds.reset_mock()
+
+    rm.process_execution_commands([self.exec_command1, self.exec_command2, self.exec_command3])
+    mock_uds.assert_has_calls([call("NODEMANAGER", "INSTALLED")], [call("NODEMANAGER", "STARTED")])
+
+    mock_uds.reset_mock()
+
+    rm.process_execution_commands([self.exec_command1, self.command])
+    mock_uds.assert_has_calls([call("NODEMANAGER", "INSTALLED")])
+    pass
+
+  def test_defaults(self):
+    rm = RecoveryManager()
+    self.assertFalse(rm.enabled())
+    self.assertEqual(None, rm.get_install_command("NODEMANAGER"))
+    self.assertEqual(None, rm.get_start_command("NODEMANAGER"))
+
+    rm.update_current_status("NODEMANAGER", "INSTALLED")
+    rm.update_desired_status("NODEMANAGER", "STARTED")
+    self.assertFalse(rm.requires_recovery("NODEMANAGER"))
+    pass
+
+  @patch.object(RecoveryManager, "_now_")
+  def test_sliding_window(self, time_mock):
+    time_mock.side_effect = \
+      [1000, 1001, 1002, 1003, 1004, 1071, 1150, 1151, 1152, 1153, 1400, 1401,
+       1500, 1571, 1572, 1653, 1900, 1971, 2300, 2301]
+
+    rm = RecoveryManager(True, False)
+    self.assertTrue(rm.enabled())
+
+    rm.update_config(0, 60, 5, 12, True, False)
+    self.assertFalse(rm.enabled())
+
+    rm.update_config(6, 60, 5, 12, True, False)
+    self.assertTrue(rm.enabled())
+
+    rm.update_config(6, 0, 5, 12, True, False)
+    self.assertFalse(rm.enabled())
+
+    rm.update_config(6, 60, 0, 12, True, False)
+    self.assertFalse(rm.enabled())
+
+    rm.update_config(6, 60, 1, 12, True, False)
+    self.assertTrue(rm.enabled())
+
+    rm.update_config(6, 60, 61, 12, True, False)
+    self.assertFalse(rm.enabled())
+
+    rm.update_config(6, 60, 5, 0, True, False)
+    self.assertFalse(rm.enabled())
+
+    rm.update_config(6, 60, 5, 4, True, False)
+    self.assertFalse(rm.enabled())
+
+    # maximum 2 in 2 minutes and at least 1 minute wait
+    rm.update_config(2, 5, 1, 4, True, False)
+    self.assertTrue(rm.enabled())
+
+    # T = 1000-2
+    self.assertTrue(rm.may_execute("NODEMANAGER"))
+    self.assertTrue(rm.may_execute("NODEMANAGER"))
+    self.assertTrue(rm.may_execute("NODEMANAGER"))
+
+    # T = 1003-4
+    self.assertTrue(rm.execute("NODEMANAGER"))
+    self.assertFalse(rm.execute("NODEMANAGER"))  # too soon
+
+    # T = 1071
+    self.assertTrue(rm.execute("NODEMANAGER"))  # 60+ seconds passed
+
+    # T = 1150-3
+    self.assertFalse(rm.execute("NODEMANAGER"))  # limit 2 exceeded
+    self.assertFalse(rm.may_execute("NODEMANAGER"))
+    self.assertTrue(rm.execute("DATANODE"))
+    self.assertTrue(rm.may_execute("NAMENODE"))
+
+    # T = 1400-1
+    self.assertTrue(rm.execute("NODEMANAGER"))  # windows reset
+    self.assertFalse(rm.may_execute("NODEMANAGER"))  # too soon
+
+    # maximum 2 in 2 minutes and no min wait
+    rm.update_config(2, 5, 1, 5, True, True)
+
+    # T = 1500-3
+    self.assertTrue(rm.execute("NODEMANAGER2"))
+    self.assertTrue(rm.may_execute("NODEMANAGER2"))
+    self.assertTrue(rm.execute("NODEMANAGER2"))
+    self.assertFalse(rm.execute("NODEMANAGER2"))  # max limit
+
+    # T = 1900-2
+    self.assertTrue(rm.execute("NODEMANAGER2"))
+    self.assertTrue(rm.execute("NODEMANAGER2"))
+
+    # T = 2300-2
+    # lifetime max reached
+    self.assertTrue(rm.execute("NODEMANAGER2"))
+    self.assertFalse(rm.execute("NODEMANAGER2"))
+    pass
+
+  def test_recovery_required(self):
+    rm = RecoveryManager(True, False)
+
+    rm.update_current_status("NODEMANAGER", "INSTALLED")
+    rm.update_desired_status("NODEMANAGER", "INSTALLED")
+    self.assertFalse(rm.requires_recovery("NODEMANAGER"))
+
+    rm.update_desired_status("NODEMANAGER", "STARTED")
+    self.assertTrue(rm.requires_recovery("NODEMANAGER"))
+
+    rm.update_current_status("NODEMANAGER", "STARTED")
+    rm.update_desired_status("NODEMANAGER", "INSTALLED")
+    self.assertFalse(rm.requires_recovery("NODEMANAGER"))
+
+    rm.update_desired_status("NODEMANAGER", "STARTED")
+    self.assertFalse(rm.requires_recovery("NODEMANAGER"))
+
+    rm.update_current_status("NODEMANAGER", "INSTALLED")
+    rm.update_desired_status("NODEMANAGER", "XYS")
+    self.assertFalse(rm.requires_recovery("NODEMANAGER"))
+
+    rm.update_desired_status("NODEMANAGER", "")
+    self.assertFalse(rm.requires_recovery("NODEMANAGER"))
+
+    rm.update_current_status("NODEMANAGER", "INIT")
+    rm.update_desired_status("NODEMANAGER", "INSTALLED")
+    self.assertTrue(rm.requires_recovery("NODEMANAGER"))
+
+    rm.update_desired_status("NODEMANAGER", "STARTED")
+    self.assertTrue(rm.requires_recovery("NODEMANAGER"))
+
+    rm = RecoveryManager(True, True)
+
+    rm.update_current_status("NODEMANAGER", "INIT")
+    rm.update_desired_status("NODEMANAGER", "INSTALLED")
+    self.assertFalse(rm.requires_recovery("NODEMANAGER"))
+
+    rm.update_current_status("NODEMANAGER", "INIT")
+    rm.update_desired_status("NODEMANAGER", "START")
+    self.assertFalse(rm.requires_recovery("NODEMANAGER"))
+
+    rm.update_current_status("NODEMANAGER", "INSTALLED")
+    rm.update_desired_status("NODEMANAGER", "START")
+    self.assertFalse(rm.requires_recovery("NODEMANAGER"))
+    pass
+
+  @patch('time.time', MagicMock(side_effects=[1]))
+  def test_store_from_status_and_use(self):
+    rm = RecoveryManager(True)
+
+    command1 = copy.deepcopy(self.command)
+
+    rm.store_or_update_command(command1)
+    self.assertTrue(rm.command_exists("NODEMANAGER", "EXECUTION_COMMAND"))
+
+    install_command = rm.get_install_command("NODEMANAGER")
+    start_command = rm.get_start_command("NODEMANAGER")
+
+    self.assertEqual("INSTALL", install_command["roleCommand"])
+    self.assertEqual("START", start_command["roleCommand"])
+    self.assertEqual("AUTO_EXECUTION_COMMAND", install_command["commandType"])
+    self.assertEqual("AUTO_EXECUTION_COMMAND", start_command["commandType"])
+    self.assertEqual("NODEMANAGER", install_command["role"])
+    self.assertEqual("NODEMANAGER", start_command["role"])
+    self.assertEquals(install_command["configurations"], start_command["configurations"])
+
+    self.assertEqual(2, install_command["taskId"])
+    self.assertEqual(3, start_command["taskId"])
+
+    self.assertEqual(None, rm.get_install_command("component2"))
+    self.assertEqual(None, rm.get_start_command("component2"))
+
+    self.assertTrue(rm.remove_command("NODEMANAGER"))
+    self.assertFalse(rm.remove_command("NODEMANAGER"))
+
+    self.assertEqual(None, rm.get_install_command("NODEMANAGER"))
+    self.assertEqual(None, rm.get_start_command("NODEMANAGER"))
+
+    self.assertEqual(None, rm.get_install_command("component2"))
+    self.assertEqual(None, rm.get_start_command("component2"))
+
+    pass
+
+  @patch.object(RecoveryManager, "_now_")
+  def test_get_recovery_commands(self, time_mock):
+    time_mock.side_effect = \
+      [1000, 2000, 3000, 4000, 5000, 6000]
+    rm = RecoveryManager(True)
+    rm.update_config(10, 5, 1, 11, True, False)
+
+    command1 = copy.deepcopy(self.command)
+
+    rm.store_or_update_command(command1)
+
+    rm.update_current_status("NODEMANAGER", "INSTALLED")
+    rm.update_desired_status("NODEMANAGER", "STARTED")
+
+    commands = rm.get_recovery_commands()
+    self.assertEqual(1, len(commands))
+    self.assertEqual("START", commands[0]["roleCommand"])
+
+    rm.update_current_status("NODEMANAGER", "INIT")
+    rm.update_desired_status("NODEMANAGER", "STARTED")
+
+    commands = rm.get_recovery_commands()
+    self.assertEqual(1, len(commands))
+    self.assertEqual("INSTALL", commands[0]["roleCommand"])
+
+    rm.update_current_status("NODEMANAGER", "INIT")
+    rm.update_desired_status("NODEMANAGER", "INSTALLED")
+
+    commands = rm.get_recovery_commands()
+    self.assertEqual(1, len(commands))
+    self.assertEqual("INSTALL", commands[0]["roleCommand"])
+
+    rm.update_config(2, 5, 1, 5, True, True)
+    rm.update_current_status("NODEMANAGER", "INIT")
+    rm.update_desired_status("NODEMANAGER", "INSTALLED")
+
+    commands = rm.get_recovery_commands()
+    self.assertEqual(0, len(commands))
+    pass
+
+  @patch.object(RecoveryManager, "update_config")
+  def test_update_rm_config(self, mock_uc):
+    rm = RecoveryManager()
+    rm.update_configuration_from_registration(None)
+    mock_uc.assert_has_calls([call(6, 60, 5, 12, False, True)])
+
+    mock_uc.reset_mock()
+    rm.update_configuration_from_registration({})
+    mock_uc.assert_has_calls([call(6, 60, 5, 12, False, True)])
+
+    mock_uc.reset_mock()
+    rm.update_configuration_from_registration(
+      {"recoveryConfig": {
+      "type" : "DEFAULT"}}
+    )
+    mock_uc.assert_has_calls([call(6, 60, 5, 12, False, True)])
+
+    mock_uc.reset_mock()
+    rm.update_configuration_from_registration(
+      {"recoveryConfig": {
+        "type" : "FULL"}}
+    )
+    mock_uc.assert_has_calls([call(6, 60, 5, 12, True, False)])
+
+    mock_uc.reset_mock()
+    rm.update_configuration_from_registration(
+      {"recoveryConfig": {
+        "type" : "AUTO_START",
+        "max_count" : "med"}}
+    )
+    mock_uc.assert_has_calls([call(6, 60, 5, 12, True, True)])
+
+    mock_uc.reset_mock()
+    rm.update_configuration_from_registration(
+      {"recoveryConfig": {
+        "type" : "AUTO_START",
+        "maxCount" : "5",
+        "windowInMinutes" : 20,
+        "retryGap" : 2,
+        "maxLifetimeCount" : 5}}
+    )
+    mock_uc.assert_has_calls([call(5, 20, 2, 5, True, True)])
+  pass
+
+  @patch.object(RecoveryManager, "_now_")
+  def test_recovery_report(self, time_mock):
+    time_mock.side_effect = \
+      [1000, 1071, 1072, 1470, 1471, 1472, 1543, 1644, 1715]
+
+    rm = RecoveryManager()
+    rec_st = rm.get_recovery_status()
+    self.assertEquals(rec_st, {"summary": "DISABLED"})
+
+    rm.update_config(2, 5, 1, 4, True, True)
+    rec_st = rm.get_recovery_status()
+    self.assertEquals(rec_st, {"summary": "RECOVERABLE", "componentReports": []})
+
+    rm.execute("PUMA")
+    rec_st = rm.get_recovery_status()
+    self.assertEquals(rec_st, {"summary": "RECOVERABLE",
+                               "componentReports": [{"name": "PUMA", "numAttempts": 1, "limitReached": False}]})
+    rm.execute("PUMA")
+    rm.execute("LION")
+
+    rec_st = rm.get_recovery_status()
+    self.assertEquals(rec_st, {"summary": "RECOVERABLE",
+                               "componentReports": [
+                                 {"name": "LION", "numAttempts": 1, "limitReached": False},
+                                 {"name": "PUMA", "numAttempts": 2, "limitReached": False}
+                               ]})
+    rm.execute("PUMA")
+    rm.execute("LION")
+    rm.execute("PUMA")
+    rm.execute("PUMA")
+    rm.execute("LION")
+    rec_st = rm.get_recovery_status()
+    self.assertEquals(rec_st, {"summary": "PARTIALLY_RECOVERABLE",
+                               "componentReports": [
+                                 {"name": "LION", "numAttempts": 3, "limitReached": False},
+                                 {"name": "PUMA", "numAttempts": 4, "limitReached": True}
+                               ]})
+
+    rm.execute("LION")
+    rec_st = rm.get_recovery_status()
+    self.assertEquals(rec_st, {"summary": "UNRECOVERABLE",
+                               "componentReports": [
+                                 {"name": "LION", "numAttempts": 4, "limitReached": True},
+                                 {"name": "PUMA", "numAttempts": 4, "limitReached": True}
+                               ]})
+    pass
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/dcbf12ef/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ExecutionCommandWrapper.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ExecutionCommandWrapper.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ExecutionCommandWrapper.java
index 79d8925..3aaf23c 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ExecutionCommandWrapper.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ExecutionCommandWrapper.java
@@ -100,9 +100,9 @@ public class ExecutionCommandWrapper {
           Map<String, Map<String, Map<String, String>>> configAttributes = configHelper.getEffectiveConfigAttributes(cluster,
               executionCommand.getConfigurationTags());
 
-          for (Map.Entry<String, Map<String, Map<String, String>>> attributesOccurance : configAttributes.entrySet()) {
-            String type = attributesOccurance.getKey();
-            Map<String, Map<String, String>> attributes = attributesOccurance.getValue();
+          for (Map.Entry<String, Map<String, Map<String, String>>> attributesOccurrence : configAttributes.entrySet()) {
+            String type = attributesOccurrence.getKey();
+            Map<String, Map<String, String>> attributes = attributesOccurrence.getValue();
 
             if (executionCommand.getConfigurationAttributes() != null) {
               if (!executionCommand.getConfigurationAttributes().containsKey(type)) {

http://git-wip-us.apache.org/repos/asf/ambari/blob/dcbf12ef/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentRequests.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentRequests.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentRequests.java
new file mode 100644
index 0000000..2980f38
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentRequests.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.agent;
+
+import com.google.inject.Singleton;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+
+/**
+ * Captures various agent requests that it sends as part of requests
+ */
+
+@Singleton
+public class AgentRequests {
+  private static Log LOG = LogFactory.getLog(HeartbeatMonitor.class);
+  private final Map<String, Map<String, Boolean>> requiresExecCmdDetails = new HashMap<String, Map<String, Boolean>>();
+  private final Object _lock = new Object();
+
+  /**
+   * Creates a holder for agent requests
+   */
+  public AgentRequests() {
+  }
+
+  public void setExecutionDetailsRequest(String host, String component, String requestExecutionCmd) {
+    if (StringUtils.isNotBlank(requestExecutionCmd)) {
+      LOG.debug("Setting need for exec command to " + requestExecutionCmd + " for " + component);
+      Map<String, Boolean> perHostRequiresExecCmdDetails = getPerHostRequiresExecCmdDetails(host);
+      if (Boolean.TRUE.toString().toUpperCase().equals(requestExecutionCmd.toUpperCase())) {
+        perHostRequiresExecCmdDetails.put(component, Boolean.TRUE);
+      } else {
+        perHostRequiresExecCmdDetails.put(component, Boolean.FALSE);
+      }
+    }
+  }
+
+  public Boolean shouldSendExecutionDetails(String host, String component) {
+
+    Map<String, Boolean> perHostRequiresExecCmdDetails = getPerHostRequiresExecCmdDetails(host);
+    if (perHostRequiresExecCmdDetails != null && perHostRequiresExecCmdDetails.containsKey(component)) {
+      LOG.debug("Sending exec command details for " + component);
+      return perHostRequiresExecCmdDetails.get(component);
+    }
+
+    return Boolean.FALSE;
+  }
+
+  private Map<String, Boolean> getPerHostRequiresExecCmdDetails(String host) {
+    if (!requiresExecCmdDetails.containsKey(host)) {
+      synchronized (_lock) {
+        if (!requiresExecCmdDetails.containsKey(host)) {
+          requiresExecCmdDetails.put(host, new HashMap<String, Boolean>());
+        }
+      }
+    }
+
+    return requiresExecCmdDetails.get(host);
+  }
+
+  @Override
+  public String toString() {
+    return new StringBuilder().append("requiresExecCmdDetails: ").append(requiresExecCmdDetails.toString()).toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/dcbf12ef/ambari-server/src/main/java/org/apache/ambari/server/agent/ComponentRecoveryReport.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/ComponentRecoveryReport.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/ComponentRecoveryReport.java
new file mode 100644
index 0000000..cf9272e
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/ComponentRecoveryReport.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.agent;
+
+import org.codehaus.jackson.annotate.JsonProperty;
+
+
+public class ComponentRecoveryReport {
+  private String name;
+  private int numAttempts;
+  private boolean limitReached;
+
+
+  @JsonProperty("name")
+  public String getName() {
+    return name;
+  }
+
+  @JsonProperty("name")
+  public void setName(String name) {
+    this.name = name;
+  }
+
+  @JsonProperty("num_attempts")
+  public int getNumAttempts() {
+    return numAttempts;
+  }
+
+  @JsonProperty("num_attempts")
+  public void setNumAttempts(int numAttempts) {
+    this.numAttempts = numAttempts;
+  }
+
+  @JsonProperty("limit_reached")
+  public boolean getLimitReached() {
+    return limitReached;
+  }
+
+  @JsonProperty("limit_reached")
+  public void setLimitReached(boolean limitReached) {
+    this.limitReached = limitReached;
+  }
+
+  @Override
+  public String toString() {
+    return "ComponentRecoveryReport{" +
+           "name='" + name + '\'' +
+           ", numFailures='" + numAttempts + '\'' +
+           ", limitReached='" + limitReached + '\'' +
+           '}';
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/dcbf12ef/ambari-server/src/main/java/org/apache/ambari/server/agent/ComponentStatus.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/ComponentStatus.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/ComponentStatus.java
index 57f25ff..5591ae8 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/ComponentStatus.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/ComponentStatus.java
@@ -34,6 +34,7 @@ public class ComponentStatus {
    * @see org.apache.ambari.server.state.SecurityState
    */
   private String securityState;
+  private String sendExecCmdDet = "False";
 
   private String serviceName;
   private String clusterName;
@@ -61,6 +62,14 @@ public class ComponentStatus {
     return status;
   }
 
+  public void setSendExecCmdDet(String sendExecCmdDet) {
+    this.sendExecCmdDet = sendExecCmdDet;
+  }
+
+  public String getSendExecCmdDet() {
+    return this.sendExecCmdDet;
+  }
+
   public void setStatus(String status) {
     this.status = status;
   }

http://git-wip-us.apache.org/repos/asf/ambari/blob/dcbf12ef/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeat.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeat.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeat.java
index 2f2688b..1430aa2 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeat.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeat.java
@@ -41,6 +41,7 @@ public class HeartBeat {
   HostStatus nodeStatus;
   private AgentEnv agentEnv = null;
   private List<Alert> alerts = null;
+  private RecoveryReport recoveryReport;
 
   public long getResponseId() {
     return responseId;
@@ -84,6 +85,14 @@ public class HeartBeat {
     this.nodeStatus = nodeStatus;
   }
 
+  public RecoveryReport getRecoveryReport() {
+    return recoveryReport;
+  }
+
+  public void setRecoveryReport(RecoveryReport recoveryReport) {
+    this.recoveryReport = recoveryReport;
+  }
+
   public AgentEnv getAgentEnv() {
     return agentEnv;
   }
@@ -129,6 +138,7 @@ public class HeartBeat {
             ", reports=" + reports +
             ", componentStatus=" + componentStatus +
             ", nodeStatus=" + nodeStatus +
+            ", recoveryReport=" + recoveryReport +
             '}';
   }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/dcbf12ef/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
index 51d6bc1..596525b 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
@@ -156,6 +156,7 @@ public class HeartBeatHandler {
   @Inject
   private VersionEventPublisher versionEventPublisher;
 
+
   /**
    * KerberosPrincipalHostDAO used to set and get Kerberos principal details
    */
@@ -245,6 +246,11 @@ public class HeartBeatHandler {
       }
     }
 
+    if (heartbeat.getRecoveryReport() != null) {
+      RecoveryReport rr = heartbeat.getRecoveryReport();
+      processRecoveryReport(rr, hostname);
+    }
+
     try {
       if (heartbeat.getNodeStatus().getStatus().equals(HostStatus.Status.HEALTHY)) {
         hostObject.handleEvent(new HostHealthyHeartbeatEvent(hostname, now,
@@ -254,7 +260,7 @@ public class HeartBeatHandler {
             null));
       }
     } catch (InvalidStateTransitionException ex) {
-      LOG.warn("Asking agent to reregister due to " + ex.getMessage(), ex);
+      LOG.warn("Asking agent to re-register due to " + ex.getMessage(), ex);
       hostObject.setState(HostState.INIT);
       return createRegisterCommand();
     }
@@ -262,7 +268,7 @@ public class HeartBeatHandler {
     // Examine heartbeat for command reports
     processCommandReports(heartbeat, hostname, clusterFsm, now);
 
-    // Examine heartbeart for component live status reports
+    // Examine heartbeat for component live status reports
     processStatusReports(heartbeat, hostname, clusterFsm);
 
     // Calculate host status
@@ -317,6 +323,12 @@ public class HeartBeatHandler {
     }
   }
 
+  protected void processRecoveryReport(RecoveryReport recoveryReport, String hostname) throws AmbariException {
+    LOG.debug("Received recovery report: " + recoveryReport.toString());
+    Host host = clusterFsm.getHost(hostname);
+    host.setRecoveryReport(recoveryReport);
+  }
+
   protected void processHostStatus(HeartBeat heartbeat, String hostname) throws AmbariException {
 
     Host host = clusterFsm.getHost(hostname);
@@ -696,6 +708,9 @@ public class HeartBeatHandler {
                       " (" + e.getMessage() + ")");
                 }
               }
+
+              this.heartbeatMonitor.getAgentRequests()
+                  .setExecutionDetailsRequest(hostname, componentName, status.getSendExecCmdDet());
             } else {
               // TODO: What should be done otherwise?
             }
@@ -941,6 +956,11 @@ public class HeartBeatHandler {
     List<AlertDefinitionCommand> alertDefinitionCommands = getRegistrationAlertDefinitionCommands(hostname);
     response.setAlertDefinitionCommands(alertDefinitionCommands);
 
+    response.setRecoveryConfig(RecoveryConfig.getRecoveryConfig(config));
+    if(response.getRecoveryConfig() != null) {
+      LOG.debug("Recovery configuration set to " + response.getRecoveryConfig().toString());
+    }
+
     Long requestId = 0L;
     hostResponseIds.put(hostname, requestId);
     response.setResponseId(requestId);

http://git-wip-us.apache.org/repos/asf/ambari/blob/dcbf12ef/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatMonitor.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatMonitor.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatMonitor.java
index d44e0a8..ad55c05 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatMonitor.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatMonitor.java
@@ -33,7 +33,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 
+import com.google.inject.Inject;
 import org.apache.ambari.server.AmbariException;
+import org.apache.ambari.server.RoleCommand;
 import org.apache.ambari.server.actionmanager.ActionManager;
 import org.apache.ambari.server.api.services.AmbariMetaInfo;
 import org.apache.ambari.server.configuration.Configuration;
@@ -75,6 +77,7 @@ public class HeartbeatMonitor implements Runnable {
   private final AmbariMetaInfo ambariMetaInfo;
   private final AmbariManagementController ambariManagementController;
   private final Configuration configuration;
+  private final AgentRequests agentRequests;
 
   public HeartbeatMonitor(Clusters clusters, ActionQueue aq, ActionManager am,
                           int threadWakeupInterval, Injector injector) {
@@ -87,6 +90,7 @@ public class HeartbeatMonitor implements Runnable {
     ambariManagementController = injector.getInstance(
             AmbariManagementController.class);
     configuration = injector.getInstance(Configuration.class);
+    agentRequests = new AgentRequests();
   }
 
   public void shutdown() {
@@ -106,6 +110,10 @@ public class HeartbeatMonitor implements Runnable {
     return monitorThread.isAlive();
   }
 
+  public AgentRequests getAgentRequests() {
+    return this.agentRequests;
+  }
+
   @Override
   public void run() {
     while (shouldRun) {
@@ -217,7 +225,7 @@ public class HeartbeatMonitor implements Runnable {
   }
 
   /**
-   * Generates status command and fills all apropriate fields.
+   * Generates status command and fills all appropriate fields.
    * @throws AmbariException
    */
   private StatusCommand createStatusCommand(String hostname, Cluster cluster,
@@ -241,6 +249,10 @@ public class HeartbeatMonitor implements Runnable {
     //Config clusterConfig = cluster.getDesiredConfigByType(GLOBAL);
     Collection<Config> clusterConfigs = cluster.getAllConfigs();
 
+    // Apply global properties for this host from all config groups
+    Map<String, Map<String, String>> allConfigTags = configHelper
+        .getEffectiveDesiredTags(cluster, hostname);
+
     for(Config clusterConfig: clusterConfigs) {
       if(!clusterConfig.getType().endsWith("-env")) {
         continue;
@@ -250,10 +262,6 @@ public class HeartbeatMonitor implements Runnable {
         // cluster config for 'global'
         Map<String, String> props = new HashMap<String, String>(clusterConfig.getProperties());
 
-        // Apply global properties for this host from all config groups
-        Map<String, Map<String, String>> allConfigTags = configHelper
-                .getEffectiveDesiredTags(cluster, hostname);
-
         Map<String, Map<String, String>> configTags = new HashMap<String,
                 Map<String, String>>();
 
@@ -294,6 +302,13 @@ public class HeartbeatMonitor implements Runnable {
     statusCmd.setConfigurationAttributes(configurationAttributes);
     statusCmd.setHostname(hostname);
 
+    // If Agent wants the command and the States differ
+    statusCmd.setDesiredState(sch.getDesiredState());
+    if (getAgentRequests().shouldSendExecutionDetails(hostname, componentName)) {
+      LOG.info(componentName + " is at " + sch.getState() + " adding more payload per agent ask");
+      statusCmd.setPayloadLevel(StatusCommand.StatusCommandPayload.EXECUTION_COMMAND);
+    }
+
     // Fill command params
     Map<String, String> commandParams = statusCmd.getCommandParams();
 
@@ -322,7 +337,13 @@ public class HeartbeatMonitor implements Runnable {
     hostLevelParams.put(STACK_NAME, stackId.getStackName());
     hostLevelParams.put(STACK_VERSION, stackId.getStackVersion());
 
+
+    if (statusCmd.getPayloadLevel() == StatusCommand.StatusCommandPayload.EXECUTION_COMMAND) {
+      ExecutionCommand ec = ambariManagementController.getExecutionCommand(cluster, sch, RoleCommand.START);
+      statusCmd.setExecutionCommand(ec);
+      LOG.debug(componentName + " has more payload for execution command");
+    }
+
     return statusCmd;
   }
-
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/dcbf12ef/ambari-server/src/main/java/org/apache/ambari/server/agent/RecoveryConfig.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/RecoveryConfig.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/RecoveryConfig.java
new file mode 100644
index 0000000..9ebfb49
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/RecoveryConfig.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.agent;
+
+import com.google.gson.annotations.SerializedName;
+import org.apache.ambari.server.configuration.Configuration;
+
+
+/**
+ * Recovery config to be sent to the agent
+ */
+public class RecoveryConfig {
+
+  /**
+   * Creates a holder for agent requests
+   */
+  public RecoveryConfig() {
+  }
+
+  @SerializedName("type")
+  private String type;
+
+  @SerializedName("maxCount")
+  private String maxCount;
+
+  @SerializedName("windowInMinutes")
+  private String windowInMinutes;
+
+  @SerializedName("retryGap")
+  private String retryGap;
+
+  @SerializedName("maxLifetimeCount")
+  private String maxLifetimeCount;
+
+
+  public String getType() {
+    return type;
+  }
+
+  public void setType(String type) {
+    this.type = type;
+  }
+
+  public String getMaxCount() {
+    return maxCount;
+  }
+
+  public void setMaxCount(String maxCount) {
+    this.maxCount = maxCount;
+  }
+
+  public String getWindowInMinutes() {
+    return windowInMinutes;
+  }
+
+  public void setWindowInMinutes(String windowInMinutes) {
+    this.windowInMinutes = windowInMinutes;
+  }
+
+  public String getRetryGap() {
+    return retryGap;
+  }
+
+  public void setRetryGap(String retryGap) {
+    this.retryGap = retryGap;
+  }
+
+  public String getMaxLifetimeCount() {
+    return maxLifetimeCount;
+  }
+
+  public void setMaxLifetimeCount(String maxLifetimeCount) {
+    this.maxLifetimeCount = maxLifetimeCount;
+  }
+
+  public static RecoveryConfig getRecoveryConfig(Configuration conf) {
+    RecoveryConfig rc = new RecoveryConfig();
+    rc.setMaxCount(conf.getNodeRecoveryMaxCount());
+    rc.setMaxLifetimeCount(conf.getNodeRecoveryLifetimeMaxCount());
+    rc.setRetryGap(conf.getNodeRecoveryRetryGap());
+    rc.setType(conf.getNodeRecoveryType());
+    rc.setWindowInMinutes(conf.getNodeRecoveryWindowInMin());
+    return rc;
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder buffer = new StringBuilder("RecoveryConfig{");
+    buffer.append(", type=").append(type);
+    buffer.append(", maxCount=").append(maxCount);
+    buffer.append(", windowInMinutes=").append(windowInMinutes);
+    buffer.append(", retryGap=").append(retryGap);
+    buffer.append(", maxLifetimeCount=").append(maxLifetimeCount);
+    buffer.append('}');
+    return buffer.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/dcbf12ef/ambari-server/src/main/java/org/apache/ambari/server/agent/RecoveryReport.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/RecoveryReport.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/RecoveryReport.java
new file mode 100644
index 0000000..f9d3e7c
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/RecoveryReport.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.agent;
+
+import org.codehaus.jackson.annotate.JsonProperty;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+
+public class RecoveryReport {
+
+  /**
+   * One of DISABLED, RECOVERABLE, UNRECOVERABLE, PARTIALLY_RECOVERABLE
+   */
+  private String summary = "DISABLED";
+  private List<ComponentRecoveryReport> componentReports = new ArrayList<ComponentRecoveryReport>();
+
+
+  @JsonProperty("summary")
+  public String getSummary() {
+    return summary;
+  }
+
+  @JsonProperty("summary")
+  public void setSummary(String summary) {
+    this.summary = summary;
+  }
+
+  @JsonProperty("component_reports")
+  public List<ComponentRecoveryReport> getComponentReports() {
+    return componentReports;
+  }
+
+  @JsonProperty("component_reports")
+  public void setComponentReports(List<ComponentRecoveryReport> componentReports) {
+    this.componentReports = componentReports;
+  }
+
+  @Override
+  public String toString() {
+    String componentReportsStr = "[]";
+    if (componentReports != null) {
+      componentReportsStr = Arrays.toString(componentReports.toArray());
+    }
+    return "RecoveryReport{" +
+           "summary='" + summary + '\'' +
+           ", component_reports='" + componentReportsStr +
+           "'}";
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/dcbf12ef/ambari-server/src/main/java/org/apache/ambari/server/agent/RegistrationResponse.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/RegistrationResponse.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/RegistrationResponse.java
index 8a24560..8768a46 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/RegistrationResponse.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/RegistrationResponse.java
@@ -58,6 +58,9 @@ public class RegistrationResponse {
   @JsonProperty("responseId")
   private long responseId;
 
+  @JsonProperty("recoveryConfig")
+  private RecoveryConfig recoveryConfig;
+
   @JsonProperty("statusCommands")
   private List<StatusCommand> statusCommands = null;
 
@@ -81,7 +84,6 @@ public class RegistrationResponse {
    * Gets the alert definition commands that contain the alert definitions for
    * each cluster that the host is a member of.
    *
-   * @param commands
    *          the commands, or {@code null} for none.
    */
   public List<AlertDefinitionCommand> getAlertDefinitionCommands() {
@@ -115,6 +117,14 @@ public class RegistrationResponse {
     this.log = log;
   }
 
+  public RecoveryConfig getRecoveryConfig() {
+    return recoveryConfig;
+  }
+
+  public void setRecoveryConfig(RecoveryConfig recoveryConfig) {
+    this.recoveryConfig = recoveryConfig;
+  }
+
   @Override
   public String toString() {
     StringBuilder buffer = new StringBuilder("RegistrationResponse{");
@@ -122,6 +132,7 @@ public class RegistrationResponse {
     buffer.append(", responseId=").append(responseId);
     buffer.append(", statusCommands=").append(statusCommands);
     buffer.append(", alertDefinitionCommands=").append(alertDefinitionCommands);
+    buffer.append(", recoveryConfig=").append(recoveryConfig);
     buffer.append('}');
     return buffer.toString();
   }

http://git-wip-us.apache.org/repos/asf/ambari/blob/dcbf12ef/ambari-server/src/main/java/org/apache/ambari/server/agent/StatusCommand.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/StatusCommand.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/StatusCommand.java
index 6e08ef0..0e08ed8 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/StatusCommand.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/StatusCommand.java
@@ -17,11 +17,12 @@
  */
 package org.apache.ambari.server.agent;
 
+import com.google.gson.annotations.SerializedName;
+import org.apache.ambari.server.state.State;
+
 import java.util.HashMap;
 import java.util.Map;
 
-import com.google.gson.annotations.SerializedName;
-
 /**
  * Command to report the status of a list of services in roles.
  */
@@ -55,6 +56,39 @@ public class StatusCommand extends AgentCommand {
   @SerializedName("hostname")
   private String hostname = null;
 
+  @SerializedName("payloadLevel")
+  private StatusCommandPayload payloadLevel = StatusCommandPayload.DEFAULT;
+
+  @SerializedName("desiredState")
+  private State desiredState;
+
+  @SerializedName("executionCommandDetails")
+  private ExecutionCommand executionCommand;
+
+  public ExecutionCommand getExecutionCommand() {
+    return executionCommand;
+  }
+
+  public void setExecutionCommand(ExecutionCommand executionCommand) {
+    this.executionCommand = executionCommand;
+  }
+
+  public State getDesiredState() {
+    return desiredState;
+  }
+
+  public void setDesiredState(State desiredState) {
+    this.desiredState = desiredState;
+  }
+
+  public StatusCommandPayload getPayloadLevel() {
+    return payloadLevel;
+  }
+
+  public void setPayloadLevel(StatusCommandPayload payloadLevel) {
+    this.payloadLevel = payloadLevel;
+  }
+
   public String getClusterName() {
     return clusterName;
   }
@@ -118,4 +152,13 @@ public class StatusCommand extends AgentCommand {
   public String getHostname() {
     return hostname;
   }
+
+  public enum StatusCommandPayload {
+    // The minimal payload for status, agent adds necessary details
+    MINIMAL,
+    // default payload - backward compatible
+    DEFAULT,
+    // has enough details to construct START or INSTALL commands
+    EXECUTION_COMMAND
+  }
 }