You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ao...@apache.org on 2018/06/22 13:58:17 UTC
[ambari] branch trunk updated: AMBARI-24166. Metric Collector goes
down after HDFS restart post EU
This is an automated email from the ASF dual-hosted git repository.
aonishuk pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/ambari.git
The following commit(s) were added to refs/heads/trunk by this push:
new 00274d4 AMBARI-24166. Metric Collector goes down after HDFS restart post EU
00274d4 is described below
commit 00274d4e25bbffbd21d4eb8fdecf47b70c68c7da
Author: aonishuk <ao...@hortonworks.com>
AuthorDate: Fri Jun 22 16:58:14 2018 +0300
AMBARI-24166. Metric Collector goes down after HDFS restart post EU
---
.../main/python/ambari_agent/InitializerModule.py | 2 +-
.../main/python/ambari_agent/RecoveryManager.py | 118 ++++++++-------
.../listeners/ConfigurationEventListener.py | 6 +
.../listeners/HostLevelParamsEventListener.py | 4 +-
.../test/python/ambari_agent/TestActionQueue.py | 2 +-
.../python/ambari_agent/TestRecoveryManager.py | 158 +++++++--------------
.../apache/ambari/server/agent/RecoveryConfig.java | 67 +--------
.../ambari/server/agent/RecoveryConfigHelper.java | 52 +------
.../ambari/server/agent/TestHeartbeatHandler.java | 10 --
.../agent/stomp/HostLevelParamsHolderTest.java | 18 +--
.../configuration/RecoveryConfigHelperTest.java | 73 ----------
11 files changed, 131 insertions(+), 379 deletions(-)
diff --git a/ambari-agent/src/main/python/ambari_agent/InitializerModule.py b/ambari-agent/src/main/python/ambari_agent/InitializerModule.py
index b15ad9b..b641c06 100644
--- a/ambari-agent/src/main/python/ambari_agent/InitializerModule.py
+++ b/ambari-agent/src/main/python/ambari_agent/InitializerModule.py
@@ -92,7 +92,7 @@ class InitializerModule:
self.server_responses_listener = ServerResponsesListener(self)
self.file_cache = FileCache(self.config)
self.customServiceOrchestrator = CustomServiceOrchestrator(self)
- self.recovery_manager = RecoveryManager()
+ self.recovery_manager = RecoveryManager(self)
self.commandStatuses = CommandStatusDict(self)
self.init_threads()
diff --git a/ambari-agent/src/main/python/ambari_agent/RecoveryManager.py b/ambari-agent/src/main/python/ambari_agent/RecoveryManager.py
index f7ec134..78c430d 100644
--- a/ambari-agent/src/main/python/ambari_agent/RecoveryManager.py
+++ b/ambari-agent/src/main/python/ambari_agent/RecoveryManager.py
@@ -22,7 +22,6 @@ import pprint
from ambari_agent.ActionQueue import ActionQueue
from ambari_agent.LiveStatus import LiveStatus
-
logger = logging.getLogger()
@@ -73,7 +72,7 @@ class RecoveryManager:
"stale_config": False
}
- def __init__(self, recovery_enabled=False, auto_start_only=False, auto_install_start=False):
+ def __init__(self, initializer_module, recovery_enabled=False, auto_start_only=False, auto_install_start=False):
self.recovery_enabled = recovery_enabled
self.auto_start_only = auto_start_only
self.auto_install_start = auto_install_start
@@ -96,9 +95,10 @@ class RecoveryManager:
self.__cache_lock = threading.RLock()
self.active_command_count = 0
self.cluster_id = None
+ self.initializer_module = initializer_module
self.actions = {}
- self.update_config(6, 60, 5, 12, recovery_enabled, auto_start_only, auto_install_start, "")
+ self.update_config(6, 60, 5, 12, recovery_enabled, auto_start_only, auto_install_start)
def on_execution_command_start(self):
with self.__active_command_lock:
@@ -436,6 +436,8 @@ class RecoveryManager:
if action_counter["count"] < self.max_count:
if seconds_since_last_attempt > self.retry_gap_in_sec:
return True
+ else:
+ logger.info("Not running recovery command due to retry_gap = {0} (seconds)".format(self.retry_gap_in_sec))
else:
sec_since_last_reset = now - action_counter["lastReset"]
if sec_since_last_reset > self.window_in_sec:
@@ -447,17 +449,28 @@ class RecoveryManager:
return int(time.time())
def update_recovery_config(self, dictionary):
- """
- TODO: Server sends the recovery configuration - call update_config after parsing
- "recoveryConfig": {
- "type" : "DEFAULT|AUTO_START|AUTO_INSTALL_START|FULL",
- "maxCount" : 10,
- "windowInMinutes" : 60,
- "retryGap" : 0,
- "components" : "a,b"
- }
- """
-
+ if dictionary and "recoveryConfig" in dictionary:
+ if logger.isEnabledFor(logging.INFO):
+ logger.info("RecoverConfig = %s", pprint.pformat(dictionary["recoveryConfig"]))
+ config = dictionary["recoveryConfig"]
+ if 'components' in config:
+ enabled_components = config['components']
+ enabled_components_list = []
+
+ components = [(item["service_name"], item["component_name"], item["desired_state"]) for item in enabled_components]
+ for service, component, state in components:
+ enabled_components_list.append(component)
+ self.update_desired_status(component, state)
+ # Recovery Manager is Component oriented, however Agent require Service and component name to build properly
+ # commands. As workaround, we pushing service name from the server and keeping it relation at agent.
+ #
+ # However it important to keep map actual, for this reason relation could be updated if service will
+ # push another service <-> component relation
+ self.__component_to_service_map[component] = service
+
+ self.enabled_components = enabled_components_list
+
+ def on_config_update(self):
recovery_enabled = False
auto_start_only = False
auto_install_start = False
@@ -465,37 +478,36 @@ class RecoveryManager:
window_in_min = 60
retry_gap = 5
max_lifetime_count = 12
- enabled_components = []
- if dictionary and "recoveryConfig" in dictionary:
- if logger.isEnabledFor(logging.INFO):
- logger.info("RecoverConfig = %s", pprint.pformat(dictionary["recoveryConfig"]))
- config = dictionary["recoveryConfig"]
- if "type" in config:
- if config["type"] in ["AUTO_INSTALL_START", "AUTO_START", "FULL"]:
+ cluster_cache = self.initializer_module.configurations_cache[self.cluster_id]
+
+ if 'configurations' in cluster_cache and 'cluster-env' in cluster_cache['configurations']:
+ config = cluster_cache['configurations']['cluster-env']
+ if "recovery_type" in config:
+ if config["recovery_type"] in ["AUTO_INSTALL_START", "AUTO_START", "FULL"]:
recovery_enabled = True
- if config["type"] == "AUTO_START":
+ if config["recovery_type"] == "AUTO_START":
auto_start_only = True
- elif config["type"] == "AUTO_INSTALL_START":
+ elif config["recovery_type"] == "AUTO_INSTALL_START":
auto_install_start = True
- if "maxCount" in config:
- max_count = self._read_int_(config["maxCount"], max_count)
- if "windowInMinutes" in config:
- window_in_min = self._read_int_(config["windowInMinutes"], window_in_min)
- if "retryGap" in config:
- retry_gap = self._read_int_(config["retryGap"], retry_gap)
- if 'maxLifetimeCount' in config:
- max_lifetime_count = self._read_int_(config['maxLifetimeCount'], max_lifetime_count)
+ if "recovery_enabled" in config:
+ recovery_enabled = self._read_bool_(config, "recovery_enabled", recovery_enabled)
- if 'components' in config:
- enabled_components = config['components']
+ if "recovery_max_count" in config:
+ max_count = self._read_int_(config, "recovery_max_count", max_count)
+ if "recovery_window_in_minutes" in config:
+ window_in_min = self._read_int_(config, "recovery_window_in_minutes", window_in_min)
+ if "recovery_retry_interval" in config:
+ retry_gap = self._read_int_(config, "recovery_retry_interval", retry_gap)
+ if 'recovery_lifetime_max_count' in config:
+ max_lifetime_count = self._read_int_(config, 'recovery_lifetime_max_count', max_lifetime_count)
self.update_config(max_count, window_in_min, retry_gap, max_lifetime_count, recovery_enabled, auto_start_only,
- auto_install_start, enabled_components)
+ auto_install_start)
def update_config(self, max_count, window_in_min, retry_gap, max_lifetime_count, recovery_enabled,
- auto_start_only, auto_install_start, enabled_components):
+ auto_start_only, auto_install_start):
"""
Update recovery configuration with the specified values.
@@ -506,8 +518,6 @@ class RecoveryManager:
recovery_enabled - True or False. Indicates whether recovery is enabled or not.
auto_start_only - True if AUTO_START recovery type was specified. False otherwise.
auto_install_start - True if AUTO_INSTALL_START recovery type was specified. False otherwise.
- enabled_components - CSV of componenents enabled for auto start.
-
Update recovery configuration, recovery is disabled if configuration values
are not correct
@@ -539,7 +549,6 @@ class RecoveryManager:
self.auto_start_only = auto_start_only
self.auto_install_start = auto_install_start
self.max_lifetime_count = max_lifetime_count
- self.enabled_components = []
self.allowed_desired_states = [self.STARTED, self.INSTALLED]
self.allowed_current_states = [self.INIT, self.INSTALL_FAILED, self.INSTALLED, self.STARTED]
@@ -551,26 +560,7 @@ class RecoveryManager:
self.allowed_desired_states = [self.INSTALLED, self.STARTED]
self.allowed_current_states = [self.INSTALL_FAILED, self.INSTALLED]
- if enabled_components is not None and len(enabled_components) > 0:
- components = [(item["service_name"], item["component_name"], item["desired_state"]) for item in enabled_components]
- for service, component, state in components:
- self.enabled_components.append(component)
- self.update_desired_status(component, state)
-
- # Recovery Manager is Component oriented, however Agent require Service and component name to build properly
- # commands. As workaround, we pushing service name from the server and keeping it relation at agent.
- #
- # However it important to keep map actual, for this reason relation could be updated if service will
- # push another service <-> component relation
- self.__component_to_service_map[component] = service
-
self.recovery_enabled = recovery_enabled
- if self.recovery_enabled:
- logger.info(
- "==> Auto recovery is enabled with maximum %s in %s minutes with gap of %s minutes between and"
- " lifetime max being %s. Enabled components - %s",
- self.max_count, self.window_in_min, self.retry_gap, self.max_lifetime_count,
- ', '.join(self.enabled_components))
def get_unique_task_id(self):
self.id += 1
@@ -687,10 +677,18 @@ class RecoveryManager:
def get_start_command(self, component):
return self.get_command(component, "START")
- def _read_int_(self, value, default_value=0):
+ def _read_int_(self, config, key, default_value=0):
int_value = default_value
try:
- int_value = int(value)
- except ValueError:
+ int_value = int(config[key])
+ except (ValueError, KeyError):
pass
return int_value
+
+ def _read_bool_(self, config, key, default_value=False):
+ bool_value = default_value
+ try:
+ bool_value = (config[key].lower() == "true")
+ except KeyError:
+ pass
+ return bool_value
diff --git a/ambari-agent/src/main/python/ambari_agent/listeners/ConfigurationEventListener.py b/ambari-agent/src/main/python/ambari_agent/listeners/ConfigurationEventListener.py
index 9887a8a..e5309bb 100644
--- a/ambari-agent/src/main/python/ambari_agent/listeners/ConfigurationEventListener.py
+++ b/ambari-agent/src/main/python/ambari_agent/listeners/ConfigurationEventListener.py
@@ -33,6 +33,7 @@ class ConfigurationEventListener(EventListener):
def __init__(self, initializer_module):
super(ConfigurationEventListener, self).__init__(initializer_module)
self.configurations_cache = initializer_module.configurations_cache
+ self.recovery_manager = initializer_module.recovery_manager
def on_event(self, headers, message):
"""
@@ -49,6 +50,11 @@ class ConfigurationEventListener(EventListener):
self.configurations_cache.rewrite_cache(message['clusters'], message['hash'])
+ if message['clusters']:
+ # FIXME: Recovery manager does not support multiple cluster as of now.
+ self.recovery_manager.cluster_id = message['clusters'].keys()[0]
+ self.recovery_manager.on_config_update()
+
def get_handled_path(self):
return Constants.CONFIGURATIONS_TOPIC
diff --git a/ambari-agent/src/main/python/ambari_agent/listeners/HostLevelParamsEventListener.py b/ambari-agent/src/main/python/ambari_agent/listeners/HostLevelParamsEventListener.py
index d4b4088..18c1252 100644
--- a/ambari-agent/src/main/python/ambari_agent/listeners/HostLevelParamsEventListener.py
+++ b/ambari-agent/src/main/python/ambari_agent/listeners/HostLevelParamsEventListener.py
@@ -53,9 +53,9 @@ class HostLevelParamsEventListener(EventListener):
cluster_id = message['clusters'].keys()[0]
if 'recoveryConfig' in message['clusters'][cluster_id]:
- logging.info("Updating recoveryConfig from metadata")
- self.recovery_manager.update_recovery_config(self.host_level_params_cache[cluster_id])
+ logging.info("Updating recoveryConfig from hostLevelParams")
self.recovery_manager.cluster_id = cluster_id
+ self.recovery_manager.update_recovery_config(self.host_level_params_cache[cluster_id])
def get_handled_path(self):
return Constants.HOST_LEVEL_PARAMS_TOPIC
\ No newline at end of file
diff --git a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
index af41fe8..d75fbb2 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
@@ -508,7 +508,7 @@ class TestActionQueue(TestCase):
initializer_module.init()
initializer_module.config = config
initializer_module.recovery_manager = RecoveryManager(tempfile.mktemp())
- initializer_module.recovery_manager.update_config(5, 5, 1, 11, True, False, False, "")
+ initializer_module.recovery_manager.update_config(5, 5, 1, 11, True, False, False)
with patch("__builtin__.open") as open_mock:
# Make file read calls visible
diff --git a/ambari-agent/src/test/python/ambari_agent/TestRecoveryManager.py b/ambari-agent/src/test/python/ambari_agent/TestRecoveryManager.py
index 432e74b..8195315 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestRecoveryManager.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestRecoveryManager.py
@@ -117,7 +117,7 @@ class _TestRecoveryManager(TestCase):
}
def test_defaults(self):
- rm = RecoveryManager()
+ rm = RecoveryManager(MagicMock())
self.assertFalse(rm.enabled())
self.assertEqual(None, rm.get_install_command("NODEMANAGER"))
self.assertEqual(None, rm.get_start_command("NODEMANAGER"))
@@ -132,32 +132,32 @@ class _TestRecoveryManager(TestCase):
[1000, 1001, 1002, 1003, 1004, 1071, 1150, 1151, 1152, 1153, 1400, 1401,
1500, 1571, 1572, 1653, 1900, 1971, 2300, 2301]
- rm = RecoveryManager(True, False)
+ rm = RecoveryManager(MagicMock(), True, False)
self.assertTrue(rm.enabled())
- config = rm.update_config(0, 60, 5, 12, True, False, False, "")
+ config = rm.update_config(0, 60, 5, 12, True, False, False)
self.assertFalse(rm.enabled())
- rm.update_config(6, 60, 5, 12, True, False, False, "")
+ rm.update_config(6, 60, 5, 12, True, False, False)
self.assertTrue(rm.enabled())
- rm.update_config(6, 0, 5, 12, True, False, False, "")
+ rm.update_config(6, 0, 5, 12, True, False, False)
self.assertFalse(rm.enabled())
- rm.update_config(6, 60, 0, 12, True, False, False, "")
+ rm.update_config(6, 60, 0, 12, True, False, False)
self.assertFalse(rm.enabled())
- rm.update_config(6, 60, 1, 12, True, False, False, None)
+ rm.update_config(6, 60, 1, 12, True, False, False)
self.assertTrue(rm.enabled())
- rm.update_config(6, 60, 61, 12, True, False, False, None)
+ rm.update_config(6, 60, 61, 12, True, False, False)
self.assertFalse(rm.enabled())
- rm.update_config(6, 60, 5, 4, True, False, False, "")
+ rm.update_config(6, 60, 5, 4, True, False, False)
self.assertFalse(rm.enabled())
# maximum 2 in 2 minutes and at least 1 minute wait
- rm.update_config(2, 5, 1, 4, True, False, False, "")
+ rm.update_config(2, 5, 1, 4, True, False, False)
self.assertTrue(rm.enabled())
# T = 1000-2
@@ -183,7 +183,7 @@ class _TestRecoveryManager(TestCase):
self.assertFalse(rm.may_execute("NODEMANAGER")) # too soon
# maximum 2 in 2 minutes and no min wait
- rm.update_config(2, 5, 1, 5, True, True, False, "")
+ rm.update_config(2, 5, 1, 5, True, True, False)
# T = 1500-3
self.assertTrue(rm.execute("NODEMANAGER2"))
@@ -202,9 +202,11 @@ class _TestRecoveryManager(TestCase):
def test_recovery_required(self):
rm = RecoveryManager(True, False)
- rm.update_config(12, 5, 1, 15, True, False, False, [
+ rm.update_config(12, 5, 1, 15, True, False, False, )
+ rm.update_recovery_config({'recoveryConfig':{'components':[
{'component_name': 'NODEMANAGER', 'service_name': 'YARN', 'desired_state': 'INSTALLED'}
- ])
+ ]}})
+
rm.update_current_status("NODEMANAGER", "INSTALLED")
rm.update_desired_status("NODEMANAGER", "INSTALLED")
self.assertFalse(rm.requires_recovery("NODEMANAGER"))
@@ -250,17 +252,19 @@ class _TestRecoveryManager(TestCase):
def test_recovery_required2(self):
rm = RecoveryManager(True, True)
- rm.update_config(15, 5, 1, 16, True, False, False, [
+ rm.update_config(15, 5, 1, 16, True, False, False)
+ rm.update_recovery_config({'recoveryConfig':{'components':[
{'component_name': 'NODEMANAGER', 'service_name': 'YARN', 'desired_state': 'INSTALLED'}
- ])
+ ]}})
rm.update_current_status("NODEMANAGER", "INSTALLED")
rm.update_desired_status("NODEMANAGER", "STARTED")
self.assertTrue(rm.requires_recovery("NODEMANAGER"))
rm = RecoveryManager( True, True)
- rm.update_config(15, 5, 1, 16, True, False, False, [
+ rm.update_config(15, 5, 1, 16, True, False, False)
+ rm.update_recovery_config({'recoveryConfig':{'components':[
{'component_name': 'NODEMANAGER', 'service_name': 'YARN', 'desired_state': 'INSTALLED'}
- ])
+ ]}})
rm.update_current_status("NODEMANAGER", "INSTALLED")
rm.update_desired_status("NODEMANAGER", "STARTED")
self.assertTrue(rm.requires_recovery("NODEMANAGER"))
@@ -270,7 +274,7 @@ class _TestRecoveryManager(TestCase):
self.assertFalse(rm.requires_recovery("DATANODE"))
rm = RecoveryManager(True, True)
- rm.update_config(15, 5, 1, 16, True, False, False, "")
+ rm.update_config(15, 5, 1, 16, True, False, False)
rm.update_current_status("NODEMANAGER", "INSTALLED")
rm.update_desired_status("NODEMANAGER", "STARTED")
self.assertFalse(rm.requires_recovery("NODEMANAGER"))
@@ -279,9 +283,10 @@ class _TestRecoveryManager(TestCase):
rm.update_desired_status("DATANODE", "STARTED")
self.assertFalse(rm.requires_recovery("DATANODE"))
- rm.update_config(15, 5, 1, 16, True, False, False, [
+ rm.update_config(15, 5, 1, 16, True, False, False)
+ rm.update_recovery_config({'recoveryConfig':{'components':[
{'component_name': 'NODEMANAGER', 'service_name': 'YARN', 'desired_state': 'INSTALLED'}
- ])
+ ]}})
rm.update_current_status("NODEMANAGER", "INSTALLED")
rm.update_desired_status("NODEMANAGER", "STARTED")
self.assertTrue(rm.requires_recovery("NODEMANAGER"))
@@ -290,83 +295,16 @@ class _TestRecoveryManager(TestCase):
rm.update_desired_status("DATANODE", "STARTED")
self.assertFalse(rm.requires_recovery("DATANODE"))
- @patch.object(RecoveryManager, "update_config")
- def test_update_rm_config(self, mock_uc):
- rm = RecoveryManager()
- rm.update_recovery_config(None)
- mock_uc.assert_has_calls([call(6, 60, 5, 12, False, False, False, [])])
-
- mock_uc.reset_mock()
- rm.update_recovery_config({})
- mock_uc.assert_has_calls([call(6, 60, 5, 12, False, False, False, [])])
-
- mock_uc.reset_mock()
- rm.update_recovery_config(
- {"recoveryConfig": {
- "type" : "DEFAULT"}}
- )
- mock_uc.assert_has_calls([call(6, 60, 5, 12, False, False, False, [])])
-
- mock_uc.reset_mock()
- rm.update_recovery_config(
- {"recoveryConfig": {
- "type" : "FULL"}}
- )
- mock_uc.assert_has_calls([call(6, 60, 5, 12, True, False, False, [])])
-
- mock_uc.reset_mock()
- rm.update_recovery_config(
- {"recoveryConfig": {
- "type" : "AUTO_START",
- "max_count" : "med"}}
- )
- mock_uc.assert_has_calls([call(6, 60, 5, 12, True, True, False, [])])
-
- mock_uc.reset_mock()
- rm.update_recovery_config(
- {"recoveryConfig": {
- "type" : "AUTO_INSTALL_START",
- "max_count" : "med"}}
- )
- mock_uc.assert_has_calls([call(6, 60, 5, 12, True, False, True, [])])
-
- mock_uc.reset_mock()
- rm.update_recovery_config(
- {"recoveryConfig": {
- "type": "AUTO_START",
- "maxCount": "5",
- "windowInMinutes" : 20,
- "retryGap": 2,
- "maxLifetimeCount" : 5,
- "components": [
- {
- "service_name": "A",
- "component_name": "A",
- "desired_state": "INSTALLED"
- },
- {
- "service_name": "B",
- "component_name": "B",
- "desired_state": "INSTALLED"
- }
- ],
- "recoveryTimestamp": 1}}
- )
- mock_uc.assert_has_calls([call(5, 20, 2, 5, True, True, False, [
- {'component_name': 'A', 'service_name': 'A', 'desired_state': 'INSTALLED'},
- {'component_name': 'B', 'service_name': 'B', 'desired_state': 'INSTALLED'}
- ])])
-
@patch.object(RecoveryManager, "_now_")
def test_recovery_report(self, time_mock):
time_mock.side_effect = \
[1000, 1071, 1072, 1470, 1471, 1472, 1543, 1644, 1815]
- rm = RecoveryManager()
+ rm = RecoveryManager(MagicMock())
rec_st = rm.get_recovery_status()
self.assertEquals(rec_st, {"summary": "DISABLED"})
- rm.update_config(2, 5, 1, 4, True, True, False, "")
+ rm.update_config(2, 5, 1, 4, True, True, False)
rec_st = rm.get_recovery_status()
self.assertEquals(rec_st, {"summary": "RECOVERABLE", "componentReports": []})
@@ -409,14 +347,15 @@ class _TestRecoveryManager(TestCase):
[1000, 1001, 1104, 1105, 1106, 1807, 1808, 1809, 1810, 1811, 1812]
rm = RecoveryManager(True)
- rm.update_config(5, 5, 0, 11, True, False, False, "")
+ rm.update_config(5, 5, 0, 11, True, False, False)
command1 = copy.deepcopy(self.command)
#rm.store_or_update_command(command1)
- rm.update_config(12, 5, 1, 15, True, False, False, [
- {'component_name': 'NODEMANAGER', 'service_name': 'YARN', 'desired_state': 'INSTALLED'}
- ])
+ rm.update_config(12, 5, 1, 15, True, False, False)
+
+ rm.update_recovery_config({'recoveryConfig':{'components':[{'component_name': 'NODEMANAGER', 'service_name': 'YARN', 'desired_state': 'INSTALLED'}]}})
+
rm.update_current_status("NODEMANAGER", "INSTALLED")
rm.update_desired_status("NODEMANAGER", "STARTED")
@@ -439,36 +378,45 @@ class _TestRecoveryManager(TestCase):
self.assertEqual("START", commands[0]["roleCommand"])
def test_configured_for_recovery(self):
- rm = RecoveryManager(True)
- rm.update_config(12, 5, 1, 15, True, False, False, [
+ rm = RecoveryManager(MagicMock(), True)
+ rm.update_config(12, 5, 1, 15, True, False, False)
+ rm.update_recovery_config({'recoveryConfig':{'components':[
{'component_name': 'A', 'service_name': 'A', 'desired_state': 'INSTALLED'},
{'component_name': 'B', 'service_name': 'B', 'desired_state': 'INSTALLED'},
- ])
+ ]}})
+
self.assertTrue(rm.configured_for_recovery("A"))
self.assertTrue(rm.configured_for_recovery("B"))
- rm.update_config(5, 5, 1, 11, True, False, False, [])
+ rm.update_config(5, 5, 1, 11, True, False, False)
+ rm.update_recovery_config({'recoveryConfig':{'components':[]}})
+
self.assertFalse(rm.configured_for_recovery("A"))
self.assertFalse(rm.configured_for_recovery("B"))
- rm.update_config(5, 5, 1, 11, True, False, False, [
+ rm.update_config(5, 5, 1, 11, True, False, False)
+ rm.update_recovery_config({'recoveryConfig':{'components':[
{'component_name': 'A', 'service_name': 'A', 'desired_state': 'INSTALLED'}
- ])
+ ]}})
+
self.assertTrue(rm.configured_for_recovery("A"))
self.assertFalse(rm.configured_for_recovery("B"))
- rm.update_config(5, 5, 1, 11, True, False, False, [
+ rm.update_config(5, 5, 1, 11, True, False, False)
+ rm.update_recovery_config({'recoveryConfig':{'components': [
{'component_name': 'A', 'service_name': 'A', 'desired_state': 'INSTALLED'}
- ])
+ ]}})
+
self.assertTrue(rm.configured_for_recovery("A"))
self.assertFalse(rm.configured_for_recovery("B"))
self.assertFalse(rm.configured_for_recovery("C"))
- rm.update_config(5, 5, 1, 11, True, False, False, [
+ rm.update_config(5, 5, 1, 11, True, False, False)
+ rm.update_recovery_config({'recoveryConfig':{'components':[
{'component_name': 'A', 'service_name': 'A', 'desired_state': 'INSTALLED'},
{'component_name': 'D', 'service_name': 'D', 'desired_state': 'INSTALLED'},
{'component_name': 'F', 'service_name': 'F', 'desired_state': 'INSTALLED'}
- ])
+ ]}})
self.assertTrue(rm.configured_for_recovery("A"))
self.assertFalse(rm.configured_for_recovery("B"))
self.assertFalse(rm.configured_for_recovery("C"))
@@ -482,7 +430,7 @@ class _TestRecoveryManager(TestCase):
[1000, 1071, 1372]
rm = RecoveryManager(True)
- rm.update_config(2, 5, 1, 4, True, True, False, "")
+ rm.update_config(2, 5, 1, 4, True, True, False)
rm.execute("COMPONENT")
actions = rm.get_actions_copy()["COMPONENT"]
@@ -500,7 +448,7 @@ class _TestRecoveryManager(TestCase):
def test_is_action_info_stale(self, time_mock):
rm = RecoveryManager(True)
- rm.update_config(5, 60, 5, 16, True, False, False, "")
+ rm.update_config(5, 60, 5, 16, True, False, False)
time_mock.return_value = 0
self.assertFalse(rm.is_action_info_stale("COMPONENT_NAME"))
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/RecoveryConfig.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/RecoveryConfig.java
index c7f5774..e71d9cc 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/RecoveryConfig.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/RecoveryConfig.java
@@ -29,37 +29,11 @@ import com.google.gson.annotations.SerializedName;
*/
public class RecoveryConfig {
- @SerializedName("type")
- @JsonProperty("type")
- private String type;
-
- @SerializedName("maxCount")
- @JsonProperty("maxCount")
- private String maxCount;
-
- @SerializedName("windowInMinutes")
- @JsonProperty("windowInMinutes")
- private String windowInMinutes;
-
- @SerializedName("retryGap")
- @JsonProperty("retryGap")
- private String retryGap;
-
- @SerializedName("maxLifetimeCount")
- @JsonProperty("maxLifetimeCount")
- private String maxLifetimeCount;
-
@SerializedName("components")
@JsonProperty("components")
private List<RecoveryConfigComponent> enabledComponents;
- public RecoveryConfig(String type, String maxCount, String windowInMinutes, String retryGap, String maxLifetimeCount,
- List<RecoveryConfigComponent> enabledComponents) {
- this.type = type;
- this.maxCount = maxCount;
- this.windowInMinutes = windowInMinutes;
- this.retryGap = retryGap;
- this.maxLifetimeCount = maxLifetimeCount;
+ public RecoveryConfig(List<RecoveryConfigComponent> enabledComponents) {
this.enabledComponents = enabledComponents;
}
@@ -67,26 +41,6 @@ public class RecoveryConfig {
return enabledComponents == null ? null : Collections.unmodifiableList(enabledComponents);
}
- public String getType() {
- return type;
- }
-
- public String getMaxCount() {
- return maxCount;
- }
-
- public String getWindowInMinutes() {
- return windowInMinutes;
- }
-
- public String getRetryGap() {
- return retryGap;
- }
-
- public String getMaxLifetimeCount() {
- return maxLifetimeCount;
- }
-
@Override
public boolean equals(Object o) {
if (this == o) return true;
@@ -94,35 +48,18 @@ public class RecoveryConfig {
RecoveryConfig that = (RecoveryConfig) o;
- if (type != null ? !type.equals(that.type) : that.type != null) return false;
- if (maxCount != null ? !maxCount.equals(that.maxCount) : that.maxCount != null) return false;
- if (windowInMinutes != null ? !windowInMinutes.equals(that.windowInMinutes) : that.windowInMinutes != null)
- return false;
- if (retryGap != null ? !retryGap.equals(that.retryGap) : that.retryGap != null) return false;
- if (maxLifetimeCount != null ? !maxLifetimeCount.equals(that.maxLifetimeCount) : that.maxLifetimeCount != null)
- return false;
return enabledComponents != null ? enabledComponents.equals(that.enabledComponents) : that.enabledComponents == null;
}
@Override
public int hashCode() {
- int result = type != null ? type.hashCode() : 0;
- result = 31 * result + (maxCount != null ? maxCount.hashCode() : 0);
- result = 31 * result + (windowInMinutes != null ? windowInMinutes.hashCode() : 0);
- result = 31 * result + (retryGap != null ? retryGap.hashCode() : 0);
- result = 31 * result + (maxLifetimeCount != null ? maxLifetimeCount.hashCode() : 0);
- result = 31 * result + (enabledComponents != null ? enabledComponents.hashCode() : 0);
+ int result = (enabledComponents != null ? enabledComponents.hashCode() : 0);
return result;
}
@Override
public String toString() {
StringBuilder buffer = new StringBuilder("RecoveryConfig{");
- buffer.append(", type=").append(type);
- buffer.append(", maxCount=").append(maxCount);
- buffer.append(", windowInMinutes=").append(windowInMinutes);
- buffer.append(", retryGap=").append(retryGap);
- buffer.append(", maxLifetimeCount=").append(maxLifetimeCount);
buffer.append(", components=").append(enabledComponents);
buffer.append('}');
return buffer.toString();
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/RecoveryConfigHelper.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/RecoveryConfigHelper.java
index 6d5b9cd..239e8c1 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/RecoveryConfigHelper.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/RecoveryConfigHelper.java
@@ -100,14 +100,8 @@ public class RecoveryConfigHelper {
}
AutoStartConfig autoStartConfig = new AutoStartConfig(clusterName);
-
- RecoveryConfig recoveryConfig = new RecoveryConfig(autoStartConfig.getNodeRecoveryType(),
- autoStartConfig.getNodeRecoveryMaxCount(),
- autoStartConfig.getNodeRecoveryWindowInMin(),
- autoStartConfig.getNodeRecoveryRetryGap(),
- autoStartConfig.getNodeRecoveryLifetimeMaxCount(),
- autoStartConfig.isRecoveryEnabled() ? autoStartConfig.getEnabledComponents(hostname) : null);
-
+
+ RecoveryConfig recoveryConfig = new RecoveryConfig(autoStartConfig.getEnabledComponents(hostname));
return recoveryConfig;
}
@@ -367,48 +361,6 @@ public class RecoveryConfigHelper {
}
/**
- * Get the node recovery type. The only supported value is AUTO_START.
- * @return
- */
- private String getNodeRecoveryType() {
- return getProperty(RECOVERY_TYPE_KEY, RECOVERY_TYPE_DEFAULT);
- }
-
- /**
- * Get configured max count of recovery attempt allowed per host component in a window
- * This is reset when agent is restarted.
- * @return
- */
- private String getNodeRecoveryMaxCount() {
- return getProperty(RECOVERY_MAX_COUNT_KEY, RECOVERY_MAX_COUNT_DEFAULT);
- }
-
- /**
- * Get configured max lifetime count of recovery attempt allowed per host component.
- * This is reset when agent is restarted.
- * @return
- */
- private String getNodeRecoveryLifetimeMaxCount() {
- return getProperty(RECOVERY_LIFETIME_MAX_COUNT_KEY, RECOVERY_LIFETIME_MAX_COUNT_DEFAULT);
- }
-
- /**
- * Get configured window size in minutes
- * @return
- */
- private String getNodeRecoveryWindowInMin() {
- return getProperty(RECOVERY_WINDOW_IN_MIN_KEY, RECOVERY_WINDOW_IN_MIN_DEFAULT);
- }
-
- /**
- * Get the configured retry gap between tries per host component
- * @return
- */
- private String getNodeRecoveryRetryGap() {
- return getProperty(RECOVERY_RETRY_GAP_KEY, RECOVERY_RETRY_GAP_DEFAULT);
- }
-
- /**
* Get the property value for the specified key. If not present, return default value.
* @param key The key for which property value is required.
* @param defaultValue Default value to return if key is not found.
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java b/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java
index c072923..ca417c0 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java
@@ -424,11 +424,6 @@ public class TestHeartbeatHandler {
reg.setPrefix(Configuration.PREFIX_DIR);
RegistrationResponse rr = handler.handleRegistration(reg);
RecoveryConfig rc = rr.getRecoveryConfig();
- assertEquals(rc.getMaxCount(), "4");
- assertEquals(rc.getType(), "AUTO_START");
- assertEquals(rc.getMaxLifetimeCount(), "10");
- assertEquals(rc.getRetryGap(), "2");
- assertEquals(rc.getWindowInMinutes(), "23");
assertEquals(rc.getEnabledComponents(), "DATANODE,NAMENODE");
// Send a heart beat with the recovery timestamp set to the
@@ -498,11 +493,6 @@ public class TestHeartbeatHandler {
reg.setPrefix(Configuration.PREFIX_DIR);
RegistrationResponse rr = handler.handleRegistration(reg);
RecoveryConfig rc = rr.getRecoveryConfig();
- assertEquals(rc.getMaxCount(), "4");
- assertEquals(rc.getType(), "AUTO_START");
- assertEquals(rc.getMaxLifetimeCount(), "10");
- assertEquals(rc.getRetryGap(), "2");
- assertEquals(rc.getWindowInMinutes(), "23");
assertEquals(rc.getEnabledComponents(), "DATANODE,NAMENODE"); // HDFS_CLIENT is in maintenance mode
}
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/agent/stomp/HostLevelParamsHolderTest.java b/ambari-server/src/test/java/org/apache/ambari/server/agent/stomp/HostLevelParamsHolderTest.java
index 3ed4442..4fef7fe 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/agent/stomp/HostLevelParamsHolderTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/agent/stomp/HostLevelParamsHolderTest.java
@@ -41,8 +41,7 @@ public class HostLevelParamsHolderTest {
HostLevelParamsUpdateEvent current = new HostLevelParamsUpdateEvent(HOST_ID, Collections.emptyMap());
Map<String, HostLevelParamsCluster> clusters = new HashMap<>();
HostRepositories hostRepositories = new HostRepositories(Collections.emptySortedMap(), Collections.emptySortedMap());
- HostLevelParamsCluster cluster = new HostLevelParamsCluster(hostRepositories, new RecoveryConfig(null,
- null, null, null, null, null));
+ HostLevelParamsCluster cluster = new HostLevelParamsCluster(hostRepositories, new RecoveryConfig(null));
clusters.put("1", cluster);
HostLevelParamsUpdateEvent update = new HostLevelParamsUpdateEvent(HOST_ID, clusters);
@@ -58,8 +57,7 @@ public class HostLevelParamsHolderTest {
public void testHandleUpdateEmptyUpdate() {
Map<String, HostLevelParamsCluster> clusters = new HashMap<>();
HostRepositories hostRepositories = new HostRepositories(Collections.emptySortedMap(), Collections.emptySortedMap());
- HostLevelParamsCluster cluster = new HostLevelParamsCluster(hostRepositories, new RecoveryConfig(null,
- null, null, null, null, null));
+ HostLevelParamsCluster cluster = new HostLevelParamsCluster(hostRepositories, new RecoveryConfig(null));
clusters.put("1", cluster);
HostLevelParamsUpdateEvent current = new HostLevelParamsUpdateEvent(HOST_ID, clusters);
HostLevelParamsUpdateEvent update = new HostLevelParamsUpdateEvent(HOST_ID, Collections.emptyMap());
@@ -76,15 +74,13 @@ public class HostLevelParamsHolderTest {
public void testHandleUpdateNoChanges() {
Map<String, HostLevelParamsCluster> currentClusters = new HashMap<>();
HostRepositories currentHostRepositories = new HostRepositories(Collections.emptySortedMap(), Collections.emptySortedMap());
- HostLevelParamsCluster currentCluster = new HostLevelParamsCluster(currentHostRepositories, new RecoveryConfig(null,
- null, null, null, null, null));
+ HostLevelParamsCluster currentCluster = new HostLevelParamsCluster(currentHostRepositories, new RecoveryConfig(null));
currentClusters.put("1", currentCluster);
HostLevelParamsUpdateEvent current = new HostLevelParamsUpdateEvent(HOST_ID, currentClusters);
Map<String, HostLevelParamsCluster> updateClusters = new HashMap<>();
HostRepositories updateHostRepositories = new HostRepositories(Collections.emptySortedMap(), Collections.emptySortedMap());
- HostLevelParamsCluster updateCluster = new HostLevelParamsCluster(updateHostRepositories, new RecoveryConfig(null,
- null, null, null, null, null));
+ HostLevelParamsCluster updateCluster = new HostLevelParamsCluster(updateHostRepositories, new RecoveryConfig(null));
updateClusters.put("1", updateCluster);
HostLevelParamsUpdateEvent update = new HostLevelParamsUpdateEvent(HOST_ID, updateClusters);
@@ -100,15 +96,13 @@ public class HostLevelParamsHolderTest {
public void testHandleUpdateOnChanges() {
Map<String, HostLevelParamsCluster> currentClusters = new HashMap<>();
HostRepositories currentHostRepositories = new HostRepositories(Collections.emptySortedMap(), Collections.emptySortedMap());
- HostLevelParamsCluster currentCluster = new HostLevelParamsCluster(currentHostRepositories, new RecoveryConfig(null,
- null, null, null, null, null));
+ HostLevelParamsCluster currentCluster = new HostLevelParamsCluster(currentHostRepositories, new RecoveryConfig(null));
currentClusters.put("1", currentCluster);
HostLevelParamsUpdateEvent current = new HostLevelParamsUpdateEvent(HOST_ID, currentClusters);
Map<String, HostLevelParamsCluster> updateClusters = new HashMap<>();
HostRepositories updateHostRepositories = new HostRepositories(Collections.emptySortedMap(), Collections.emptySortedMap());
- HostLevelParamsCluster updateCluster = new HostLevelParamsCluster(updateHostRepositories, new RecoveryConfig(null,
- null, null, null, null, null));
+ HostLevelParamsCluster updateCluster = new HostLevelParamsCluster(updateHostRepositories, new RecoveryConfig(null));
updateClusters.put("2", updateCluster);
HostLevelParamsUpdateEvent update = new HostLevelParamsUpdateEvent(HOST_ID, updateClusters);
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/configuration/RecoveryConfigHelperTest.java b/ambari-server/src/test/java/org/apache/ambari/server/configuration/RecoveryConfigHelperTest.java
index 738e06e..db1155b 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/configuration/RecoveryConfigHelperTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/configuration/RecoveryConfigHelperTest.java
@@ -23,8 +23,6 @@ import static org.apache.ambari.server.agent.DummyHeartbeatConstants.DummyHostna
import static org.apache.ambari.server.agent.DummyHeartbeatConstants.HDFS;
import static org.apache.ambari.server.agent.DummyHeartbeatConstants.NAMENODE;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
@@ -45,7 +43,6 @@ import org.apache.ambari.server.orm.OrmTestHelper;
import org.apache.ambari.server.orm.dao.RepositoryVersionDAO;
import org.apache.ambari.server.orm.entities.RepositoryVersionEntity;
import org.apache.ambari.server.state.Cluster;
-import org.apache.ambari.server.state.Config;
import org.apache.ambari.server.state.MaintenanceState;
import org.apache.ambari.server.state.Service;
import org.apache.ambari.server.state.StackId;
@@ -56,7 +53,6 @@ import org.junit.Before;
import org.junit.Test;
import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
import com.google.common.eventbus.EventBus;
import com.google.inject.Guice;
import com.google.inject.Inject;
@@ -108,40 +104,6 @@ public class RecoveryConfigHelperTest {
}
/**
- * Test default cluster-env properties for recovery.
- */
- @Test
- public void testRecoveryConfigDefaultValues()
- throws Exception {
- RecoveryConfig recoveryConfig = recoveryConfigHelper.getDefaultRecoveryConfig();
- assertEquals(recoveryConfig.getMaxLifetimeCount(), RecoveryConfigHelper.RECOVERY_LIFETIME_MAX_COUNT_DEFAULT);
- assertEquals(recoveryConfig.getMaxCount(), RecoveryConfigHelper.RECOVERY_MAX_COUNT_DEFAULT);
- assertEquals(recoveryConfig.getRetryGap(), RecoveryConfigHelper.RECOVERY_RETRY_GAP_DEFAULT);
- assertEquals(recoveryConfig.getWindowInMinutes(), RecoveryConfigHelper.RECOVERY_WINDOW_IN_MIN_DEFAULT);
- assertEquals(recoveryConfig.getType(), RecoveryConfigHelper.RECOVERY_TYPE_DEFAULT);
- assertNull(recoveryConfig.getEnabledComponents());
- }
-
- /**
- * Test cluster-env properties from a dummy cluster
- *
- * @throws Exception
- */
- @Test
- public void testRecoveryConfigValues()
- throws Exception {
- String hostname = "hostname1";
- Cluster cluster = getDummyCluster(Sets.newHashSet(hostname));
- RecoveryConfig recoveryConfig = recoveryConfigHelper.getRecoveryConfig(cluster.getClusterName(), hostname);
- assertEquals(recoveryConfig.getMaxLifetimeCount(), "10");
- assertEquals(recoveryConfig.getMaxCount(), "4");
- assertEquals(recoveryConfig.getRetryGap(), "2");
- assertEquals(recoveryConfig.getWindowInMinutes(), "23");
- assertEquals(recoveryConfig.getType(), "AUTO_START");
- assertNotNull(recoveryConfig.getEnabledComponents());
- }
-
- /**
* Install a component with auto start enabled. Verify that the old config was
* invalidated.
*
@@ -211,41 +173,6 @@ public class RecoveryConfigHelperTest {
}
/**
- * Disable cluster level auto start and verify that the config is stale.
- *
- * @throws Exception
- */
- @Test
- public void testClusterEnvConfigChanged()
- throws Exception {
- Cluster cluster = heartbeatTestHelper.getDummyCluster();
- RepositoryVersionEntity repositoryVersion = helper.getOrCreateRepositoryVersion(cluster);
- Service hdfs = cluster.addService(HDFS, repositoryVersion);
-
- hdfs.addServiceComponent(DATANODE).setRecoveryEnabled(true);
- hdfs.getServiceComponent(DATANODE).addServiceComponentHost(DummyHostname1);
- hdfs.getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1).setDesiredState(State.INSTALLED);
-
- // Get the recovery configuration
- RecoveryConfig recoveryConfig = recoveryConfigHelper.getRecoveryConfig(cluster.getClusterName(), DummyHostname1);
- assertEquals(Lists.newArrayList(
- new RecoveryConfigComponent(DATANODE, HDFS, State.INSTALLED)
- ), recoveryConfig.getEnabledComponents());
-
- // Get cluser-env config and turn off recovery for the cluster
- Config config = cluster.getDesiredConfigByType("cluster-env");
-
- config.updateProperties(new HashMap<String, String>() {{
- put(RecoveryConfigHelper.RECOVERY_ENABLED_KEY, "false");
- }});
- config.save();
-
- // Get the recovery configuration again and verify that there are no components to be auto started
- recoveryConfig = recoveryConfigHelper.getRecoveryConfig(cluster.getClusterName(), DummyHostname1);
- assertNull(recoveryConfig.getEnabledComponents());
- }
-
- /**
* Change the maintenance mode of a service component host and verify that
* config is stale.
*