You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ha...@apache.org on 2018/05/22 16:04:24 UTC

[ambari] branch trunk updated: [AMBARI-23877] Service auto-start not working (dgrinenko)

This is an automated email from the ASF dual-hosted git repository.

hapylestat pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/ambari.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 571dfe6  [AMBARI-23877] Service auto-start not working (dgrinenko)
571dfe6 is described below

commit 571dfe62627e786efad31fa35702c61785004dab
Author: Reishin <ha...@gmail.com>
AuthorDate: Fri May 18 03:44:39 2018 +0300

    [AMBARI-23877] Service auto-start not working (dgrinenko)
---
 .../src/main/python/ambari_agent/ActionQueue.py    |  19 +-
 .../src/main/python/ambari_agent/AmbariConfig.py   |   8 +-
 .../python/ambari_agent/ComponentStatusExecutor.py |   1 -
 .../main/python/ambari_agent/InitializerModule.py  |  24 +-
 .../main/python/ambari_agent/RecoveryManager.py    | 337 ++++++++-------------
 .../python/ambari_agent/TestRecoveryManager.py     | 119 +++++---
 .../apache/ambari/server/agent/RecoveryConfig.java |  22 +-
 .../server/agent/RecoveryConfigComponent.java      | 118 ++++++++
 .../ambari/server/agent/RecoveryConfigHelper.java  |   8 +-
 .../configuration/RecoveryConfigHelperTest.java    |  53 +++-
 10 files changed, 392 insertions(+), 317 deletions(-)

diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
index 65239ed..f0c996b 100644
--- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
+++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
@@ -143,7 +143,7 @@ class ActionQueue(threading.Thread):
           if self.parallel_execution == 0:
             command = self.commandQueue.get(True, self.EXECUTION_COMMAND_WAIT_TIME)
 
-            if command == None:
+            if command is None:
               break
 
             self.process_command(command)
@@ -153,17 +153,16 @@ class ActionQueue(threading.Thread):
             while not self.stop_event.is_set():
               command = self.commandQueue.get(True, self.EXECUTION_COMMAND_WAIT_TIME)
 
-              if command == None:
+              if command is None:
                 break
               # If command is not retry_enabled then do not start them in parallel
               # checking just one command is enough as all commands for a stage is sent
               # at the same time and retry is only enabled for initial start/install
-              retryAble = False
+              retry_able = False
               if 'commandParams' in command and 'command_retry_enabled' in command['commandParams']:
-                retryAble = command['commandParams']['command_retry_enabled'] == "true"
-              if retryAble:
-                logger.info("Kicking off a thread for the command, id=" +
-                            str(command['commandId']) + " taskId=" + str(command['taskId']))
+                retry_able = command['commandParams']['command_retry_enabled'] == "true"
+              if retry_able:
+                logger.info("Kicking off a thread for the command, id={} taskId={}".format(command['commandId'], command['taskId']))
                 t = threading.Thread(target=self.process_command, args=(command,))
                 t.daemon = True
                 t.start()
@@ -172,14 +171,14 @@ class ActionQueue(threading.Thread):
                 break
               pass
             pass
-        except (Queue.Empty):
+        except Queue.Empty:
           pass
-      except:
+      except Exception:
         logger.exception("ActionQueue thread failed with exception. Re-running it")
     logger.info("ActionQueue thread has successfully finished")
 
   def fillRecoveryCommands(self):
-    if not self.tasks_in_progress_or_pending():
+    if self.recovery_manager.enabled() and not self.tasks_in_progress_or_pending():
       self.put(self.recovery_manager.get_recovery_commands())
 
   def processBackgroundQueueSafeEmpty(self):
diff --git a/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py b/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py
index 1e95fbe..88aa8ea 100644
--- a/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py
+++ b/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py
@@ -186,21 +186,23 @@ class AmbariConfig:
   @property
   def cluster_cache_dir(self):
     return os.path.join(self.cache_dir, FileCache.CLUSTER_CACHE_DIRECTORY)
-  @property
-  def recovery_cache_dir(self):
-    return os.path.join(self.cache_dir, FileCache.RECOVERY_CACHE_DIRECTORY)
+
   @property
   def alerts_cachedir(self):
     return os.path.join(self.cache_dir, FileCache.ALERTS_CACHE_DIRECTORY)
+
   @property
   def stacks_dir(self):
     return os.path.join(self.cache_dir, FileCache.STACKS_CACHE_DIRECTORY)
+
   @property
   def common_services_dir(self):
     return os.path.join(self.cache_dir, FileCache.COMMON_SERVICES_DIRECTORY)
+
   @property
   def extensions_dir(self):
     return os.path.join(self.cache_dir, FileCache.EXTENSIONS_CACHE_DIRECTORY)
+
   @property
   def host_scripts_dir(self):
     return os.path.join(self.cache_dir, FileCache.HOST_SCRIPTS_CACHE_DIRECTORY)
diff --git a/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py b/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py
index c9f86da..5d20495 100644
--- a/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py
+++ b/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py
@@ -154,7 +154,6 @@ class ComponentStatusExecutor(threading.Thread):
         self.send_updates_to_server({cluster_id: [result]})
 
       return result
-
     return None
 
   def send_updates_to_server(self, cluster_reports):
diff --git a/ambari-agent/src/main/python/ambari_agent/InitializerModule.py b/ambari-agent/src/main/python/ambari_agent/InitializerModule.py
index 052e8c1..787df16 100644
--- a/ambari-agent/src/main/python/ambari_agent/InitializerModule.py
+++ b/ambari-agent/src/main/python/ambari_agent/InitializerModule.py
@@ -46,6 +46,7 @@ from ambari_agent.AlertStatusReporter import AlertStatusReporter
 
 logger = logging.getLogger(__name__)
 
