You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ao...@apache.org on 2017/06/20 12:26:36 UTC
ambari git commit: AMBARI-21270. Ability to auto-start component
without server intervention (aonishuk)
Repository: ambari
Updated Branches:
refs/heads/branch-3.0-perf 11f16c83a -> 670a08eed
AMBARI-21270. Ability to auto-start component without server intervention (aonishuk)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/670a08ee
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/670a08ee
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/670a08ee
Branch: refs/heads/branch-3.0-perf
Commit: 670a08eed3c8b45b58ab729c8592d6f46af3d4f2
Parents: 11f16c8
Author: Andrew Onishuk <ao...@hortonworks.com>
Authored: Tue Jun 20 15:25:47 2017 +0300
Committer: Andrew Onishuk <ao...@hortonworks.com>
Committed: Tue Jun 20 15:25:47 2017 +0300
----------------------------------------------------------------------
.../src/main/python/ambari_agent/ActionQueue.py | 146 ++-------
.../python/ambari_agent/CommandStatusDict.py | 2 +-
.../ambari_agent/ComponentStatusExecutor.py | 5 +-
.../main/python/ambari_agent/HeartbeatThread.py | 2 +-
.../python/ambari_agent/InitializerModule.py | 4 +-
.../main/python/ambari_agent/RecoveryManager.py | 303 +++++++------------
.../listeners/MetadataEventListener.py | 17 +-
.../ambari_agent/TestAgentStompResponses.py | 23 +-
.../dummy_files/stomp/execution_commands.json | 10 +-
.../stomp/metadata_after_registration.json | 12 +-
10 files changed, 176 insertions(+), 348 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/670a08ee/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
index 5632b5b..a470697 100644
--- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
+++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
@@ -76,6 +76,7 @@ class ActionQueue(threading.Thread):
self.backgroundCommandQueue = Queue.Queue()
self.commandStatuses = initializer_module.commandStatuses
self.config = initializer_module.config
+ self.recovery_manager = initializer_module.recovery_manager
self.configTags = {}
self.stop_event = initializer_module.stop_event
self.tmpdir = self.config.get('agent', 'prefix')
@@ -91,9 +92,6 @@ class ActionQueue(threading.Thread):
command['serviceName'] = "null"
if command.has_key('clusterId'):
command['clusterId'] = "null"
- if not command.has_key('clusterName'):
- command['clusterName'] = 'null'
-
logger.info("Adding " + command['commandType'] + " for role " + \
command['role'] + " for service " + \
@@ -134,6 +132,7 @@ class ActionQueue(threading.Thread):
try:
while not self.stop_event.is_set():
self.processBackgroundQueueSafeEmpty()
+ self.fillRecoveryCommands()
try:
if self.parallel_execution == 0:
command = self.commandQueue.get(True, self.EXECUTION_COMMAND_WAIT_TIME)
@@ -168,6 +167,10 @@ class ActionQueue(threading.Thread):
logger.info("ActionQueue thread has successfully finished")
+ def fillRecoveryCommands(self):
+ if not self.tasks_in_progress_or_pending():
+ self.put(self.recovery_manager.get_recovery_commands())
+
def processBackgroundQueueSafeEmpty(self):
while not self.backgroundCommandQueue.empty():
try:
@@ -190,40 +193,34 @@ class ActionQueue(threading.Thread):
try:
if commandType in [self.EXECUTION_COMMAND, self.BACKGROUND_EXECUTION_COMMAND, self.AUTO_EXECUTION_COMMAND]:
try:
- # TODO STOMP: fix recovery manager for execution commands
- #if self.controller.recovery_manager.enabled():
- # self.controller.recovery_manager.start_execution_command()
+ if self.recovery_manager.enabled():
+ self.recovery_manager.on_execution_command_start()
+ self.recovery_manager.process_execution_command(command)
+
self.execute_command(command)
finally:
- pass
- #if self.controller.recovery_manager.enabled():
- # self.controller.recovery_manager.stop_execution_command()
+ if self.recovery_manager.enabled():
+ self.recovery_manager.on_execution_command_finish()
else:
logger.error("Unrecognized command " + pprint.pformat(command))
except Exception:
logger.exception("Exception while processing {0} command".format(commandType))
def tasks_in_progress_or_pending(self):
- return_val = False
- if not self.commandQueue.empty():
- return_val = True
- if self.controller.recovery_manager.has_active_command():
- return_val = True
- return return_val
- pass
+ return not self.commandQueue.empty() or self.recovery_manager.has_active_command()
def execute_command(self, command):
'''
Executes commands of type EXECUTION_COMMAND
'''
- clusterName = command['clusterName']
+ clusterId = command['clusterId']
commandId = command['commandId']
isCommandBackground = command['commandType'] == self.BACKGROUND_EXECUTION_COMMAND
isAutoExecuteCommand = command['commandType'] == self.AUTO_EXECUTION_COMMAND
message = "Executing command with id = {commandId}, taskId = {taskId} for role = {role} of " \
- "cluster {cluster}.".format(
+ "cluster_id {cluster}.".format(
commandId = str(commandId), taskId = str(command['taskId']),
- role=command['role'], cluster=clusterName)
+ role=command['role'], cluster=clusterId)
logger.info(message)
taskId = command['taskId']
@@ -359,10 +356,10 @@ class ActionQueue(threading.Thread):
roleResult['stderr'] = 'None'
# let ambari know name of custom command
- """
- if command['hostLevelParams'].has_key('custom_command'):
- roleResult['customCommand'] = command['hostLevelParams']['custom_command']
- """
+
+ if command.has_key('custom_command'):
+ roleResult['customCommand'] = command['custom_command']
+
if 'structuredOut' in commandresult:
roleResult['structuredOut'] = str(json.dumps(commandresult['structuredOut']))
else:
@@ -370,30 +367,6 @@ class ActionQueue(threading.Thread):
# let recovery manager know the current state
if status == self.COMPLETED_STATUS:
- # TODO STOMP:fix recovery_manager
- """
- if self.controller.recovery_manager.enabled() and command.has_key('roleCommand') \
- and self.controller.recovery_manager.configured_for_recovery(command['role']):
- if command['roleCommand'] == self.ROLE_COMMAND_START:
- self.controller.recovery_manager.update_current_status(command['role'], LiveStatus.LIVE_STATUS)
- self.controller.recovery_manager.update_config_staleness(command['role'], False)
- logger.info("After EXECUTION_COMMAND (START), with taskId=" + str(command['taskId']) +
- ", current state of " + command['role'] + " to " +
- self.controller.recovery_manager.get_current_status(command['role']) )
- elif command['roleCommand'] == self.ROLE_COMMAND_STOP or command['roleCommand'] == self.ROLE_COMMAND_INSTALL:
- self.controller.recovery_manager.update_current_status(command['role'], LiveStatus.DEAD_STATUS)
- logger.info("After EXECUTION_COMMAND (STOP/INSTALL), with taskId=" + str(command['taskId']) +
- ", current state of " + command['role'] + " to " +
- self.controller.recovery_manager.get_current_status(command['role']) )
- elif command['roleCommand'] == self.ROLE_COMMAND_CUSTOM_COMMAND:
- if command['hostLevelParams'].has_key('custom_command') and \
- command['hostLevelParams']['custom_command'] == self.CUSTOM_COMMAND_RESTART:
- self.controller.recovery_manager.update_current_status(command['role'], LiveStatus.LIVE_STATUS)
- self.controller.recovery_manager.update_config_staleness(command['role'], False)
- logger.info("After EXECUTION_COMMAND (RESTART), current state of " + command['role'] + " to " +
- self.controller.recovery_manager.get_current_status(command['role']) )
- """
-
# let ambari know that configuration tags were applied
configHandler = ActualConfigHandler(self.config, self.configTags)
@@ -428,18 +401,8 @@ class ActionQueue(threading.Thread):
command['hostLevelParams']['clientsToUpdateConfigs'])
roleResult['configurationTags'] = configHandler.read_actual_component(
command['role'])
- elif status == self.FAILED_STATUS:
- # TODO STOMP: recovery manager
- """
- if self.controller.recovery_manager.enabled() and command.has_key('roleCommand') \
- and self.controller.recovery_manager.configured_for_recovery(command['role']):
- if command['roleCommand'] == self.ROLE_COMMAND_INSTALL:
- self.controller.recovery_manager.update_current_status(command['role'], self.controller.recovery_manager.INSTALL_FAILED)
- logger.info("After EXECUTION_COMMAND (INSTALL), with taskId=" + str(command['taskId']) +
- ", current state of " + command['role'] + " to " +
- self.controller.recovery_manager.get_current_status(command['role']))
- """
+ self.recovery_manager.process_execution_command_result(command, status)
self.commandStatuses.put_command_status(command, roleResult)
def log_command_output(self, text, taskId):
@@ -495,73 +458,6 @@ class ActionQueue(threading.Thread):
component_status_result = self.customServiceOrchestrator.requestComponentStatus(command)
return command, component_status_result
- def process_status_command_result(self, result):
- '''
- Executes commands of type STATUS_COMMAND
- '''
- # TODO STOMP: review if we need to run this with new status commands
- try:
- command, component_status_result = result
- cluster = command['clusterName']
- service = command['serviceName']
- component = command['componentName']
- configurations = command['configurations']
- if configurations.has_key('global'):
- globalConfig = configurations['global']
- else:
- globalConfig = {}
-
- # TODO STOMP: check why we need this
- if not Script.config :
- logger.debug('Setting Script.config to last status command configuration')
- Script.config = command
-
- livestatus = LiveStatus(cluster, service, component,
- globalConfig, self.config, self.configTags)
-
- component_extra = None
-
- if component_status_result['exitcode'] == 0:
- component_status = LiveStatus.LIVE_STATUS
- if self.controller.recovery_manager.enabled() \
- and self.controller.recovery_manager.configured_for_recovery(component):
- self.controller.recovery_manager.update_current_status(component, component_status)
- else:
- component_status = LiveStatus.DEAD_STATUS
- if self.controller.recovery_manager.enabled() \
- and self.controller.recovery_manager.configured_for_recovery(component):
- if (self.controller.recovery_manager.get_current_status(component) != self.controller.recovery_manager.INSTALL_FAILED):
- self.controller.recovery_manager.update_current_status(component, component_status)
-
- request_execution_cmd = self.controller.recovery_manager.requires_recovery(component) and \
- not self.controller.recovery_manager.command_exists(component, ActionQueue.EXECUTION_COMMAND)
-
- if 'structuredOut' in component_status_result:
- component_extra = component_status_result['structuredOut']
-
- result = livestatus.build(component_status=component_status)
- if self.controller.recovery_manager.enabled():
- result['sendExecCmdDet'] = str(request_execution_cmd)
-
- if component_extra is not None and len(component_extra) != 0:
- if component_extra.has_key('alerts'):
- result['alerts'] = component_extra['alerts']
- del component_extra['alerts']
-
- result['extra'] = component_extra
-
- logger.debug("Got live status for component " + component + \
- " of service " + str(service) + \
- " of cluster " + str(cluster))
-
- logger.debug(pprint.pformat(result))
- if result is not None:
- self.commandStatuses.put_command_status(command, result)
- except Exception, err:
- traceback.print_exc()
- logger.warn(err)
- pass
-
def status_update_callback(self):
"""
Actions that are executed every time when command status changes
http://git-wip-us.apache.org/repos/asf/ambari/blob/670a08ee/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py b/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py
index e27a243..133701f 100644
--- a/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py
+++ b/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py
@@ -136,7 +136,7 @@ class CommandStatusDict():
'role': command['role'],
'actionId': command['commandId'],
'taskId': command['taskId'],
- 'clusterName': command['clusterName'],
+ 'clusterId': command['clusterId'],
'serviceName': command['serviceName'],
'roleCommand': command['roleCommand']
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/670a08ee/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py b/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py
index 520c97d..2ac904f 100644
--- a/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py
+++ b/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py
@@ -22,6 +22,7 @@ import logging
import threading
from ambari_agent import Constants
+from ambari_agent.LiveStatus import LiveStatus
from collections import defaultdict
logger = logging.getLogger(__name__)
@@ -33,6 +34,7 @@ class ComponentStatusExecutor(threading.Thread):
self.topology_cache = initializer_module.topology_cache
self.customServiceOrchestrator = initializer_module.customServiceOrchestrator
self.stop_event = initializer_module.stop_event
+ self.recovery_manager = initializer_module.recovery_manager
self.reported_component_status = defaultdict(lambda:defaultdict(lambda:None)) # component statuses which were received by server
threading.Thread.__init__(self)
@@ -77,7 +79,7 @@ class ComponentStatusExecutor(threading.Thread):
component_status_result = self.customServiceOrchestrator.requestComponentStatus(command_dict)
logger.info(component_status_result)
- status = "STARTED" if component_status_result['exitcode'] == 0 else "INSTALLED"
+ status = LiveStatus.LIVE_STATUS if component_status_result['exitcode'] == 0 else LiveStatus.DEAD_STATUS
result = {
'serviceName': service_name,
@@ -90,6 +92,7 @@ class ComponentStatusExecutor(threading.Thread):
if status != self.reported_component_status[component_name][command_name]:
logging.info("Status for {0} has changed to {1}".format(component_name, status))
cluster_reports[cluster_id].append(result)
+ self.recovery_manager.handle_status_change(component_name, status)
self.send_updates_to_server(cluster_reports)
except:
http://git-wip-us.apache.org/repos/asf/ambari/blob/670a08ee/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py b/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
index 40e5b12..dbf4006 100644
--- a/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
+++ b/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
@@ -54,7 +54,7 @@ class HeartbeatThread(threading.Thread):
# listeners
self.server_responses_listener = ServerResponsesListener()
self.commands_events_listener = CommandsEventListener(initializer_module.action_queue)
- self.metadata_events_listener = MetadataEventListener(initializer_module.metadata_cache)
+ self.metadata_events_listener = MetadataEventListener(initializer_module.metadata_cache, initializer_module.recovery_manager)
self.topology_events_listener = TopologyEventListener(initializer_module.topology_cache)
self.configuration_events_listener = ConfigurationEventListener(initializer_module.configurations_cache)
self.listeners = [self.server_responses_listener, self.commands_events_listener, self.metadata_events_listener, self.topology_events_listener, self.configuration_events_listener]
http://git-wip-us.apache.org/repos/asf/ambari/blob/670a08ee/ambari-agent/src/main/python/ambari_agent/InitializerModule.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/InitializerModule.py b/ambari-agent/src/main/python/ambari_agent/InitializerModule.py
index f0c3b43..8de1fa5 100644
--- a/ambari-agent/src/main/python/ambari_agent/InitializerModule.py
+++ b/ambari-agent/src/main/python/ambari_agent/InitializerModule.py
@@ -31,7 +31,7 @@ from ambari_agent.security import AmbariStompConnection
from ambari_agent.ActionQueue import ActionQueue
from ambari_agent.CommandStatusDict import CommandStatusDict
from ambari_agent.CustomServiceOrchestrator import CustomServiceOrchestrator
-from ambari_agent.HostStatusReporter import HostStatusReporter
+from ambari_agent.RecoveryManager import RecoveryManager
logger = logging.getLogger()
@@ -58,6 +58,7 @@ class InitializerModule:
self.cache_dir = self.config.get('agent', 'cache_dir', default='/var/lib/ambari-agent/cache')
self.command_reports_interval = int(self.config.get('agent', 'command_reports_interval', default='5'))
self.cluster_cache_dir = os.path.join(self.cache_dir, FileCache.CLUSTER_CACHE_DIRECTORY)
+ self.recovery_cache_dir = os.path.join(self.cache_dir, FileCache.RECOVERY_CACHE_DIRECTORY)
self.host_status_report_interval = int(self.config.get('heartbeat', 'state_interval_seconds', '60'))
@@ -74,6 +75,7 @@ class InitializerModule:
self.configurations_cache = ClusterConfigurationCache(self.cluster_cache_dir)
self.customServiceOrchestrator = CustomServiceOrchestrator(self)
+ self.recovery_manager = RecoveryManager(self.recovery_cache_dir)
self.commandStatuses = CommandStatusDict(self)
self.action_queue = ActionQueue(self)
http://git-wip-us.apache.org/repos/asf/ambari/blob/670a08ee/ambari-agent/src/main/python/ambari_agent/RecoveryManager.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/RecoveryManager.py b/ambari-agent/src/main/python/ambari_agent/RecoveryManager.py
index be335f2..68dd0be 100644
--- a/ambari-agent/src/main/python/ambari_agent/RecoveryManager.py
+++ b/ambari-agent/src/main/python/ambari_agent/RecoveryManager.py
@@ -43,11 +43,12 @@ class RecoveryManager:
COMPONENT_NAME = "componentName"
ROLE = "role"
TASK_ID = "taskId"
+ CLUSTER_ID = "clusterId"
DESIRED_STATE = "desiredState"
HAS_STALE_CONFIG = "hasStaleConfigs"
EXECUTION_COMMAND_DETAILS = "executionCommandDetails"
ROLE_COMMAND = "roleCommand"
- HOST_LEVEL_PARAMS = "hostLevelParams"
+ COMMAND_ID = "commandId"
PAYLOAD_LEVEL_DEFAULT = "DEFAULT"
PAYLOAD_LEVEL_MINIMAL = "MINIMAL"
PAYLOAD_LEVEL_EXECUTION_COMMAND = "EXECUTION_COMMAND"
@@ -85,7 +86,6 @@ class RecoveryManager:
self.retry_gap = 5
self.max_lifetime_count = 12
- self.stored_exec_commands = {}
self.id = int(time.time())
self.allowed_desired_states = [self.STARTED, self.INSTALLED]
self.allowed_current_states = [self.INIT, self.INSTALLED]
@@ -98,6 +98,7 @@ class RecoveryManager:
self.active_command_count = 0
self.paused = False
self.recovery_timestamp = -1
+ self.cluster_id = None
if not os.path.exists(cache_dir):
try:
@@ -113,12 +114,12 @@ class RecoveryManager:
pass
- def start_execution_command(self):
+ def on_execution_command_start(self):
with self.__active_command_lock:
self.active_command_count += 1
pass
- def stop_execution_command(self):
+ def on_execution_command_finish(self):
with self.__active_command_lock:
self.active_command_count -= 1
pass
@@ -160,11 +161,18 @@ class RecoveryManager:
pass
self.statuses[component]["stale_config"] = is_config_stale
- if self.statuses[component]["current"] == self.statuses[component]["desired"] and \
- self.statuses[component]["stale_config"] == False:
- self.remove_command(component)
pass
+ def handle_status_change(self, component, component_status):
+ if not self.enabled() or not self.configured_for_recovery(component):
+ return
+
+ if component_status == LiveStatus.LIVE_STATUS:
+ self.update_current_status(component, component_status)
+ else:
+ if (self.get_current_status(component) != self.INSTALL_FAILED):
+ self.update_current_status(component, component_status)
+
def update_current_status(self, component, state):
"""
Updates the current status of a host component managed by the agent
@@ -184,9 +192,6 @@ class RecoveryManager:
if self.statuses[component]["current"] != state:
logger.info("current status is set to %s for %s", state, component)
self.statuses[component]["current"] = state
- if self.statuses[component]["current"] == self.statuses[component]["desired"] and \
- self.statuses[component]["stale_config"] == False:
- self.remove_command(component)
pass
@@ -209,9 +214,6 @@ class RecoveryManager:
if self.statuses[component]["desired"] != state:
logger.info("desired status is set to %s for %s", state, component)
self.statuses[component]["desired"] = state
- if self.statuses[component]["current"] == self.statuses[component]["desired"] and \
- self.statuses[component]["stale_config"] == False:
- self.remove_command(component)
pass
"""
@@ -354,9 +356,10 @@ class RecoveryManager:
if command:
self.execute(component)
+ logger.info("Created recovery command %s for component %s",
+ command[self.ROLE_COMMAND], command[self.ROLE])
commands.append(command)
return commands
- pass
def may_execute(self, action):
@@ -549,7 +552,7 @@ class RecoveryManager:
pass
- def update_configuration_from_registration(self, reg_resp):
+ def update_recovery_config(self, dictionary):
"""
TODO: Server sends the recovery configuration - call update_config after parsing
"recoveryConfig": {
@@ -573,9 +576,9 @@ class RecoveryManager:
recovery_timestamp = -1 # Default value if recoveryTimestamp is not available.
- if reg_resp and "recoveryConfig" in reg_resp:
- logger.info("RecoverConfig = " + pprint.pformat(reg_resp["recoveryConfig"]))
- config = reg_resp["recoveryConfig"]
+ if dictionary and "recoveryConfig" in dictionary:
+ logger.info("RecoverConfig = " + pprint.pformat(dictionary["recoveryConfig"]))
+ config = dictionary["recoveryConfig"]
if "type" in config:
if config["type"] in ["AUTO_INSTALL_START", "AUTO_START", "FULL"]:
recovery_enabled = True
@@ -681,209 +684,113 @@ class RecoveryManager:
def get_unique_task_id(self):
self.id += 1
return self.id
- pass
-
- def process_status_commands(self, commands):
+ def process_execution_command_result(self, command, status):
+ """
+ Update current status for the components depending on command and its status.
+ """
if not self.enabled():
return
-
- if commands and len(commands) > 0:
- for command in commands:
- self.store_or_update_command(command)
- if self.EXECUTION_COMMAND_DETAILS in command:
- logger.debug("Details to construct exec commands: " + pprint.pformat(command[self.EXECUTION_COMMAND_DETAILS]))
-
- pass
-
-
- def process_execution_commands(self, commands):
- if not self.enabled():
+
+ if not command.has_key(self.ROLE_COMMAND) or not self.configured_for_recovery(command['role']):
return
-
- if commands and len(commands) > 0:
- for command in commands:
- if self.COMMAND_TYPE in command and command[self.COMMAND_TYPE] == ActionQueue.EXECUTION_COMMAND:
- if self.ROLE in command:
- if command[self.ROLE_COMMAND] in (ActionQueue.ROLE_COMMAND_INSTALL, ActionQueue.ROLE_COMMAND_STOP) \
- and self.configured_for_recovery(command[self.ROLE]):
- self.update_desired_status(command[self.ROLE], LiveStatus.DEAD_STATUS)
- logger.info("Received EXECUTION_COMMAND (STOP/INSTALL), desired state of " + command[self.ROLE] + " to " +
- self.get_desired_status(command[self.ROLE]) )
- elif command[self.ROLE_COMMAND] == ActionQueue.ROLE_COMMAND_START \
- and self.configured_for_recovery(command[self.ROLE]):
- self.update_desired_status(command[self.ROLE], LiveStatus.LIVE_STATUS)
- logger.info("Received EXECUTION_COMMAND (START), desired state of " + command[self.ROLE] + " to " +
- self.get_desired_status(command[self.ROLE]) )
- elif command[self.HOST_LEVEL_PARAMS].has_key('custom_command') and \
- command[self.HOST_LEVEL_PARAMS]['custom_command'] == ActionQueue.CUSTOM_COMMAND_RESTART \
- and self.configured_for_recovery(command[self.ROLE]):
- self.update_desired_status(command[self.ROLE], LiveStatus.LIVE_STATUS)
- logger.info("Received EXECUTION_COMMAND (RESTART), desired state of " + command[self.ROLE] + " to " +
- self.get_desired_status(command[self.ROLE]) )
-
- pass
-
-
- def store_or_update_command(self, command):
- """
- Stores command details by reading them from the STATUS_COMMAND
- Update desired state as well
+
+ if status == ActionQueue.COMPLETED_STATUS:
+ if command[self.ROLE_COMMAND] == self.ROLE_COMMAND_START:
+ self.update_current_status(command[self.ROLE], LiveStatus.LIVE_STATUS)
+ #self.update_config_staleness(command['role'], False)
+ logger.info("After EXECUTION_COMMAND (START), with taskId=" + str(command['taskId']) +
+ ", current state of " + command[self.ROLE] + " to " +
+ self.get_current_status(command[self.ROLE]) )
+ elif command['roleCommand'] == ActionQueue.ROLE_COMMAND_STOP or command[self.ROLE_COMMAND] == ActionQueue.ROLE_COMMAND_INSTALL:
+ self.update_current_status(command[self.ROLE], LiveStatus.DEAD_STATUS)
+ logger.info("After EXECUTION_COMMAND (STOP/INSTALL), with taskId=" + str(command['taskId']) +
+ ", current state of " + command[self.ROLE] + " to " +
+ self.get_current_status(command[self.ROLE]) )
+ elif command[self.ROLE_COMMAND] == ActionQueue.ROLE_COMMAND_CUSTOM_COMMAND:
+ if command.has_key('custom_command') and command['custom_command'] == ActionQueue.CUSTOM_COMMAND_RESTART:
+ self.update_current_status(command['role'], LiveStatus.LIVE_STATUS)
+ #self.update_config_staleness(command['role'], False)
+ logger.info("After EXECUTION_COMMAND (RESTART), current state of " + command[self.ROLE] + " to " +
+ self.get_current_status(command[self.ROLE]) )
+ elif status == ActionQueue.FAILED_STATUS:
+ if command[self.ROLE_COMMAND] == ActionQueue.ROLE_COMMAND_INSTALL:
+ self.update_current_status(command[self.ROLE], self.INSTALL_FAILED)
+ logger.info("After EXECUTION_COMMAND (INSTALL), with taskId=" + str(command['taskId']) +
+ ", current state of " + command[self.ROLE] + " to " +
+ self.get_current_status(command[self.ROLE]))
+
+ def process_execution_command(self, command):
+ """
+ Change desired state of the component depending on the execution command triggered.
"""
if not self.enabled():
return
+
+ if not self.COMMAND_TYPE in command or not command[self.COMMAND_TYPE] == ActionQueue.EXECUTION_COMMAND:
+ return
+
+ if not self.ROLE in command:
+ return
- logger.debug("Inspecting command to store/update details")
- if self.COMMAND_TYPE in command and command[self.COMMAND_TYPE] == ActionQueue.STATUS_COMMAND:
- payloadLevel = self.PAYLOAD_LEVEL_DEFAULT
- if self.PAYLOAD_LEVEL in command:
- payloadLevel = command[self.PAYLOAD_LEVEL]
-
- component = command[self.COMPONENT_NAME]
- self.update_desired_status(component, command[self.DESIRED_STATE])
- self.update_config_staleness(component, command[self.HAS_STALE_CONFIG])
-
- if payloadLevel == self.PAYLOAD_LEVEL_EXECUTION_COMMAND:
- if self.EXECUTION_COMMAND_DETAILS in command:
- # Store the execution command details
- self.remove_command(component)
- self.add_command(component, command[self.EXECUTION_COMMAND_DETAILS])
- logger.debug("Stored command details for " + component)
- else:
- logger.warn("Expected field " + self.EXECUTION_COMMAND_DETAILS + " unavailable.")
- pass
- pass
-
-
- def get_install_command(self, component):
- if self.paused:
- logger.info("Recovery is paused, likely tasks waiting in pipeline for this host.")
- return None
-
- if self.enabled():
- logger.debug("Using stored INSTALL command for %s", component)
- if self.command_exists(component, ActionQueue.EXECUTION_COMMAND):
- command = copy.deepcopy(self.stored_exec_commands[component])
- command[self.ROLE_COMMAND] = "INSTALL"
- command[self.COMMAND_TYPE] = ActionQueue.AUTO_EXECUTION_COMMAND
- command[self.TASK_ID] = self.get_unique_task_id()
- return command
- else:
- logger.info("INSTALL command cannot be computed as details are not received from Server.")
- else:
- logger.info("Recovery is not enabled. INSTALL command will not be computed.")
- return None
- pass
-
- def get_stop_command(self, component):
- if self.paused:
- logger.info("Recovery is paused, likely tasks waiting in pipeline for this host.")
- return None
-
- if self.enabled():
- logger.debug("Using stored STOP command for %s", component)
- if self.command_exists(component, ActionQueue.EXECUTION_COMMAND):
- command = copy.deepcopy(self.stored_exec_commands[component])
- command[self.ROLE_COMMAND] = "STOP"
- command[self.COMMAND_TYPE] = ActionQueue.AUTO_EXECUTION_COMMAND
- command[self.TASK_ID] = self.get_unique_task_id()
- return command
- else:
- logger.info("STOP command cannot be computed as details are not received from Server.")
- else:
- logger.info("Recovery is not enabled. STOP command will not be computed.")
- return None
- pass
-
- def get_restart_command(self, component):
- if self.paused:
- logger.info("Recovery is paused, likely tasks waiting in pipeline for this host.")
- return None
-
- if self.enabled():
- logger.debug("Using stored INSTALL command for %s", component)
- if self.command_exists(component, ActionQueue.EXECUTION_COMMAND):
- command = copy.deepcopy(self.stored_exec_commands[component])
- command[self.ROLE_COMMAND] = "CUSTOM_COMMAND"
- command[self.COMMAND_TYPE] = ActionQueue.AUTO_EXECUTION_COMMAND
- command[self.TASK_ID] = self.get_unique_task_id()
- command[self.HOST_LEVEL_PARAMS]['custom_command'] = 'RESTART'
- return command
- else:
- logger.info("RESTART command cannot be computed as details are not received from Server.")
- else:
- logger.info("Recovery is not enabled. RESTART command will not be computed.")
- return None
- pass
-
-
- def get_start_command(self, component):
+ if command[self.ROLE_COMMAND] in (ActionQueue.ROLE_COMMAND_INSTALL, ActionQueue.ROLE_COMMAND_STOP) \
+ and self.configured_for_recovery(command[self.ROLE]):
+ self.update_desired_status(command[self.ROLE], LiveStatus.DEAD_STATUS)
+ logger.info("Received EXECUTION_COMMAND (STOP/INSTALL), desired state of " + command[self.ROLE] + " to " +
+ self.get_desired_status(command[self.ROLE]) )
+ elif command[self.ROLE_COMMAND] == ActionQueue.ROLE_COMMAND_START \
+ and self.configured_for_recovery(command[self.ROLE]):
+ self.update_desired_status(command[self.ROLE], LiveStatus.LIVE_STATUS)
+ logger.info("Received EXECUTION_COMMAND (START), desired state of " + command[self.ROLE] + " to " +
+ self.get_desired_status(command[self.ROLE]) )
+ elif command.has_key('custom_command') and \
+ command['custom_command'] == ActionQueue.CUSTOM_COMMAND_RESTART \
+ and self.configured_for_recovery(command[self.ROLE]):
+ self.update_desired_status(command[self.ROLE], LiveStatus.LIVE_STATUS)
+ logger.info("Received EXECUTION_COMMAND (RESTART), desired state of " + command[self.ROLE] + " to " +
+ self.get_desired_status(command[self.ROLE]) )
+
+ def get_command(self, component, command_name):
+ """
+ Get command dictionary by component name and command_name
+ """
if self.paused:
logger.info("Recovery is paused, likely tasks waiting in pipeline for this host.")
return None
if self.enabled():
- logger.debug("Using stored START command for %s", component)
- if self.command_exists(component, ActionQueue.EXECUTION_COMMAND):
- command = copy.deepcopy(self.stored_exec_commands[component])
- command[self.ROLE_COMMAND] = "START"
- command[self.COMMAND_TYPE] = ActionQueue.AUTO_EXECUTION_COMMAND
- command[self.TASK_ID] = self.get_unique_task_id()
- return command
- else:
- logger.info("START command cannot be computed as details are not received from Server.")
+ command_id = self.get_unique_task_id()
+ command = {
+ self.CLUSTER_ID: self.cluster_id,
+ self.ROLE_COMMAND: command_name,
+ self.COMMAND_TYPE: ActionQueue.AUTO_EXECUTION_COMMAND,
+ self.TASK_ID: command_id,
+ self.ROLE: component,
+ self.COMMAND_ID: command_id
+ }
+ return command
else:
logger.info("Recovery is not enabled. START command will not be computed.")
return None
- pass
-
-
- def command_exists(self, component, command_type):
- if command_type == ActionQueue.EXECUTION_COMMAND:
- self.remove_stale_command(component)
- if component in self.stored_exec_commands:
- return True
-
- return False
- pass
+ def get_restart_command(self, component):
+ command = self.get_command(component, "CUSTOM_COMMAND")
- def remove_stale_command(self, component):
- component_update_key = self.COMPONENT_UPDATE_KEY_FORMAT.format(component)
- if component in self.stored_exec_commands:
- insert_time = self.stored_exec_commands[component_update_key]
- age = self._now_() - insert_time
- if self.COMMAND_REFRESH_DELAY_SEC < age:
- logger.debug("Removing stored command for component : " + str(component) + " as its " + str(age) + " sec old")
- self.remove_command(component)
- pass
+ if command is not None:
+ command[self.ROLE_COMMAND] = "CUSTOM_COMMAND"
+ command['custom_command'] = 'RESTART'
+ return command
- def remove_command(self, component):
- if component in self.stored_exec_commands:
- self.__status_lock.acquire()
- try:
- component_update_key = self.COMPONENT_UPDATE_KEY_FORMAT.format(component)
- del self.stored_exec_commands[component]
- del self.stored_exec_commands[component_update_key]
- logger.debug("Removed stored command for component : " + str(component))
- return True
- finally:
- self.__status_lock.release()
- return False
-
+ def get_install_command(self, component):
+ return self.get_command(component, "INSTALL")
- def add_command(self, component, command):
- self.__status_lock.acquire()
- try:
- component_update_key = self.COMPONENT_UPDATE_KEY_FORMAT.format(component)
- self.stored_exec_commands[component] = command
- self.stored_exec_commands[component_update_key] = self._now_()
- logger.debug("Added command for component : " + str(component))
- finally:
- self.__status_lock.release()
+ def get_stop_command(self, component):
+ return self.get_command(component, "STOP")
+ def get_start_command(self, component):
+ return self.get_command(component, "START")
def _read_int_(self, value, default_value=0):
int_value = default_value
http://git-wip-us.apache.org/repos/asf/ambari/blob/670a08ee/ambari-agent/src/main/python/ambari_agent/listeners/MetadataEventListener.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/listeners/MetadataEventListener.py b/ambari-agent/src/main/python/ambari_agent/listeners/MetadataEventListener.py
index 788d381..364d8af 100644
--- a/ambari-agent/src/main/python/ambari_agent/listeners/MetadataEventListener.py
+++ b/ambari-agent/src/main/python/ambari_agent/listeners/MetadataEventListener.py
@@ -32,8 +32,9 @@ class MetadataEventListener(EventListener):
"""
Listener of Constants.METADATA_TOPIC events from server.
"""
- def __init__(self, configuration_cache):
- self.topology_cache = configuration_cache
+ def __init__(self, metadata_cache, recovery_manager):
+ self.metadata_cache = metadata_cache
+ self.recovery_manager = recovery_manager
def on_event(self, headers, message):
"""
@@ -46,8 +47,16 @@ class MetadataEventListener(EventListener):
if message == {}:
return
- self.topology_cache.rewrite_cache(message['clusters'])
- self.topology_cache.hash = message['hash']
+ self.metadata_cache.rewrite_cache(message['clusters'])
+ self.metadata_cache.hash = message['hash']
+
+ # FIXME: Recovery manager does not support multiple cluster as of now.
+ cluster_id = message['clusters'].keys()[0]
+
+ if 'recoveryConfig' in message['clusters'][cluster_id]:
+ logging.info("Updating recoveryConfig from metadata")
+ self.recovery_manager.update_recovery_config(self.metadata_cache[cluster_id])
+ self.recovery_manager.cluster_id = cluster_id
def get_handled_path(self):
return Constants.METADATA_TOPIC
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/670a08ee/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py b/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py
index f53097f..c41f87e 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py
@@ -104,12 +104,14 @@ class TestAgentStompResponses(BaseStompServerTestCase):
metadata_subscribe_frame = self.server.frames_queue.get()
topologies_subscribe_frame = self.server.frames_queue.get()
heartbeat_frame = self.server.frames_queue.get()
- dn_start_in_progress_frame = json.loads(self.server.frames_queue.get().body)
- dn_start_failed_frame = json.loads(self.server.frames_queue.get().body)
- zk_start_in_progress_frame = json.loads(self.server.frames_queue.get().body)
- zk_start_failed_frame = json.loads(self.server.frames_queue.get().body)
+ dn_install_in_progress_frame = json.loads(self.server.frames_queue.get().body)
+ dn_install_failed_frame = json.loads(self.server.frames_queue.get().body)
+ zk_install_in_progress_frame = json.loads(self.server.frames_queue.get().body)
+ zk_install_failed_frame = json.loads(self.server.frames_queue.get().body)
action_status_in_progress_frame = json.loads(self.server.frames_queue.get().body)
action_status_failed_frame = json.loads(self.server.frames_queue.get().body)
+ dn_recovery_in_progress_frame = json.loads(self.server.frames_queue.get().body)
+ dn_recovery_failed_frame = json.loads(self.server.frames_queue.get().body)
host_status_report = json.loads(self.server.frames_queue.get().body)
initializer_module.stop_event.set()
@@ -129,10 +131,13 @@ class TestAgentStompResponses(BaseStompServerTestCase):
self.assertEquals(initializer_module.topology_cache['0']['hosts'][0]['hostName'], 'c6401.ambari.apache.org')
self.assertEquals(initializer_module.metadata_cache['0']['status_commands_to_run'], ('STATUS',))
self.assertEquals(initializer_module.configurations_cache['0']['configurations']['zoo.cfg']['clientPort'], '2181')
- self.assertEquals(dn_start_in_progress_frame[0]['roleCommand'], 'START')
- self.assertEquals(dn_start_in_progress_frame[0]['role'], 'DATANODE')
- self.assertEquals(dn_start_in_progress_frame[0]['status'], 'IN_PROGRESS')
- self.assertEquals(dn_start_failed_frame[0]['status'], 'FAILED')
+ self.assertEquals(dn_install_in_progress_frame[0]['roleCommand'], 'INSTALL')
+ self.assertEquals(dn_install_in_progress_frame[0]['role'], 'DATANODE')
+ self.assertEquals(dn_install_in_progress_frame[0]['status'], 'IN_PROGRESS')
+ self.assertEquals(dn_install_failed_frame[0]['status'], 'FAILED')
+ self.assertEquals(dn_recovery_in_progress_frame[0]['roleCommand'], 'INSTALL')
+ self.assertEquals(dn_recovery_in_progress_frame[0]['role'], 'DATANODE')
+ self.assertEquals(dn_recovery_in_progress_frame[0]['status'], 'IN_PROGRESS')
#============================================================================================
#============================================================================================
@@ -253,7 +258,7 @@ class TestAgentStompResponses(BaseStompServerTestCase):
self.assertEquals(json_topology, json_excepted_lopology)
#self.assertEquals(initializer_module.topology_cache, self.get_dict_from_file("topology_cache_expected.json"))
- self.assert_with_retries(is_json_equal, tries=40, try_sleep=0.1)
+ self.assert_with_retries(is_json_equal, tries=80, try_sleep=0.1)
initializer_module.stop_event.set()
http://git-wip-us.apache.org/repos/asf/ambari/blob/670a08ee/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/execution_commands.json
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/execution_commands.json b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/execution_commands.json
index 6e84319..075699e 100644
--- a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/execution_commands.json
+++ b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/execution_commands.json
@@ -9,8 +9,7 @@
"serviceName":"HDFS",
"role":"DATANODE",
"commandType":"EXECUTION_COMMAND",
- "roleCommand":"START",
- "clusterName": "c1",
+ "roleCommand":"INSTALL",
"clusterId": "0",
"configuration_credentials":{
@@ -19,7 +18,7 @@
"service_package_folder":"common-services/HDFS/2.1.0.2.0/package",
"hooks_folder":"HDP/2.0.6/hooks",
"script":"scripts/datanode.py",
- "phase":"INITIAL_START",
+ "phase":"INITIAL_INSTALL",
"max_duration_for_retries":"600",
"command_retry_enabled":"false",
"command_timeout":"1200",
@@ -35,8 +34,7 @@
"serviceName":"ZOOKEEPER",
"role":"ZOOKEEPER_SERVER",
"commandType":"EXECUTION_COMMAND",
- "roleCommand":"START",
- "clusterName": "c1",
+ "roleCommand":"INSTALL",
"configuration_credentials":{
},
@@ -44,7 +42,7 @@
"service_package_folder":"common-services/ZOOKEEPER/3.4.5/package",
"hooks_folder":"HDP/2.0.6/hooks",
"script":"scripts/datanode.py",
- "phase":"INITIAL_START",
+ "phase":"INITIAL_INSTALL",
"max_duration_for_retries":"600",
"command_retry_enabled":"false",
"command_timeout":"1200",
http://git-wip-us.apache.org/repos/asf/ambari/blob/670a08ee/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/metadata_after_registration.json
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/metadata_after_registration.json b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/metadata_after_registration.json
index f60b49a..6462ccf 100644
--- a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/metadata_after_registration.json
+++ b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/metadata_after_registration.json
@@ -25,7 +25,8 @@
"ambari_db_rca_driver": "org.postgresql.Driver",
"java_home": "/usr/jdk64/jdk1.8.0_112",
"user_list": "[\"zookeeper\",\"ambari-qa\",\"hdfs\",\"yarn\",\"mapred\"]",
- "hooks_folder": "HDP/2.0.6/hooks"
+ "hooks_folder": "HDP/2.0.6/hooks",
+ "cluster_name": "cl1"
},
"serviceLevelParams": {
"HDFS": {
@@ -37,7 +38,14 @@
},
"status_commands_to_run": [
"STATUS"
- ]
+ ],
+ "recoveryConfig": {
+ "type" : "AUTO_INSTALL_START",
+ "maxCount" : 10,
+ "windowInMinutes" : 60,
+ "components" : "NAMENODE,DATANODE",
+ "recoveryTimestamp" : 1458150424380
+ }
}
}
}
\ No newline at end of file