You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by sm...@apache.org on 2016/06/08 23:32:33 UTC

ambari git commit: AMBARI-16935: Retry and recover from component install failures

Repository: ambari
Updated Branches:
  refs/heads/trunk 56fc6ee14 -> 60ad7b270


AMBARI-16935: Retry and recover from component install failures


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/60ad7b27
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/60ad7b27
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/60ad7b27

Branch: refs/heads/trunk
Commit: 60ad7b270b4ed37ff68513e4609a9467d2c50976
Parents: 56fc6ee
Author: Nahappan Somasundaram <ns...@hortonworks.com>
Authored: Tue May 31 15:10:09 2016 -0700
Committer: Nahappan Somasundaram <ns...@hortonworks.com>
Committed: Wed Jun 8 12:21:52 2016 -0700

----------------------------------------------------------------------
 .../src/main/python/ambari_agent/ActionQueue.py | 12 +++-
 .../main/python/ambari_agent/RecoveryManager.py | 42 +++++++++++---
 .../python/ambari_agent/TestRecoveryManager.py  | 58 +++++++++++++-------
 3 files changed, 82 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/60ad7b27/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
index 4de5390..f217a54 100644
--- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
+++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
@@ -426,6 +426,14 @@ class ActionQueue(threading.Thread):
                                                 command['hostLevelParams']['clientsToUpdateConfigs'])
         roleResult['configurationTags'] = configHandler.read_actual_component(
             command['role'])
+    elif status == self.FAILED_STATUS:
+      if self.controller.recovery_manager.enabled() and command.has_key('roleCommand') \
+              and self.controller.recovery_manager.configured_for_recovery(command['role']):
+        if command['roleCommand'] == self.ROLE_COMMAND_INSTALL:
+          self.controller.recovery_manager.update_current_status(command['role'], self.controller.recovery_manager.INSTALL_FAILED)
+          logger.info("After EXECUTION_COMMAND (INSTALL), with taskId=" + str(command['taskId']) +
+                      ", current state of " + command['role'] + " to " +
+                      self.controller.recovery_manager.get_current_status(command['role']))
 
     self.commandStatuses.put_command_status(command, roleResult)
 
@@ -511,7 +519,9 @@ class ActionQueue(threading.Thread):
         component_status = LiveStatus.DEAD_STATUS
         if self.controller.recovery_manager.enabled() \
           and self.controller.recovery_manager.configured_for_recovery(component):
-          self.controller.recovery_manager.update_current_status(component, component_status)
+          if (self.controller.recovery_manager.get_current_status(component) != self.controller.recovery_manager.INSTALL_FAILED):
+            self.controller.recovery_manager.update_current_status(component, component_status)
+
       request_execution_cmd = self.controller.recovery_manager.requires_recovery(component) and \
                                 not self.controller.recovery_manager.command_exists(component, ActionQueue.EXECUTION_COMMAND)
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/60ad7b27/ambari-agent/src/main/python/ambari_agent/RecoveryManager.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/RecoveryManager.py b/ambari-agent/src/main/python/ambari_agent/RecoveryManager.py
index 87d9483..be335f2 100644
--- a/ambari-agent/src/main/python/ambari_agent/RecoveryManager.py
+++ b/ambari-agent/src/main/python/ambari_agent/RecoveryManager.py
@@ -54,6 +54,7 @@ class RecoveryManager:
   STARTED = "STARTED"
   INSTALLED = "INSTALLED"
   INIT = "INIT"  # TODO: What is the state when machine is reset
+  INSTALL_FAILED = "INSTALL_FAILED"
   COMPONENT_UPDATE_KEY_FORMAT = "{0}_UPDATE_TIME"
   COMMAND_REFRESH_DELAY_SEC = 600 #10 minutes
 
@@ -75,9 +76,10 @@ class RecoveryManager:
     "stale_config": False
   }
 
-  def __init__(self, cache_dir, recovery_enabled=False, auto_start_only=False):
+  def __init__(self, cache_dir, recovery_enabled=False, auto_start_only=False, auto_install_start=False):
     self.recovery_enabled = recovery_enabled
     self.auto_start_only = auto_start_only