+
 class InitializerModule:
   """
   - Instantiate some singleton classes or widely used instances along with providing their dependencies.
@@ -56,6 +57,23 @@ class InitializerModule:
   def __init__(self):
     self.stop_event = threading.Event()
     self.config = AmbariConfig.get_resolved_config()
+
+    self.is_registered = None
+    self.metadata_cache = None
+    self.topology_cache = None
+    self.host_level_params_cache = None
+    self.configurations_cache = None
+    self.alert_definitions_cache = None
+    self.configuration_builder = None
+    self.stale_alerts_monitor = None
+    self.server_responses_listener = None
+    self.file_cache = None
+    self.customServiceOrchestrator = None
+    self.recovery_manager = None
+    self.commandStatuses = None
+    self.action_queue = None
+    self.alert_scheduler_handler = None
+
     self.init()
 
   def init(self):
@@ -71,14 +89,10 @@ class InitializerModule:
     self.alert_definitions_cache = ClusterAlertDefinitionsCache(self.config.cluster_cache_dir)
     self.configuration_builder = ConfigurationBuilder(self)
     self.stale_alerts_monitor = StaleAlertsMonitor(self)
-
     self.server_responses_listener = ServerResponsesListener()
-
     self.file_cache = FileCache(self.config)
-
     self.customServiceOrchestrator = CustomServiceOrchestrator(self)
-
-    self.recovery_manager = RecoveryManager(self.config.recovery_cache_dir)
+    self.recovery_manager = RecoveryManager()
     self.commandStatuses = CommandStatusDict(self)
 
     self.init_threads()
diff --git a/ambari-agent/src/main/python/ambari_agent/RecoveryManager.py b/ambari-agent/src/main/python/ambari_agent/RecoveryManager.py
index 3a20b20..f7ec134 100644
--- a/ambari-agent/src/main/python/ambari_agent/RecoveryManager.py
+++ b/ambari-agent/src/main/python/ambari_agent/RecoveryManager.py
@@ -1,5 +1,3 @@
-#!/usr/bin/env python
-
 # Licensed to the Apache Software Foundation (ASF) under one or more
 # contributor license agreements.  See the NOTICE file distributed with
 # this work for additional information regarding copyright ownership.
@@ -15,10 +13,8 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import json
 import logging
 import copy
-import os
 import time
 import threading
 import pprint
@@ -29,17 +25,17 @@ from ambari_agent.LiveStatus import LiveStatus
 
 logger = logging.getLogger()
 
-"""
-RecoveryManager has the following capabilities:
-* Store data needed for execution commands extracted from STATUS command
-* Generate INSTALL command
-* Generate START command
-"""
-
 
 class RecoveryManager:
+  """
+  RecoveryManager has the following capabilities:
+  * Store data needed for execution commands extracted from STATUS command
+  * Generate INSTALL command
+  * Generate START command
+  """
   COMMAND_TYPE = "commandType"
   PAYLOAD_LEVEL = "payloadLevel"
+  SERVICE_NAME = "serviceName"
   COMPONENT_NAME = "componentName"
   ROLE = "role"
   TASK_ID = "taskId"
@@ -57,7 +53,7 @@ class RecoveryManager:
   INIT = "INIT"  # TODO: What is the state when machine is reset
   INSTALL_FAILED = "INSTALL_FAILED"
   COMPONENT_UPDATE_KEY_FORMAT = "{0}_UPDATE_TIME"
-  COMMAND_REFRESH_DELAY_SEC = 600 #10 minutes
+  COMMAND_REFRESH_DELAY_SEC = 600
 
   FILENAME = "recovery.json"
 
@@ -77,13 +73,15 @@ class RecoveryManager:
     "stale_config": False
   }
 
-  def __init__(self, cache_dir, recovery_enabled=False, auto_start_only=False, auto_install_start=False):
+  def __init__(self, recovery_enabled=False, auto_start_only=False, auto_install_start=False):
     self.recovery_enabled = recovery_enabled
     self.auto_start_only = auto_start_only
     self.auto_install_start = auto_install_start
     self.max_count = 6
     self.window_in_min = 60
     self.retry_gap = 5
+    self.window_in_sec = self.window_in_min * 60
+    self.retry_gap_in_sec = self.retry_gap * 60
     self.max_lifetime_count = 12
 
     self.id = int(time.time())
@@ -91,6 +89,7 @@ class RecoveryManager:
     self.allowed_current_states = [self.INIT, self.INSTALLED]
     self.enabled_components = []
     self.statuses = {}
+    self.__component_to_service_map = {}   # component => service map TODO: fix it later(hack here)
     self.__status_lock = threading.RLock()
     self.__command_lock = threading.RLock()
     self.__active_command_lock = threading.RLock()
@@ -98,29 +97,16 @@ class RecoveryManager:
     self.active_command_count = 0
     self.cluster_id = None
 
-    if not os.path.exists(cache_dir):
-      try:
-        os.makedirs(cache_dir)
-      except:
-        logger.critical("[RecoveryManager] Could not create the cache directory {0}".format(cache_dir))
-
-    self.__actions_json_file = os.path.join(cache_dir, self.FILENAME)
-
     self.actions = {}
-
     self.update_config(6, 60, 5, 12, recovery_enabled, auto_start_only, auto_install_start, "")
 
-    pass
-
   def on_execution_command_start(self):
     with self.__active_command_lock:
       self.active_command_count += 1
-    pass
 
   def on_execution_command_finish(self):
     with self.__active_command_lock:
       self.active_command_count -= 1
-    pass
 
   def has_active_command(self):
     return self.active_command_count > 0
@@ -131,12 +117,10 @@ class RecoveryManager:
   def get_current_status(self, component):
     if component in self.statuses:
       return self.statuses[component]["current"]
-    pass
 
   def get_desired_status(self, component):
     if component in self.statuses:
       return self.statuses[component]["desired"]
-    pass
 
   def update_config_staleness(self, component, is_config_stale):
     """
@@ -154,17 +138,10 @@ class RecoveryManager:
       pass
 
     self.statuses[component]["stale_config"] = is_config_stale
-    pass
 
   def handle_status_change(self, component, component_status):
-    if not self.enabled() or not self.configured_for_recovery(component):
-      return
-
-    if component_status == LiveStatus.LIVE_STATUS:
-        self.update_current_status(component, component_status)
-    else:
-        if (self.get_current_status(component) != self.INSTALL_FAILED):
-          self.update_current_status(component, component_status)
+    if component_status == LiveStatus.LIVE_STATUS or self.get_current_status(component) != self.INSTALL_FAILED:
+      self.update_current_status(component, component_status)
 
   def update_current_status(self, component, state):
     """
@@ -184,9 +161,8 @@ class RecoveryManager:
 
     if self.statuses[component]["current"] != state:
       logger.info("current status is set to %s for %s", state, component)
-    self.statuses[component]["current"] = state
-    pass
 
+    self.statuses[component]["current"] = state
 
   def update_desired_status(self, component, state):
     """
@@ -202,21 +178,16 @@ class RecoveryManager:
           logger.info("New status, desired status is set to %s for %s", self.statuses[component]["desired"], component)
       finally:
         self.__status_lock.release()
-      pass
 
     if self.statuses[component]["desired"] != state:
       logger.info("desired status is set to %s for %s", state, component)
     self.statuses[component]["desired"] = state
-    pass
 
-  """
-  Whether specific components are enabled for recovery.
-  """
   def configured_for_recovery(self, component):
-    if len(self.enabled_components) > 0 and component in self.enabled_components:
-      return True
-
-    return False
+    """
+    Whether specific components are enabled for recovery.
+    """
+    return len(self.enabled_components) > 0 and component in self.enabled_components
 
   def requires_recovery(self, component):
     """
@@ -225,23 +196,15 @@ class RecoveryManager:
     INIT --> INSTALLED --> STARTED
     RE-INSTALLED (if configs do not match)
     """
-    if not self.enabled():
-      return False
-
-    if not self.configured_for_recovery(component):
-      return False
-
-    if component not in self.statuses:
+    if not self.enabled() or not self.configured_for_recovery(component) or component not in self.statuses:
       return False
 
     status = self.statuses[component]
     if self.auto_start_only or self.auto_install_start:
-      if status["current"] == status["desired"]:
-        return False
-      if status["desired"] not in self.allowed_desired_states:
+      if status["current"] == status["desired"] or status["desired"] not in self.allowed_desired_states:
         return False
     else:
-      if status["current"] == status["desired"] and status['stale_config'] == False:
+      if status["current"] == status["desired"] and status['stale_config'] is False:
         return False
 
     if status["desired"] not in self.allowed_desired_states or status["current"] not in self.allowed_current_states:
@@ -249,9 +212,6 @@ class RecoveryManager:
 
     logger.info("%s needs recovery, desired = %s, and current = %s.", component, status["desired"], status["current"])
     return True
-    pass
-
-
 
   def get_recovery_status(self):
     """
@@ -268,8 +228,7 @@ class RecoveryManager:
       ]
     }
     """
-    report = {}
-    report["summary"] = "DISABLED"
+    report = {"summary": "DISABLED"}
     if self.enabled():
       report["summary"] = "RECOVERABLE"
       num_limits_reached = 0
@@ -279,24 +238,23 @@ class RecoveryManager:
       try:
         for component in self.actions.keys():
           action = self.actions[component]
-          recovery_state = {}
-          recovery_state["name"] = component
-          recovery_state["numAttempts"] = action["lifetimeCount"]
-          recovery_state["limitReached"] = self.max_lifetime_count <= action["lifetimeCount"]
+          recovery_state = {
+            "name": component,
+            "numAttempts": action["lifetimeCount"],
+            "limitReached": self.max_lifetime_count <= action["lifetimeCount"]
+          }
           recovery_states.append(recovery_state)
-          if recovery_state["limitReached"] == True:
+          if recovery_state["limitReached"] is True:
             num_limits_reached += 1
-          pass
       finally:
         self.__status_lock.release()
 
-      if num_limits_reached > 0:
+      if num_limits_reached > 0 and num_limits_reached == len(recovery_states):
+        report["summary"] = "UNRECOVERABLE"
+      elif num_limits_reached > 0:
         report["summary"] = "PARTIALLY_RECOVERABLE"
-        if num_limits_reached == len(recovery_states):
-          report["summary"] = "UNRECOVERABLE"
 
     return report
-    pass
 
   def get_recovery_commands(self):
     """
@@ -308,39 +266,34 @@ class RecoveryManager:
     """
     commands = []
     for component in self.statuses.keys():
