You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by sm...@apache.org on 2015/04/15 00:32:32 UTC
[2/2] ambari git commit: AMBARI-10029. Node auto-recovery (phase-I)
AMBARI-10029. Node auto-recovery (phase-I)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/dcbf12ef
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/dcbf12ef
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/dcbf12ef
Branch: refs/heads/trunk
Commit: dcbf12ef92b3922d13e5fb00bc5551fd927cbf08
Parents: a58352b
Author: Sumit Mohanty <sm...@hortonworks.com>
Authored: Tue Apr 14 15:28:02 2015 -0700
Committer: Sumit Mohanty <sm...@hortonworks.com>
Committed: Tue Apr 14 15:28:16 2015 -0700
----------------------------------------------------------------------
.../src/main/python/ambari_agent/ActionQueue.py | 38 +-
.../src/main/python/ambari_agent/Controller.py | 23 +-
.../ambari_agent/CustomServiceOrchestrator.py | 8 +-
.../src/main/python/ambari_agent/DataCleaner.py | 5 +-
.../src/main/python/ambari_agent/Heartbeat.py | 3 +
.../main/python/ambari_agent/RecoveryManager.py | 563 +++++++++++++++++++
.../test/python/ambari_agent/TestActionQueue.py | 3 +
.../test/python/ambari_agent/TestController.py | 106 ++++
.../python/ambari_agent/TestRecoveryManager.py | 430 ++++++++++++++
.../actionmanager/ExecutionCommandWrapper.java | 6 +-
.../ambari/server/agent/AgentRequests.java | 85 +++
.../server/agent/ComponentRecoveryReport.java | 67 +++
.../ambari/server/agent/ComponentStatus.java | 9 +
.../apache/ambari/server/agent/HeartBeat.java | 10 +
.../ambari/server/agent/HeartBeatHandler.java | 24 +-
.../ambari/server/agent/HeartbeatMonitor.java | 33 +-
.../ambari/server/agent/RecoveryConfig.java | 113 ++++
.../ambari/server/agent/RecoveryReport.java | 67 +++
.../server/agent/RegistrationResponse.java | 13 +-
.../ambari/server/agent/StatusCommand.java | 47 +-
.../server/configuration/Configuration.java | 68 ++-
.../controller/AmbariManagementController.java | 10 +
.../AmbariManagementControllerImpl.java | 106 +++-
.../ambari/server/controller/HostResponse.java | 47 +-
.../internal/HostResourceProvider.java | 10 +-
.../controller/internal/URLStreamProvider.java | 4 +-
.../org/apache/ambari/server/state/Host.java | 12 +
.../ambari/server/state/host/HostImpl.java | 24 +
.../src/main/resources/properties.json | 2 +
.../server/agent/TestHeartbeatHandler.java | 137 ++++-
.../AmbariManagementControllerTest.java | 45 ++
.../internal/HostResourceProviderTest.java | 112 +++-
32 files changed, 2166 insertions(+), 64 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/dcbf12ef/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
index 0979d79..c4e2c33 100644
--- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
+++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
@@ -53,6 +53,7 @@ class ActionQueue(threading.Thread):
STATUS_COMMAND = 'STATUS_COMMAND'
EXECUTION_COMMAND = 'EXECUTION_COMMAND'
+ AUTO_EXECUTION_COMMAND = 'AUTO_EXECUTION_COMMAND'
BACKGROUND_EXECUTION_COMMAND = 'BACKGROUND_EXECUTION_COMMAND'
ROLE_COMMAND_INSTALL = 'INSTALL'
ROLE_COMMAND_START = 'START'
@@ -91,8 +92,8 @@ class ActionQueue(threading.Thread):
for command in commands:
logger.info("Adding " + command['commandType'] + " for service " + \
- command['serviceName'] + " of cluster " + \
- command['clusterName'] + " to the queue.")
+ command['serviceName'] + " of cluster " + \
+ command['clusterName'] + " to the queue.")
self.statusCommandQueue.put(command)
def put(self, commands):
@@ -102,7 +103,8 @@ class ActionQueue(threading.Thread):
if not command.has_key('clusterName'):
command['clusterName'] = 'null'
- logger.info("Adding " + command['commandType'] + " for service " + \
+ logger.info("Adding " + command['commandType'] + " for role " + \
+ command['role'] + " for service " + \
command['serviceName'] + " of cluster " + \
command['clusterName'] + " to the queue.")
if command['commandType'] == self.BACKGROUND_EXECUTION_COMMAND :
@@ -174,7 +176,7 @@ class ActionQueue(threading.Thread):
commandType = command['commandType']
logger.debug("Took an element of Queue (command type = %s)." % commandType)
try:
- if commandType in [self.EXECUTION_COMMAND, self.BACKGROUND_EXECUTION_COMMAND]:
+ if commandType in [self.EXECUTION_COMMAND, self.BACKGROUND_EXECUTION_COMMAND, self.AUTO_EXECUTION_COMMAND]:
self.execute_command(command)
elif commandType == self.STATUS_COMMAND:
self.execute_status_command(command)
@@ -192,6 +194,7 @@ class ActionQueue(threading.Thread):
clusterName = command['clusterName']
commandId = command['commandId']
isCommandBackground = command['commandType'] == self.BACKGROUND_EXECUTION_COMMAND
+ isAutoExecuteCommand = command['commandType'] == self.AUTO_EXECUTION_COMMAND
message = "Executing command with id = {commandId} for role = {role} of " \
"cluster {cluster}.".format(
commandId = str(commandId), role=command['role'],
@@ -203,12 +206,21 @@ class ActionQueue(threading.Thread):
in_progress_status = self.commandStatuses.generate_report_template(command)
# The path of the files that contain the output log and error log use a prefix that the agent advertises to the
# server. The prefix is defined in agent-config.ini
- in_progress_status.update({
- 'tmpout': self.tmpdir + os.sep + 'output-' + str(taskId) + '.txt',
- 'tmperr': self.tmpdir + os.sep + 'errors-' + str(taskId) + '.txt',
- 'structuredOut' : self.tmpdir + os.sep + 'structured-out-' + str(taskId) + '.json',
- 'status': self.IN_PROGRESS_STATUS
- })
+ if not isAutoExecuteCommand:
+ in_progress_status.update({
+ 'tmpout': self.tmpdir + os.sep + 'output-' + str(taskId) + '.txt',
+ 'tmperr': self.tmpdir + os.sep + 'errors-' + str(taskId) + '.txt',
+ 'structuredOut' : self.tmpdir + os.sep + 'structured-out-' + str(taskId) + '.json',
+ 'status': self.IN_PROGRESS_STATUS
+ })
+ else:
+ in_progress_status.update({
+ 'tmpout': self.tmpdir + os.sep + 'auto_output-' + str(taskId) + '.txt',
+ 'tmperr': self.tmpdir + os.sep + 'auto_errors-' + str(taskId) + '.txt',
+ 'structuredOut' : self.tmpdir + os.sep + 'auto_structured-out-' + str(taskId) + '.json',
+ 'status': self.IN_PROGRESS_STATUS
+ })
+
self.commandStatuses.put_command_status(command, in_progress_status)
# running command
@@ -322,6 +334,7 @@ class ActionQueue(threading.Thread):
globalConfig, self.config, self.configTags)
component_extra = None
+ request_execution_cmd = False
# For custom services, responsibility to determine service status is
# delegated to python scripts
@@ -330,13 +343,18 @@ class ActionQueue(threading.Thread):
if component_status_result['exitcode'] == 0:
component_status = LiveStatus.LIVE_STATUS
+ self.controller.recovery_manager.update_current_status(component, component_status)
else:
component_status = LiveStatus.DEAD_STATUS
+ self.controller.recovery_manager.update_current_status(component, component_status)
+ request_execution_cmd = self.controller.recovery_manager.requires_recovery(component)
if component_status_result.has_key('structuredOut'):
component_extra = component_status_result['structuredOut']
result = livestatus.build(forsed_component_status= component_status)
+ if self.controller.recovery_manager.enabled():
+ result['sendExecCmdDet'] = str(request_execution_cmd)
# Add security state to the result
result['securityState'] = component_security_status_result
http://git-wip-us.apache.org/repos/asf/ambari/blob/dcbf12ef/ambari-agent/src/main/python/ambari_agent/Controller.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/Controller.py b/ambari-agent/src/main/python/ambari_agent/Controller.py
index 3f3e55d..959c213 100644
--- a/ambari-agent/src/main/python/ambari_agent/Controller.py
+++ b/ambari-agent/src/main/python/ambari_agent/Controller.py
@@ -44,6 +44,7 @@ from ambari_agent.NetUtil import NetUtil
from ambari_agent.LiveStatus import LiveStatus
from ambari_agent.AlertSchedulerHandler import AlertSchedulerHandler
from ambari_agent.ClusterConfiguration import ClusterConfiguration
+from ambari_agent.RecoveryManager import RecoveryManager
from ambari_agent.HeartbeatHandlers import HeartbeatStopHandlers, bind_signal_handlers
logger = logging.getLogger()
@@ -81,6 +82,7 @@ class Controller(threading.Thread):
self.heartbeat_stop_callback = heartbeat_stop_callback
# List of callbacks that are called at agent registration
self.registration_listeners = []
+ self.recovery_manager = RecoveryManager()
# pull config directory out of config
cache_dir = config.get('agent', 'cache_dir')
@@ -128,8 +130,10 @@ class Controller(threading.Thread):
self.hostname, prettyData)
ret = self.sendRequest(self.registerUrl, data)
+ prettyData = pprint.pformat(ret)
+ logger.debug("Registration response is %s", prettyData)
- # exitstatus is a code of error which was rised on server side.
+ # exitstatus is a code of error which was raised on server side.
# exitstatus = 0 (OK - Default)
# exitstatus = 1 (Registration failed because different version of agent and server)
exitstatus = 0
@@ -158,17 +162,20 @@ class Controller(threading.Thread):
# always update cached cluster configurations on registration
self.cluster_configuration.update_configurations_from_heartbeat(ret)
+ self.recovery_manager.update_configuration_from_registration(ret)
+
# always update alert definitions on registration
self.alert_scheduler_handler.update_definitions(ret)
except ssl.SSLError:
self.repeatRegistration = False
self.isRegistered = False
return
- except Exception:
+ except Exception, ex:
# try a reconnect only after a certain amount of random time
delay = randint(0, self.range)
logger.error("Unable to connect to: " + self.registerUrl, exc_info=True)
- """ Sleeping for {0} seconds and then retrying again """.format(delay)
+ logger.error("Error:" + str(ex))
+ logger.warn(""" Sleeping for {0} seconds and then trying again """.format(delay,))
time.sleep(delay)
return ret
@@ -265,11 +272,21 @@ class Controller(threading.Thread):
if 'executionCommands' in response_keys:
execution_commands = response['executionCommands']
+ self.recovery_manager.process_execution_commands(execution_commands)
self.addToQueue(execution_commands)
if 'statusCommands' in response_keys:
+ # try storing execution command details and desired state
+ self.recovery_manager.process_status_commands(response['statusCommands'])
self.addToStatusQueue(response['statusCommands'])
+ if self.actionQueue.commandQueue.empty():
+ recovery_commands = self.recovery_manager.get_recovery_commands()
+ for recovery_command in recovery_commands:
+ logger.info("Adding recovery command %s for component %s",
+ recovery_command['roleCommand'], recovery_command['role'])
+ self.addToQueue([recovery_command])
+
if 'alertDefinitionCommands' in response_keys:
self.alert_scheduler_handler.update_definitions(response)
http://git-wip-us.apache.org/repos/asf/ambari/blob/dcbf12ef/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py b/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
index 925cfff..72fb0af 100644
--- a/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
+++ b/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
@@ -230,6 +230,7 @@ class CustomServiceOrchestrator():
override_output_files=True # by default, we override status command output
if logger.level == logging.DEBUG:
override_output_files = False
+
res = self.runCommand(command, self.status_commands_stdout,
self.status_commands_stderr, self.COMMAND_NAME_STATUS,
override_output_files=override_output_files)
@@ -267,7 +268,7 @@ class CustomServiceOrchestrator():
def resolve_script_path(self, base_dir, script):
"""
- Incapsulates logic of script location determination.
+ Encapsulates logic of script location determination.
"""
path = os.path.join(base_dir, script)
if not os.path.exists(path):
@@ -305,7 +306,7 @@ class CustomServiceOrchestrator():
command_type = command['commandType']
from ActionQueue import ActionQueue # To avoid cyclic dependency
if command_type == ActionQueue.STATUS_COMMAND:
- # These files are frequently created, thats why we don't
+ # These files are frequently created, that's why we don't
# store them all, but only the latest one
file_path = os.path.join(self.tmp_dir, "status_command.json")
else:
@@ -313,6 +314,9 @@ class CustomServiceOrchestrator():
if 'clusterHostInfo' in command and command['clusterHostInfo']:
command['clusterHostInfo'] = self.decompressClusterHostInfo(command['clusterHostInfo'])
file_path = os.path.join(self.tmp_dir, "command-{0}.json".format(task_id))
+ if command_type == ActionQueue.AUTO_EXECUTION_COMMAND:
+ file_path = os.path.join(self.tmp_dir, "auto_command-{0}.json".format(task_id))
+
# Json may contain passwords, that's why we need proper permissions
if os.path.isfile(file_path):
os.unlink(file_path)
http://git-wip-us.apache.org/repos/asf/ambari/blob/dcbf12ef/ambari-agent/src/main/python/ambari_agent/DataCleaner.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/DataCleaner.py b/ambari-agent/src/main/python/ambari_agent/DataCleaner.py
index e767be1..f756eb4 100644
--- a/ambari-agent/src/main/python/ambari_agent/DataCleaner.py
+++ b/ambari-agent/src/main/python/ambari_agent/DataCleaner.py
@@ -28,7 +28,10 @@ import logging
logger = logging.getLogger()
class DataCleaner(threading.Thread):
- FILE_NAME_PATTERN = 'errors-\d+.txt|output-\d+.txt|site-\d+.pp|structured-out-\d+.json|command-\d+.json'
+ COMMAND_FILE_NAMES_PATTERN = 'errors-\d+.txt|output-\d+.txt|site-\d+.pp|structured-out-\d+.json|command-\d+.json'
+ AUTO_COMMAND_FILE_NAMES_PATTERN = \
+ 'auto_command-\d+.json|auto_errors-\d+.txt|auto_output-\d+.txt|auto_structured-out-\d+.json'
+ FILE_NAME_PATTERN = AUTO_COMMAND_FILE_NAMES_PATTERN + "|" + COMMAND_FILE_NAMES_PATTERN
def __init__(self, config):
threading.Thread.__init__(self)
http://git-wip-us.apache.org/repos/asf/ambari/blob/dcbf12ef/ambari-agent/src/main/python/ambari_agent/Heartbeat.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/Heartbeat.py b/ambari-agent/src/main/python/ambari_agent/Heartbeat.py
index 4b4937e..27aef04 100644
--- a/ambari-agent/src/main/python/ambari_agent/Heartbeat.py
+++ b/ambari-agent/src/main/python/ambari_agent/Heartbeat.py
@@ -55,6 +55,9 @@ class Heartbeat:
'nodeStatus' : nodeStatus
}
+ rec_status = self.actionQueue.controller.recovery_manager.get_recovery_status()
+ heartbeat['recoveryReport'] = rec_status
+
commandsInProgress = False
if not self.actionQueue.commandQueue.empty():
commandsInProgress = True
http://git-wip-us.apache.org/repos/asf/ambari/blob/dcbf12ef/ambari-agent/src/main/python/ambari_agent/RecoveryManager.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/RecoveryManager.py b/ambari-agent/src/main/python/ambari_agent/RecoveryManager.py
new file mode 100644
index 0000000..3deec2b
--- /dev/null
+++ b/ambari-agent/src/main/python/ambari_agent/RecoveryManager.py
@@ -0,0 +1,563 @@
+#!/usr/bin/env python
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import logging
+import copy
+import time
+import threading
+import pprint
+
+from ambari_agent.ActionQueue import ActionQueue
+from ambari_agent.LiveStatus import LiveStatus
+
+
+logger = logging.getLogger()
+
+"""
+RecoveryManager has the following capabilities:
+* Store data needed for execution commands extracted from STATUS command
+* Generate INSTALL command
+* Generate START command
+"""
+
+
+class RecoveryManager:
+ COMMAND_TYPE = "commandType"
+ PAYLOAD_LEVEL = "payloadLevel"
+ COMPONENT_NAME = "componentName"
+ ROLE = "role"
+ TASK_ID = "taskId"
+ DESIRED_STATE = "desiredState"
+ EXECUTION_COMMAND_DETAILS = "executionCommandDetails"
+ ROLE_COMMAND = "roleCommand"
+ PAYLOAD_LEVEL_DEFAULT = "DEFAULT"
+ PAYLOAD_LEVEL_MINIMAL = "MINIMAL"
+ PAYLOAD_LEVEL_EXECUTION_COMMAND = "EXECUTION_COMMAND"
+ STARTED = "STARTED"
+ INSTALLED = "INSTALLED"
+ INIT = "INIT" # TODO: What is the state when machine is reset
+
+ default_action_counter = {
+ "lastAttempt": 0,
+ "count": 0,
+ "lastReset": 0,
+ "lifetimeCount" : 0,
+ "warnedLastAttempt": False,
+ "warnedLastReset": False,
+ "warnedThresholdReached": False
+ }
+
+
+ def __init__(self, recovery_enabled=False, auto_start_only=False):
+ self.recovery_enabled = recovery_enabled
+ self.auto_start_only = auto_start_only
+ self.max_count = 6
+ self.window_in_min = 60
+ self.retry_gap = 5
+ self.max_lifetime_count = 12
+
+ self.stored_exec_commands = {}
+ self.id = int(time.time())
+ self.allowed_desired_states = [self.STARTED, self.INSTALLED]
+ self.allowed_current_states = [self.INIT, self.INSTALLED]
+ self.actions = {}
+ self.statuses = {}
+ self.__status_lock = threading.RLock()
+
+ self.update_config(6, 60, 5, 12, recovery_enabled, auto_start_only)
+
+ pass
+
+
+ def enabled(self):
+ return self.recovery_enabled
+
+
+ def update_current_status(self, component, state):
+ """
+ Updates the current status of a host component managed by the agent
+ """
+ if component not in self.statuses:
+ self.__status_lock.acquire()
+ try:
+ if component not in self.statuses:
+ self.statuses[component] = {
+ "current": "",
+ "desired": ""
+ }
+ finally:
+ self.__status_lock.release()
+ pass
+
+ self.statuses[component]["current"] = state
+ pass
+
+
+ def update_desired_status(self, component, state):
+ """
+ Updates the desired status of a host component managed by the agent
+ """
+ if component not in self.statuses:
+ self.__status_lock.acquire()
+ try:
+ if component not in self.statuses:
+ self.statuses[component] = {
+ "current": "",
+ "desired": ""
+ }
+ finally:
+ self.__status_lock.release()
+ pass
+
+ self.statuses[component]["desired"] = state
+ pass
+
+
+ def requires_recovery(self, component):
+ """
+ Recovery is allowed for:
+ INISTALLED --> STARTED
+ INIT --> INSTALLED --> STARTED
+ CLIENTs may be RE-INSTALLED (TODO)
+ """
+ if not self.enabled():
+ return False
+
+ if component not in self.statuses:
+ return False
+
+ status = self.statuses[component]
+ if status["current"] == status["desired"]:
+ return False
+
+ if status["desired"] not in self.allowed_desired_states or status["current"] not in self.allowed_current_states:
+ return False
+
+ ### No recovery to INSTALLED or INIT states
+ if status["current"] == self.STARTED:
+ return False
+
+ logger.info("%s needs recovery.", component)
+ return True
+ pass
+
+
+
+ def get_recovery_status(self):
+ """
+ Creates a status in the form
+ {
+ "summary" : "RECOVERABLE|DISABLED|PARTIALLY_RECOVERABLE|UNRECOVERABLE",
+ "component_reports" : [
+ {
+ "name": "component_name",
+ "numAttempts" : "x",
+ "limitReached" : "true|false"
+ "status" : "REQUIRES_RECOVERY|RECOVERY_COMMAND_REQUESTED|RECOVERY_COMMAND_ISSUED|NO_RECOVERY_NEEDED"
+ }
+ ]
+ }
+ """
+ report = {}
+ report["summary"] = "DISABLED"
+ if self.enabled():
+ report["summary"] = "RECOVERABLE"
+ num_limits_reached = 0
+ recovery_states = []
+ report["componentReports"] = recovery_states
+ self.__status_lock.acquire()
+ try:
+ for component in self.actions.keys():
+ action = self.actions[component]
+ recovery_state = {}
+ recovery_state["name"] = component
+ recovery_state["numAttempts"] = action["lifetimeCount"]
+ recovery_state["limitReached"] = self.max_lifetime_count <= action["lifetimeCount"]
+ recovery_states.append(recovery_state)
+ if recovery_state["limitReached"] == True:
+ num_limits_reached += 1
+ pass
+ finally:
+ self.__status_lock.release()
+
+ if num_limits_reached > 0:
+ report["summary"] = "PARTIALLY_RECOVERABLE"
+ if num_limits_reached == len(recovery_states):
+ report["summary"] = "UNRECOVERABLE"
+
+ return report
+ pass
+
+ def get_recovery_commands(self):
+ """
+ This method computes the recovery commands for the following transitions
+ INSTALLED --> STARTED
+ INIT --> INSTALLED
+ """
+ commands = []
+ for component in self.statuses.keys():
+ if self.requires_recovery(component) and self.may_execute(component):
+ status = copy.deepcopy(self.statuses[component])
+ if status["desired"] == self.STARTED:
+ if status["current"] == self.INSTALLED:
+ command = self.get_start_command(component)
+ elif status["current"] == self.INIT:
+ command = self.get_install_command(component)
+ elif status["desired"] == self.INSTALLED:
+ if status["current"] == self.INIT:
+ command = self.get_install_command(component)
+ if command:
+ self.execute(component)
+ commands.append(command)
+ return commands
+ pass
+
+
+ def may_execute(self, action):
+ """
+ Check if an action can be executed
+ """
+ if not action or action.strip() == "":
+ return False
+
+ if action not in self.actions:
+ self.__status_lock.acquire()
+ try:
+ self.actions[action] = copy.deepcopy(self.default_action_counter)
+ finally:
+ self.__status_lock.release()
+ return self._execute_action_chk_only(action)
+ pass
+
+
+ def execute(self, action):
+ """
+ Executed an action
+ """
+ if not action or action.strip() == "":
+ return False
+
+ if action not in self.actions:
+ self.__status_lock.acquire()
+ try:
+ self.actions[action] = copy.deepcopy(self.default_action_counter)
+ finally:
+ self.__status_lock.release()
+ return self._execute_action_(action)
+ pass
+
+
+ def _execute_action_(self, action_name):
+ """
+ _private_ implementation of [may] execute
+ """
+ action_counter = self.actions[action_name]
+ now = self._now_()
+ seconds_since_last_attempt = now - action_counter["lastAttempt"]
+ if action_counter["lifetimeCount"] < self.max_lifetime_count:
+ if action_counter["count"] < self.max_count:
+ if seconds_since_last_attempt > self.retry_gap_in_sec:
+ action_counter["count"] += 1
+ action_counter["lifetimeCount"] +=1
+ if self.retry_gap > 0:
+ action_counter["lastAttempt"] = now
+ action_counter["warnedLastAttempt"] = False
+ if action_counter["count"] == 1:
+ action_counter["lastReset"] = now
+ return True
+ else:
+ if action_counter["warnedLastAttempt"] == False:
+ action_counter["warnedLastAttempt"] = True
+ logger.warn(
+ "%s seconds has not passed since last occurrence %s seconds back for %s. " +
+ "Will silently skip execution without warning till retry gap is passed",
+ self.retry_gap_in_sec, seconds_since_last_attempt, action_name)
+ else:
+ logger.debug(
+ "%s seconds has not passed since last occurrence %s seconds back for %s",
+ self.retry_gap_in_sec, seconds_since_last_attempt, action_name)
+ else:
+ sec_since_last_reset = now - action_counter["lastReset"]
+ if sec_since_last_reset > self.window_in_sec:
+ action_counter["count"] = 1
+ action_counter["lifetimeCount"] +=1
+ if self.retry_gap > 0:
+ action_counter["lastAttempt"] = now
+ action_counter["lastReset"] = now
+ action_counter["warnedLastReset"] = False
+ return True
+ else:
+ if action_counter["warnedLastReset"] == False:
+ action_counter["warnedLastReset"] = True
+ logger.warn("%s occurrences in %s minutes reached the limit for %s. " +
+ "Will silently skip execution without warning till window is reset",
+ action_counter["count"], self.window_in_min, action_name)
+ else:
+ logger.debug("%s occurrences in %s minutes reached the limit for %s",
+ action_counter["count"], self.window_in_min, action_name)
+ else:
+ if action_counter["warnedThresholdReached"] == False:
+ action_counter["warnedThresholdReached"] = True
+ logger.warn("%s occurrences in agent life time reached the limit for %s. " +
+ "Will silently skip execution without warning till window is reset",
+ action_counter["lifetimeCount"], action_name)
+ else:
+ logger.debug("%s occurrences in agent life time reached the limit for %s",
+ action_counter["lifetimeCount"], action_name)
+ return False
+ pass
+
+
+ def _execute_action_chk_only(self, action_name):
+ """
+ _private_ implementation of [may] execute check only
+ """
+ action_counter = self.actions[action_name]
+ now = self._now_()
+ seconds_since_last_attempt = now - action_counter["lastAttempt"]
+
+ if action_counter["lifetimeCount"] < self.max_lifetime_count:
+ if action_counter["count"] < self.max_count:
+ if seconds_since_last_attempt > self.retry_gap_in_sec:
+ return True
+ else:
+ sec_since_last_reset = now - action_counter["lastReset"]
+ if sec_since_last_reset > self.window_in_sec:
+ return True
+
+ return False
+ pass
+
+ def _now_(self):
+ return int(time.time())
+ pass
+
+
+ def update_configuration_from_registration(self, reg_resp):
+ """
+ TODO: Server sends the recovery configuration - call update_config after parsing
+ "recovery_config": {
+ "type" : "DEFAULT|AUTO_START|FULL",
+ "maxCount" : 10,
+ "windowInMinutes" : 60,
+ "retryGap" : 0 }
+ """
+
+ recovery_enabled = False
+ auto_start_only = True
+ max_count = 6
+ window_in_min = 60
+ retry_gap = 5
+ max_lifetime_count = 12
+
+ if reg_resp and "recoveryConfig" in reg_resp:
+ config = reg_resp["recoveryConfig"]
+ if "type" in config:
+ if config["type"] in ["AUTO_START", "FULL"]:
+ recovery_enabled = True
+ if config["type"] == "FULL":
+ auto_start_only = False
+ if "maxCount" in config:
+ max_count = self._read_int_(config["maxCount"], max_count)
+ if "windowInMinutes" in config:
+ window_in_min = self._read_int_(config["windowInMinutes"], window_in_min)
+ if "retryGap" in config:
+ retry_gap = self._read_int_(config["retryGap"], retry_gap)
+ if 'maxLifetimeCount' in config:
+ max_lifetime_count = self._read_int_(config['maxLifetimeCount'], max_lifetime_count)
+ self.update_config(max_count, window_in_min, retry_gap, max_lifetime_count, recovery_enabled, auto_start_only)
+ pass
+
+
+ def update_config(self, max_count, window_in_min, retry_gap, max_lifetime_count, recovery_enabled, auto_start_only):
+ """
+ Update recovery configuration, recovery is disabled if configuration values
+ are not correct
+ """
+ self.recovery_enabled = False;
+ if max_count <= 0:
+ logger.warn("Recovery disabled: max_count must be a non-negative number")
+ return
+
+ if window_in_min <= 0:
+ logger.warn("Recovery disabled: window_in_min must be a non-negative number")
+ return
+
+ if retry_gap < 1:
+ logger.warn("Recovery disabled: retry_gap must be a positive number and at least 1")
+ return
+ if retry_gap >= window_in_min:
+ logger.warn("Recovery disabled: retry_gap must be smaller than window_in_min")
+ return
+ if max_lifetime_count < 0 or max_lifetime_count < max_count:
+ logger.warn("Recovery disabled: max_lifetime_count must more than 0 and >= max_count")
+ return
+
+ self.max_count = max_count
+ self.window_in_min = window_in_min
+ self.retry_gap = retry_gap
+ self.window_in_sec = window_in_min * 60
+ self.retry_gap_in_sec = retry_gap * 60
+ self.auto_start_only = auto_start_only
+ self.max_lifetime_count = max_lifetime_count
+
+ self.allowed_desired_states = [self.STARTED, self.INSTALLED]
+ self.allowed_current_states = [self.INIT, self.INSTALLED]
+
+ if self.auto_start_only:
+ self.allowed_desired_states = [self.STARTED]
+ self.allowed_current_states = [self.INSTALLED]
+
+ self.recovery_enabled = recovery_enabled
+ if self.recovery_enabled:
+ logger.info(
+ "==> Auto recovery is enabled with maximum %s in %s minutes with gap of %s minutes between and lifetime max being %s.",
+ self.max_count, self.window_in_min, self.retry_gap, self.max_lifetime_count)
+ pass
+
+
+ def get_unique_task_id(self):
+ self.id += 1
+ return self.id
+ pass
+
+
+ def process_status_commands(self, commands):
+ if not self.enabled():
+ return
+
+ if commands and len(commands) > 0:
+ for command in commands:
+ self.store_or_update_command(command)
+ if self.EXECUTION_COMMAND_DETAILS in command:
+ logger.debug("Details to construct exec commands: " + pprint.pformat(command[self.EXECUTION_COMMAND_DETAILS]))
+
+ pass
+
+
+ def process_execution_commands(self, commands):
+ if not self.enabled():
+ return
+
+ if commands and len(commands) > 0:
+ for command in commands:
+ if self.COMMAND_TYPE in command and command[self.COMMAND_TYPE] == ActionQueue.EXECUTION_COMMAND:
+ if self.ROLE in command:
+ if command[self.ROLE_COMMAND] == ActionQueue.ROLE_COMMAND_INSTALL:
+ self.update_desired_status(command[self.ROLE], LiveStatus.DEAD_STATUS)
+ if command[self.ROLE_COMMAND] == ActionQueue.ROLE_COMMAND_START:
+ self.update_desired_status(command[self.ROLE], LiveStatus.LIVE_STATUS)
+ pass
+
+
+ def store_or_update_command(self, command):
+ """
+ Stores command details by reading them from the STATUS_COMMAND
+ Update desired state as well
+ """
+ if not self.enabled():
+ return
+
+ logger.debug("Inspecting command to store/update details")
+ if self.COMMAND_TYPE in command and command[self.COMMAND_TYPE] == ActionQueue.STATUS_COMMAND:
+ payloadLevel = self.PAYLOAD_LEVEL_DEFAULT
+ if self.PAYLOAD_LEVEL in command:
+ payloadLevel = command[self.PAYLOAD_LEVEL]
+
+ component = command[self.COMPONENT_NAME]
+ self.update_desired_status(component, command[self.DESIRED_STATE])
+
+ if payloadLevel == self.PAYLOAD_LEVEL_EXECUTION_COMMAND:
+ if self.EXECUTION_COMMAND_DETAILS in command:
+ # Store the execution command details
+ self.remove_command(component)
+ self.stored_exec_commands[component] = command[self.EXECUTION_COMMAND_DETAILS]
+ logger.debug("Stored command details for " + component)
+ else:
+ logger.warn("Expected field " + self.EXECUTION_COMMAND_DETAILS + " unavailable.")
+ pass
+ pass
+
+
+ def get_install_command(self, component):
+ if self.enabled():
+ logger.debug("Using stored INSTALL command for %s", component)
+ if self.command_exists(component, ActionQueue.EXECUTION_COMMAND):
+ command = copy.deepcopy(self.stored_exec_commands[component])
+ command[self.ROLE_COMMAND] = "INSTALL"
+ command[self.COMMAND_TYPE] = ActionQueue.AUTO_EXECUTION_COMMAND
+ command[self.TASK_ID] = self.get_unique_task_id()
+ return command
+ else:
+ logger.info("INSTALL command cannot be computed.")
+ else:
+ logger.info("Recovery is not enabled. INSTALL command will not be computed.")
+ return None
+ pass
+
+
+ def get_start_command(self, component):
+ if self.enabled():
+ logger.debug("Using stored START command for %s", component)
+ if self.command_exists(component, ActionQueue.EXECUTION_COMMAND):
+ command = copy.deepcopy(self.stored_exec_commands[component])
+ command[self.ROLE_COMMAND] = "START"
+ command[self.COMMAND_TYPE] = ActionQueue.AUTO_EXECUTION_COMMAND
+ command[self.TASK_ID] = self.get_unique_task_id()
+ return command
+ else:
+ logger.info("START command cannot be computed.")
+ else:
+ logger.info("Recovery is not enabled. START command will not be computed.")
+
+ return None
+ pass
+
+
+ def command_exists(self, component, command_type):
+ if command_type == ActionQueue.EXECUTION_COMMAND:
+ if component in self.stored_exec_commands:
+ return True
+
+ return False
+ pass
+
+
+ def remove_command(self, component):
+ if component in self.stored_exec_commands:
+ del self.stored_exec_commands[component]
+ return True
+ return False
+
+
+ def _read_int_(self, value, default_value=0):
+ int_value = default_value
+ try:
+ int_value = int(value)
+ except ValueError:
+ pass
+ return int_value
+
+
+def main(argv=None):
+ cmd_mgr = RecoveryManager()
+ pass
+
+
+if __name__ == '__main__':
+ main()
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/dcbf12ef/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
index 6e445c1..b9cbbe0 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
@@ -35,6 +35,7 @@ from ambari_agent.CustomServiceOrchestrator import CustomServiceOrchestrator
from ambari_agent.PythonExecutor import PythonExecutor
from ambari_agent.CommandStatusDict import CommandStatusDict
from ambari_agent.ActualConfigHandler import ActualConfigHandler
+from ambari_agent.RecoveryManager import RecoveryManager
from FileCache import FileCache
from ambari_commons import OSCheck
from only_for_platform import only_for_platform, get_platform, not_for_platform, PLATFORM_LINUX, PLATFORM_WINDOWS
@@ -544,6 +545,8 @@ class TestActionQueue(TestCase):
build_mock.return_value = {'dummy report': '' }
+ dummy_controller.recovery_manager = RecoveryManager()
+
requestComponentStatus_mock.reset_mock()
requestComponentStatus_mock.return_value = {'exitcode': 0 }
http://git-wip-us.apache.org/repos/asf/ambari/blob/dcbf12ef/ambari-agent/src/test/python/ambari_agent/TestController.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestController.py b/ambari-agent/src/test/python/ambari_agent/TestController.py
index 76470b3..f379fbf 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestController.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestController.py
@@ -598,6 +598,112 @@ class TestController(unittest.TestCase):
self.assertEquals(LiveStatus_mock.CLIENT_COMPONENTS, client_components_expected)
self.assertEquals(LiveStatus_mock.COMPONENTS, components_expected)
+ @patch("socket.gethostbyname")
+ @patch("json.dumps")
+ @patch("time.sleep")
+ @patch("pprint.pformat")
+ @patch.object(Controller, "randint")
+ @patch.object(Controller, "LiveStatus")
+ def test_recoveryRegConfig(self, LiveStatus_mock, randintMock, pformatMock, sleepMock,
+ dumpsMock, socketGhbnMock):
+ self.assertEquals(self.controller.recovery_manager.recovery_enabled, False)
+ self.assertEquals(self.controller.recovery_manager.auto_start_only, False)
+ self.assertEquals(self.controller.recovery_manager.max_count, 6)
+ self.assertEquals(self.controller.recovery_manager.window_in_min, 60)
+ self.assertEquals(self.controller.recovery_manager.retry_gap, 5)
+
+ out = StringIO.StringIO()
+ sys.stdout = out
+
+
+ dumpsMock.return_value = '{"valid_object": true}'
+ socketGhbnMock.return_value = "host1"
+
+ sendRequest = MagicMock(name="sendRequest")
+ self.controller.sendRequest = sendRequest
+
+ register = MagicMock(name="register")
+ self.controller.register = register
+
+ sendRequest.return_value = {
+ "responseId": 1,
+ "recoveryConfig": {
+ "type": "FULL",
+ "maxCount": 5,
+ "windowInMinutes": 50,
+ "retryGap": 3,
+ "maxLifetimeCount": 7},
+ "log": "", "exitstatus": "0"}
+
+ self.controller.isRegistered = False
+ self.controller.registerWithServer()
+
+ self.assertEquals(self.controller.recovery_manager.recovery_enabled, True)
+ self.assertEquals(self.controller.recovery_manager.auto_start_only, False)
+ self.assertEquals(self.controller.recovery_manager.max_count, 5)
+ self.assertEquals(self.controller.recovery_manager.window_in_min, 50)
+ self.assertEquals(self.controller.recovery_manager.retry_gap, 3)
+ self.assertEquals(self.controller.recovery_manager.max_lifetime_count, 7)
+
+ sys.stdout = sys.__stdout__
+
+ self.controller.sendRequest = Controller.Controller.sendRequest
+ self.controller.addToStatusQueue = Controller.Controller.addToStatusQueue
+ pass
+
+ @patch.object(threading._Event, "wait")
+ @patch("time.sleep")
+ @patch("json.dumps")
+ def test_recoveryHbCmd(self, dumpsMock, sleepMock, event_mock):
+
+ out = StringIO.StringIO()
+ sys.stdout = out
+
+ hearbeat = MagicMock()
+ self.controller.heartbeat = hearbeat
+ event_mock.return_value = False
+ dumpsMock.return_value = "data"
+
+ sendRequest = MagicMock(name="sendRequest")
+ self.controller.sendRequest = sendRequest
+ addToQueue = MagicMock(name="addToQueue")
+ addToStatusQueue = MagicMock(name="addToStatusQueue")
+ self.addToQueue = addToQueue
+ self.addToStatusQueue = addToStatusQueue
+
+ process_execution_commands = MagicMock(name="process_execution_commands")
+ self.controller.recovery_manager.process_execution_commands = process_execution_commands
+ process_status_commands = MagicMock(name="process_status_commands")
+ self.controller.recovery_manager.process_status_commands = process_status_commands
+
+ self.controller.responseId = 0
+ response = {"responseId":1, "statusCommands": "commands2", "executionCommands" : "commands1", "log":"", "exitstatus":"0"}
+ sendRequest.return_value = response
+
+ def one_heartbeat(*args, **kwargs):
+ self.controller.DEBUG_STOP_HEARTBEATING = True
+ return response
+
+ sendRequest.side_effect = one_heartbeat
+
+ actionQueue = MagicMock()
+ actionQueue.isIdle.return_value = True
+
+ # one successful request, after stop
+ self.controller.actionQueue = actionQueue
+ self.controller.heartbeatWithServer()
+ self.assertTrue(sendRequest.called)
+ self.assertTrue(process_execution_commands.called)
+ self.assertTrue(process_status_commands.called)
+ process_execution_commands.assert_called_with("commands1")
+ process_status_commands.assert_called_with("commands2")
+
+ self.controller.heartbeatWithServer()
+ sys.stdout = sys.__stdout__
+ self.controller.sendRequest = Controller.Controller.sendRequest
+ self.controller.sendRequest = Controller.Controller.addToQueue
+ self.controller.sendRequest = Controller.Controller.addToStatusQueue
+ pass
if __name__ == "__main__":
unittest.main(verbosity=2)
http://git-wip-us.apache.org/repos/asf/ambari/blob/dcbf12ef/ambari-agent/src/test/python/ambari_agent/TestRecoveryManager.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestRecoveryManager.py b/ambari-agent/src/test/python/ambari_agent/TestRecoveryManager.py
new file mode 100644
index 0000000..4cc2c0b
--- /dev/null
+++ b/ambari-agent/src/test/python/ambari_agent/TestRecoveryManager.py
@@ -0,0 +1,430 @@
+#!/usr/bin/env python
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+from unittest import TestCase
+import copy
+
+from ambari_agent.RecoveryManager import RecoveryManager
+from mock.mock import patch, MagicMock, call
+
+
+class TestRecoveryManager(TestCase):
+ command = {
+ "commandType": "STATUS_COMMAND",
+ "payloadLevel": "EXECUTION_COMMAND",
+ "componentName": "NODEMANAGER",
+ "desiredState": "STARTED",
+ "executionCommandDetails": {
+ "commandType": "EXECUTION_COMMAND",
+ "roleCommand": "INSTALL",
+ "role": "NODEMANAGER",
+ "configurations": {
+ "capacity-scheduler": {
+ "yarn.scheduler.capacity.default.minimum-user-limit-percent": "100"},
+ "capacity-calculator": {
+ "yarn.scheduler.capacity.default.minimum-user-limit-percent": "100"},
+ "commandParams": {
+ "service_package_folder": "common-services/YARN/2.1.0.2.0/package"
+ }
+ }
+ }
+ }
+
+ exec_command1 = {
+ "commandType": "EXECUTION_COMMAND",
+ "roleCommand": "INSTALL",
+ "role": "NODEMANAGER",
+ "configurations": {
+ "capacity-scheduler": {
+ "yarn.scheduler.capacity.default.minimum-user-limit-percent": "100"},
+ "capacity-calculator": {
+ "yarn.scheduler.capacity.default.minimum-user-limit-percent": "100"},
+ "commandParams": {
+ "service_package_folder": "common-services/YARN/2.1.0.2.0/package"
+ }
+ }
+ }
+
+ exec_command2 = {
+ "commandType": "EXECUTION_COMMAND",
+ "roleCommand": "START",
+ "role": "NODEMANAGER",
+ "configurations": {
+ "capacity-scheduler": {
+ "yarn.scheduler.capacity.default.minimum-user-limit-percent": "100"},
+ "capacity-calculator": {
+ "yarn.scheduler.capacity.default.minimum-user-limit-percent": "100"},
+ "commandParams": {
+ "service_package_folder": "common-services/YARN/2.1.0.2.0/package"
+ }
+ }
+ }
+
+ exec_command3 = {
+ "commandType": "EXECUTION_COMMAND",
+ "roleCommand": "SERVICE_CHECK",
+ "role": "NODEMANAGER",
+ "configurations": {
+ "capacity-scheduler": {
+ "yarn.scheduler.capacity.default.minimum-user-limit-percent": "100"},
+ "capacity-calculator": {
+ "yarn.scheduler.capacity.default.minimum-user-limit-percent": "100"},
+ "commandParams": {
+ "service_package_folder": "common-services/YARN/2.1.0.2.0/package"
+ }
+ }
+ }
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ @patch.object(RecoveryManager, "update_desired_status")
+ def test_process_commands(self, mock_uds):
+ rm = RecoveryManager(True)
+ rm.process_status_commands(None)
+ self.assertFalse(mock_uds.called)
+
+ rm.process_status_commands([])
+ self.assertFalse(mock_uds.called)
+
+ rm.process_status_commands([self.command])
+ mock_uds.assert_has_calls([call("NODEMANAGER", "STARTED")])
+
+ mock_uds.reset_mock()
+
+ rm.process_status_commands([self.command, self.exec_command1, self.command])
+ mock_uds.assert_has_calls([call("NODEMANAGER", "STARTED")], [call("NODEMANAGER", "STARTED")])
+
+ mock_uds.reset_mock()
+
+ rm.process_execution_commands([self.exec_command1, self.exec_command2, self.exec_command3])
+ mock_uds.assert_has_calls([call("NODEMANAGER", "INSTALLED")], [call("NODEMANAGER", "STARTED")])
+
+ mock_uds.reset_mock()
+
+ rm.process_execution_commands([self.exec_command1, self.command])
+ mock_uds.assert_has_calls([call("NODEMANAGER", "INSTALLED")])
+ pass
+
+ def test_defaults(self):
+ rm = RecoveryManager()
+ self.assertFalse(rm.enabled())
+ self.assertEqual(None, rm.get_install_command("NODEMANAGER"))
+ self.assertEqual(None, rm.get_start_command("NODEMANAGER"))
+
+ rm.update_current_status("NODEMANAGER", "INSTALLED")
+ rm.update_desired_status("NODEMANAGER", "STARTED")
+ self.assertFalse(rm.requires_recovery("NODEMANAGER"))
+ pass
+
+ @patch.object(RecoveryManager, "_now_")
+ def test_sliding_window(self, time_mock):
+ time_mock.side_effect = \
+ [1000, 1001, 1002, 1003, 1004, 1071, 1150, 1151, 1152, 1153, 1400, 1401,
+ 1500, 1571, 1572, 1653, 1900, 1971, 2300, 2301]
+
+ rm = RecoveryManager(True, False)
+ self.assertTrue(rm.enabled())
+
+ rm.update_config(0, 60, 5, 12, True, False)
+ self.assertFalse(rm.enabled())
+
+ rm.update_config(6, 60, 5, 12, True, False)
+ self.assertTrue(rm.enabled())
+
+ rm.update_config(6, 0, 5, 12, True, False)
+ self.assertFalse(rm.enabled())
+
+ rm.update_config(6, 60, 0, 12, True, False)
+ self.assertFalse(rm.enabled())
+
+ rm.update_config(6, 60, 1, 12, True, False)
+ self.assertTrue(rm.enabled())
+
+ rm.update_config(6, 60, 61, 12, True, False)
+ self.assertFalse(rm.enabled())
+
+ rm.update_config(6, 60, 5, 0, True, False)
+ self.assertFalse(rm.enabled())
+
+ rm.update_config(6, 60, 5, 4, True, False)
+ self.assertFalse(rm.enabled())
+
+ # maximum 2 in 2 minutes and at least 1 minute wait
+ rm.update_config(2, 5, 1, 4, True, False)
+ self.assertTrue(rm.enabled())
+
+ # T = 1000-2
+ self.assertTrue(rm.may_execute("NODEMANAGER"))
+ self.assertTrue(rm.may_execute("NODEMANAGER"))
+ self.assertTrue(rm.may_execute("NODEMANAGER"))
+
+ # T = 1003-4
+ self.assertTrue(rm.execute("NODEMANAGER"))
+ self.assertFalse(rm.execute("NODEMANAGER")) # too soon
+
+ # T = 1071
+ self.assertTrue(rm.execute("NODEMANAGER")) # 60+ seconds passed
+
+ # T = 1150-3
+ self.assertFalse(rm.execute("NODEMANAGER")) # limit 2 exceeded
+ self.assertFalse(rm.may_execute("NODEMANAGER"))
+ self.assertTrue(rm.execute("DATANODE"))
+ self.assertTrue(rm.may_execute("NAMENODE"))
+
+ # T = 1400-1
+ self.assertTrue(rm.execute("NODEMANAGER")) # windows reset
+ self.assertFalse(rm.may_execute("NODEMANAGER")) # too soon
+
+ # maximum 2 in 2 minutes and no min wait
+ rm.update_config(2, 5, 1, 5, True, True)
+
+ # T = 1500-3
+ self.assertTrue(rm.execute("NODEMANAGER2"))
+ self.assertTrue(rm.may_execute("NODEMANAGER2"))
+ self.assertTrue(rm.execute("NODEMANAGER2"))
+ self.assertFalse(rm.execute("NODEMANAGER2")) # max limit
+
+ # T = 1900-2
+ self.assertTrue(rm.execute("NODEMANAGER2"))
+ self.assertTrue(rm.execute("NODEMANAGER2"))
+
+ # T = 2300-2
+ # lifetime max reached
+ self.assertTrue(rm.execute("NODEMANAGER2"))
+ self.assertFalse(rm.execute("NODEMANAGER2"))
+ pass
+
+ def test_recovery_required(self):
+ rm = RecoveryManager(True, False)
+
+ rm.update_current_status("NODEMANAGER", "INSTALLED")
+ rm.update_desired_status("NODEMANAGER", "INSTALLED")
+ self.assertFalse(rm.requires_recovery("NODEMANAGER"))
+
+ rm.update_desired_status("NODEMANAGER", "STARTED")
+ self.assertTrue(rm.requires_recovery("NODEMANAGER"))
+
+ rm.update_current_status("NODEMANAGER", "STARTED")
+ rm.update_desired_status("NODEMANAGER", "INSTALLED")
+ self.assertFalse(rm.requires_recovery("NODEMANAGER"))
+
+ rm.update_desired_status("NODEMANAGER", "STARTED")
+ self.assertFalse(rm.requires_recovery("NODEMANAGER"))
+
+ rm.update_current_status("NODEMANAGER", "INSTALLED")
+ rm.update_desired_status("NODEMANAGER", "XYS")
+ self.assertFalse(rm.requires_recovery("NODEMANAGER"))
+
+ rm.update_desired_status("NODEMANAGER", "")
+ self.assertFalse(rm.requires_recovery("NODEMANAGER"))
+
+ rm.update_current_status("NODEMANAGER", "INIT")
+ rm.update_desired_status("NODEMANAGER", "INSTALLED")
+ self.assertTrue(rm.requires_recovery("NODEMANAGER"))
+
+ rm.update_desired_status("NODEMANAGER", "STARTED")
+ self.assertTrue(rm.requires_recovery("NODEMANAGER"))
+
+ rm = RecoveryManager(True, True)
+
+ rm.update_current_status("NODEMANAGER", "INIT")
+ rm.update_desired_status("NODEMANAGER", "INSTALLED")
+ self.assertFalse(rm.requires_recovery("NODEMANAGER"))
+
+ rm.update_current_status("NODEMANAGER", "INIT")
+ rm.update_desired_status("NODEMANAGER", "START")
+ self.assertFalse(rm.requires_recovery("NODEMANAGER"))
+
+ rm.update_current_status("NODEMANAGER", "INSTALLED")
+ rm.update_desired_status("NODEMANAGER", "START")
+ self.assertFalse(rm.requires_recovery("NODEMANAGER"))
+ pass
+
+ @patch('time.time', MagicMock(side_effects=[1]))
+ def test_store_from_status_and_use(self):
+ rm = RecoveryManager(True)
+
+ command1 = copy.deepcopy(self.command)
+
+ rm.store_or_update_command(command1)
+ self.assertTrue(rm.command_exists("NODEMANAGER", "EXECUTION_COMMAND"))
+
+ install_command = rm.get_install_command("NODEMANAGER")
+ start_command = rm.get_start_command("NODEMANAGER")
+
+ self.assertEqual("INSTALL", install_command["roleCommand"])
+ self.assertEqual("START", start_command["roleCommand"])
+ self.assertEqual("AUTO_EXECUTION_COMMAND", install_command["commandType"])
+ self.assertEqual("AUTO_EXECUTION_COMMAND", start_command["commandType"])
+ self.assertEqual("NODEMANAGER", install_command["role"])
+ self.assertEqual("NODEMANAGER", start_command["role"])
+ self.assertEquals(install_command["configurations"], start_command["configurations"])
+
+ self.assertEqual(2, install_command["taskId"])
+ self.assertEqual(3, start_command["taskId"])
+
+ self.assertEqual(None, rm.get_install_command("component2"))
+ self.assertEqual(None, rm.get_start_command("component2"))
+
+ self.assertTrue(rm.remove_command("NODEMANAGER"))
+ self.assertFalse(rm.remove_command("NODEMANAGER"))
+
+ self.assertEqual(None, rm.get_install_command("NODEMANAGER"))
+ self.assertEqual(None, rm.get_start_command("NODEMANAGER"))
+
+ self.assertEqual(None, rm.get_install_command("component2"))
+ self.assertEqual(None, rm.get_start_command("component2"))
+
+ pass
+
+ @patch.object(RecoveryManager, "_now_")
+ def test_get_recovery_commands(self, time_mock):
+ time_mock.side_effect = \
+ [1000, 2000, 3000, 4000, 5000, 6000]
+ rm = RecoveryManager(True)
+ rm.update_config(10, 5, 1, 11, True, False)
+
+ command1 = copy.deepcopy(self.command)
+
+ rm.store_or_update_command(command1)
+
+ rm.update_current_status("NODEMANAGER", "INSTALLED")
+ rm.update_desired_status("NODEMANAGER", "STARTED")
+
+ commands = rm.get_recovery_commands()
+ self.assertEqual(1, len(commands))
+ self.assertEqual("START", commands[0]["roleCommand"])
+
+ rm.update_current_status("NODEMANAGER", "INIT")
+ rm.update_desired_status("NODEMANAGER", "STARTED")
+
+ commands = rm.get_recovery_commands()
+ self.assertEqual(1, len(commands))
+ self.assertEqual("INSTALL", commands[0]["roleCommand"])
+
+ rm.update_current_status("NODEMANAGER", "INIT")
+ rm.update_desired_status("NODEMANAGER", "INSTALLED")
+
+ commands = rm.get_recovery_commands()
+ self.assertEqual(1, len(commands))
+ self.assertEqual("INSTALL", commands[0]["roleCommand"])
+
+ rm.update_config(2, 5, 1, 5, True, True)
+ rm.update_current_status("NODEMANAGER", "INIT")
+ rm.update_desired_status("NODEMANAGER", "INSTALLED")
+
+ commands = rm.get_recovery_commands()
+ self.assertEqual(0, len(commands))
+ pass
+
+ @patch.object(RecoveryManager, "update_config")
+ def test_update_rm_config(self, mock_uc):
+ rm = RecoveryManager()
+ rm.update_configuration_from_registration(None)
+ mock_uc.assert_has_calls([call(6, 60, 5, 12, False, True)])
+
+ mock_uc.reset_mock()
+ rm.update_configuration_from_registration({})
+ mock_uc.assert_has_calls([call(6, 60, 5, 12, False, True)])
+
+ mock_uc.reset_mock()
+ rm.update_configuration_from_registration(
+ {"recoveryConfig": {
+ "type" : "DEFAULT"}}
+ )
+ mock_uc.assert_has_calls([call(6, 60, 5, 12, False, True)])
+
+ mock_uc.reset_mock()
+ rm.update_configuration_from_registration(
+ {"recoveryConfig": {
+ "type" : "FULL"}}
+ )
+ mock_uc.assert_has_calls([call(6, 60, 5, 12, True, False)])
+
+ mock_uc.reset_mock()
+ rm.update_configuration_from_registration(
+ {"recoveryConfig": {
+ "type" : "AUTO_START",
+ "max_count" : "med"}}
+ )
+ mock_uc.assert_has_calls([call(6, 60, 5, 12, True, True)])
+
+ mock_uc.reset_mock()
+ rm.update_configuration_from_registration(
+ {"recoveryConfig": {
+ "type" : "AUTO_START",
+ "maxCount" : "5",
+ "windowInMinutes" : 20,
+ "retryGap" : 2,
+ "maxLifetimeCount" : 5}}
+ )
+ mock_uc.assert_has_calls([call(5, 20, 2, 5, True, True)])
+ pass
+
+ @patch.object(RecoveryManager, "_now_")
+ def test_recovery_report(self, time_mock):
+ time_mock.side_effect = \
+ [1000, 1071, 1072, 1470, 1471, 1472, 1543, 1644, 1715]
+
+ rm = RecoveryManager()
+ rec_st = rm.get_recovery_status()
+ self.assertEquals(rec_st, {"summary": "DISABLED"})
+
+ rm.update_config(2, 5, 1, 4, True, True)
+ rec_st = rm.get_recovery_status()
+ self.assertEquals(rec_st, {"summary": "RECOVERABLE", "componentReports": []})
+
+ rm.execute("PUMA")
+ rec_st = rm.get_recovery_status()
+ self.assertEquals(rec_st, {"summary": "RECOVERABLE",
+ "componentReports": [{"name": "PUMA", "numAttempts": 1, "limitReached": False}]})
+ rm.execute("PUMA")
+ rm.execute("LION")
+
+ rec_st = rm.get_recovery_status()
+ self.assertEquals(rec_st, {"summary": "RECOVERABLE",
+ "componentReports": [
+ {"name": "LION", "numAttempts": 1, "limitReached": False},
+ {"name": "PUMA", "numAttempts": 2, "limitReached": False}
+ ]})
+ rm.execute("PUMA")
+ rm.execute("LION")
+ rm.execute("PUMA")
+ rm.execute("PUMA")
+ rm.execute("LION")
+ rec_st = rm.get_recovery_status()
+ self.assertEquals(rec_st, {"summary": "PARTIALLY_RECOVERABLE",
+ "componentReports": [
+ {"name": "LION", "numAttempts": 3, "limitReached": False},
+ {"name": "PUMA", "numAttempts": 4, "limitReached": True}
+ ]})
+
+ rm.execute("LION")
+ rec_st = rm.get_recovery_status()
+ self.assertEquals(rec_st, {"summary": "UNRECOVERABLE",
+ "componentReports": [
+ {"name": "LION", "numAttempts": 4, "limitReached": True},
+ {"name": "PUMA", "numAttempts": 4, "limitReached": True}
+ ]})
+ pass
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/dcbf12ef/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ExecutionCommandWrapper.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ExecutionCommandWrapper.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ExecutionCommandWrapper.java
index 79d8925..3aaf23c 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ExecutionCommandWrapper.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ExecutionCommandWrapper.java
@@ -100,9 +100,9 @@ public class ExecutionCommandWrapper {
Map<String, Map<String, Map<String, String>>> configAttributes = configHelper.getEffectiveConfigAttributes(cluster,
executionCommand.getConfigurationTags());
- for (Map.Entry<String, Map<String, Map<String, String>>> attributesOccurance : configAttributes.entrySet()) {
- String type = attributesOccurance.getKey();
- Map<String, Map<String, String>> attributes = attributesOccurance.getValue();
+ for (Map.Entry<String, Map<String, Map<String, String>>> attributesOccurrence : configAttributes.entrySet()) {
+ String type = attributesOccurrence.getKey();
+ Map<String, Map<String, String>> attributes = attributesOccurrence.getValue();
if (executionCommand.getConfigurationAttributes() != null) {
if (!executionCommand.getConfigurationAttributes().containsKey(type)) {
http://git-wip-us.apache.org/repos/asf/ambari/blob/dcbf12ef/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentRequests.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentRequests.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentRequests.java
new file mode 100644
index 0000000..2980f38
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentRequests.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.agent;
+
+import com.google.inject.Singleton;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+
+/**
+ * Captures various agent requests that it sends as part of requests
+ */
+
+@Singleton
+public class AgentRequests {
+ private static Log LOG = LogFactory.getLog(HeartbeatMonitor.class);
+ private final Map<String, Map<String, Boolean>> requiresExecCmdDetails = new HashMap<String, Map<String, Boolean>>();
+ private final Object _lock = new Object();
+
+ /**
+ * Creates a holder for agent requests
+ */
+ public AgentRequests() {
+ }
+
+ public void setExecutionDetailsRequest(String host, String component, String requestExecutionCmd) {
+ if (StringUtils.isNotBlank(requestExecutionCmd)) {
+ LOG.debug("Setting need for exec command to " + requestExecutionCmd + " for " + component);
+ Map<String, Boolean> perHostRequiresExecCmdDetails = getPerHostRequiresExecCmdDetails(host);
+ if (Boolean.TRUE.toString().toUpperCase().equals(requestExecutionCmd.toUpperCase())) {
+ perHostRequiresExecCmdDetails.put(component, Boolean.TRUE);
+ } else {
+ perHostRequiresExecCmdDetails.put(component, Boolean.FALSE);
+ }
+ }
+ }
+
+ public Boolean shouldSendExecutionDetails(String host, String component) {
+
+ Map<String, Boolean> perHostRequiresExecCmdDetails = getPerHostRequiresExecCmdDetails(host);
+ if (perHostRequiresExecCmdDetails != null && perHostRequiresExecCmdDetails.containsKey(component)) {
+ LOG.debug("Sending exec command details for " + component);
+ return perHostRequiresExecCmdDetails.get(component);
+ }
+
+ return Boolean.FALSE;
+ }
+
+ private Map<String, Boolean> getPerHostRequiresExecCmdDetails(String host) {
+ if (!requiresExecCmdDetails.containsKey(host)) {
+ synchronized (_lock) {
+ if (!requiresExecCmdDetails.containsKey(host)) {
+ requiresExecCmdDetails.put(host, new HashMap<String, Boolean>());
+ }
+ }
+ }
+
+ return requiresExecCmdDetails.get(host);
+ }
+
+ @Override
+ public String toString() {
+ return new StringBuilder().append("requiresExecCmdDetails: ").append(requiresExecCmdDetails.toString()).toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/dcbf12ef/ambari-server/src/main/java/org/apache/ambari/server/agent/ComponentRecoveryReport.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/ComponentRecoveryReport.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/ComponentRecoveryReport.java
new file mode 100644
index 0000000..cf9272e
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/ComponentRecoveryReport.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.agent;
+
+import org.codehaus.jackson.annotate.JsonProperty;
+
+
+public class ComponentRecoveryReport {
+ private String name;
+ private int numAttempts;
+ private boolean limitReached;
+
+
+ @JsonProperty("name")
+ public String getName() {
+ return name;
+ }
+
+ @JsonProperty("name")
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ @JsonProperty("num_attempts")
+ public int getNumAttempts() {
+ return numAttempts;
+ }
+
+ @JsonProperty("num_attempts")
+ public void setNumAttempts(int numAttempts) {
+ this.numAttempts = numAttempts;
+ }
+
+ @JsonProperty("limit_reached")
+ public boolean getLimitReached() {
+ return limitReached;
+ }
+
+ @JsonProperty("limit_reached")
+ public void setLimitReached(boolean limitReached) {
+ this.limitReached = limitReached;
+ }
+
+ @Override
+ public String toString() {
+ return "ComponentRecoveryReport{" +
+ "name='" + name + '\'' +
+ ", numFailures='" + numAttempts + '\'' +
+ ", limitReached='" + limitReached + '\'' +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/dcbf12ef/ambari-server/src/main/java/org/apache/ambari/server/agent/ComponentStatus.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/ComponentStatus.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/ComponentStatus.java
index 57f25ff..5591ae8 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/ComponentStatus.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/ComponentStatus.java
@@ -34,6 +34,7 @@ public class ComponentStatus {
* @see org.apache.ambari.server.state.SecurityState
*/
private String securityState;
+ private String sendExecCmdDet = "False";
private String serviceName;
private String clusterName;
@@ -61,6 +62,14 @@ public class ComponentStatus {
return status;
}
+ public void setSendExecCmdDet(String sendExecCmdDet) {
+ this.sendExecCmdDet = sendExecCmdDet;
+ }
+
+ public String getSendExecCmdDet() {
+ return this.sendExecCmdDet;
+ }
+
public void setStatus(String status) {
this.status = status;
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/dcbf12ef/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeat.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeat.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeat.java
index 2f2688b..1430aa2 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeat.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeat.java
@@ -41,6 +41,7 @@ public class HeartBeat {
HostStatus nodeStatus;
private AgentEnv agentEnv = null;
private List<Alert> alerts = null;
+ private RecoveryReport recoveryReport;
public long getResponseId() {
return responseId;
@@ -84,6 +85,14 @@ public class HeartBeat {
this.nodeStatus = nodeStatus;
}
+ public RecoveryReport getRecoveryReport() {
+ return recoveryReport;
+ }
+
+ public void setRecoveryReport(RecoveryReport recoveryReport) {
+ this.recoveryReport = recoveryReport;
+ }
+
public AgentEnv getAgentEnv() {
return agentEnv;
}
@@ -129,6 +138,7 @@ public class HeartBeat {
", reports=" + reports +
", componentStatus=" + componentStatus +
", nodeStatus=" + nodeStatus +
+ ", recoveryReport=" + recoveryReport +
'}';
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/dcbf12ef/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
index 51d6bc1..596525b 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
@@ -156,6 +156,7 @@ public class HeartBeatHandler {
@Inject
private VersionEventPublisher versionEventPublisher;
+
/**
* KerberosPrincipalHostDAO used to set and get Kerberos principal details
*/
@@ -245,6 +246,11 @@ public class HeartBeatHandler {
}
}
+ if (heartbeat.getRecoveryReport() != null) {
+ RecoveryReport rr = heartbeat.getRecoveryReport();
+ processRecoveryReport(rr, hostname);
+ }
+
try {
if (heartbeat.getNodeStatus().getStatus().equals(HostStatus.Status.HEALTHY)) {
hostObject.handleEvent(new HostHealthyHeartbeatEvent(hostname, now,
@@ -254,7 +260,7 @@ public class HeartBeatHandler {
null));
}
} catch (InvalidStateTransitionException ex) {
- LOG.warn("Asking agent to reregister due to " + ex.getMessage(), ex);
+ LOG.warn("Asking agent to re-register due to " + ex.getMessage(), ex);
hostObject.setState(HostState.INIT);
return createRegisterCommand();
}
@@ -262,7 +268,7 @@ public class HeartBeatHandler {
// Examine heartbeat for command reports
processCommandReports(heartbeat, hostname, clusterFsm, now);
- // Examine heartbeart for component live status reports
+ // Examine heartbeat for component live status reports
processStatusReports(heartbeat, hostname, clusterFsm);
// Calculate host status
@@ -317,6 +323,12 @@ public class HeartBeatHandler {
}
}
+ protected void processRecoveryReport(RecoveryReport recoveryReport, String hostname) throws AmbariException {
+ LOG.debug("Received recovery report: " + recoveryReport.toString());
+ Host host = clusterFsm.getHost(hostname);
+ host.setRecoveryReport(recoveryReport);
+ }
+
protected void processHostStatus(HeartBeat heartbeat, String hostname) throws AmbariException {
Host host = clusterFsm.getHost(hostname);
@@ -696,6 +708,9 @@ public class HeartBeatHandler {
" (" + e.getMessage() + ")");
}
}
+
+ this.heartbeatMonitor.getAgentRequests()
+ .setExecutionDetailsRequest(hostname, componentName, status.getSendExecCmdDet());
} else {
// TODO: What should be done otherwise?
}
@@ -941,6 +956,11 @@ public class HeartBeatHandler {
List<AlertDefinitionCommand> alertDefinitionCommands = getRegistrationAlertDefinitionCommands(hostname);
response.setAlertDefinitionCommands(alertDefinitionCommands);
+ response.setRecoveryConfig(RecoveryConfig.getRecoveryConfig(config));
+ if(response.getRecoveryConfig() != null) {
+ LOG.debug("Recovery configuration set to " + response.getRecoveryConfig().toString());
+ }
+
Long requestId = 0L;
hostResponseIds.put(hostname, requestId);
response.setResponseId(requestId);
http://git-wip-us.apache.org/repos/asf/ambari/blob/dcbf12ef/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatMonitor.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatMonitor.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatMonitor.java
index d44e0a8..ad55c05 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatMonitor.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatMonitor.java
@@ -33,7 +33,9 @@ import java.util.List;
import java.util.Map;
import java.util.TreeMap;
+import com.google.inject.Inject;
import org.apache.ambari.server.AmbariException;
+import org.apache.ambari.server.RoleCommand;
import org.apache.ambari.server.actionmanager.ActionManager;
import org.apache.ambari.server.api.services.AmbariMetaInfo;
import org.apache.ambari.server.configuration.Configuration;
@@ -75,6 +77,7 @@ public class HeartbeatMonitor implements Runnable {
private final AmbariMetaInfo ambariMetaInfo;
private final AmbariManagementController ambariManagementController;
private final Configuration configuration;
+ private final AgentRequests agentRequests;
public HeartbeatMonitor(Clusters clusters, ActionQueue aq, ActionManager am,
int threadWakeupInterval, Injector injector) {
@@ -87,6 +90,7 @@ public class HeartbeatMonitor implements Runnable {
ambariManagementController = injector.getInstance(
AmbariManagementController.class);
configuration = injector.getInstance(Configuration.class);
+ agentRequests = new AgentRequests();
}
public void shutdown() {
@@ -106,6 +110,10 @@ public class HeartbeatMonitor implements Runnable {
return monitorThread.isAlive();
}
+ public AgentRequests getAgentRequests() {
+ return this.agentRequests;
+ }
+
@Override
public void run() {
while (shouldRun) {
@@ -217,7 +225,7 @@ public class HeartbeatMonitor implements Runnable {
}
/**
- * Generates status command and fills all apropriate fields.
+ * Generates status command and fills all appropriate fields.
* @throws AmbariException
*/
private StatusCommand createStatusCommand(String hostname, Cluster cluster,
@@ -241,6 +249,10 @@ public class HeartbeatMonitor implements Runnable {
//Config clusterConfig = cluster.getDesiredConfigByType(GLOBAL);
Collection<Config> clusterConfigs = cluster.getAllConfigs();
+ // Apply global properties for this host from all config groups
+ Map<String, Map<String, String>> allConfigTags = configHelper
+ .getEffectiveDesiredTags(cluster, hostname);
+
for(Config clusterConfig: clusterConfigs) {
if(!clusterConfig.getType().endsWith("-env")) {
continue;
@@ -250,10 +262,6 @@ public class HeartbeatMonitor implements Runnable {
// cluster config for 'global'
Map<String, String> props = new HashMap<String, String>(clusterConfig.getProperties());
- // Apply global properties for this host from all config groups
- Map<String, Map<String, String>> allConfigTags = configHelper
- .getEffectiveDesiredTags(cluster, hostname);
-
Map<String, Map<String, String>> configTags = new HashMap<String,
Map<String, String>>();
@@ -294,6 +302,13 @@ public class HeartbeatMonitor implements Runnable {
statusCmd.setConfigurationAttributes(configurationAttributes);
statusCmd.setHostname(hostname);
+ // If Agent wants the command and the States differ
+ statusCmd.setDesiredState(sch.getDesiredState());
+ if (getAgentRequests().shouldSendExecutionDetails(hostname, componentName)) {
+ LOG.info(componentName + " is at " + sch.getState() + " adding more payload per agent ask");
+ statusCmd.setPayloadLevel(StatusCommand.StatusCommandPayload.EXECUTION_COMMAND);
+ }
+
// Fill command params
Map<String, String> commandParams = statusCmd.getCommandParams();
@@ -322,7 +337,13 @@ public class HeartbeatMonitor implements Runnable {
hostLevelParams.put(STACK_NAME, stackId.getStackName());
hostLevelParams.put(STACK_VERSION, stackId.getStackVersion());
+
+ if (statusCmd.getPayloadLevel() == StatusCommand.StatusCommandPayload.EXECUTION_COMMAND) {
+ ExecutionCommand ec = ambariManagementController.getExecutionCommand(cluster, sch, RoleCommand.START);
+ statusCmd.setExecutionCommand(ec);
+ LOG.debug(componentName + " has more payload for execution command");
+ }
+
return statusCmd;
}
-
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/dcbf12ef/ambari-server/src/main/java/org/apache/ambari/server/agent/RecoveryConfig.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/RecoveryConfig.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/RecoveryConfig.java
new file mode 100644
index 0000000..9ebfb49
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/RecoveryConfig.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.agent;
+
+import com.google.gson.annotations.SerializedName;
+import org.apache.ambari.server.configuration.Configuration;
+
+
+/**
+ * Recovery config to be sent to the agent
+ */
+public class RecoveryConfig {
+
+ /**
+ * Creates a holder for agent requests
+ */
+ public RecoveryConfig() {
+ }
+
+ @SerializedName("type")
+ private String type;
+
+ @SerializedName("maxCount")
+ private String maxCount;
+
+ @SerializedName("windowInMinutes")
+ private String windowInMinutes;
+
+ @SerializedName("retryGap")
+ private String retryGap;
+
+ @SerializedName("maxLifetimeCount")
+ private String maxLifetimeCount;
+
+
+ public String getType() {
+ return type;
+ }
+
+ public void setType(String type) {
+ this.type = type;
+ }
+
+ public String getMaxCount() {
+ return maxCount;
+ }
+
+ public void setMaxCount(String maxCount) {
+ this.maxCount = maxCount;
+ }
+
+ public String getWindowInMinutes() {
+ return windowInMinutes;
+ }
+
+ public void setWindowInMinutes(String windowInMinutes) {
+ this.windowInMinutes = windowInMinutes;
+ }
+
+ public String getRetryGap() {
+ return retryGap;
+ }
+
+ public void setRetryGap(String retryGap) {
+ this.retryGap = retryGap;
+ }
+
+ public String getMaxLifetimeCount() {
+ return maxLifetimeCount;
+ }
+
+ public void setMaxLifetimeCount(String maxLifetimeCount) {
+ this.maxLifetimeCount = maxLifetimeCount;
+ }
+
+ public static RecoveryConfig getRecoveryConfig(Configuration conf) {
+ RecoveryConfig rc = new RecoveryConfig();
+ rc.setMaxCount(conf.getNodeRecoveryMaxCount());
+ rc.setMaxLifetimeCount(conf.getNodeRecoveryLifetimeMaxCount());
+ rc.setRetryGap(conf.getNodeRecoveryRetryGap());
+ rc.setType(conf.getNodeRecoveryType());
+ rc.setWindowInMinutes(conf.getNodeRecoveryWindowInMin());
+ return rc;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder buffer = new StringBuilder("RecoveryConfig{");
+ buffer.append(", type=").append(type);
+ buffer.append(", maxCount=").append(maxCount);
+ buffer.append(", windowInMinutes=").append(windowInMinutes);
+ buffer.append(", retryGap=").append(retryGap);
+ buffer.append(", maxLifetimeCount=").append(maxLifetimeCount);
+ buffer.append('}');
+ return buffer.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/dcbf12ef/ambari-server/src/main/java/org/apache/ambari/server/agent/RecoveryReport.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/RecoveryReport.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/RecoveryReport.java
new file mode 100644
index 0000000..f9d3e7c
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/RecoveryReport.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.agent;
+
+import org.codehaus.jackson.annotate.JsonProperty;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+
+public class RecoveryReport {
+
+ /**
+ * One of DISABLED, RECOVERABLE, UNRECOVERABLE, PARTIALLY_RECOVERABLE
+ */
+ private String summary = "DISABLED";
+ private List<ComponentRecoveryReport> componentReports = new ArrayList<ComponentRecoveryReport>();
+
+
+ @JsonProperty("summary")
+ public String getSummary() {
+ return summary;
+ }
+
+ @JsonProperty("summary")
+ public void setSummary(String summary) {
+ this.summary = summary;
+ }
+
+ @JsonProperty("component_reports")
+ public List<ComponentRecoveryReport> getComponentReports() {
+ return componentReports;
+ }
+
+ @JsonProperty("component_reports")
+ public void setComponentReports(List<ComponentRecoveryReport> componentReports) {
+ this.componentReports = componentReports;
+ }
+
+ @Override
+ public String toString() {
+ String componentReportsStr = "[]";
+ if (componentReports != null) {
+ componentReportsStr = Arrays.toString(componentReports.toArray());
+ }
+ return "RecoveryReport{" +
+ "summary='" + summary + '\'' +
+ ", component_reports='" + componentReportsStr +
+ "'}";
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/dcbf12ef/ambari-server/src/main/java/org/apache/ambari/server/agent/RegistrationResponse.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/RegistrationResponse.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/RegistrationResponse.java
index 8a24560..8768a46 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/RegistrationResponse.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/RegistrationResponse.java
@@ -58,6 +58,9 @@ public class RegistrationResponse {
@JsonProperty("responseId")
private long responseId;
+ @JsonProperty("recoveryConfig")
+ private RecoveryConfig recoveryConfig;
+
@JsonProperty("statusCommands")
private List<StatusCommand> statusCommands = null;
@@ -81,7 +84,6 @@ public class RegistrationResponse {
* Gets the alert definition commands that contain the alert definitions for
* each cluster that the host is a member of.
*
- * @param commands
* the commands, or {@code null} for none.
*/
public List<AlertDefinitionCommand> getAlertDefinitionCommands() {
@@ -115,6 +117,14 @@ public class RegistrationResponse {
this.log = log;
}
+ public RecoveryConfig getRecoveryConfig() {
+ return recoveryConfig;
+ }
+
+ public void setRecoveryConfig(RecoveryConfig recoveryConfig) {
+ this.recoveryConfig = recoveryConfig;
+ }
+
@Override
public String toString() {
StringBuilder buffer = new StringBuilder("RegistrationResponse{");
@@ -122,6 +132,7 @@ public class RegistrationResponse {
buffer.append(", responseId=").append(responseId);
buffer.append(", statusCommands=").append(statusCommands);
buffer.append(", alertDefinitionCommands=").append(alertDefinitionCommands);
+ buffer.append(", recoveryConfig=").append(recoveryConfig);
buffer.append('}');
return buffer.toString();
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/dcbf12ef/ambari-server/src/main/java/org/apache/ambari/server/agent/StatusCommand.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/StatusCommand.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/StatusCommand.java
index 6e08ef0..0e08ed8 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/StatusCommand.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/StatusCommand.java
@@ -17,11 +17,12 @@
*/
package org.apache.ambari.server.agent;
+import com.google.gson.annotations.SerializedName;
+import org.apache.ambari.server.state.State;
+
import java.util.HashMap;
import java.util.Map;
-import com.google.gson.annotations.SerializedName;
-
/**
* Command to report the status of a list of services in roles.
*/
@@ -55,6 +56,39 @@ public class StatusCommand extends AgentCommand {
@SerializedName("hostname")
private String hostname = null;
+ @SerializedName("payloadLevel")
+ private StatusCommandPayload payloadLevel = StatusCommandPayload.DEFAULT;
+
+ @SerializedName("desiredState")
+ private State desiredState;
+
+ @SerializedName("executionCommandDetails")
+ private ExecutionCommand executionCommand;
+
+ public ExecutionCommand getExecutionCommand() {
+ return executionCommand;
+ }
+
+ public void setExecutionCommand(ExecutionCommand executionCommand) {
+ this.executionCommand = executionCommand;
+ }
+
+ public State getDesiredState() {
+ return desiredState;
+ }
+
+ public void setDesiredState(State desiredState) {
+ this.desiredState = desiredState;
+ }
+
+ public StatusCommandPayload getPayloadLevel() {
+ return payloadLevel;
+ }
+
+ public void setPayloadLevel(StatusCommandPayload payloadLevel) {
+ this.payloadLevel = payloadLevel;
+ }
+
public String getClusterName() {
return clusterName;
}
@@ -118,4 +152,13 @@ public class StatusCommand extends AgentCommand {
public String getHostname() {
return hostname;
}
+
+ public enum StatusCommandPayload {
+ // The minimal payload for status, agent adds necessary details
+ MINIMAL,
+ // default payload - backward compatible
+ DEFAULT,
+ // has enough details to construct START or INSTALL commands
+ EXECUTION_COMMAND
+ }
}