You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ha...@apache.org on 2018/05/22 16:04:24 UTC
[ambari] branch trunk updated: [AMBARI-23877] Service auto-start
not working (dgrinenko)
This is an automated email from the ASF dual-hosted git repository.
hapylestat pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/ambari.git
The following commit(s) were added to refs/heads/trunk by this push:
new 571dfe6 [AMBARI-23877] Service auto-start not working (dgrinenko)
571dfe6 is described below
commit 571dfe62627e786efad31fa35702c61785004dab
Author: Reishin <ha...@gmail.com>
AuthorDate: Fri May 18 03:44:39 2018 +0300
[AMBARI-23877] Service auto-start not working (dgrinenko)
---
.../src/main/python/ambari_agent/ActionQueue.py | 19 +-
.../src/main/python/ambari_agent/AmbariConfig.py | 8 +-
.../python/ambari_agent/ComponentStatusExecutor.py | 1 -
.../main/python/ambari_agent/InitializerModule.py | 24 +-
.../main/python/ambari_agent/RecoveryManager.py | 337 ++++++++-------------
.../python/ambari_agent/TestRecoveryManager.py | 119 +++++---
.../apache/ambari/server/agent/RecoveryConfig.java | 22 +-
.../server/agent/RecoveryConfigComponent.java | 118 ++++++++
.../ambari/server/agent/RecoveryConfigHelper.java | 8 +-
.../configuration/RecoveryConfigHelperTest.java | 53 +++-
10 files changed, 392 insertions(+), 317 deletions(-)
diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
index 65239ed..f0c996b 100644
--- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
+++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
@@ -143,7 +143,7 @@ class ActionQueue(threading.Thread):
if self.parallel_execution == 0:
command = self.commandQueue.get(True, self.EXECUTION_COMMAND_WAIT_TIME)
- if command == None:
+ if command is None:
break
self.process_command(command)
@@ -153,17 +153,16 @@ class ActionQueue(threading.Thread):
while not self.stop_event.is_set():
command = self.commandQueue.get(True, self.EXECUTION_COMMAND_WAIT_TIME)
- if command == None:
+ if command is None:
break
# If command is not retry_enabled then do not start them in parallel
# checking just one command is enough as all commands for a stage is sent
# at the same time and retry is only enabled for initial start/install
- retryAble = False
+ retry_able = False
if 'commandParams' in command and 'command_retry_enabled' in command['commandParams']:
- retryAble = command['commandParams']['command_retry_enabled'] == "true"
- if retryAble:
- logger.info("Kicking off a thread for the command, id=" +
- str(command['commandId']) + " taskId=" + str(command['taskId']))
+ retry_able = command['commandParams']['command_retry_enabled'] == "true"
+ if retry_able:
+ logger.info("Kicking off a thread for the command, id={} taskId={}".format(command['commandId'], command['taskId']))
t = threading.Thread(target=self.process_command, args=(command,))
t.daemon = True
t.start()
@@ -172,14 +171,14 @@ class ActionQueue(threading.Thread):
break
pass
pass
- except (Queue.Empty):
+ except Queue.Empty:
pass
- except:
+ except Exception:
logger.exception("ActionQueue thread failed with exception. Re-running it")
logger.info("ActionQueue thread has successfully finished")
def fillRecoveryCommands(self):
- if not self.tasks_in_progress_or_pending():
+ if self.recovery_manager.enabled() and not self.tasks_in_progress_or_pending():
self.put(self.recovery_manager.get_recovery_commands())
def processBackgroundQueueSafeEmpty(self):
diff --git a/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py b/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py
index 1e95fbe..88aa8ea 100644
--- a/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py
+++ b/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py
@@ -186,21 +186,23 @@ class AmbariConfig:
@property
def cluster_cache_dir(self):
return os.path.join(self.cache_dir, FileCache.CLUSTER_CACHE_DIRECTORY)
- @property
- def recovery_cache_dir(self):
- return os.path.join(self.cache_dir, FileCache.RECOVERY_CACHE_DIRECTORY)
+
@property
def alerts_cachedir(self):
return os.path.join(self.cache_dir, FileCache.ALERTS_CACHE_DIRECTORY)
+
@property
def stacks_dir(self):
return os.path.join(self.cache_dir, FileCache.STACKS_CACHE_DIRECTORY)
+
@property
def common_services_dir(self):
return os.path.join(self.cache_dir, FileCache.COMMON_SERVICES_DIRECTORY)
+
@property
def extensions_dir(self):
return os.path.join(self.cache_dir, FileCache.EXTENSIONS_CACHE_DIRECTORY)
+
@property
def host_scripts_dir(self):
return os.path.join(self.cache_dir, FileCache.HOST_SCRIPTS_CACHE_DIRECTORY)
diff --git a/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py b/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py
index c9f86da..5d20495 100644
--- a/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py
+++ b/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py
@@ -154,7 +154,6 @@ class ComponentStatusExecutor(threading.Thread):
self.send_updates_to_server({cluster_id: [result]})
return result
-
return None
def send_updates_to_server(self, cluster_reports):
diff --git a/ambari-agent/src/main/python/ambari_agent/InitializerModule.py b/ambari-agent/src/main/python/ambari_agent/InitializerModule.py
index 052e8c1..787df16 100644
--- a/ambari-agent/src/main/python/ambari_agent/InitializerModule.py
+++ b/ambari-agent/src/main/python/ambari_agent/InitializerModule.py
@@ -46,6 +46,7 @@ from ambari_agent.AlertStatusReporter import AlertStatusReporter
logger = logging.getLogger(__name__)
+
class InitializerModule:
"""
- Instantiate some singleton classes or widely used instances along with providing their dependencies.
@@ -56,6 +57,23 @@ class InitializerModule:
def __init__(self):
self.stop_event = threading.Event()
self.config = AmbariConfig.get_resolved_config()
+
+ self.is_registered = None
+ self.metadata_cache = None
+ self.topology_cache = None
+ self.host_level_params_cache = None
+ self.configurations_cache = None
+ self.alert_definitions_cache = None
+ self.configuration_builder = None
+ self.stale_alerts_monitor = None
+ self.server_responses_listener = None
+ self.file_cache = None
+ self.customServiceOrchestrator = None
+ self.recovery_manager = None
+ self.commandStatuses = None
+ self.action_queue = None
+ self.alert_scheduler_handler = None
+
self.init()
def init(self):
@@ -71,14 +89,10 @@ class InitializerModule:
self.alert_definitions_cache = ClusterAlertDefinitionsCache(self.config.cluster_cache_dir)
self.configuration_builder = ConfigurationBuilder(self)
self.stale_alerts_monitor = StaleAlertsMonitor(self)
-
self.server_responses_listener = ServerResponsesListener()
-
self.file_cache = FileCache(self.config)
-
self.customServiceOrchestrator = CustomServiceOrchestrator(self)
-
- self.recovery_manager = RecoveryManager(self.config.recovery_cache_dir)
+ self.recovery_manager = RecoveryManager()
self.commandStatuses = CommandStatusDict(self)
self.init_threads()
diff --git a/ambari-agent/src/main/python/ambari_agent/RecoveryManager.py b/ambari-agent/src/main/python/ambari_agent/RecoveryManager.py
index 3a20b20..f7ec134 100644
--- a/ambari-agent/src/main/python/ambari_agent/RecoveryManager.py
+++ b/ambari-agent/src/main/python/ambari_agent/RecoveryManager.py
@@ -1,5 +1,3 @@
-#!/usr/bin/env python
-
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
@@ -15,10 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-import json
import logging
import copy
-import os
import time
import threading
import pprint
@@ -29,17 +25,17 @@ from ambari_agent.LiveStatus import LiveStatus
logger = logging.getLogger()
-"""
-RecoveryManager has the following capabilities:
-* Store data needed for execution commands extracted from STATUS command
-* Generate INSTALL command
-* Generate START command
-"""
-
class RecoveryManager:
+ """
+ RecoveryManager has the following capabilities:
+ * Store data needed for execution commands extracted from STATUS command
+ * Generate INSTALL command
+ * Generate START command
+ """
COMMAND_TYPE = "commandType"
PAYLOAD_LEVEL = "payloadLevel"
+ SERVICE_NAME = "serviceName"
COMPONENT_NAME = "componentName"
ROLE = "role"
TASK_ID = "taskId"
@@ -57,7 +53,7 @@ class RecoveryManager:
INIT = "INIT" # TODO: What is the state when machine is reset
INSTALL_FAILED = "INSTALL_FAILED"
COMPONENT_UPDATE_KEY_FORMAT = "{0}_UPDATE_TIME"
- COMMAND_REFRESH_DELAY_SEC = 600 #10 minutes
+ COMMAND_REFRESH_DELAY_SEC = 600
FILENAME = "recovery.json"
@@ -77,13 +73,15 @@ class RecoveryManager:
"stale_config": False
}
- def __init__(self, cache_dir, recovery_enabled=False, auto_start_only=False, auto_install_start=False):
+ def __init__(self, recovery_enabled=False, auto_start_only=False, auto_install_start=False):
self.recovery_enabled = recovery_enabled
self.auto_start_only = auto_start_only
self.auto_install_start = auto_install_start
self.max_count = 6
self.window_in_min = 60
self.retry_gap = 5
+ self.window_in_sec = self.window_in_min * 60
+ self.retry_gap_in_sec = self.retry_gap * 60
self.max_lifetime_count = 12
self.id = int(time.time())
@@ -91,6 +89,7 @@ class RecoveryManager:
self.allowed_current_states = [self.INIT, self.INSTALLED]
self.enabled_components = []
self.statuses = {}
+ self.__component_to_service_map = {} # component => service map TODO: fix it later(hack here)
self.__status_lock = threading.RLock()
self.__command_lock = threading.RLock()
self.__active_command_lock = threading.RLock()
@@ -98,29 +97,16 @@ class RecoveryManager:
self.active_command_count = 0
self.cluster_id = None
- if not os.path.exists(cache_dir):
- try:
- os.makedirs(cache_dir)
- except:
- logger.critical("[RecoveryManager] Could not create the cache directory {0}".format(cache_dir))
-
- self.__actions_json_file = os.path.join(cache_dir, self.FILENAME)
-
self.actions = {}
-
self.update_config(6, 60, 5, 12, recovery_enabled, auto_start_only, auto_install_start, "")
- pass
-
def on_execution_command_start(self):
with self.__active_command_lock:
self.active_command_count += 1
- pass
def on_execution_command_finish(self):
with self.__active_command_lock:
self.active_command_count -= 1
- pass
def has_active_command(self):
return self.active_command_count > 0
@@ -131,12 +117,10 @@ class RecoveryManager:
def get_current_status(self, component):
if component in self.statuses:
return self.statuses[component]["current"]
- pass
def get_desired_status(self, component):
if component in self.statuses:
return self.statuses[component]["desired"]
- pass
def update_config_staleness(self, component, is_config_stale):
"""
@@ -154,17 +138,10 @@ class RecoveryManager:
pass
self.statuses[component]["stale_config"] = is_config_stale
- pass
def handle_status_change(self, component, component_status):
- if not self.enabled() or not self.configured_for_recovery(component):
- return
-
- if component_status == LiveStatus.LIVE_STATUS:
- self.update_current_status(component, component_status)
- else:
- if (self.get_current_status(component) != self.INSTALL_FAILED):
- self.update_current_status(component, component_status)
+ if component_status == LiveStatus.LIVE_STATUS or self.get_current_status(component) != self.INSTALL_FAILED:
+ self.update_current_status(component, component_status)
def update_current_status(self, component, state):
"""
@@ -184,9 +161,8 @@ class RecoveryManager:
if self.statuses[component]["current"] != state:
logger.info("current status is set to %s for %s", state, component)
- self.statuses[component]["current"] = state
- pass
+ self.statuses[component]["current"] = state
def update_desired_status(self, component, state):
"""
@@ -202,21 +178,16 @@ class RecoveryManager:
logger.info("New status, desired status is set to %s for %s", self.statuses[component]["desired"], component)
finally:
self.__status_lock.release()
- pass
if self.statuses[component]["desired"] != state:
logger.info("desired status is set to %s for %s", state, component)
self.statuses[component]["desired"] = state
- pass
- """
- Whether specific components are enabled for recovery.
- """
def configured_for_recovery(self, component):
- if len(self.enabled_components) > 0 and component in self.enabled_components:
- return True
-
- return False
+ """
+ Whether specific components are enabled for recovery.
+ """
+ return len(self.enabled_components) > 0 and component in self.enabled_components
def requires_recovery(self, component):
"""
@@ -225,23 +196,15 @@ class RecoveryManager:
INIT --> INSTALLED --> STARTED
RE-INSTALLED (if configs do not match)
"""
- if not self.enabled():
- return False
-
- if not self.configured_for_recovery(component):
- return False
-
- if component not in self.statuses:
+ if not self.enabled() or not self.configured_for_recovery(component) or component not in self.statuses:
return False
status = self.statuses[component]
if self.auto_start_only or self.auto_install_start:
- if status["current"] == status["desired"]:
- return False
- if status["desired"] not in self.allowed_desired_states:
+ if status["current"] == status["desired"] or status["desired"] not in self.allowed_desired_states:
return False
else:
- if status["current"] == status["desired"] and status['stale_config'] == False:
+ if status["current"] == status["desired"] and status['stale_config'] is False:
return False
if status["desired"] not in self.allowed_desired_states or status["current"] not in self.allowed_current_states:
@@ -249,9 +212,6 @@ class RecoveryManager:
logger.info("%s needs recovery, desired = %s, and current = %s.", component, status["desired"], status["current"])
return True
- pass
-
-
def get_recovery_status(self):
"""
@@ -268,8 +228,7 @@ class RecoveryManager:
]
}
"""
- report = {}
- report["summary"] = "DISABLED"
+ report = {"summary": "DISABLED"}
if self.enabled():
report["summary"] = "RECOVERABLE"
num_limits_reached = 0
@@ -279,24 +238,23 @@ class RecoveryManager:
try:
for component in self.actions.keys():
action = self.actions[component]
- recovery_state = {}
- recovery_state["name"] = component
- recovery_state["numAttempts"] = action["lifetimeCount"]
- recovery_state["limitReached"] = self.max_lifetime_count <= action["lifetimeCount"]
+ recovery_state = {
+ "name": component,
+ "numAttempts": action["lifetimeCount"],
+ "limitReached": self.max_lifetime_count <= action["lifetimeCount"]
+ }
recovery_states.append(recovery_state)
- if recovery_state["limitReached"] == True:
+ if recovery_state["limitReached"] is True:
num_limits_reached += 1
- pass
finally:
self.__status_lock.release()
- if num_limits_reached > 0:
+ if num_limits_reached > 0 and num_limits_reached == len(recovery_states):
+ report["summary"] = "UNRECOVERABLE"
+ elif num_limits_reached > 0:
report["summary"] = "PARTIALLY_RECOVERABLE"
- if num_limits_reached == len(recovery_states):
- report["summary"] = "UNRECOVERABLE"
return report
- pass
def get_recovery_commands(self):
"""
@@ -308,39 +266,34 @@ class RecoveryManager:
"""
commands = []
for component in self.statuses.keys():
- if self.requires_recovery(component) and self.may_execute(component):
+ if self.configured_for_recovery(component) and self.requires_recovery(component) and self.may_execute(component):
status = copy.deepcopy(self.statuses[component])
command = None
if self.auto_start_only:
- if status["desired"] == self.STARTED:
- if status["current"] == self.INSTALLED:
- command = self.get_start_command(component)
+ if status["desired"] == self.STARTED and status["current"] == self.INSTALLED:
+ command = self.get_start_command(component)
elif self.auto_install_start:
- if status["desired"] == self.STARTED:
- if status["current"] == self.INSTALLED:
- command = self.get_start_command(component)
- elif status["current"] == self.INSTALL_FAILED:
- command = self.get_install_command(component)
- elif status["desired"] == self.INSTALLED:
- if status["current"] == self.INSTALL_FAILED:
+ if status["desired"] == self.STARTED and status["current"] == self.INSTALLED:
+ command = self.get_start_command(component)
+ elif status["desired"] == self.STARTED and status["current"] == self.INSTALL_FAILED:
+ command = self.get_install_command(component)
+ elif status["desired"] == self.INSTALLED and status["current"] == self.INSTALL_FAILED:
command = self.get_install_command(component)
else:
# START, INSTALL, RESTART
if status["desired"] != status["current"]:
- if status["desired"] == self.STARTED:
- if status["current"] == self.INSTALLED:
- command = self.get_start_command(component)
- elif status["current"] == self.INIT:
- command = self.get_install_command(component)
- elif status["current"] == self.INSTALL_FAILED:
- command = self.get_install_command(component)
- elif status["desired"] == self.INSTALLED:
- if status["current"] == self.INIT:
- command = self.get_install_command(component)
- elif status["current"] == self.INSTALL_FAILED:
- command = self.get_install_command(component)
- elif status["current"] == self.STARTED:
- command = self.get_stop_command(component)
+ if status["desired"] == self.STARTED and status["current"] == self.INSTALLED:
+ command = self.get_start_command(component)
+ elif status["desired"] == self.STARTED and status["current"] == self.INIT:
+ command = self.get_install_command(component)
+ elif status["desired"] == self.STARTED and status["current"] == self.INSTALL_FAILED:
+ command = self.get_install_command(component)
+ elif status["desired"] == self.INSTALLED and status["current"] == self.INIT:
+ command = self.get_install_command(component)
+ elif status["desired"] == self.INSTALLED and status["current"] == self.INSTALL_FAILED:
+ command = self.get_install_command(component)
+ elif status["desired"] == self.INSTALLED and status["current"] == self.STARTED:
+ command = self.get_stop_command(component)
else:
if status["current"] == self.INSTALLED:
command = self.get_install_command(component)
@@ -349,11 +302,10 @@ class RecoveryManager:
if command:
self.execute(component)
- logger.info("Created recovery command %s for component %s",
- command[self.ROLE_COMMAND], command[self.ROLE])
+ logger.info("Created recovery command %s for component %s", command[self.ROLE_COMMAND], command[self.ROLE])
commands.append(command)
- return commands
+ return commands
def may_execute(self, action):
"""
@@ -369,8 +321,6 @@ class RecoveryManager:
finally:
self.__status_lock.release()
return self._execute_action_chk_only(action)
- pass
-
def execute(self, action):
"""
@@ -386,8 +336,6 @@ class RecoveryManager:
finally:
self.__status_lock.release()
return self._execute_action_(action)
- pass
-
def _execute_action_(self, action_name):
"""
@@ -398,7 +346,7 @@ class RecoveryManager:
executed = False
seconds_since_last_attempt = now - action_counter["lastAttempt"]
if action_counter["lifetimeCount"] < self.max_lifetime_count:
- #reset if window_in_sec seconds passed since last attempt
+ # reset if window_in_sec seconds passed since last attempt
if seconds_since_last_attempt > self.window_in_sec:
action_counter["count"] = 0
action_counter["lastReset"] = now
@@ -406,7 +354,7 @@ class RecoveryManager:
if action_counter["count"] < self.max_count:
if seconds_since_last_attempt > self.retry_gap_in_sec:
action_counter["count"] += 1
- action_counter["lifetimeCount"] +=1
+ action_counter["lifetimeCount"] += 1
if self.retry_gap > 0:
action_counter["lastAttempt"] = now
action_counter["warnedLastAttempt"] = False
@@ -414,28 +362,27 @@ class RecoveryManager:
action_counter["lastReset"] = now
executed = True
else:
- if action_counter["warnedLastAttempt"] == False:
+ if action_counter["warnedLastAttempt"] is False:
action_counter["warnedLastAttempt"] = True
logger.warn(
"%s seconds has not passed since last occurrence %s seconds back for %s. " +
"Will silently skip execution without warning till retry gap is passed",
self.retry_gap_in_sec, seconds_since_last_attempt, action_name)
else:
- logger.debug(
- "%s seconds has not passed since last occurrence %s seconds back for %s",
- self.retry_gap_in_sec, seconds_since_last_attempt, action_name)
+ logger.debug("%s seconds has not passed since last occurrence %s seconds back for %s",
+ self.retry_gap_in_sec, seconds_since_last_attempt, action_name)
else:
sec_since_last_reset = now - action_counter["lastReset"]
if sec_since_last_reset > self.window_in_sec:
action_counter["count"] = 1
- action_counter["lifetimeCount"] +=1
+ action_counter["lifetimeCount"] += 1
if self.retry_gap > 0:
action_counter["lastAttempt"] = now
action_counter["lastReset"] = now
action_counter["warnedLastReset"] = False
executed = True
else:
- if action_counter["warnedLastReset"] == False:
+ if action_counter["warnedLastReset"] is False:
action_counter["warnedLastReset"] = True
logger.warn("%s occurrences in %s minutes reached the limit for %s. " +
"Will silently skip execution without warning till window is reset",
@@ -444,7 +391,7 @@ class RecoveryManager:
logger.debug("%s occurrences in %s minutes reached the limit for %s",
action_counter["count"], self.window_in_min, action_name)
else:
- if action_counter["warnedThresholdReached"] == False:
+ if action_counter["warnedThresholdReached"] is False:
action_counter["warnedThresholdReached"] = True
logger.warn("%s occurrences in agent life time reached the limit for %s. " +
"Will silently skip execution without warning till window is reset",
@@ -452,47 +399,7 @@ class RecoveryManager:
else:
logger.error("%s occurrences in agent life time reached the limit for %s",
action_counter["lifetimeCount"], action_name)
- self._dump_actions()
return executed
- pass
-
-
- def _dump_actions(self):
- """
- Dump recovery actions to FS
- """
- self.__cache_lock.acquire()
- try:
- with open(self.__actions_json_file, 'w') as f:
- json.dump(self.actions, f, indent=2)
- except Exception, exception:
- logger.exception("Unable to dump actions to {0}".format(self.__actions_json_file))
- return False
- finally:
- self.__cache_lock.release()
-
- return True
- pass
-
-
- def _load_actions(self):
- """
- Loads recovery actions from FS
- """
- self.__cache_lock.acquire()
-
- try:
- if os.path.isfile(self.__actions_json_file):
- with open(self.__actions_json_file, 'r') as fp:
- return json.load(fp)
- except Exception, exception:
- logger.warning("Unable to load recovery actions from {0}.".format(self.__actions_json_file))
- finally:
- self.__cache_lock.release()
-
- return {}
- pass
-
def get_actions_copy(self):
"""
@@ -503,8 +410,6 @@ class RecoveryManager:
return copy.deepcopy(self.actions)
finally:
self.__status_lock.release()
- pass
-
def is_action_info_stale(self, action_name):
"""
@@ -518,7 +423,6 @@ class RecoveryManager:
seconds_since_last_attempt = now - action_counter["lastAttempt"]
return seconds_since_last_attempt > self.window_in_sec
return False
- pass
def _execute_action_chk_only(self, action_name):
"""
@@ -538,12 +442,9 @@ class RecoveryManager:
return True
return False
- pass
def _now_(self):
return int(time.time())
- pass
-
def update_recovery_config(self, dictionary):
"""
@@ -564,8 +465,7 @@ class RecoveryManager:
window_in_min = 60
retry_gap = 5
max_lifetime_count = 12
- enabled_components = ""
-
+ enabled_components = []
if dictionary and "recoveryConfig" in dictionary:
if logger.isEnabledFor(logging.INFO):
@@ -593,27 +493,26 @@ class RecoveryManager:
self.update_config(max_count, window_in_min, retry_gap, max_lifetime_count, recovery_enabled, auto_start_only,
auto_install_start, enabled_components)
- pass
- """
- Update recovery configuration with the specified values.
-
- max_count - Configured maximum count of recovery attempt allowed per host component in a window.
- window_in_min - Configured window size in minutes.
- retry_gap - Configured retry gap between tries per host component
- max_lifetime_count - Configured maximum lifetime count of recovery attempt allowed per host component.
- recovery_enabled - True or False. Indicates whether recovery is enabled or not.
- auto_start_only - True if AUTO_START recovery type was specified. False otherwise.
- auto_install_start - True if AUTO_INSTALL_START recovery type was specified. False otherwise.
- enabled_components - CSV of componenents enabled for auto start.
- """
def update_config(self, max_count, window_in_min, retry_gap, max_lifetime_count, recovery_enabled,
auto_start_only, auto_install_start, enabled_components):
"""
+ Update recovery configuration with the specified values.
+
+ max_count - Configured maximum count of recovery attempt allowed per host component in a window.
+ window_in_min - Configured window size in minutes.
+ retry_gap - Configured retry gap between tries per host component
+ max_lifetime_count - Configured maximum lifetime count of recovery attempt allowed per host component.
+ recovery_enabled - True or False. Indicates whether recovery is enabled or not.
+ auto_start_only - True if AUTO_START recovery type was specified. False otherwise.
+ auto_install_start - True if AUTO_INSTALL_START recovery type was specified. False otherwise.
+ enabled_components - CSV of componenents enabled for auto start.
+
+
Update recovery configuration, recovery is disabled if configuration values
are not correct
"""
- self.recovery_enabled = False;
+ self.recovery_enabled = False
if max_count <= 0:
logger.warn("Recovery disabled: max_count must be a non-negative number")
return
@@ -653,10 +552,17 @@ class RecoveryManager:
self.allowed_current_states = [self.INSTALL_FAILED, self.INSTALLED]
if enabled_components is not None and len(enabled_components) > 0:
- components = enabled_components.split(",")
- for component in components:
- if len(component.strip()) > 0:
- self.enabled_components.append(component.strip())
+ components = [(item["service_name"], item["component_name"], item["desired_state"]) for item in enabled_components]
+ for service, component, state in components:
+ self.enabled_components.append(component)
+ self.update_desired_status(component, state)
+
+ # Recovery Manager is Component oriented, however Agent require Service and component name to build properly
+ # commands. As workaround, we pushing service name from the server and keeping it relation at agent.
+ #
+ # However it important to keep map actual, for this reason relation could be updated if service will
+ # push another service <-> component relation
+ self.__component_to_service_map[component] = service
self.recovery_enabled = recovery_enabled
if self.recovery_enabled:
@@ -665,8 +571,6 @@ class RecoveryManager:
" lifetime max being %s. Enabled components - %s",
self.max_count, self.window_in_min, self.retry_gap, self.max_lifetime_count,
', '.join(self.enabled_components))
- pass
-
def get_unique_task_id(self):
self.id += 1
@@ -679,33 +583,31 @@ class RecoveryManager:
if not self.enabled():
return
- if not command.has_key(self.ROLE_COMMAND) or not self.configured_for_recovery(command['role']):
+ if self.ROLE_COMMAND not in command or not self.configured_for_recovery(command['role']):
return
if status == ActionQueue.COMPLETED_STATUS:
if command[self.ROLE_COMMAND] == ActionQueue.ROLE_COMMAND_START:
self.update_current_status(command[self.ROLE], LiveStatus.LIVE_STATUS)
- #self.update_config_staleness(command['role'], False)
- logger.info("After EXECUTION_COMMAND (START), with taskId=" + str(command['taskId']) +
- ", current state of " + command[self.ROLE] + " to " +
- self.get_current_status(command[self.ROLE]) )
+ logger.info("After EXECUTION_COMMAND (START), with taskId={}, current state of {} to {}".format(
+ command['taskId'], command[self.ROLE], self.get_current_status(command[self.ROLE])))
+
elif command['roleCommand'] == ActionQueue.ROLE_COMMAND_STOP or command[self.ROLE_COMMAND] == ActionQueue.ROLE_COMMAND_INSTALL:
self.update_current_status(command[self.ROLE], LiveStatus.DEAD_STATUS)
- logger.info("After EXECUTION_COMMAND (STOP/INSTALL), with taskId=" + str(command['taskId']) +
- ", current state of " + command[self.ROLE] + " to " +
- self.get_current_status(command[self.ROLE]) )
+ logger.info("After EXECUTION_COMMAND (STOP/INSTALL), with taskId={}, current state of {} to {}".format(
+ command['taskId'], command[self.ROLE], self.get_current_status(command[self.ROLE])))
+
elif command[self.ROLE_COMMAND] == ActionQueue.ROLE_COMMAND_CUSTOM_COMMAND:
- if command.has_key('custom_command') and command['custom_command'] == ActionQueue.CUSTOM_COMMAND_RESTART:
+ if 'custom_command' in command and command['custom_command'] == ActionQueue.CUSTOM_COMMAND_RESTART:
self.update_current_status(command['role'], LiveStatus.LIVE_STATUS)
- #self.update_config_staleness(command['role'], False)
- logger.info("After EXECUTION_COMMAND (RESTART), current state of " + command[self.ROLE] + " to " +
- self.get_current_status(command[self.ROLE]) )
+ logger.info("After EXECUTION_COMMAND (RESTART), current state of {} to {}".format(
+ command[self.ROLE], self.get_current_status(command[self.ROLE])))
+
elif status == ActionQueue.FAILED_STATUS:
if command[self.ROLE_COMMAND] == ActionQueue.ROLE_COMMAND_INSTALL:
self.update_current_status(command[self.ROLE], self.INSTALL_FAILED)
- logger.info("After EXECUTION_COMMAND (INSTALL), with taskId=" + str(command['taskId']) +
- ", current state of " + command[self.ROLE] + " to " +
- self.get_current_status(command[self.ROLE]))
+ logger.info("After EXECUTION_COMMAND (INSTALL), with taskId={}, current state of {} to {}".format(
+ command['taskId'], command[self.ROLE], self.get_current_status(command[self.ROLE])))
def process_execution_command(self, command):
"""
@@ -714,28 +616,30 @@ class RecoveryManager:
if not self.enabled():
return
- if not self.COMMAND_TYPE in command or not command[self.COMMAND_TYPE] == ActionQueue.EXECUTION_COMMAND:
+ if self.COMMAND_TYPE not in command or not command[self.COMMAND_TYPE] == ActionQueue.EXECUTION_COMMAND:
return
- if not self.ROLE in command:
+ if self.ROLE not in command:
return
if command[self.ROLE_COMMAND] in (ActionQueue.ROLE_COMMAND_INSTALL, ActionQueue.ROLE_COMMAND_STOP) \
and self.configured_for_recovery(command[self.ROLE]):
+
self.update_desired_status(command[self.ROLE], LiveStatus.DEAD_STATUS)
- logger.info("Received EXECUTION_COMMAND (STOP/INSTALL), desired state of " + command[self.ROLE] + " to " +
- self.get_desired_status(command[self.ROLE]) )
- elif command[self.ROLE_COMMAND] == ActionQueue.ROLE_COMMAND_START \
- and self.configured_for_recovery(command[self.ROLE]):
+ logger.info("Received EXECUTION_COMMAND (STOP/INSTALL), desired state of {} to {}".format(
+ command[self.ROLE], self.get_desired_status(command[self.ROLE])))
+
+ elif command[self.ROLE_COMMAND] == ActionQueue.ROLE_COMMAND_START and self.configured_for_recovery(command[self.ROLE]):
self.update_desired_status(command[self.ROLE], LiveStatus.LIVE_STATUS)
- logger.info("Received EXECUTION_COMMAND (START), desired state of " + command[self.ROLE] + " to " +
- self.get_desired_status(command[self.ROLE]) )
- elif command.has_key('custom_command') and \
- command['custom_command'] == ActionQueue.CUSTOM_COMMAND_RESTART \
+ logger.info("Received EXECUTION_COMMAND (START), desired state of {} to {}".format(
+ command[self.ROLE], self.get_desired_status(command[self.ROLE])))
+
+ elif 'custom_command' in command and command['custom_command'] == ActionQueue.CUSTOM_COMMAND_RESTART \
and self.configured_for_recovery(command[self.ROLE]):
+
self.update_desired_status(command[self.ROLE], LiveStatus.LIVE_STATUS)
- logger.info("Received EXECUTION_COMMAND (RESTART), desired state of " + command[self.ROLE] + " to " +
- self.get_desired_status(command[self.ROLE]) )
+ logger.info("Received EXECUTION_COMMAND (RESTART), desired state of {} to {}".format(
+ command[self.ROLE], self.get_desired_status(command[self.ROLE])))
def get_command(self, component, command_name):
"""
@@ -755,6 +659,10 @@ class RecoveryManager:
self.ROLE: component,
self.COMMAND_ID: command_id
}
+
+ if component in self.__component_to_service_map:
+ command[self.SERVICE_NAME] = self.__component_to_service_map[component]
+
return command
else:
logger.info("Recovery is not enabled. START command will not be computed.")
@@ -786,12 +694,3 @@ class RecoveryManager:
except ValueError:
pass
return int_value
-
-
-def main(argv=None):
- cmd_mgr = RecoveryManager('/tmp')
- pass
-
-
-if __name__ == '__main__':
- main()
diff --git a/ambari-agent/src/test/python/ambari_agent/TestRecoveryManager.py b/ambari-agent/src/test/python/ambari_agent/TestRecoveryManager.py
index b9800bb..432e74b 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestRecoveryManager.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestRecoveryManager.py
@@ -116,14 +116,8 @@ class _TestRecoveryManager(TestCase):
}
}
- def setUp(self):
- pass
-
- def tearDown(self):
- pass
-
def test_defaults(self):
- rm = RecoveryManager(tempfile.mktemp())
+ rm = RecoveryManager()
self.assertFalse(rm.enabled())
self.assertEqual(None, rm.get_install_command("NODEMANAGER"))
self.assertEqual(None, rm.get_start_command("NODEMANAGER"))
@@ -131,7 +125,6 @@ class _TestRecoveryManager(TestCase):
rm.update_current_status("NODEMANAGER", "INSTALLED")
rm.update_desired_status("NODEMANAGER", "STARTED")
self.assertFalse(rm.requires_recovery("NODEMANAGER"))
- pass
@patch.object(RecoveryManager, "_now_")
def test_sliding_window(self, time_mock):
@@ -139,7 +132,7 @@ class _TestRecoveryManager(TestCase):
[1000, 1001, 1002, 1003, 1004, 1071, 1150, 1151, 1152, 1153, 1400, 1401,
1500, 1571, 1572, 1653, 1900, 1971, 2300, 2301]
- rm = RecoveryManager(tempfile.mktemp(), True, False)
+ rm = RecoveryManager(True, False)
self.assertTrue(rm.enabled())
config = rm.update_config(0, 60, 5, 12, True, False, False, "")
@@ -206,11 +199,12 @@ class _TestRecoveryManager(TestCase):
# lifetime max reached
self.assertTrue(rm.execute("NODEMANAGER2"))
self.assertFalse(rm.execute("NODEMANAGER2"))
- pass
def test_recovery_required(self):
- rm = RecoveryManager(tempfile.mktemp(), True, False)
- rm.update_config(12, 5, 1, 15, True, False, False, "NODEMANAGER")
+ rm = RecoveryManager(True, False)
+ rm.update_config(12, 5, 1, 15, True, False, False, [
+ {'component_name': 'NODEMANAGER', 'service_name': 'YARN', 'desired_state': 'INSTALLED'}
+ ])
rm.update_current_status("NODEMANAGER", "INSTALLED")
rm.update_desired_status("NODEMANAGER", "INSTALLED")
self.assertFalse(rm.requires_recovery("NODEMANAGER"))
@@ -239,7 +233,7 @@ class _TestRecoveryManager(TestCase):
rm.update_desired_status("NODEMANAGER", "STARTED")
self.assertTrue(rm.requires_recovery("NODEMANAGER"))
- rm = RecoveryManager(tempfile.mktemp(), True, True)
+ rm = RecoveryManager(True, True)
rm.update_current_status("NODEMANAGER", "INIT")
rm.update_desired_status("NODEMANAGER", "INSTALLED")
@@ -253,18 +247,20 @@ class _TestRecoveryManager(TestCase):
rm.update_desired_status("NODEMANAGER", "START")
self.assertFalse(rm.requires_recovery("NODEMANAGER"))
- pass
-
def test_recovery_required2(self):
- rm = RecoveryManager(tempfile.mktemp(), True, True)
- rm.update_config(15, 5, 1, 16, True, False, False, "NODEMANAGER")
+ rm = RecoveryManager(True, True)
+ rm.update_config(15, 5, 1, 16, True, False, False, [
+ {'component_name': 'NODEMANAGER', 'service_name': 'YARN', 'desired_state': 'INSTALLED'}
+ ])
rm.update_current_status("NODEMANAGER", "INSTALLED")
rm.update_desired_status("NODEMANAGER", "STARTED")
self.assertTrue(rm.requires_recovery("NODEMANAGER"))
- rm = RecoveryManager(tempfile.mktemp(), True, True)
- rm.update_config(15, 5, 1, 16, True, False, False, "NODEMANAGER")
+ rm = RecoveryManager( True, True)
+ rm.update_config(15, 5, 1, 16, True, False, False, [
+ {'component_name': 'NODEMANAGER', 'service_name': 'YARN', 'desired_state': 'INSTALLED'}
+ ])
rm.update_current_status("NODEMANAGER", "INSTALLED")
rm.update_desired_status("NODEMANAGER", "STARTED")
self.assertTrue(rm.requires_recovery("NODEMANAGER"))
@@ -273,7 +269,7 @@ class _TestRecoveryManager(TestCase):
rm.update_desired_status("DATANODE", "STARTED")
self.assertFalse(rm.requires_recovery("DATANODE"))
- rm = RecoveryManager(tempfile.mktemp(), True, True)
+ rm = RecoveryManager(True, True)
rm.update_config(15, 5, 1, 16, True, False, False, "")
rm.update_current_status("NODEMANAGER", "INSTALLED")
rm.update_desired_status("NODEMANAGER", "STARTED")
@@ -283,7 +279,9 @@ class _TestRecoveryManager(TestCase):
rm.update_desired_status("DATANODE", "STARTED")
self.assertFalse(rm.requires_recovery("DATANODE"))
- rm.update_config(15, 5, 1, 16, True, False, False, "NODEMANAGER")
+ rm.update_config(15, 5, 1, 16, True, False, False, [
+ {'component_name': 'NODEMANAGER', 'service_name': 'YARN', 'desired_state': 'INSTALLED'}
+ ])
rm.update_current_status("NODEMANAGER", "INSTALLED")
rm.update_desired_status("NODEMANAGER", "STARTED")
self.assertTrue(rm.requires_recovery("NODEMANAGER"))
@@ -291,31 +289,30 @@ class _TestRecoveryManager(TestCase):
rm.update_current_status("DATANODE", "INSTALLED")
rm.update_desired_status("DATANODE", "STARTED")
self.assertFalse(rm.requires_recovery("DATANODE"))
- pass
@patch.object(RecoveryManager, "update_config")
def test_update_rm_config(self, mock_uc):
- rm = RecoveryManager(tempfile.mktemp())
+ rm = RecoveryManager()
rm.update_recovery_config(None)
- mock_uc.assert_has_calls([call(6, 60, 5, 12, False, False, False, "")])
+ mock_uc.assert_has_calls([call(6, 60, 5, 12, False, False, False, [])])
mock_uc.reset_mock()
rm.update_recovery_config({})
- mock_uc.assert_has_calls([call(6, 60, 5, 12, False, False, False, "")])
+ mock_uc.assert_has_calls([call(6, 60, 5, 12, False, False, False, [])])
mock_uc.reset_mock()
rm.update_recovery_config(
{"recoveryConfig": {
"type" : "DEFAULT"}}
)
- mock_uc.assert_has_calls([call(6, 60, 5, 12, False, False, False, "")])
+ mock_uc.assert_has_calls([call(6, 60, 5, 12, False, False, False, [])])
mock_uc.reset_mock()
rm.update_recovery_config(
{"recoveryConfig": {
"type" : "FULL"}}
)
- mock_uc.assert_has_calls([call(6, 60, 5, 12, True, False, False, "")])
+ mock_uc.assert_has_calls([call(6, 60, 5, 12, True, False, False, [])])
mock_uc.reset_mock()
rm.update_recovery_config(
@@ -323,7 +320,7 @@ class _TestRecoveryManager(TestCase):
"type" : "AUTO_START",
"max_count" : "med"}}
)
- mock_uc.assert_has_calls([call(6, 60, 5, 12, True, True, False, "")])
+ mock_uc.assert_has_calls([call(6, 60, 5, 12, True, True, False, [])])
mock_uc.reset_mock()
rm.update_recovery_config(
@@ -331,28 +328,41 @@ class _TestRecoveryManager(TestCase):
"type" : "AUTO_INSTALL_START",
"max_count" : "med"}}
)
- mock_uc.assert_has_calls([call(6, 60, 5, 12, True, False, True, "")])
+ mock_uc.assert_has_calls([call(6, 60, 5, 12, True, False, True, [])])
mock_uc.reset_mock()
rm.update_recovery_config(
{"recoveryConfig": {
- "type" : "AUTO_START",
- "maxCount" : "5",
+ "type": "AUTO_START",
+ "maxCount": "5",
"windowInMinutes" : 20,
- "retryGap" : 2,
+ "retryGap": 2,
"maxLifetimeCount" : 5,
- "components" : " A,B",
- "recoveryTimestamp" : 1}}
+ "components": [
+ {
+ "service_name": "A",
+ "component_name": "A",
+ "desired_state": "INSTALLED"
+ },
+ {
+ "service_name": "B",
+ "component_name": "B",
+ "desired_state": "INSTALLED"
+ }
+ ],
+ "recoveryTimestamp": 1}}
)
- mock_uc.assert_has_calls([call(5, 20, 2, 5, True, True, False, " A,B")])
- pass
+ mock_uc.assert_has_calls([call(5, 20, 2, 5, True, True, False, [
+ {'component_name': 'A', 'service_name': 'A', 'desired_state': 'INSTALLED'},
+ {'component_name': 'B', 'service_name': 'B', 'desired_state': 'INSTALLED'}
+ ])])
@patch.object(RecoveryManager, "_now_")
def test_recovery_report(self, time_mock):
time_mock.side_effect = \
[1000, 1071, 1072, 1470, 1471, 1472, 1543, 1644, 1815]
- rm = RecoveryManager(tempfile.mktemp())
+ rm = RecoveryManager()
rec_st = rm.get_recovery_status()
self.assertEquals(rec_st, {"summary": "DISABLED"})
@@ -392,20 +402,21 @@ class _TestRecoveryManager(TestCase):
{"name": "LION", "numAttempts": 4, "limitReached": True},
{"name": "PUMA", "numAttempts": 4, "limitReached": True}
]})
- pass
@patch.object(RecoveryManager, "_now_")
def test_command_expiry(self, time_mock):
time_mock.side_effect = \
[1000, 1001, 1104, 1105, 1106, 1807, 1808, 1809, 1810, 1811, 1812]
- rm = RecoveryManager(tempfile.mktemp(), True)
+ rm = RecoveryManager(True)
rm.update_config(5, 5, 0, 11, True, False, False, "")
command1 = copy.deepcopy(self.command)
#rm.store_or_update_command(command1)
- rm.update_config(12, 5, 1, 15, True, False, False, "NODEMANAGER")
+ rm.update_config(12, 5, 1, 15, True, False, False, [
+ {'component_name': 'NODEMANAGER', 'service_name': 'YARN', 'desired_state': 'INSTALLED'}
+ ])
rm.update_current_status("NODEMANAGER", "INSTALLED")
rm.update_desired_status("NODEMANAGER", "STARTED")
@@ -426,28 +437,38 @@ class _TestRecoveryManager(TestCase):
commands = rm.get_recovery_commands()
self.assertEqual(1, len(commands))
self.assertEqual("START", commands[0]["roleCommand"])
- pass
def test_configured_for_recovery(self):
- rm = RecoveryManager(tempfile.mktemp(), True)
- rm.update_config(12, 5, 1, 15, True, False, False, "A,B")
+ rm = RecoveryManager(True)
+ rm.update_config(12, 5, 1, 15, True, False, False, [
+ {'component_name': 'A', 'service_name': 'A', 'desired_state': 'INSTALLED'},
+ {'component_name': 'B', 'service_name': 'B', 'desired_state': 'INSTALLED'},
+ ])
self.assertTrue(rm.configured_for_recovery("A"))
self.assertTrue(rm.configured_for_recovery("B"))
- rm.update_config(5, 5, 1, 11, True, False, False, "")
+ rm.update_config(5, 5, 1, 11, True, False, False, [])
self.assertFalse(rm.configured_for_recovery("A"))
self.assertFalse(rm.configured_for_recovery("B"))
- rm.update_config(5, 5, 1, 11, True, False, False, "A")
+ rm.update_config(5, 5, 1, 11, True, False, False, [
+ {'component_name': 'A', 'service_name': 'A', 'desired_state': 'INSTALLED'}
+ ])
self.assertTrue(rm.configured_for_recovery("A"))
self.assertFalse(rm.configured_for_recovery("B"))
- rm.update_config(5, 5, 1, 11, True, False, False, "A")
+ rm.update_config(5, 5, 1, 11, True, False, False, [
+ {'component_name': 'A', 'service_name': 'A', 'desired_state': 'INSTALLED'}
+ ])
self.assertTrue(rm.configured_for_recovery("A"))
self.assertFalse(rm.configured_for_recovery("B"))
self.assertFalse(rm.configured_for_recovery("C"))
- rm.update_config(5, 5, 1, 11, True, False, False, "A, D, F ")
+ rm.update_config(5, 5, 1, 11, True, False, False, [
+ {'component_name': 'A', 'service_name': 'A', 'desired_state': 'INSTALLED'},
+ {'component_name': 'D', 'service_name': 'D', 'desired_state': 'INSTALLED'},
+ {'component_name': 'F', 'service_name': 'F', 'desired_state': 'INSTALLED'}
+ ])
self.assertTrue(rm.configured_for_recovery("A"))
self.assertFalse(rm.configured_for_recovery("B"))
self.assertFalse(rm.configured_for_recovery("C"))
@@ -459,7 +480,7 @@ class _TestRecoveryManager(TestCase):
def test_reset_if_window_passed_since_last_attempt(self, time_mock):
time_mock.side_effect = \
[1000, 1071, 1372]
- rm = RecoveryManager(tempfile.mktemp(), True)
+ rm = RecoveryManager(True)
rm.update_config(2, 5, 1, 4, True, True, False, "")
@@ -478,7 +499,7 @@ class _TestRecoveryManager(TestCase):
@patch.object(RecoveryManager, "_now_")
def test_is_action_info_stale(self, time_mock):
- rm = RecoveryManager(tempfile.mktemp(), True)
+ rm = RecoveryManager(True)
rm.update_config(5, 60, 5, 16, True, False, False, "")
time_mock.return_value = 0
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/RecoveryConfig.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/RecoveryConfig.java
index 8e2078d..9e3e455 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/RecoveryConfig.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/RecoveryConfig.java
@@ -18,8 +18,10 @@
package org.apache.ambari.server.agent;
-import com.google.gson.annotations.SerializedName;
+import java.util.List;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.gson.annotations.SerializedName;
/**
* Recovery config to be sent to the agent
@@ -33,34 +35,34 @@ public class RecoveryConfig {
}
@SerializedName("type")
- @com.fasterxml.jackson.annotation.JsonProperty("type")
+ @JsonProperty("type")
private String type;
@SerializedName("maxCount")
- @com.fasterxml.jackson.annotation.JsonProperty("maxCount")
+ @JsonProperty("maxCount")
private String maxCount;
@SerializedName("windowInMinutes")
- @com.fasterxml.jackson.annotation.JsonProperty("windowInMinutes")
+ @JsonProperty("windowInMinutes")
private String windowInMinutes;
@SerializedName("retryGap")
- @com.fasterxml.jackson.annotation.JsonProperty("retryGap")
+ @JsonProperty("retryGap")
private String retryGap;
@SerializedName("maxLifetimeCount")
- @com.fasterxml.jackson.annotation.JsonProperty("maxLifetimeCount")
+ @JsonProperty("maxLifetimeCount")
private String maxLifetimeCount;
@SerializedName("components")
- @com.fasterxml.jackson.annotation.JsonProperty("components")
- private String enabledComponents;
+ @JsonProperty("components")
+ private List<RecoveryConfigComponent> enabledComponents;
- public String getEnabledComponents() {
+ public List<RecoveryConfigComponent> getEnabledComponents() {
return enabledComponents;
}
- public void setEnabledComponents(String enabledComponents) {
+ public void setEnabledComponents(List<RecoveryConfigComponent> enabledComponents) {
this.enabledComponents = enabledComponents;
}
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/RecoveryConfigComponent.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/RecoveryConfigComponent.java
new file mode 100644
index 0000000..50f13b4
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/RecoveryConfigComponent.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.agent;
+
+import java.util.Objects;
+
+import org.apache.ambari.server.state.ServiceComponentHost;
+import org.apache.ambari.server.state.State;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.gson.annotations.SerializedName;
+
+/**
+ * Holder for component
+ */
+public class RecoveryConfigComponent{
+
+ @SerializedName("component_name")
+ @JsonProperty("component_name")
+ private String componentName;
+
+ @SerializedName("service_name")
+ @JsonProperty("service_name")
+ private String serviceName;
+
+ @SerializedName("desired_state")
+ @JsonProperty("desired_state")
+ private String desiredState;
+
+ /**
+ * Creates new instance of {@link RecoveryConfigComponent}
+ * @param componentName name of the component
+ * @param desiredState desired desiredState of the component
+ */
+ public RecoveryConfigComponent(String componentName, String serviceName, State desiredState){
+ this.setComponentName(componentName);
+ this.setServiceName(serviceName);
+ this.setDesiredState(desiredState);
+ }
+
+ /**
+ * Creates {@link RecoveryConfigComponent} instance from initialized {@link ServiceComponentHost}
+ */
+ public RecoveryConfigComponent(ServiceComponentHost sch) {
+ this(sch.getServiceComponentName(), sch.getServiceName(), sch.getDesiredState());
+ }
+
+ public String getComponentName() {
+ return componentName;
+ }
+
+ public void setComponentName(String componentName) {
+ this.componentName = componentName;
+ }
+
+ public State getDesiredState() {
+ return State.valueOf(desiredState);
+ }
+
+
+ public void setDesiredState(State state) {
+ this.desiredState = state.toString();
+ }
+
+ @Override
+ public String toString(){
+ StringBuilder sb = new StringBuilder("{")
+ .append("componentName=").append(componentName)
+ .append(", serviceName=").append(serviceName)
+ .append(", desiredState=").append(desiredState)
+ .append("}");
+ return sb.toString();
+ }
+
+ public String getServiceName() {
+ return serviceName;
+ }
+
+ public void setServiceName(String serviceName) {
+ this.serviceName = serviceName;
+ }
+
+ @Override
+ public boolean equals(Object o){
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ final RecoveryConfigComponent that = (RecoveryConfigComponent) o;
+ return Objects.equals(componentName, that.componentName) &&
+ Objects.equals(serviceName, that.serviceName) &&
+ Objects.equals(desiredState, that.desiredState);
+ }
+
+ @Override
+ public int hashCode(){
+ return Objects.hash(componentName, serviceName, desiredState);
+ }
+}
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/RecoveryConfigHelper.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/RecoveryConfigHelper.java
index 668b65e..fbd8a7f 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/RecoveryConfigHelper.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/RecoveryConfigHelper.java
@@ -108,7 +108,7 @@ public class RecoveryConfigHelper {
recoveryConfig.setType(autoStartConfig.getNodeRecoveryType());
recoveryConfig.setWindowInMinutes(autoStartConfig.getNodeRecoveryWindowInMin());
if (autoStartConfig.isRecoveryEnabled()) {
- recoveryConfig.setEnabledComponents(StringUtils.join(autoStartConfig.getEnabledComponents(hostname), ','));
+ recoveryConfig.setEnabledComponents(autoStartConfig.getEnabledComponents(hostname));
}
return recoveryConfig;
@@ -316,8 +316,8 @@ public class RecoveryConfigHelper {
* maintenance mode.
* @return
*/
- private List<String> getEnabledComponents(String hostname) throws AmbariException {
- List<String> enabledComponents = new ArrayList<>();
+ private List<RecoveryConfigComponent> getEnabledComponents(String hostname) throws AmbariException {
+ List<RecoveryConfigComponent> enabledComponents = new ArrayList<>();
if (cluster == null) {
return enabledComponents;
@@ -343,7 +343,7 @@ public class RecoveryConfigHelper {
if (service.getMaintenanceState() == MaintenanceState.OFF) {
// Keep the components that are not in maintenance mode.
if (sch.getMaintenanceState() == MaintenanceState.OFF) {
- enabledComponents.add(sch.getServiceComponentName());
+ enabledComponents.add(new RecoveryConfigComponent(sch));
}
}
}
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/configuration/RecoveryConfigHelperTest.java b/ambari-server/src/test/java/org/apache/ambari/server/configuration/RecoveryConfigHelperTest.java
index fb53b1f..738e06e 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/configuration/RecoveryConfigHelperTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/configuration/RecoveryConfigHelperTest.java
@@ -27,6 +27,7 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@@ -35,6 +36,7 @@ import java.util.Set;
import org.apache.ambari.server.H2DatabaseCleaner;
import org.apache.ambari.server.agent.HeartbeatTestHelper;
import org.apache.ambari.server.agent.RecoveryConfig;
+import org.apache.ambari.server.agent.RecoveryConfigComponent;
import org.apache.ambari.server.agent.RecoveryConfigHelper;
import org.apache.ambari.server.controller.internal.DeleteHostComponentStatusMetaData;
import org.apache.ambari.server.orm.GuiceJpaInitializer;
@@ -53,6 +55,7 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.eventbus.EventBus;
import com.google.inject.Guice;
@@ -145,8 +148,7 @@ public class RecoveryConfigHelperTest {
* @throws Exception
*/
@Test
- public void testServiceComponentInstalled()
- throws Exception {
+ public void testServiceComponentInstalled() throws Exception {
Cluster cluster = heartbeatTestHelper.getDummyCluster();
RepositoryVersionEntity repositoryVersion = helper.getOrCreateRepositoryVersion(cluster);
@@ -157,7 +159,9 @@ public class RecoveryConfigHelperTest {
// Get the recovery configuration
RecoveryConfig recoveryConfig = recoveryConfigHelper.getRecoveryConfig(cluster.getClusterName(), DummyHostname1);
- assertEquals(recoveryConfig.getEnabledComponents(), "DATANODE");
+ assertEquals(Lists.newArrayList(
+ new RecoveryConfigComponent(DATANODE, HDFS, State.INIT)
+ ), recoveryConfig.getEnabledComponents());
// Install HDFS::NAMENODE to trigger a component installed event
hdfs.addServiceComponent(NAMENODE).setRecoveryEnabled(true);
@@ -165,7 +169,10 @@ public class RecoveryConfigHelperTest {
// Verify the new config
recoveryConfig = recoveryConfigHelper.getRecoveryConfig(cluster.getClusterName(), DummyHostname1);
- assertEquals(recoveryConfig.getEnabledComponents(), "DATANODE,NAMENODE");
+ assertEquals(Lists.newArrayList(
+ new RecoveryConfigComponent(DATANODE, HDFS, State.INIT),
+ new RecoveryConfigComponent(NAMENODE, HDFS, State.INIT)
+ ), recoveryConfig.getEnabledComponents());
}
/**
@@ -188,14 +195,19 @@ public class RecoveryConfigHelperTest {
// Get the recovery configuration
RecoveryConfig recoveryConfig = recoveryConfigHelper.getRecoveryConfig(cluster.getClusterName(), DummyHostname1);
- assertEquals(recoveryConfig.getEnabledComponents(), "DATANODE,NAMENODE");
+ assertEquals(Lists.newArrayList(
+ new RecoveryConfigComponent(DATANODE, HDFS, State.INIT),
+ new RecoveryConfigComponent(NAMENODE, HDFS, State.INIT)
+ ), recoveryConfig.getEnabledComponents());
// Uninstall HDFS::DATANODE from host1
hdfs.getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1).delete(new DeleteHostComponentStatusMetaData());
// Verify the new config
recoveryConfig = recoveryConfigHelper.getRecoveryConfig(cluster.getClusterName(), DummyHostname1);
- assertEquals(recoveryConfig.getEnabledComponents(), "NAMENODE");
+ assertEquals(Lists.newArrayList(
+ new RecoveryConfigComponent(NAMENODE, HDFS, State.INIT)
+ ), recoveryConfig.getEnabledComponents());
}
/**
@@ -216,7 +228,9 @@ public class RecoveryConfigHelperTest {
// Get the recovery configuration
RecoveryConfig recoveryConfig = recoveryConfigHelper.getRecoveryConfig(cluster.getClusterName(), DummyHostname1);
- assertEquals(recoveryConfig.getEnabledComponents(), "DATANODE");
+ assertEquals(Lists.newArrayList(
+ new RecoveryConfigComponent(DATANODE, HDFS, State.INSTALLED)
+ ), recoveryConfig.getEnabledComponents());
// Get cluser-env config and turn off recovery for the cluster
Config config = cluster.getDesiredConfigByType("cluster-env");
@@ -238,8 +252,7 @@ public class RecoveryConfigHelperTest {
* @throws Exception
*/
@Test
- public void testMaintenanceModeChanged()
- throws Exception {
+ public void testMaintenanceModeChanged() throws Exception {
Cluster cluster = heartbeatTestHelper.getDummyCluster();
RepositoryVersionEntity repositoryVersion = helper.getOrCreateRepositoryVersion(cluster);
Service hdfs = cluster.addService(HDFS, repositoryVersion);
@@ -252,13 +265,18 @@ public class RecoveryConfigHelperTest {
// Get the recovery configuration
RecoveryConfig recoveryConfig = recoveryConfigHelper.getRecoveryConfig(cluster.getClusterName(), DummyHostname1);
- assertEquals(recoveryConfig.getEnabledComponents(), "DATANODE,NAMENODE");
+ assertEquals(Lists.newArrayList(
+ new RecoveryConfigComponent(DATANODE, HDFS, State.INIT),
+ new RecoveryConfigComponent(NAMENODE, HDFS, State.INIT)
+ ), recoveryConfig.getEnabledComponents());
hdfs.getServiceComponent(DATANODE).getServiceComponentHost(DummyHostname1).setMaintenanceState(MaintenanceState.ON);
// Only NAMENODE is left
recoveryConfig = recoveryConfigHelper.getRecoveryConfig(cluster.getClusterName(), DummyHostname1);
- assertEquals(recoveryConfig.getEnabledComponents(), "NAMENODE");
+ assertEquals(Lists.newArrayList(
+ new RecoveryConfigComponent(NAMENODE, HDFS, State.INIT)
+ ), recoveryConfig.getEnabledComponents());
}
/**
@@ -267,8 +285,7 @@ public class RecoveryConfigHelperTest {
* @throws Exception
*/
@Test
- public void testServiceComponentRecoveryChanged()
- throws Exception {
+ public void testServiceComponentRecoveryChanged() throws Exception {
Cluster cluster = heartbeatTestHelper.getDummyCluster();
RepositoryVersionEntity repositoryVersion = helper.getOrCreateRepositoryVersion(cluster);
Service hdfs = cluster.addService(HDFS, repositoryVersion);
@@ -278,14 +295,16 @@ public class RecoveryConfigHelperTest {
// Get the recovery configuration
RecoveryConfig recoveryConfig = recoveryConfigHelper.getRecoveryConfig(cluster.getClusterName(), DummyHostname1);
- assertEquals(recoveryConfig.getEnabledComponents(), "DATANODE");
+ assertEquals(Lists.newArrayList(
+ new RecoveryConfigComponent(DATANODE, HDFS, State.INIT)
+ ), recoveryConfig.getEnabledComponents());
// Turn off auto start for HDFS::DATANODE
hdfs.getServiceComponent(DATANODE).setRecoveryEnabled(false);
// Get the latest config. DATANODE should not be present.
recoveryConfig = recoveryConfigHelper.getRecoveryConfig(cluster.getClusterName(), DummyHostname1);
- assertEquals(recoveryConfig.getEnabledComponents(), "");
+ assertEquals(new ArrayList<RecoveryConfigComponent>(), recoveryConfig.getEnabledComponents());
}
/**
@@ -319,7 +338,9 @@ public class RecoveryConfigHelperTest {
// Simulate registration for Host1: Get the recovery configuration right away for Host1.
// It makes an entry for cluster name and Host1 in the timestamp dictionary.
RecoveryConfig recoveryConfig = recoveryConfigHelper.getRecoveryConfig(cluster.getClusterName(), "Host1");
- assertEquals(recoveryConfig.getEnabledComponents(), "DATANODE");
+ assertEquals(Lists.newArrayList(
+ new RecoveryConfigComponent(DATANODE, HDFS, State.INIT)
+ ), recoveryConfig.getEnabledComponents());
// Simulate heartbeat for Host2: When second host heartbeats, it first checks if config stale.
// This should return true since it did not get the configuration during registration.
--
To stop receiving notification emails like this one, please contact
hapylestat@apache.org.