-      if self.requires_recovery(component) and self.may_execute(component):
+      if self.configured_for_recovery(component) and self.requires_recovery(component) and self.may_execute(component):
         status = copy.deepcopy(self.statuses[component])
         command = None
         if self.auto_start_only:
-          if status["desired"] == self.STARTED:
-            if status["current"] == self.INSTALLED:
-              command = self.get_start_command(component)
+          if status["desired"] == self.STARTED and status["current"] == self.INSTALLED:
+            command = self.get_start_command(component)
         elif self.auto_install_start:
-          if status["desired"] == self.STARTED:
-            if status["current"] == self.INSTALLED:
-              command = self.get_start_command(component)
-            elif status["current"] == self.INSTALL_FAILED:
-              command = self.get_install_command(component)
-          elif status["desired"] == self.INSTALLED:
-            if status["current"] == self.INSTALL_FAILED:
+          if status["desired"] == self.STARTED and status["current"] == self.INSTALLED:
+            command = self.get_start_command(component)
+          elif status["desired"] == self.STARTED and status["current"] == self.INSTALL_FAILED:
+            command = self.get_install_command(component)
+          elif status["desired"] == self.INSTALLED and status["current"] == self.INSTALL_FAILED:
               command = self.get_install_command(component)
         else:
           # START, INSTALL, RESTART
           if status["desired"] != status["current"]:
-            if status["desired"] == self.STARTED:
-              if status["current"] == self.INSTALLED:
-                command = self.get_start_command(component)
-              elif status["current"] == self.INIT:
-                command = self.get_install_command(component)
-              elif status["current"] == self.INSTALL_FAILED:
-                command = self.get_install_command(component)
-            elif status["desired"] == self.INSTALLED:
-              if status["current"] == self.INIT:
-                command = self.get_install_command(component)
-              elif status["current"] == self.INSTALL_FAILED:
-                command = self.get_install_command(component)
-              elif status["current"] == self.STARTED:
-                command = self.get_stop_command(component)
+            if status["desired"] == self.STARTED and status["current"] == self.INSTALLED:
+              command = self.get_start_command(component)
+            elif status["desired"] == self.STARTED and status["current"] == self.INIT:
+              command = self.get_install_command(component)
+            elif status["desired"] == self.STARTED and status["current"] == self.INSTALL_FAILED:
+              command = self.get_install_command(component)
+            elif status["desired"] == self.INSTALLED and status["current"] == self.INIT:
+              command = self.get_install_command(component)
+            elif status["desired"] == self.INSTALLED and status["current"] == self.INSTALL_FAILED:
+              command = self.get_install_command(component)
+            elif status["desired"] == self.INSTALLED and status["current"] == self.STARTED:
+              command = self.get_stop_command(component)
           else:
             if status["current"] == self.INSTALLED:
               command = self.get_install_command(component)
@@ -349,11 +302,10 @@ class RecoveryManager:
 
         if command:
           self.execute(component)
-          logger.info("Created recovery command %s for component %s",
-                    command[self.ROLE_COMMAND], command[self.ROLE])
+          logger.info("Created recovery command %s for component %s", command[self.ROLE_COMMAND], command[self.ROLE])
           commands.append(command)
-    return commands
 
+    return commands
 
   def may_execute(self, action):
     """
@@ -369,8 +321,6 @@ class RecoveryManager:
       finally:
         self.__status_lock.release()
     return self._execute_action_chk_only(action)
-    pass
-
 
   def execute(self, action):
     """
@@ -386,8 +336,6 @@ class RecoveryManager:
       finally:
         self.__status_lock.release()
     return self._execute_action_(action)
-    pass
-
 
   def _execute_action_(self, action_name):
     """
@@ -398,7 +346,7 @@ class RecoveryManager:
     executed = False
     seconds_since_last_attempt = now - action_counter["lastAttempt"]
     if action_counter["lifetimeCount"] < self.max_lifetime_count:
-      #reset if window_in_sec seconds passed since last attempt
+      # reset if window_in_sec seconds passed since last attempt
       if seconds_since_last_attempt > self.window_in_sec:
         action_counter["count"] = 0
         action_counter["lastReset"] = now
@@ -406,7 +354,7 @@ class RecoveryManager:
       if action_counter["count"] < self.max_count:
         if seconds_since_last_attempt > self.retry_gap_in_sec:
           action_counter["count"] += 1
-          action_counter["lifetimeCount"] +=1
+          action_counter["lifetimeCount"] += 1
           if self.retry_gap > 0:
             action_counter["lastAttempt"] = now
           action_counter["warnedLastAttempt"] = False
@@ -414,28 +362,27 @@ class RecoveryManager:
             action_counter["lastReset"] = now
           executed = True
         else:
-          if action_counter["warnedLastAttempt"] == False:
+          if action_counter["warnedLastAttempt"] is False:
             action_counter["warnedLastAttempt"] = True
             logger.warn(
               "%s seconds has not passed since last occurrence %s seconds back for %s. " +
               "Will silently skip execution without warning till retry gap is passed",
               self.retry_gap_in_sec, seconds_since_last_attempt, action_name)
           else:
-            logger.debug(
-              "%s seconds has not passed since last occurrence %s seconds back for %s",
-              self.retry_gap_in_sec, seconds_since_last_attempt, action_name)
+            logger.debug("%s seconds has not passed since last occurrence %s seconds back for %s",
+                         self.retry_gap_in_sec, seconds_since_last_attempt, action_name)
       else:
         sec_since_last_reset = now - action_counter["lastReset"]
         if sec_since_last_reset > self.window_in_sec:
           action_counter["count"] = 1
-          action_counter["lifetimeCount"] +=1
+          action_counter["lifetimeCount"] += 1
           if self.retry_gap > 0:
             action_counter["lastAttempt"] = now
           action_counter["lastReset"] = now
           action_counter["warnedLastReset"] = False
           executed = True
         else:
-          if action_counter["warnedLastReset"] == False:
+          if action_counter["warnedLastReset"] is False:
             action_counter["warnedLastReset"] = True
             logger.warn("%s occurrences in %s minutes reached the limit for %s. " +
                         "Will silently skip execution without warning till window is reset",
@@ -444,7 +391,7 @@ class RecoveryManager:
             logger.debug("%s occurrences in %s minutes reached the limit for %s",
                          action_counter["count"], self.window_in_min, action_name)
     else:
-      if action_counter["warnedThresholdReached"] == False:
+      if action_counter["warnedThresholdReached"] is False:
         action_counter["warnedThresholdReached"] = True
         logger.warn("%s occurrences in agent life time reached the limit for %s. " +
                     "Will silently skip execution without warning till window is reset",
@@ -452,47 +399,7 @@ class RecoveryManager:
       else:
         logger.error("%s occurrences in agent life time reached the limit for %s",
                      action_counter["lifetimeCount"], action_name)
-    self._dump_actions()
     return executed
-    pass
-
-
-  def _dump_actions(self):
-    """
-    Dump recovery actions to FS
-    """
-    self.__cache_lock.acquire()
-    try:
-      with open(self.__actions_json_file, 'w') as f:
-        json.dump(self.actions, f, indent=2)
-    except Exception, exception:
-      logger.exception("Unable to dump actions to {0}".format(self.__actions_json_file))
-      return False
-    finally:
-      self.__cache_lock.release()
-
-    return True
-    pass
-
-
-  def _load_actions(self):
-    """
-    Loads recovery actions from FS
-    """
-    self.__cache_lock.acquire()
-
-    try:
-      if os.path.isfile(self.__actions_json_file):
-        with open(self.__actions_json_file, 'r') as fp:
-          return json.load(fp)
-    except Exception, exception:
-      logger.warning("Unable to load recovery actions from {0}.".format(self.__actions_json_file))
-    finally:
-      self.__cache_lock.release()
-
-    return {}
-    pass
-
 
   def get_actions_copy(self):
     """
@@ -503,8 +410,6 @@ class RecoveryManager:
       return copy.deepcopy(self.actions)
     finally:
       self.__status_lock.release()
