You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by sm...@apache.org on 2016/06/08 23:32:33 UTC
ambari git commit: AMBARI-16935: Retry and recover from component
install failures
Repository: ambari
Updated Branches:
refs/heads/trunk 56fc6ee14 -> 60ad7b270
AMBARI-16935: Retry and recover from component install failures
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/60ad7b27
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/60ad7b27
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/60ad7b27
Branch: refs/heads/trunk
Commit: 60ad7b270b4ed37ff68513e4609a9467d2c50976
Parents: 56fc6ee
Author: Nahappan Somasundaram <ns...@hortonworks.com>
Authored: Tue May 31 15:10:09 2016 -0700
Committer: Nahappan Somasundaram <ns...@hortonworks.com>
Committed: Wed Jun 8 12:21:52 2016 -0700
----------------------------------------------------------------------
.../src/main/python/ambari_agent/ActionQueue.py | 12 +++-
.../main/python/ambari_agent/RecoveryManager.py | 42 +++++++++++---
.../python/ambari_agent/TestRecoveryManager.py | 58 +++++++++++++-------
3 files changed, 82 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/60ad7b27/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
index 4de5390..f217a54 100644
--- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
+++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
@@ -426,6 +426,14 @@ class ActionQueue(threading.Thread):
command['hostLevelParams']['clientsToUpdateConfigs'])
roleResult['configurationTags'] = configHandler.read_actual_component(
command['role'])
+ elif status == self.FAILED_STATUS:
+ if self.controller.recovery_manager.enabled() and command.has_key('roleCommand') \
+ and self.controller.recovery_manager.configured_for_recovery(command['role']):
+ if command['roleCommand'] == self.ROLE_COMMAND_INSTALL:
+ self.controller.recovery_manager.update_current_status(command['role'], self.controller.recovery_manager.INSTALL_FAILED)
+ logger.info("After EXECUTION_COMMAND (INSTALL), with taskId=" + str(command['taskId']) +
+ ", current state of " + command['role'] + " to " +
+ self.controller.recovery_manager.get_current_status(command['role']))
self.commandStatuses.put_command_status(command, roleResult)
@@ -511,7 +519,9 @@ class ActionQueue(threading.Thread):
component_status = LiveStatus.DEAD_STATUS
if self.controller.recovery_manager.enabled() \
and self.controller.recovery_manager.configured_for_recovery(component):
- self.controller.recovery_manager.update_current_status(component, component_status)
+ if (self.controller.recovery_manager.get_current_status(component) != self.controller.recovery_manager.INSTALL_FAILED):
+ self.controller.recovery_manager.update_current_status(component, component_status)
+
request_execution_cmd = self.controller.recovery_manager.requires_recovery(component) and \
not self.controller.recovery_manager.command_exists(component, ActionQueue.EXECUTION_COMMAND)
http://git-wip-us.apache.org/repos/asf/ambari/blob/60ad7b27/ambari-agent/src/main/python/ambari_agent/RecoveryManager.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/RecoveryManager.py b/ambari-agent/src/main/python/ambari_agent/RecoveryManager.py
index 87d9483..be335f2 100644
--- a/ambari-agent/src/main/python/ambari_agent/RecoveryManager.py
+++ b/ambari-agent/src/main/python/ambari_agent/RecoveryManager.py
@@ -54,6 +54,7 @@ class RecoveryManager:
STARTED = "STARTED"
INSTALLED = "INSTALLED"
INIT = "INIT" # TODO: What is the state when machine is reset
+ INSTALL_FAILED = "INSTALL_FAILED"
COMPONENT_UPDATE_KEY_FORMAT = "{0}_UPDATE_TIME"
COMMAND_REFRESH_DELAY_SEC = 600 #10 minutes
@@ -75,9 +76,10 @@ class RecoveryManager:
"stale_config": False
}
- def __init__(self, cache_dir, recovery_enabled=False, auto_start_only=False):
+ def __init__(self, cache_dir, recovery_enabled=False, auto_start_only=False, auto_install_start=False):
self.recovery_enabled = recovery_enabled
self.auto_start_only = auto_start_only
+ self.auto_install_start = auto_install_start
self.max_count = 6
self.window_in_min = 60
self.retry_gap = 5
@@ -107,7 +109,7 @@ class RecoveryManager:
self.actions = {}
- self.update_config(6, 60, 5, 12, recovery_enabled, auto_start_only, "", -1)
+ self.update_config(6, 60, 5, 12, recovery_enabled, auto_start_only, auto_install_start, "", -1)
pass
@@ -238,7 +240,7 @@ class RecoveryManager:
return False
status = self.statuses[component]
- if self.auto_start_only:
+ if self.auto_start_only or self.auto_install_start:
if status["current"] == status["desired"]:
return False
if status["desired"] not in self.allowed_desired_states:
@@ -306,6 +308,8 @@ class RecoveryManager:
This method computes the recovery commands for the following transitions
INSTALLED --> STARTED
INIT --> INSTALLED
+ INSTALLED_FAILED --> INSTALLED
+ INSTALLED_FAILED --> STARTED
"""
commands = []
for component in self.statuses.keys():
@@ -316,6 +320,15 @@ class RecoveryManager:
if status["desired"] == self.STARTED:
if status["current"] == self.INSTALLED:
command = self.get_start_command(component)
+ elif self.auto_install_start:
+ if status["desired"] == self.STARTED:
+ if status["current"] == self.INSTALLED:
+ command = self.get_start_command(component)
+ elif status["current"] == self.INSTALL_FAILED:
+ command = self.get_install_command(component)
+ elif status["desired"] == self.INSTALLED:
+ if status["current"] == self.INSTALL_FAILED:
+ command = self.get_install_command(component)
else:
# START, INSTALL, RESTART
if status["desired"] != status["current"]:
@@ -324,9 +337,13 @@ class RecoveryManager:
command = self.get_start_command(component)
elif status["current"] == self.INIT:
command = self.get_install_command(component)
+ elif status["current"] == self.INSTALL_FAILED:
+ command = self.get_install_command(component)
elif status["desired"] == self.INSTALLED:
if status["current"] == self.INIT:
command = self.get_install_command(component)
+ elif status["current"] == self.INSTALL_FAILED:
+ command = self.get_install_command(component)
elif status["current"] == self.STARTED:
command = self.get_stop_command(component)
else:
@@ -536,7 +553,7 @@ class RecoveryManager:
"""
TODO: Server sends the recovery configuration - call update_config after parsing
"recoveryConfig": {
- "type" : "DEFAULT|AUTO_START|FULL",
+ "type" : "DEFAULT|AUTO_START|AUTO_INSTALL_START|FULL",
"maxCount" : 10,
"windowInMinutes" : 60,
"retryGap" : 0,
@@ -547,6 +564,7 @@ class RecoveryManager:
recovery_enabled = False
auto_start_only = False
+ auto_install_start = False
max_count = 6
window_in_min = 60
retry_gap = 5
@@ -559,10 +577,13 @@ class RecoveryManager:
logger.info("RecoverConfig = " + pprint.pformat(reg_resp["recoveryConfig"]))
config = reg_resp["recoveryConfig"]
if "type" in config:
- if config["type"] in ["AUTO_START", "FULL"]:
+ if config["type"] in ["AUTO_INSTALL_START", "AUTO_START", "FULL"]:
recovery_enabled = True
if config["type"] == "AUTO_START":
auto_start_only = True
+ elif config["type"] == "AUTO_INSTALL_START":
+ auto_install_start = True
+
if "maxCount" in config:
max_count = self._read_int_(config["maxCount"], max_count)
if "windowInMinutes" in config:
@@ -579,7 +600,7 @@ class RecoveryManager:
recovery_timestamp = config['recoveryTimestamp']
self.update_config(max_count, window_in_min, retry_gap, max_lifetime_count, recovery_enabled, auto_start_only,
- enabled_components, recovery_timestamp)
+ auto_install_start, enabled_components, recovery_timestamp)
pass
"""
@@ -591,11 +612,12 @@ class RecoveryManager:
max_lifetime_count - Configured maximum lifetime count of recovery attempt allowed per host component.
recovery_enabled - True or False. Indicates whether recovery is enabled or not.
auto_start_only - True if AUTO_START recovery type was specified. False otherwise.
+ auto_install_start - True if AUTO_INSTALL_START recovery type was specified. False otherwise.
enabled_components - CSV of componenents enabled for auto start.
recovery_timestamp - Timestamp when the recovery values were last updated. -1 on start up.
"""
def update_config(self, max_count, window_in_min, retry_gap, max_lifetime_count, recovery_enabled,
- auto_start_only, enabled_components, recovery_timestamp):
+ auto_start_only, auto_install_start, enabled_components, recovery_timestamp):
"""
Update recovery configuration, recovery is disabled if configuration values
are not correct
@@ -625,16 +647,20 @@ class RecoveryManager:
self.window_in_sec = window_in_min * 60
self.retry_gap_in_sec = retry_gap * 60
self.auto_start_only = auto_start_only
+ self.auto_install_start = auto_install_start
self.max_lifetime_count = max_lifetime_count
self.enabled_components = []
self.recovery_timestamp = recovery_timestamp
self.allowed_desired_states = [self.STARTED, self.INSTALLED]
- self.allowed_current_states = [self.INIT, self.INSTALLED, self.STARTED]
+ self.allowed_current_states = [self.INIT, self.INSTALL_FAILED, self.INSTALLED, self.STARTED]
if self.auto_start_only:
self.allowed_desired_states = [self.STARTED]
self.allowed_current_states = [self.INSTALLED]
+ elif self.auto_install_start:
+ self.allowed_desired_states = [self.INSTALLED, self.STARTED]
+ self.allowed_current_states = [self.INSTALL_FAILED, self.INSTALLED]
if enabled_components is not None and len(enabled_components) > 0:
components = enabled_components.split(",")
http://git-wip-us.apache.org/repos/asf/ambari/blob/60ad7b27/ambari-agent/src/test/python/ambari_agent/TestRecoveryManager.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestRecoveryManager.py b/ambari-agent/src/test/python/ambari_agent/TestRecoveryManager.py
index ed0fd2f..f7382d5 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestRecoveryManager.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestRecoveryManager.py
@@ -372,7 +372,7 @@ class _TestRecoveryManager(TestCase):
@patch.object(RecoveryManager, "_now_")
def test_get_recovery_commands(self, time_mock):
time_mock.side_effect = \
- [1000, 1001, 1002, 1003,
+ [1000, 1001, 1002, 1003, 1004,
1100, 1101, 1102,
1200, 1201, 1203,
4000, 4001, 4002, 4003,
@@ -380,12 +380,12 @@ class _TestRecoveryManager(TestCase):
4200, 4201, 4202,
4300, 4301, 4302]
rm = RecoveryManager(tempfile.mktemp(), True)
- rm.update_config(15, 5, 1, 16, True, False, "", -1)
+ rm.update_config(15, 5, 1, 16, True, False, False, "", -1)
command1 = copy.deepcopy(self.command)
rm.store_or_update_command(command1)
- rm.update_config(12, 5, 1, 15, True, False, "NODEMANAGER", -1)
+ rm.update_config(12, 5, 1, 15, True, False, False, "NODEMANAGER", -1)
rm.update_current_status("NODEMANAGER", "INSTALLED")
rm.update_desired_status("NODEMANAGER", "STARTED")
self.assertEqual("INSTALLED", rm.get_current_status("NODEMANAGER"))
@@ -395,6 +395,14 @@ class _TestRecoveryManager(TestCase):
self.assertEqual(1, len(commands))
self.assertEqual("START", commands[0]["roleCommand"])
+ rm.update_config(2, 5, 1, 5, True, False, True, "NODEMANAGER", -1)
+ rm.update_current_status("NODEMANAGER", "INSTALL_FAILED")
+ rm.update_desired_status("NODEMANAGER", "INSTALLED")
+
+ commands = rm.get_recovery_commands()
+ self.assertEqual(1, len(commands))
+ self.assertEqual("INSTALL", commands[0]["roleCommand"])
+
rm.update_current_status("NODEMANAGER", "INIT")
rm.update_desired_status("NODEMANAGER", "STARTED")
@@ -411,14 +419,14 @@ class _TestRecoveryManager(TestCase):
self.assertEqual(1, len(commands))
self.assertEqual("INSTALL", commands[0]["roleCommand"])
- rm.update_config(2, 5, 1, 5, True, True, "", -1)
+ rm.update_config(2, 5, 1, 5, True, True, False, "", -1)
rm.update_current_status("NODEMANAGER", "INIT")
rm.update_desired_status("NODEMANAGER", "INSTALLED")
commands = rm.get_recovery_commands()
self.assertEqual(0, len(commands))
- rm.update_config(12, 5, 1, 15, True, False, "NODEMANAGER", -1)
+ rm.update_config(12, 5, 1, 15, True, False, False, "NODEMANAGER", -1)
rm.update_current_status("NODEMANAGER", "INIT")
rm.update_desired_status("NODEMANAGER", "INSTALLED")
@@ -459,25 +467,25 @@ class _TestRecoveryManager(TestCase):
def test_update_rm_config(self, mock_uc):
rm = RecoveryManager(tempfile.mktemp())
rm.update_configuration_from_registration(None)
- mock_uc.assert_has_calls([call(6, 60, 5, 12, False, False, "", -1)])
+ mock_uc.assert_has_calls([call(6, 60, 5, 12, False, False, False, "", -1)])
mock_uc.reset_mock()
rm.update_configuration_from_registration({})
- mock_uc.assert_has_calls([call(6, 60, 5, 12, False, False, "", -1)])
+ mock_uc.assert_has_calls([call(6, 60, 5, 12, False, False, False, "", -1)])
mock_uc.reset_mock()
rm.update_configuration_from_registration(
{"recoveryConfig": {
"type" : "DEFAULT"}}
)
- mock_uc.assert_has_calls([call(6, 60, 5, 12, False, False, "", -1)])
+ mock_uc.assert_has_calls([call(6, 60, 5, 12, False, False, False, "", -1)])
mock_uc.reset_mock()
rm.update_configuration_from_registration(
{"recoveryConfig": {
"type" : "FULL"}}
)
- mock_uc.assert_has_calls([call(6, 60, 5, 12, True, False, "", -1)])
+ mock_uc.assert_has_calls([call(6, 60, 5, 12, True, False, False, "", -1)])
mock_uc.reset_mock()
rm.update_configuration_from_registration(
@@ -485,7 +493,15 @@ class _TestRecoveryManager(TestCase):
"type" : "AUTO_START",
"max_count" : "med"}}
)
- mock_uc.assert_has_calls([call(6, 60, 5, 12, True, True, "", -1)])
+ mock_uc.assert_has_calls([call(6, 60, 5, 12, True, True, False, "", -1)])
+
+ mock_uc.reset_mock()
+ rm.update_configuration_from_registration(
+ {"recoveryConfig": {
+ "type" : "AUTO_INSTALL_START",
+ "max_count" : "med"}}
+ )
+ mock_uc.assert_has_calls([call(6, 60, 5, 12, True, False, True, "", -1)])
mock_uc.reset_mock()
rm.update_configuration_from_registration(
@@ -498,7 +514,7 @@ class _TestRecoveryManager(TestCase):
"components" : " A,B",
"recoveryTimestamp" : 1}}
)
- mock_uc.assert_has_calls([call(5, 20, 2, 5, True, True, " A,B", 1)])
+ mock_uc.assert_has_calls([call(5, 20, 2, 5, True, True, False, " A,B", 1)])
pass
@patch.object(RecoveryManager, "_now_")
@@ -510,7 +526,7 @@ class _TestRecoveryManager(TestCase):
rec_st = rm.get_recovery_status()
self.assertEquals(rec_st, {"summary": "DISABLED"})
- rm.update_config(2, 5, 1, 4, True, True, "", -1)
+ rm.update_config(2, 5, 1, 4, True, True, False, "", -1)
rec_st = rm.get_recovery_status()
self.assertEquals(rec_st, {"summary": "RECOVERABLE", "componentReports": []})
@@ -554,12 +570,12 @@ class _TestRecoveryManager(TestCase):
[1000, 1001, 1002, 1003, 1104, 1105, 1106, 1807, 1808, 1809, 1810, 1811, 1812]
rm = RecoveryManager(tempfile.mktemp(), True)
- rm.update_config(5, 5, 1, 11, True, False, "", -1)
+ rm.update_config(5, 5, 1, 11, True, False, False, "", -1)
command1 = copy.deepcopy(self.command)
rm.store_or_update_command(command1)
- rm.update_config(12, 5, 1, 15, True, False, "NODEMANAGER", -1)
+ rm.update_config(12, 5, 1, 15, True, False, False, "NODEMANAGER", -1)
rm.update_current_status("NODEMANAGER", "INSTALLED")
rm.update_desired_status("NODEMANAGER", "STARTED")
@@ -595,24 +611,24 @@ class _TestRecoveryManager(TestCase):
def test_configured_for_recovery(self):
rm = RecoveryManager(tempfile.mktemp(), True)
- rm.update_config(12, 5, 1, 15, True, False, "A,B", -1)
+ rm.update_config(12, 5, 1, 15, True, False, False, "A,B", -1)
self.assertTrue(rm.configured_for_recovery("A"))
self.assertTrue(rm.configured_for_recovery("B"))
- rm.update_config(5, 5, 1, 11, True, False, "", -1)
+ rm.update_config(5, 5, 1, 11, True, False, False, "", -1)
self.assertFalse(rm.configured_for_recovery("A"))
self.assertFalse(rm.configured_for_recovery("B"))
- rm.update_config(5, 5, 1, 11, True, False, "A", -1)
+ rm.update_config(5, 5, 1, 11, True, False, False, "A", -1)
self.assertTrue(rm.configured_for_recovery("A"))
self.assertFalse(rm.configured_for_recovery("B"))
- rm.update_config(5, 5, 1, 11, True, False, "A", -1)
+ rm.update_config(5, 5, 1, 11, True, False, False, "A", -1)
self.assertTrue(rm.configured_for_recovery("A"))
self.assertFalse(rm.configured_for_recovery("B"))
self.assertFalse(rm.configured_for_recovery("C"))
- rm.update_config(5, 5, 1, 11, True, False, "A, D, F ", -1)
+ rm.update_config(5, 5, 1, 11, True, False, False, "A, D, F ", -1)
self.assertTrue(rm.configured_for_recovery("A"))
self.assertFalse(rm.configured_for_recovery("B"))
self.assertFalse(rm.configured_for_recovery("C"))
@@ -626,7 +642,7 @@ class _TestRecoveryManager(TestCase):
[1000, 1071, 1372]
rm = RecoveryManager(tempfile.mktemp(), True)
- rm.update_config(2, 5, 1, 4, True, True, "", -1)
+ rm.update_config(2, 5, 1, 4, True, True, False, "", -1)
rm.execute("COMPONENT")
actions = rm.get_actions_copy()["COMPONENT"]
@@ -644,7 +660,7 @@ class _TestRecoveryManager(TestCase):
def test_is_action_info_stale(self, time_mock):
rm = RecoveryManager(tempfile.mktemp(), True)
- rm.update_config(5, 60, 5, 16, True, False, "", -1)
+ rm.update_config(5, 60, 5, 16, True, False, False, "", -1)
time_mock.return_value = 0
self.assertFalse(rm.is_action_info_stale("COMPONENT_NAME"))