You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by nc...@apache.org on 2017/02/03 14:53:19 UTC
[15/50] [abbrv] ambari git commit: AMBARI-19802. Debug: agent
randomly losing heartbeat with the server. (Attila Doroszlai via stoader)
AMBARI-19802. Debug: agent randomly losing heartbeat with the server. (Attila Doroszlai via stoader)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/26273808
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/26273808
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/26273808
Branch: refs/heads/branch-dev-patch-upgrade
Commit: 2627380840c1ae57d21cc60b666b4a503863366e
Parents: 0ec0597
Author: Attila Doroszlai <ad...@hortonworks.com>
Authored: Wed Feb 1 19:06:29 2017 +0100
Committer: Toader, Sebastian <st...@hortonworks.com>
Committed: Wed Feb 1 19:06:29 2017 +0100
----------------------------------------------------------------------
.../src/main/python/ambari_agent/Controller.py | 51 +++++++++++++-------
.../ambari_agent/StatusCommandsExecutor.py | 1 +
2 files changed, 34 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/26273808/ambari-agent/src/main/python/ambari_agent/Controller.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/Controller.py b/ambari-agent/src/main/python/ambari_agent/Controller.py
index 6370715..45c057f 100644
--- a/ambari-agent/src/main/python/ambari_agent/Controller.py
+++ b/ambari-agent/src/main/python/ambari_agent/Controller.py
@@ -274,7 +274,7 @@ class Controller(threading.Thread):
self.DEBUG_SUCCESSFULL_HEARTBEATS = 0
retry = False
certVerifFailed = False
- state_interval = self.config.get('heartbeat', 'state_interval_seconds', '60')
+ state_interval = int(self.config.get('heartbeat', 'state_interval_seconds', '60'))
# last time when state was successfully sent to server
last_state_timestamp = 0.0
@@ -289,27 +289,34 @@ class Controller(threading.Thread):
getrecoverycommands_timestamp = 0.0
getrecoverycommands_interval = self.netutil.HEARTBEAT_IDLE_INTERVAL_DEFAULT_MAX_SEC
+ heartbeat_interval = self.netutil.HEARTBEAT_IDLE_INTERVAL_DEFAULT_MAX_SEC
+
while not self.DEBUG_STOP_HEARTBEATING:
- heartbeat_interval = self.netutil.HEARTBEAT_IDLE_INTERVAL_DEFAULT_MAX_SEC
+ current_time = time.time()
+ logging_level = logging.DEBUG
+ if current_time - heartbeat_running_msg_timestamp > state_interval:
+ # log more steps every minute or so
+ logging_level = logging.INFO
+ heartbeat_running_msg_timestamp = current_time
try:
- crt_time = time.time()
- if crt_time - heartbeat_running_msg_timestamp > int(state_interval):
- logger.info("Heartbeat (response id = %s) with server is running...", self.responseId)
- heartbeat_running_msg_timestamp = crt_time
+ logger.log(logging_level, "Heartbeat (response id = %s) with server is running...", self.responseId)
send_state = False
if not retry:
- if crt_time - last_state_timestamp > int(state_interval):
+ if current_time - last_state_timestamp > state_interval:
send_state = True
- data = json.dumps(
- self.heartbeat.build(self.responseId, send_state, self.hasMappedComponents))
+ logger.log(logging_level, "Building heartbeat message")
+
+ data = json.dumps(self.heartbeat.build(self.responseId, send_state, self.hasMappedComponents))
else:
self.DEBUG_HEARTBEAT_RETRIES += 1
-
- logger.debug("Sending Heartbeat (id = %s): %s", self.responseId, data)
+ if logger.isEnabledFor(logging.DEBUG):
+ logger.log(logging_level, "Sending Heartbeat (id = %s): %s", self.responseId, data)
+ else:
+ logger.log(logging_level, "Sending Heartbeat (id = %s)", self.responseId)
response = self.sendRequest(self.heartbeatUrl, data)
exitStatus = 0
@@ -321,8 +328,7 @@ class Controller(threading.Thread):
serverId = int(response['responseId'])
-
- logger.debug('Heartbeat response received (id = %s)', serverId)
+ logger.log(logging_level, 'Heartbeat response received (id = %s)', serverId)
cluster_size = int(response['clusterSize']) if 'clusterSize' in response.keys() else -1
@@ -331,8 +337,7 @@ class Controller(threading.Thread):
if cluster_size > 0 \
else self.netutil.HEARTBEAT_IDLE_INTERVAL_DEFAULT_MAX_SEC
-
- logger.debug("Heartbeat interval is %s seconds", heartbeat_interval)
+ logger.log(logging_level, "Heartbeat interval is %s seconds", heartbeat_interval)
if 'hasMappedComponents' in response.keys():
self.hasMappedComponents = response['hasMappedComponents'] is not False
@@ -364,10 +369,11 @@ class Controller(threading.Thread):
else:
self.responseId = serverId
if send_state:
- last_state_timestamp = time.time()
+ last_state_timestamp = current_time
# if the response contains configurations, update the in-memory and
# disk-based configuration cache (execution and alert commands have this)
+ logger.log(logging_level, "Updating configurations from heartbeat")
self.cluster_configuration.update_configurations_from_heartbeat(response)
response_keys = response.keys()
@@ -375,6 +381,8 @@ class Controller(threading.Thread):
# there's case when canceled task can be processed in Action Queue.execute before adding rescheduled task to queue
# this can cause command failure instead result suppression
# so canceling and putting rescheduled commands should be executed atomically
+ if 'cancelCommands' in response_keys or 'executionCommands' in response_keys:
+ logger.log(logging_level, "Adding cancel/execution commands")
with self.actionQueue.lock:
if 'cancelCommands' in response_keys:
self.cancelCommandInQueue(response['cancelCommands'])
@@ -388,9 +396,10 @@ class Controller(threading.Thread):
# try storing execution command details and desired state
self.addToStatusQueue(response['statusCommands'])
- if crt_time - getrecoverycommands_timestamp > int(getrecoverycommands_interval):
- getrecoverycommands_timestamp = crt_time
+ if current_time - getrecoverycommands_timestamp > getrecoverycommands_interval:
+ getrecoverycommands_timestamp = current_time
if not self.actionQueue.tasks_in_progress_or_pending():
+ logger.log(logging_level, "Adding recovery commands")
recovery_commands = self.recovery_manager.get_recovery_commands()
for recovery_command in recovery_commands:
logger.info("Adding recovery command %s for component %s",
@@ -398,9 +407,11 @@ class Controller(threading.Thread):
self.addToQueue([recovery_command])
if 'alertDefinitionCommands' in response_keys:
+ logger.log(logging_level, "Updating alert definitions")
self.alert_scheduler_handler.update_definitions(response)
if 'alertExecutionCommands' in response_keys:
+ logger.log(logging_level, "Executing alert commands")
self.alert_scheduler_handler.execute_alert(response['alertExecutionCommands'])
if "true" == response['restartAgent']:
@@ -414,6 +425,7 @@ class Controller(threading.Thread):
if "recoveryConfig" in response:
# update the list of components enabled for recovery
+ logger.log(logging_level, "Updating recovery config")
self.recovery_manager.update_configuration_from_registration(response)
retry = False
@@ -455,12 +467,15 @@ class Controller(threading.Thread):
# Sleep for some time
timeout = heartbeat_interval - self.netutil.MINIMUM_INTERVAL_BETWEEN_HEARTBEATS
+ logger.log(logging_level, "Waiting %s for next heartbeat", timeout)
if 0 == self.heartbeat_stop_callback.wait(timeout, self.netutil.MINIMUM_INTERVAL_BETWEEN_HEARTBEATS):
# Stop loop when stop event received
logger.info("Stop event received")
self.DEBUG_STOP_HEARTBEATING=True
+ logger.log(logging_level, "Wait for next heartbeat over")
+
def spawnStatusCommandsExecutorProcess(self):
'''
Starts a new StatusCommandExecutor child process. In case there is a running instance
http://git-wip-us.apache.org/repos/asf/ambari/blob/26273808/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py b/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py
index fbb29f4..2f15770 100644
--- a/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py
+++ b/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py
@@ -47,6 +47,7 @@ class StatusCommandsExecutor(multiprocessing.Process):
def run(self):
try:
bind_debug_signal_handlers()
+ logger.info("StatusCommandsExecutor starting")
while True:
command = self.actionQueue.statusCommandQueue.get(True) # blocks until status status command appears
logger.debug("Running status command for {0}".format(command['componentName']))