-    pass
-
 
   def is_action_info_stale(self, action_name):
     """
@@ -518,7 +423,6 @@ class RecoveryManager:
       seconds_since_last_attempt = now - action_counter["lastAttempt"]
       return seconds_since_last_attempt > self.window_in_sec
     return False
-    pass
 
   def _execute_action_chk_only(self, action_name):
     """
@@ -538,12 +442,9 @@ class RecoveryManager:
           return True
 
     return False
-    pass
 
   def _now_(self):
     return int(time.time())
-    pass
-
 
   def update_recovery_config(self, dictionary):
     """
@@ -564,8 +465,7 @@ class RecoveryManager:
     window_in_min = 60
     retry_gap = 5
     max_lifetime_count = 12
-    enabled_components = ""
-
+    enabled_components = []
 
     if dictionary and "recoveryConfig" in dictionary:
       if logger.isEnabledFor(logging.INFO):
@@ -593,27 +493,26 @@ class RecoveryManager:
 
     self.update_config(max_count, window_in_min, retry_gap, max_lifetime_count, recovery_enabled, auto_start_only,
                        auto_install_start, enabled_components)
-    pass
 
-  """
-  Update recovery configuration with the specified values.
-
-  max_count - Configured maximum count of recovery attempt allowed per host component in a window.
-  window_in_min - Configured window size in minutes.
-  retry_gap - Configured retry gap between tries per host component
-  max_lifetime_count - Configured maximum lifetime count of recovery attempt allowed per host component.
-  recovery_enabled - True or False. Indicates whether recovery is enabled or not.
-  auto_start_only - True if AUTO_START recovery type was specified. False otherwise.
-  auto_install_start - True if AUTO_INSTALL_START recovery type was specified. False otherwise.
-  enabled_components - CSV of componenents enabled for auto start.
-  """
   def update_config(self, max_count, window_in_min, retry_gap, max_lifetime_count, recovery_enabled,
                     auto_start_only, auto_install_start, enabled_components):
     """
+    Update recovery configuration with the specified values.
+
+    max_count - Configured maximum count of recovery attempt allowed per host component in a window.
+    window_in_min - Configured window size in minutes.
+    retry_gap - Configured retry gap between tries per host component
+    max_lifetime_count - Configured maximum lifetime count of recovery attempt allowed per host component.
+    recovery_enabled - True or False. Indicates whether recovery is enabled or not.
+    auto_start_only - True if AUTO_START recovery type was specified. False otherwise.
+    auto_install_start - True if AUTO_INSTALL_START recovery type was specified. False otherwise.
+    enabled_components - CSV of componenents enabled for auto start.
+
+
     Update recovery configuration, recovery is disabled if configuration values
     are not correct
     """
-    self.recovery_enabled = False;
+    self.recovery_enabled = False
     if max_count <= 0:
       logger.warn("Recovery disabled: max_count must be a non-negative number")
       return
@@ -653,10 +552,17 @@ class RecoveryManager:
       self.allowed_current_states = [self.INSTALL_FAILED, self.INSTALLED]
 
     if enabled_components is not None and len(enabled_components) > 0:
-      components = enabled_components.split(",")
-      for component in components:
-        if len(component.strip()) > 0:
-          self.enabled_components.append(component.strip())
+      components = [(item["service_name"], item["component_name"], item["desired_state"]) for item in enabled_components]
+      for service, component, state in components:
+        self.enabled_components.append(component)
+        self.update_desired_status(component, state)
+
+        # Recovery Manager is Component oriented, however Agent require Service and component name to build properly
+        # commands. As workaround, we pushing service name from the server and keeping it relation at agent.
+        #
+        # However it important to keep map actual, for this reason relation could be updated if service will
+        #  push another service <-> component relation
+        self.__component_to_service_map[component] = service
 
     self.recovery_enabled = recovery_enabled
     if self.recovery_enabled:
@@ -665,8 +571,6 @@ class RecoveryManager:
         " lifetime max being %s. Enabled components - %s",
         self.max_count, self.window_in_min, self.retry_gap, self.max_lifetime_count,
         ', '.join(self.enabled_components))
-    pass
-
 
   def get_unique_task_id(self):
     self.id += 1
@@ -679,33 +583,31 @@ class RecoveryManager:
     if not self.enabled():
       return
 
-    if not command.has_key(self.ROLE_COMMAND) or not self.configured_for_recovery(command['role']):
+    if self.ROLE_COMMAND not in command or not self.configured_for_recovery(command['role']):
       return
 
     if status == ActionQueue.COMPLETED_STATUS:
       if command[self.ROLE_COMMAND] == ActionQueue.ROLE_COMMAND_START:
         self.update_current_status(command[self.ROLE], LiveStatus.LIVE_STATUS)
-        #self.update_config_staleness(command['role'], False)
-        logger.info("After EXECUTION_COMMAND (START), with taskId=" + str(command['taskId']) +
-                    ", current state of " + command[self.ROLE] + " to " +
-                     self.get_current_status(command[self.ROLE]) )
+        logger.info("After EXECUTION_COMMAND (START), with taskId={}, current state of {} to {}".format(
+          command['taskId'], command[self.ROLE], self.get_current_status(command[self.ROLE])))
+
       elif command['roleCommand'] == ActionQueue.ROLE_COMMAND_STOP or command[self.ROLE_COMMAND] == ActionQueue.ROLE_COMMAND_INSTALL:
         self.update_current_status(command[self.ROLE], LiveStatus.DEAD_STATUS)
-        logger.info("After EXECUTION_COMMAND (STOP/INSTALL), with taskId=" + str(command['taskId']) +
-                    ", current state of " + command[self.ROLE] + " to " +
-                     self.get_current_status(command[self.ROLE]) )
+        logger.info("After EXECUTION_COMMAND (STOP/INSTALL), with taskId={}, current state of {} to {}".format(
+          command['taskId'], command[self.ROLE], self.get_current_status(command[self.ROLE])))
+
       elif command[self.ROLE_COMMAND] == ActionQueue.ROLE_COMMAND_CUSTOM_COMMAND:
-        if command.has_key('custom_command') and command['custom_command'] == ActionQueue.CUSTOM_COMMAND_RESTART:
+        if 'custom_command' in command and command['custom_command'] == ActionQueue.CUSTOM_COMMAND_RESTART:
           self.update_current_status(command['role'], LiveStatus.LIVE_STATUS)
-          #self.update_config_staleness(command['role'], False)
-          logger.info("After EXECUTION_COMMAND (RESTART), current state of " + command[self.ROLE] + " to " +
-                       self.get_current_status(command[self.ROLE]) )
+          logger.info("After EXECUTION_COMMAND (RESTART), current state of {} to {}".format(
+            command[self.ROLE], self.get_current_status(command[self.ROLE])))
+
     elif status == ActionQueue.FAILED_STATUS:
       if command[self.ROLE_COMMAND] == ActionQueue.ROLE_COMMAND_INSTALL:
         self.update_current_status(command[self.ROLE], self.INSTALL_FAILED)
-        logger.info("After EXECUTION_COMMAND (INSTALL), with taskId=" + str(command['taskId']) +
-                    ", current state of " + command[self.ROLE] + " to " +
-                    self.get_current_status(command[self.ROLE]))
+        logger.info("After EXECUTION_COMMAND (INSTALL), with taskId={}, current state of {} to {}".format(
+          command['taskId'], command[self.ROLE], self.get_current_status(command[self.ROLE])))
 
   def process_execution_command(self, command):
     """
@@ -714,28 +616,30 @@ class RecoveryManager:
     if not self.enabled():
       return
 
-    if not self.COMMAND_TYPE in command or not command[self.COMMAND_TYPE] == ActionQueue.EXECUTION_COMMAND:
+    if self.COMMAND_TYPE not in command or not command[self.COMMAND_TYPE] == ActionQueue.EXECUTION_COMMAND:
       return
 
-    if not self.ROLE in command:
+    if self.ROLE not in command:
       return
 
     if command[self.ROLE_COMMAND] in (ActionQueue.ROLE_COMMAND_INSTALL, ActionQueue.ROLE_COMMAND_STOP) \
         and self.configured_for_recovery(command[self.ROLE]):
+
       self.update_desired_status(command[self.ROLE], LiveStatus.DEAD_STATUS)