+    self.auto_install_start = auto_install_start
     self.max_count = 6
     self.window_in_min = 60
     self.retry_gap = 5
@@ -107,7 +109,7 @@ class RecoveryManager:
 
     self.actions = {}
 
-    self.update_config(6, 60, 5, 12, recovery_enabled, auto_start_only, "", -1)
+    self.update_config(6, 60, 5, 12, recovery_enabled, auto_start_only, auto_install_start, "", -1)
 
     pass
 
@@ -238,7 +240,7 @@ class RecoveryManager:
       return False
 
     status = self.statuses[component]
-    if self.auto_start_only:
+    if self.auto_start_only or self.auto_install_start:
       if status["current"] == status["desired"]:
         return False
       if status["desired"] not in self.allowed_desired_states:
@@ -306,6 +308,8 @@ class RecoveryManager:
     This method computes the recovery commands for the following transitions
     INSTALLED --> STARTED
     INIT --> INSTALLED
+    INSTALLED_FAILED --> INSTALLED
+    INSTALLED_FAILED --> STARTED
     """
     commands = []
     for component in self.statuses.keys():
@@ -316,6 +320,15 @@ class RecoveryManager:
           if status["desired"] == self.STARTED:
             if status["current"] == self.INSTALLED:
               command = self.get_start_command(component)
+        elif self.auto_install_start:
+          if status["desired"] == self.STARTED:
+            if status["current"] == self.INSTALLED:
+              command = self.get_start_command(component)
+            elif status["current"] == self.INSTALL_FAILED:
+              command = self.get_install_command(component)
+          elif status["desired"] == self.INSTALLED:
+            if status["current"] == self.INSTALL_FAILED:
+              command = self.get_install_command(component)
         else:
           # START, INSTALL, RESTART
           if status["desired"] != status["current"]:
@@ -324,9 +337,13 @@ class RecoveryManager:
                 command = self.get_start_command(component)
               elif status["current"] == self.INIT:
                 command = self.get_install_command(component)
+              elif status["current"] == self.INSTALL_FAILED:
+                command = self.get_install_command(component)
             elif status["desired"] == self.INSTALLED:
               if status["current"] == self.INIT:
                 command = self.get_install_command(component)
+              elif status["current"] == self.INSTALL_FAILED:
+                command = self.get_install_command(component)
               elif status["current"] == self.STARTED:
                 command = self.get_stop_command(component)
           else:
@@ -536,7 +553,7 @@ class RecoveryManager:
     """
     TODO: Server sends the recovery configuration - call update_config after parsing
     "recoveryConfig": {
-      "type" : "DEFAULT|AUTO_START|FULL",
+      "type" : "DEFAULT|AUTO_START|AUTO_INSTALL_START|FULL",
       "maxCount" : 10,
       "windowInMinutes" : 60,
       "retryGap" : 0,
@@ -547,6 +564,7 @@ class RecoveryManager:
 
     recovery_enabled = False
     auto_start_only = False
+    auto_install_start = False
     max_count = 6
     window_in_min = 60
     retry_gap = 5
@@ -559,10 +577,13 @@ class RecoveryManager:
       logger.info("RecoverConfig = " + pprint.pformat(reg_resp["recoveryConfig"]))
       config = reg_resp["recoveryConfig"]
       if "type" in config:
-        if config["type"] in ["AUTO_START", "FULL"]:
+        if config["type"] in ["AUTO_INSTALL_START", "AUTO_START", "FULL"]:
           recovery_enabled = True
           if config["type"] == "AUTO_START":
             auto_start_only = True
+          elif config["type"] == "AUTO_INSTALL_START":
+            auto_install_start = True
+
       if "maxCount" in config:
         max_count = self._read_int_(config["maxCount"], max_count)
       if "windowInMinutes" in config:
@@ -579,7 +600,7 @@ class RecoveryManager:
         recovery_timestamp = config['recoveryTimestamp']
 
     self.update_config(max_count, window_in_min, retry_gap, max_lifetime_count, recovery_enabled, auto_start_only,
-                       enabled_components, recovery_timestamp)
+                       auto_install_start, enabled_components, recovery_timestamp)
     pass
 
   """
@@ -591,11 +612,12 @@ class RecoveryManager:
   max_lifetime_count - Configured maximum lifetime count of recovery attempt allowed per host component.
   recovery_enabled - True or False. Indicates whether recovery is enabled or not.
   auto_start_only - True if AUTO_START recovery type was specified. False otherwise.
+  auto_install_start - True if AUTO_INSTALL_START recovery type was specified. False otherwise.
   enabled_components - CSV of componenents enabled for auto start.
   recovery_timestamp - Timestamp when the recovery values were last updated. -1 on start up.
   """
   def update_config(self, max_count, window_in_min, retry_gap, max_lifetime_count, recovery_enabled,
-                    auto_start_only, enabled_components, recovery_timestamp):
+                    auto_start_only, auto_install_start, enabled_components, recovery_timestamp):
     """
     Update recovery configuration, recovery is disabled if configuration values
     are not correct
@@ -625,16 +647,20 @@ class RecoveryManager:
     self.window_in_sec = window_in_min * 60
     self.retry_gap_in_sec = retry_gap * 60
     self.auto_start_only = auto_start_only
+    self.auto_install_start = auto_install_start
     self.max_lifetime_count = max_lifetime_count
     self.enabled_components = []
     self.recovery_timestamp = recovery_timestamp
 
     self.allowed_desired_states = [self.STARTED, self.INSTALLED]
-    self.allowed_current_states = [self.INIT, self.INSTALLED, self.STARTED]
+    self.allowed_current_states = [self.INIT, self.INSTALL_FAILED, self.INSTALLED, self.STARTED]
 
     if self.auto_start_only:
       self.allowed_desired_states = [self.STARTED]
       self.allowed_current_states = [self.INSTALLED]
+    elif self.auto_install_start:
+      self.allowed_desired_states = [self.INSTALLED, self.STARTED]
+      self.allowed_current_states = [self.INSTALL_FAILED, self.INSTALLED]
 
     if enabled_components is not None and len(enabled_components) > 0:
       components = enabled_components.split(",")

http://git-wip-us.apache.org/repos/asf/ambari/blob/60ad7b27/ambari-agent/src/test/python/ambari_agent/TestRecoveryManager.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestRecoveryManager.py b/ambari-agent/src/test/python/ambari_agent/TestRecoveryManager.py
index ed0fd2f..f7382d5 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestRecoveryManager.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestRecoveryManager.py
@@ -372,7 +372,7 @@ class _TestRecoveryManager(TestCase):
   @patch.object(RecoveryManager, "_now_")
   def test_get_recovery_commands(self, time_mock):
     time_mock.side_effect = \
-      [1000, 1001, 1002, 1003,
+      [1000, 1001, 1002, 1003, 1004,
        1100, 1101, 1102,
        1200, 1201, 1203,
        4000, 4001, 4002, 4003,
@@ -380,12 +380,12 @@ class _TestRecoveryManager(TestCase):
        4200, 4201, 4202,
        4300, 4301, 4302]
     rm = RecoveryManager(tempfile.mktemp(), True)
-    rm.update_config(15, 5, 1, 16, True, False, "", -1)
+    rm.update_config(15, 5, 1, 16, True, False, False, "", -1)
 
     command1 = copy.deepcopy(self.command)
 
     rm.store_or_update_command(command1)
-    rm.update_config(12, 5, 1, 15, True, False, "NODEMANAGER", -1)
+    rm.update_config(12, 5, 1, 15, True, False, False, "NODEMANAGER", -1)
     rm.update_current_status("NODEMANAGER", "INSTALLED")
     rm.update_desired_status("NODEMANAGER", "STARTED")
     self.assertEqual("INSTALLED", rm.get_current_status("NODEMANAGER"))
@@ -395,6 +395,14 @@ class _TestRecoveryManager(TestCase):
     self.assertEqual(1, len(commands))
     self.assertEqual("START", commands[0]["roleCommand"])
 
+    rm.update_config(2, 5, 1, 5, True, False, True, "NODEMANAGER", -1)
+    rm.update_current_status("NODEMANAGER", "INSTALL_FAILED")
+    rm.update_desired_status("NODEMANAGER", "INSTALLED")
+
+    commands = rm.get_recovery_commands()
+    self.assertEqual(1, len(commands))
+    self.assertEqual("INSTALL", commands[0]["roleCommand"])
+
     rm.update_current_status("NODEMANAGER", "INIT")
     rm.update_desired_status("NODEMANAGER", "STARTED")
 
@@ -411,14 +419,14 @@ class _TestRecoveryManager(TestCase):
     self.assertEqual(1, len(commands))
     self.assertEqual("INSTALL", commands[0]["roleCommand"])
 
-    rm.update_config(2, 5, 1, 5, True, True, "", -1)
+    rm.update_config(2, 5, 1, 5, True, True, False, "", -1)
     rm.update_current_status("NODEMANAGER", "INIT")
     rm.update_desired_status("NODEMANAGER", "INSTALLED")
 
     commands = rm.get_recovery_commands()
     self.assertEqual(0, len(commands))
 
-    rm.update_config(12, 5, 1, 15, True, False, "NODEMANAGER", -1)
+    rm.update_config(12, 5, 1, 15, True, False, False, "NODEMANAGER", -1)
     rm.update_current_status("NODEMANAGER", "INIT")
     rm.update_desired_status("NODEMANAGER", "INSTALLED")
 
@@ -459,25 +467,25 @@ class _TestRecoveryManager(TestCase):
   def test_update_rm_config(self, mock_uc):
     rm = RecoveryManager(tempfile.mktemp())
     rm.update_configuration_from_registration(None)
-    mock_uc.assert_has_calls([call(6, 60, 5, 12, False, False, "", -1)])
+    mock_uc.assert_has_calls([call(6, 60, 5, 12, False, False, False, "", -1)])
 
     mock_uc.reset_mock()
     rm.update_configuration_from_registration({})
-    mock_uc.assert_has_calls([call(6, 60, 5, 12, False, False, "", -1)])
+    mock_uc.assert_has_calls([call(6, 60, 5, 12, False, False, False, "", -1)])
 
     mock_uc.reset_mock()
     rm.update_configuration_from_registration(
       {"recoveryConfig": {
       "type" : "DEFAULT"}}
     )
-    mock_uc.assert_has_calls([call(6, 60, 5, 12, False, False, "", -1)])
+    mock_uc.assert_has_calls([call(6, 60, 5, 12, False, False, False, "", -1)])
 
     mock_uc.reset_mock()
     rm.update_configuration_from_registration(
       {"recoveryConfig": {
         "type" : "FULL"}}
     )
-    mock_uc.assert_has_calls([call(6, 60, 5, 12, True, False, "", -1)])
+    mock_uc.assert_has_calls([call(6, 60, 5, 12, True, False, False, "", -1)])
 
     mock_uc.reset_mock()
     rm.update_configuration_from_registration(
@@ -485,7 +493,15 @@ class _TestRecoveryManager(TestCase):
         "type" : "AUTO_START",
         "max_count" : "med"}}
     )
-    mock_uc.assert_has_calls([call(6, 60, 5, 12, True, True, "", -1)])
+    mock_uc.assert_has_calls([call(6, 60, 5, 12, True, True, False, "", -1)])
+
+    mock_uc.reset_mock()
+    rm.update_configuration_from_registration(
+      {"recoveryConfig": {
+        "type" : "AUTO_INSTALL_START",
+        "max_count" : "med"}}
+    )
+    mock_uc.assert_has_calls([call(6, 60, 5, 12, True, False, True, "", -1)])
 
     mock_uc.reset_mock()
     rm.update_configuration_from_registration(
@@ -498,7 +514,7 @@ class _TestRecoveryManager(TestCase):
         "components" : " A,B",
         "recoveryTimestamp" : 1}}
     )
-    mock_uc.assert_has_calls([call(5, 20, 2, 5, True, True, " A,B", 1)])
+    mock_uc.assert_has_calls([call(5, 20, 2, 5, True, True, False, " A,B", 1)])
   pass
 
   @patch.object(RecoveryManager, "_now_")
@@ -510,7 +526,7 @@ class _TestRecoveryManager(TestCase):
     rec_st = rm.get_recovery_status()
     self.assertEquals(rec_st, {"summary": "DISABLED"})
 
-    rm.update_config(2, 5, 1, 4, True, True, "", -1)
+    rm.update_config(2, 5, 1, 4, True, True, False, "", -1)
     rec_st = rm.get_recovery_status()
     self.assertEquals(rec_st, {"summary": "RECOVERABLE", "componentReports": []})
 
@@ -554,12 +570,12 @@ class _TestRecoveryManager(TestCase):
       [1000, 1001, 1002, 1003, 1104, 1105, 1106, 1807, 1808, 1809, 1810, 1811, 1812]
 
     rm = RecoveryManager(tempfile.mktemp(), True)
-    rm.update_config(5, 5, 1, 11, True, False, "", -1)
+    rm.update_config(5, 5, 1, 11, True, False, False, "", -1)
 
     command1 = copy.deepcopy(self.command)
 
     rm.store_or_update_command(command1)
-    rm.update_config(12, 5, 1, 15, True, False, "NODEMANAGER", -1)
+    rm.update_config(12, 5, 1, 15, True, False, False, "NODEMANAGER", -1)
     rm.update_current_status("NODEMANAGER", "INSTALLED")
     rm.update_desired_status("NODEMANAGER", "STARTED")
 
@@ -595,24 +611,24 @@ class _TestRecoveryManager(TestCase):
 
   def test_configured_for_recovery(self):
     rm = RecoveryManager(tempfile.mktemp(), True)
-    rm.update_config(12, 5, 1, 15, True, False, "A,B", -1)
+    rm.update_config(12, 5, 1, 15, True, False, False, "A,B", -1)
     self.assertTrue(rm.configured_for_recovery("A"))
     self.assertTrue(rm.configured_for_recovery("B"))
 
-    rm.update_config(5, 5, 1, 11, True, False, "", -1)
+    rm.update_config(5, 5, 1, 11, True, False, False, "", -1)
     self.assertFalse(rm.configured_for_recovery("A"))
     self.assertFalse(rm.configured_for_recovery("B"))
 
-    rm.update_config(5, 5, 1, 11, True, False, "A", -1)
+    rm.update_config(5, 5, 1, 11, True, False, False, "A", -1)
     self.assertTrue(rm.configured_for_recovery("A"))
     self.assertFalse(rm.configured_for_recovery("B"))
 
-    rm.update_config(5, 5, 1, 11, True, False, "A", -1)
+    rm.update_config(5, 5, 1, 11, True, False, False, "A", -1)
     self.assertTrue(rm.configured_for_recovery("A"))
     self.assertFalse(rm.configured_for_recovery("B"))
     self.assertFalse(rm.configured_for_recovery("C"))
 
-    rm.update_config(5, 5, 1, 11, True, False, "A, D, F ", -1)
+    rm.update_config(5, 5, 1, 11, True, False, False, "A, D, F ", -1)
     self.assertTrue(rm.configured_for_recovery("A"))
     self.assertFalse(rm.configured_for_recovery("B"))
     self.assertFalse(rm.configured_for_recovery("C"))
@@ -626,7 +642,7 @@ class _TestRecoveryManager(TestCase):
       [1000, 1071, 1372]
     rm = RecoveryManager(tempfile.mktemp(), True)
 
-    rm.update_config(2, 5, 1, 4, True, True, "", -1)
+    rm.update_config(2, 5, 1, 4, True, True, False, "", -1)
 
     rm.execute("COMPONENT")
     actions = rm.get_actions_copy()["COMPONENT"]
@@ -644,7 +660,7 @@ class _TestRecoveryManager(TestCase):
   def test_is_action_info_stale(self, time_mock):
 
     rm = RecoveryManager(tempfile.mktemp(), True)
-    rm.update_config(5, 60, 5, 16, True, False, "", -1)
+    rm.update_config(5, 60, 5, 16, True, False, False, "", -1)
 
     time_mock.return_value = 0
     self.assertFalse(rm.is_action_info_stale("COMPONENT_NAME"))