-      logger.info("Received EXECUTION_COMMAND (STOP/INSTALL), desired state of " + command[self.ROLE] + " to " +
-                   self.get_desired_status(command[self.ROLE]) )
-    elif command[self.ROLE_COMMAND] == ActionQueue.ROLE_COMMAND_START \
-        and self.configured_for_recovery(command[self.ROLE]):
+      logger.info("Received EXECUTION_COMMAND (STOP/INSTALL), desired state of {} to {}".format(
+        command[self.ROLE], self.get_desired_status(command[self.ROLE])))
+
+    elif command[self.ROLE_COMMAND] == ActionQueue.ROLE_COMMAND_START and self.configured_for_recovery(command[self.ROLE]):
       self.update_desired_status(command[self.ROLE], LiveStatus.LIVE_STATUS)
-      logger.info("Received EXECUTION_COMMAND (START), desired state of " + command[self.ROLE] + " to " +
-                   self.get_desired_status(command[self.ROLE]) )
-    elif command.has_key('custom_command') and \
-            command['custom_command'] == ActionQueue.CUSTOM_COMMAND_RESTART \
+      logger.info("Received EXECUTION_COMMAND (START), desired state of {} to {}".format(
+        command[self.ROLE], self.get_desired_status(command[self.ROLE])))
+
+    elif 'custom_command' in command and  command['custom_command'] == ActionQueue.CUSTOM_COMMAND_RESTART \
             and self.configured_for_recovery(command[self.ROLE]):
+
       self.update_desired_status(command[self.ROLE], LiveStatus.LIVE_STATUS)
-      logger.info("Received EXECUTION_COMMAND (RESTART), desired state of " + command[self.ROLE] + " to " +
-                   self.get_desired_status(command[self.ROLE]) )
+      logger.info("Received EXECUTION_COMMAND (RESTART), desired state of {} to {}".format(
+        command[self.ROLE], self.get_desired_status(command[self.ROLE])))
 
   def get_command(self, component, command_name):
     """
@@ -755,6 +659,10 @@ class RecoveryManager:
         self.ROLE: component,
         self.COMMAND_ID: command_id
       }
+
+      if component in self.__component_to_service_map:
+        command[self.SERVICE_NAME] = self.__component_to_service_map[component]
+
       return command
     else:
       logger.info("Recovery is not enabled. START command will not be computed.")
@@ -786,12 +694,3 @@ class RecoveryManager:
     except ValueError:
       pass
     return int_value
-
-
-def main(argv=None):
-  cmd_mgr = RecoveryManager('/tmp')
-  pass
-
-
-if __name__ == '__main__':
-  main()
diff --git a/ambari-agent/src/test/python/ambari_agent/TestRecoveryManager.py b/ambari-agent/src/test/python/ambari_agent/TestRecoveryManager.py
index b9800bb..432e74b 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestRecoveryManager.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestRecoveryManager.py
@@ -116,14 +116,8 @@ class _TestRecoveryManager(TestCase):
     }
   }
 
-  def setUp(self):
-    pass
-
-  def tearDown(self):
-    pass
-
   def test_defaults(self):
-    rm = RecoveryManager(tempfile.mktemp())
+    rm = RecoveryManager()
     self.assertFalse(rm.enabled())
     self.assertEqual(None, rm.get_install_command("NODEMANAGER"))
     self.assertEqual(None, rm.get_start_command("NODEMANAGER"))
@@ -131,7 +125,6 @@ class _TestRecoveryManager(TestCase):
     rm.update_current_status("NODEMANAGER", "INSTALLED")
     rm.update_desired_status("NODEMANAGER", "STARTED")
     self.assertFalse(rm.requires_recovery("NODEMANAGER"))
-    pass
 
   @patch.object(RecoveryManager, "_now_")
   def test_sliding_window(self, time_mock):
@@ -139,7 +132,7 @@ class _TestRecoveryManager(TestCase):
       [1000, 1001, 1002, 1003, 1004, 1071, 1150, 1151, 1152, 1153, 1400, 1401,
        1500, 1571, 1572, 1653, 1900, 1971, 2300, 2301]
 
-    rm = RecoveryManager(tempfile.mktemp(), True, False)
+    rm = RecoveryManager(True, False)
     self.assertTrue(rm.enabled())
 
     config = rm.update_config(0, 60, 5, 12, True, False, False, "")
@@ -206,11 +199,12 @@ class _TestRecoveryManager(TestCase):
     # lifetime max reached
     self.assertTrue(rm.execute("NODEMANAGER2"))
     self.assertFalse(rm.execute("NODEMANAGER2"))
-    pass
 
   def test_recovery_required(self):
-    rm = RecoveryManager(tempfile.mktemp(), True, False)
-    rm.update_config(12, 5, 1, 15, True, False, False, "NODEMANAGER")
+    rm = RecoveryManager(True, False)
+    rm.update_config(12, 5, 1, 15, True, False, False, [
+      {'component_name': 'NODEMANAGER', 'service_name': 'YARN', 'desired_state': 'INSTALLED'}
+    ])
     rm.update_current_status("NODEMANAGER", "INSTALLED")
     rm.update_desired_status("NODEMANAGER", "INSTALLED")
     self.assertFalse(rm.requires_recovery("NODEMANAGER"))
@@ -239,7 +233,7 @@ class _TestRecoveryManager(TestCase):
     rm.update_desired_status("NODEMANAGER", "STARTED")
     self.assertTrue(rm.requires_recovery("NODEMANAGER"))
 
-    rm = RecoveryManager(tempfile.mktemp(), True, True)
+    rm = RecoveryManager(True, True)
 
     rm.update_current_status("NODEMANAGER", "INIT")
     rm.update_desired_status("NODEMANAGER", "INSTALLED")
@@ -253,18 +247,20 @@ class _TestRecoveryManager(TestCase):
     rm.update_desired_status("NODEMANAGER", "START")
     self.assertFalse(rm.requires_recovery("NODEMANAGER"))
 
-    pass
-
   def test_recovery_required2(self):
 
-    rm = RecoveryManager(tempfile.mktemp(), True, True)
-    rm.update_config(15, 5, 1, 16, True, False, False, "NODEMANAGER")
+    rm = RecoveryManager(True, True)
+    rm.update_config(15, 5, 1, 16, True, False, False, [
+      {'component_name': 'NODEMANAGER', 'service_name': 'YARN', 'desired_state': 'INSTALLED'}
+    ])
     rm.update_current_status("NODEMANAGER", "INSTALLED")
     rm.update_desired_status("NODEMANAGER", "STARTED")
     self.assertTrue(rm.requires_recovery("NODEMANAGER"))
 
-    rm = RecoveryManager(tempfile.mktemp(), True, True)
-    rm.update_config(15, 5, 1, 16, True, False, False, "NODEMANAGER")
+    rm = RecoveryManager( True, True)
+    rm.update_config(15, 5, 1, 16, True, False, False, [
+      {'component_name': 'NODEMANAGER', 'service_name': 'YARN', 'desired_state': 'INSTALLED'}
+    ])
     rm.update_current_status("NODEMANAGER", "INSTALLED")
     rm.update_desired_status("NODEMANAGER", "STARTED")
     self.assertTrue(rm.requires_recovery("NODEMANAGER"))
@@ -273,7 +269,7 @@ class _TestRecoveryManager(TestCase):
     rm.update_desired_status("DATANODE", "STARTED")
     self.assertFalse(rm.requires_recovery("DATANODE"))
 
-    rm = RecoveryManager(tempfile.mktemp(), True, True)
+    rm = RecoveryManager(True, True)
     rm.update_config(15, 5, 1, 16, True, False, False, "")
     rm.update_current_status("NODEMANAGER", "INSTALLED")
     rm.update_desired_status("NODEMANAGER", "STARTED")
@@ -283,7 +279,9 @@ class _TestRecoveryManager(TestCase):
     rm.update_desired_status("DATANODE", "STARTED")
     self.assertFalse(rm.requires_recovery("DATANODE"))
 
-    rm.update_config(15, 5, 1, 16, True, False, False, "NODEMANAGER")
+    rm.update_config(15, 5, 1, 16, True, False, False, [
+      {'component_name': 'NODEMANAGER', 'service_name': 'YARN', 'desired_state': 'INSTALLED'}
+    ])
     rm.update_current_status("NODEMANAGER", "INSTALLED")
     rm.update_desired_status("NODEMANAGER", "STARTED")
     self.assertTrue(rm.requires_recovery("NODEMANAGER"))
@@ -291,31 +289,30 @@ class _TestRecoveryManager(TestCase):
     rm.update_current_status("DATANODE", "INSTALLED")
     rm.update_desired_status("DATANODE", "STARTED")
     self.assertFalse(rm.requires_recovery("DATANODE"))
-    pass
 
   @patch.object(RecoveryManager, "update_config")
   def test_update_rm_config(self, mock_uc):
-    rm = RecoveryManager(tempfile.mktemp())
+    rm = RecoveryManager()
     rm.update_recovery_config(None)
-    mock_uc.assert_has_calls([call(6, 60, 5, 12, False, False, False, "")])
+    mock_uc.assert_has_calls([call(6, 60, 5, 12, False, False, False, [])])
 
     mock_uc.reset_mock()
     rm.update_recovery_config({})
-    mock_uc.assert_has_calls([call(6, 60, 5, 12, False, False, False, "")])
+    mock_uc.assert_has_calls([call(6, 60, 5, 12, False, False, False, [])])
 
     mock_uc.reset_mock()
     rm.update_recovery_config(
       {"recoveryConfig": {
       "type" : "DEFAULT"}}
     )
-    mock_uc.assert_has_calls([call(6, 60, 5, 12, False, False, False, "")])
+    mock_uc.assert_has_calls([call(6, 60, 5, 12, False, False, False, [])])
 
     mock_uc.reset_mock()
     rm.update_recovery_config(
       {"recoveryConfig": {
         "type" : "FULL"}}
     )
-    mock_uc.assert_has_calls([call(6, 60, 5, 12, True, False, False, "")])
+    mock_uc.assert_has_calls([call(6, 60, 5, 12, True, False, False, [])])
 
     mock_uc.reset_mock()
     rm.update_recovery_config(
@@ -323,7 +320,7 @@ class _TestRecoveryManager(TestCase):
         "type" : "AUTO_START",
         "max_count" : "med"}}
     )
-    mock_uc.assert_has_calls([call(6, 60, 5, 12, True, True, False, "")])
+    mock_uc.assert_has_calls([call(6, 60, 5, 12, True, True, False, [])])
 
     mock_uc.reset_mock()
     rm.update_recovery_config(
@@ -331,28 +328,41 @@ class _TestRecoveryManager(TestCase):
         "type" : "AUTO_INSTALL_START",
         "max_count" : "med"}}
     )
-    mock_uc.assert_has_calls([call(6, 60, 5, 12, True, False, True, "")])
+    mock_uc.assert_has_calls([call(6, 60, 5, 12, True, False, True, [])])
 
     mock_uc.reset_mock()
     rm.update_recovery_config(
       {"recoveryConfig": {
-        "type" : "AUTO_START",
-        "maxCount" : "5",
+        "type": "AUTO_START",
+        "maxCount": "5",
         "windowInMinutes" : 20,
-        "retryGap" : 2,
+        "retryGap": 2,
         "maxLifetimeCount" : 5,
-        "components" : " A,B",
-        "recoveryTimestamp" : 1}}
+        "components": [
+          {
+            "service_name": "A",
+            "component_name": "A",
+            "desired_state": "INSTALLED"
+          },
+          {
+            "service_name": "B",
+            "component_name": "B",
+            "desired_state": "INSTALLED"
+          }
+        ],
+        "recoveryTimestamp": 1}}
     )
-    mock_uc.assert_has_calls([call(5, 20, 2, 5, True, True, False, " A,B")])
-  pass
+    mock_uc.assert_has_calls([call(5, 20, 2, 5, True, True, False, [
+      {'component_name': 'A', 'service_name': 'A', 'desired_state': 'INSTALLED'},
+      {'component_name': 'B', 'service_name': 'B', 'desired_state': 'INSTALLED'}
+    ])])
 
   @patch.object(RecoveryManager, "_now_")
   def test_recovery_report(self, time_mock):
     time_mock.side_effect = \
       [1000, 1071, 1072, 1470, 1471, 1472, 1543, 1644, 1815]
 
-    rm = RecoveryManager(tempfile.mktemp())
+    rm = RecoveryManager()
     rec_st = rm.get_recovery_status()
     self.assertEquals(rec_st, {"summary": "DISABLED"})
 
@@ -392,20 +402,21 @@ class _TestRecoveryManager(TestCase):
                                  {"name": "LION", "numAttempts": 4, "limitReached": True},
                                  {"name": "PUMA", "numAttempts": 4, "limitReached": True}
                                ]})
-    pass
 
   @patch.object(RecoveryManager, "_now_")
   def test_command_expiry(self, time_mock):
     time_mock.side_effect = \
       [1000, 1001, 1104, 1105, 1106, 1807, 1808, 1809, 1810, 1811, 1812]
 
-    rm = RecoveryManager(tempfile.mktemp(), True)
+    rm = RecoveryManager(True)
     rm.update_config(5, 5, 0, 11, True, False, False, "")
 
     command1 = copy.deepcopy(self.command)
 
     #rm.store_or_update_command(command1)
-    rm.update_config(12, 5, 1, 15, True, False, False, "NODEMANAGER")
+    rm.update_config(12, 5, 1, 15, True, False, False, [
+      {'component_name': 'NODEMANAGER', 'service_name': 'YARN', 'desired_state': 'INSTALLED'}
+    ])
     rm.update_current_status("NODEMANAGER", "INSTALLED")
     rm.update_desired_status("NODEMANAGER", "STARTED")
 
@@ -426,28 +437,38 @@ class _TestRecoveryManager(TestCase):
     commands = rm.get_recovery_commands()
     self.assertEqual(1, len(commands))
     self.assertEqual("START", commands[0]["roleCommand"])
-    pass
 
   def test_configured_for_recovery(self):
-    rm = RecoveryManager(tempfile.mktemp(), True)
-    rm.update_config(12, 5, 1, 15, True, False, False, "A,B")
+    rm = RecoveryManager(True)
+    rm.update_config(12, 5, 1, 15, True, False, False, [
+      {'component_name': 'A', 'service_name': 'A', 'desired_state': 'INSTALLED'},
+      {'component_name': 'B', 'service_name': 'B', 'desired_state': 'INSTALLED'},
+    ])
     self.assertTrue(rm.configured_for_recovery("A"))
     self.assertTrue(rm.configured_for_recovery("B"))
 
-    rm.update_config(5, 5, 1, 11, True, False, False, "")
+    rm.update_config(5, 5, 1, 11, True, False, False, [])
     self.assertFalse(rm.configured_for_recovery("A"))
     self.assertFalse(rm.configured_for_recovery("B"))
 
-    rm.update_config(5, 5, 1, 11, True, False, False, "A")
+    rm.update_config(5, 5, 1, 11, True, False, False, [
+      {'component_name': 'A', 'service_name': 'A', 'desired_state': 'INSTALLED'}
+    ])
     self.assertTrue(rm.configured_for_recovery("A"))
     self.assertFalse(rm.configured_for_recovery("B"))
 
-    rm.update_config(5, 5, 1, 11, True, False, False, "A")
+    rm.update_config(5, 5, 1, 11, True, False, False, [
+      {'component_name': 'A', 'service_name': 'A', 'desired_state': 'INSTALLED'}
+    ])
     self.assertTrue(rm.configured_for_recovery("A"))
     self.assertFalse(rm.configured_for_recovery("B"))
     self.assertFalse(rm.configured_for_recovery("C"))
 
-    rm.update_config(5, 5, 1, 11, True, False, False, "A, D, F ")
+    rm.update_config(5, 5, 1, 11, True, False, False, [
+      {'component_name': 'A', 'service_name': 'A', 'desired_state': 'INSTALLED'},
+      {'component_name': 'D', 'service_name': 'D', 'desired_state': 'INSTALLED'},
+      {'component_name': 'F', 'service_name': 'F', 'desired_state': 'INSTALLED'}
+    ])
     self.assertTrue(rm.configured_for_recovery("A"))
     self.assertFalse(rm.configured_for_recovery("B"))
     self.assertFalse(rm.configured_for_recovery("C"))
@@ -459,7 +480,7 @@ class _TestRecoveryManager(TestCase):
   def test_reset_if_window_passed_since_last_attempt(self, time_mock):
     time_mock.side_effect = \
       [1000, 1071, 1372]
-    rm = RecoveryManager(tempfile.mktemp(), True)
+    rm = RecoveryManager(True)
 
     rm.update_config(2, 5, 1, 4, True, True, False, "")
 
@@ -478,7 +499,7 @@ class _TestRecoveryManager(TestCase):
   @patch.object(RecoveryManager, "_now_")
   def test_is_action_info_stale(self, time_mock):
 
-    rm = RecoveryManager(tempfile.mktemp(), True)
+    rm = RecoveryManager(True)
     rm.update_config(5, 60, 5, 16, True, False, False, "")
 
     time_mock.return_value = 0
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/RecoveryConfig.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/RecoveryConfig.java
index 8e2078d..9e3e455 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/RecoveryConfig.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/RecoveryConfig.java
@@ -18,8 +18,10 @@
 
 package org.apache.ambari.server.agent;
 
-import com.google.gson.annotations.SerializedName;
+import java.util.List;
 
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.gson.annotations.SerializedName;
 
 /**
  * Recovery config to be sent to the agent
@@ -33,34 +35,34 @@ public class RecoveryConfig {
   }
 
   @SerializedName("type")
-  @com.fasterxml.jackson.annotation.JsonProperty("type")
+  @JsonProperty("type")
   private String type;
 
   @SerializedName("maxCount")
-  @com.fasterxml.jackson.annotation.JsonProperty("maxCount")
+  @JsonProperty("maxCount")
   private String maxCount;
 
   @SerializedName("windowInMinutes")
-  @com.fasterxml.jackson.annotation.JsonProperty("windowInMinutes")
+  @JsonProperty("windowInMinutes")
   private String windowInMinutes;
 
   @SerializedName("retryGap")
-  @com.fasterxml.jackson.annotation.JsonProperty("retryGap")
+  @JsonProperty("retryGap")
   private String retryGap;
 
   @SerializedName("maxLifetimeCount")
-  @com.fasterxml.jackson.annotation.JsonProperty("maxLifetimeCount")
+  @JsonProperty("maxLifetimeCount")
   private String maxLifetimeCount;
 
   @SerializedName("components")
-  @com.fasterxml.jackson.annotation.JsonProperty("components")
-  private String enabledComponents;
+  @JsonProperty("components")
+  private List<RecoveryConfigComponent> enabledComponents;
 
-  public String getEnabledComponents() {
+  public List<RecoveryConfigComponent> getEnabledComponents() {
     return enabledComponents;
   }
 
-  public void setEnabledComponents(String enabledComponents) {
+  public void setEnabledComponents(List<RecoveryConfigComponent> enabledComponents) {
     this.enabledComponents = enabledComponents;
   }
 
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/RecoveryConfigComponent.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/RecoveryConfigComponent.java
new file mode 100644
index 0000000..50f13b4
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/RecoveryConfigComponent.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.agent;
+
+import java.util.Objects;
+
+import org.apache.ambari.server.state.ServiceComponentHost;
+import org.apache.ambari.server.state.State;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.gson.annotations.SerializedName;
+
+/**
+ * Holder for component
+ */
+public class RecoveryConfigComponent{
+
+  @SerializedName("component_name")
+  @JsonProperty("component_name")
+  private String componentName;
+
+  @SerializedName("service_name")
+  @JsonProperty("service_name")
+  private String serviceName;
+
+  @SerializedName("desired_state")
+  @JsonProperty("desired_state")
+  private String desiredState;
+
+  /**
+   * Creates new instance of {@link RecoveryConfigComponent}
+   * @param componentName name of the component
+   * @param desiredState desired desiredState of the component
+   */
+  public RecoveryConfigComponent(String componentName, String serviceName, State desiredState){
+    this.setComponentName(componentName);
+    this.setServiceName(serviceName);
+    this.setDesiredState(desiredState);
+  }
+
+  /**
+   * Creates {@link RecoveryConfigComponent} instance from initialized {@link ServiceComponentHost}
+   */
+  public RecoveryConfigComponent(ServiceComponentHost sch) {
+    this(sch.getServiceComponentName(), sch.getServiceName(), sch.getDesiredState());
+  }
+
+  public String getComponentName() {
+    return componentName;
+  }
+
+  public void setComponentName(String componentName) {
+    this.componentName = componentName;
+  }
+
+  public State getDesiredState() {
+    return State.valueOf(desiredState);
+  }
+
+
+  public void setDesiredState(State state) {
+    this.desiredState = state.toString();
+  }
+
+  @Override
+  public String toString(){
+    StringBuilder sb = new StringBuilder("{")
+      .append("componentName=").append(componentName)
+      .append(", serviceName=").append(serviceName)
+      .append(", desiredState=").append(desiredState)
+      .append("}");
+    return sb.toString();
+  }
+
+  public String getServiceName() {
+    return serviceName;
+  }
+
+  public void setServiceName(String serviceName) {
+    this.serviceName = serviceName;
+  }
+
+  @Override
+  public boolean equals(Object o){
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    final RecoveryConfigComponent that = (RecoveryConfigComponent) o;
+    return Objects.equals(componentName, that.componentName) &&
+      Objects.equals(serviceName, that.serviceName) &&
+      Objects.equals(desiredState, that.desiredState);
+  }
+
+  @Override
+  public int hashCode(){
+    return Objects.hash(componentName, serviceName, desiredState);
+  }
+}
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/RecoveryConfigHelper.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/RecoveryConfigHelper.java
index 668b65e..fbd8a7f 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/RecoveryConfigHelper.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/RecoveryConfigHelper.java
@@ -108,7 +108,7 @@ public class RecoveryConfigHelper {
     recoveryConfig.setType(autoStartConfig.getNodeRecoveryType());
     recoveryConfig.setWindowInMinutes(autoStartConfig.getNodeRecoveryWindowInMin());
     if (autoStartConfig.isRecoveryEnabled()) {
-      recoveryConfig.setEnabledComponents(StringUtils.join(autoStartConfig.getEnabledComponents(hostname), ','));
+      recoveryConfig.setEnabledComponents(autoStartConfig.getEnabledComponents(hostname));
     }
 
     return recoveryConfig;
@@ -316,8 +316,8 @@ public class RecoveryConfigHelper {
      * maintenance mode.
      * @return
      */
-    private List<String> getEnabledComponents(String hostname) throws AmbariException {
-      List<String> enabledComponents = new ArrayList<>();
+    private List<RecoveryConfigComponent> getEnabledComponents(String hostname) throws AmbariException {
+      List<RecoveryConfigComponent> enabledComponents = new ArrayList<>();
 
       if (cluster == null) {
         return enabledComponents;
@@ -343,7 +343,7 @@ public class RecoveryConfigHelper {
           if (service.getMaintenanceState() == MaintenanceState.OFF) {
             // Keep the components that are not in maintenance mode.
             if (sch.getMaintenanceState() == MaintenanceState.OFF) {
-              enabledComponents.add(sch.getServiceComponentName());
+              enabledComponents.add(new RecoveryConfigComponent(sch));
             }
           }
         }
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/configuration/RecoveryConfigHelperTest.java b/ambari-server/src/test/java/org/apache/ambari/server/configuration/RecoveryConfigHelperTest.java
index fb53b1f..738e06e 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/configuration/RecoveryConfigHelperTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/configuration/RecoveryConfigHelperTest.java
@@ -27,6 +27,7 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -35,6 +36,7 @@ import java.util.Set;
 import org.apache.ambari.server.H2DatabaseCleaner;
 import org.apache.ambari.server.agent.HeartbeatTestHelper;
 import org.apache.ambari.server.agent.RecoveryConfig;
+import org.apache.ambari.server.agent.RecoveryConfigComponent;
 import org.apache.ambari.server.agent.RecoveryConfigHelper;
 import org.apache.ambari.server.controller.internal.DeleteHostComponentStatusMetaData;
 import org.apache.ambari.server.orm.GuiceJpaInitializer;
@@ -53,6 +55,7 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
+import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import com.google.common.eventbus.EventBus;
 import com.google.inject.Guice;
@@ -145,8 +148,7 @@ public class RecoveryConfigHelperTest {
    * @throws Exception
    */
   @Test
-  public void testServiceComponentInstalled()
-      throws Exception {
+  public void testServiceComponentInstalled() throws Exception {
     Cluster cluster = heartbeatTestHelper.getDummyCluster();
 
     RepositoryVersionEntity repositoryVersion = helper.getOrCreateRepositoryVersion(cluster);
@@ -157,7 +159,9 @@ public class RecoveryConfigHelperTest {
 
     // Get the recovery configuration
     RecoveryConfig recoveryConfig = recoveryConfigHelper.getRecoveryConfig(cluster.getClusterName(), DummyHostname1);
-    assertEquals(recoveryConfig.getEnabledComponents(), "DATANODE");
+    assertEquals(Lists.newArrayList(
+       new RecoveryConfigComponent(DATANODE, HDFS, State.INIT)
+      ), recoveryConfig.getEnabledComponents());
 
     // Install HDFS::NAMENODE to trigger a component installed event
     hdfs.addServiceComponent(NAMENODE).setRecoveryEnabled(true);
@@ -165,7 +169,10 @@ public class RecoveryConfigHelperTest {
 
     // Verify the new config
     recoveryConfig = recoveryConfigHelper.getRecoveryConfig(cluster.getClusterName(), DummyHostname1);
-    assertEquals(recoveryConfig.getEnabledComponents(), "DATANODE,NAMENODE");
+    assertEquals(Lists.newArrayList(
+      new RecoveryConfigComponent(DATANODE, HDFS, State.INIT),
+      new RecoveryConfigComponent(NAMENODE, HDFS, State.INIT)
+      ), recoveryConfig.getEnabledComponents());
   }
 
   /**
@@ -188,14 +195,19 @@ public class RecoveryConfigHelperTest {
 
     // Get the recovery configuration
     RecoveryConfig recoveryConfig = recoveryConfigHelper.getRecoveryConfig(cluster.getClusterName(), DummyHostname1);
-    assertEquals(recoveryConfig.getEnabledComponents(), "DATANODE,NAMENODE");
+    assertEquals(Lists.newArrayList(
+      new RecoveryConfigComponent(DATANODE, HDFS, State.INIT),
+      new RecoveryConfigComponent(NAMENODE, HDFS, State.INIT)
+    ), recoveryConfig.getEnabledComponents());
 
     // Uninstall HDFS::DATANODE from host1
     hdfs.getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1).delete(new DeleteHostComponentStatusMetaData());
 
     // Verify the new config
     recoveryConfig = recoveryConfigHelper.getRecoveryConfig(cluster.getClusterName(), DummyHostname1);
-    assertEquals(recoveryConfig.getEnabledComponents(), "NAMENODE");
+    assertEquals(Lists.newArrayList(
+      new RecoveryConfigComponent(NAMENODE, HDFS, State.INIT)
+    ), recoveryConfig.getEnabledComponents());
   }
 
   /**
@@ -216,7 +228,9 @@ public class RecoveryConfigHelperTest {
 
     // Get the recovery configuration
     RecoveryConfig recoveryConfig = recoveryConfigHelper.getRecoveryConfig(cluster.getClusterName(), DummyHostname1);
-    assertEquals(recoveryConfig.getEnabledComponents(), "DATANODE");
+    assertEquals(Lists.newArrayList(
+      new RecoveryConfigComponent(DATANODE, HDFS, State.INSTALLED)
+    ), recoveryConfig.getEnabledComponents());
 
     // Get cluser-env config and turn off recovery for the cluster
     Config config = cluster.getDesiredConfigByType("cluster-env");
@@ -238,8 +252,7 @@ public class RecoveryConfigHelperTest {
    * @throws Exception
    */
   @Test
-  public void testMaintenanceModeChanged()
-      throws Exception {
+  public void testMaintenanceModeChanged() throws Exception {
     Cluster cluster = heartbeatTestHelper.getDummyCluster();
     RepositoryVersionEntity repositoryVersion = helper.getOrCreateRepositoryVersion(cluster);
     Service hdfs = cluster.addService(HDFS, repositoryVersion);
@@ -252,13 +265,18 @@ public class RecoveryConfigHelperTest {
 
     // Get the recovery configuration
     RecoveryConfig recoveryConfig = recoveryConfigHelper.getRecoveryConfig(cluster.getClusterName(), DummyHostname1);
-    assertEquals(recoveryConfig.getEnabledComponents(), "DATANODE,NAMENODE");
+    assertEquals(Lists.newArrayList(
+      new RecoveryConfigComponent(DATANODE, HDFS, State.INIT),
+      new RecoveryConfigComponent(NAMENODE, HDFS, State.INIT)
+    ), recoveryConfig.getEnabledComponents());
 
     hdfs.getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1).setMaintenanceState(MaintenanceState.ON);
 
     // Only NAMENODE is left
     recoveryConfig = recoveryConfigHelper.getRecoveryConfig(cluster.getClusterName(), DummyHostname1);
-    assertEquals(recoveryConfig.getEnabledComponents(), "NAMENODE");
+    assertEquals(Lists.newArrayList(
+      new RecoveryConfigComponent(NAMENODE, HDFS, State.INIT)
+    ), recoveryConfig.getEnabledComponents());
   }
 
   /**
@@ -267,8 +285,7 @@ public class RecoveryConfigHelperTest {
    * @throws Exception
    */
   @Test
-  public void testServiceComponentRecoveryChanged()
-      throws Exception {
+  public void testServiceComponentRecoveryChanged() throws Exception {
     Cluster cluster = heartbeatTestHelper.getDummyCluster();
     RepositoryVersionEntity repositoryVersion = helper.getOrCreateRepositoryVersion(cluster);
     Service hdfs = cluster.addService(HDFS, repositoryVersion);
@@ -278,14 +295,16 @@ public class RecoveryConfigHelperTest {
 
     // Get the recovery configuration
     RecoveryConfig recoveryConfig = recoveryConfigHelper.getRecoveryConfig(cluster.getClusterName(), DummyHostname1);
-    assertEquals(recoveryConfig.getEnabledComponents(), "DATANODE");
+    assertEquals(Lists.newArrayList(
+      new RecoveryConfigComponent(DATANODE, HDFS, State.INIT)
+    ), recoveryConfig.getEnabledComponents());
 
     // Turn off auto start for HDFS::DATANODE
     hdfs.getServiceComponent(DATANODE).setRecoveryEnabled(false);
 
     // Get the latest config. DATANODE should not be present.
     recoveryConfig = recoveryConfigHelper.getRecoveryConfig(cluster.getClusterName(), DummyHostname1);
-    assertEquals(recoveryConfig.getEnabledComponents(), "");
+    assertEquals(new ArrayList<RecoveryConfigComponent>(), recoveryConfig.getEnabledComponents());
   }
 
   /**
@@ -319,7 +338,9 @@ public class RecoveryConfigHelperTest {
     // Simulate registration for Host1: Get the recovery configuration right away for Host1.
     // It makes an entry for cluster name and Host1 in the timestamp dictionary.
     RecoveryConfig recoveryConfig = recoveryConfigHelper.getRecoveryConfig(cluster.getClusterName(), "Host1");
-    assertEquals(recoveryConfig.getEnabledComponents(), "DATANODE");
+    assertEquals(Lists.newArrayList(
+      new RecoveryConfigComponent(DATANODE, HDFS, State.INIT)
+    ), recoveryConfig.getEnabledComponents());
 
     // Simulate heartbeat for Host2: When second host heartbeats, it first checks if config stale.
     // This should return true since it did not get the configuration during registration.

-- 
To stop receiving notification emails like this one, please contact
hapylestat@apache.org.