You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ao...@apache.org on 2020/11/25 09:52:50 UTC
[ambari] branch branch-2.7 updated: AMBARI-25589. When hearbeat is
lost sometimes start/stop tasks can hang for a long time. (aonishuk)
This is an automated email from the ASF dual-hosted git repository.
aonishuk pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/ambari.git
The following commit(s) were added to refs/heads/branch-2.7 by this push:
new 93a4d3d AMBARI-25589. When hearbeat is lost sometimes start/stop tasks can hang for a long time. (aonishuk)
93a4d3d is described below
commit 93a4d3ddf19edfd6bbbc58e6c8396679a5f68617
Author: Andrew Onishuk <ao...@hortonworks.com>
AuthorDate: Tue Nov 24 13:21:50 2020 +0200
AMBARI-25589. When hearbeat is lost sometimes start/stop tasks can hang for a long time. (aonishuk)
---
.../main/python/ambari_agent/CommandStatusDict.py | 20 +++++++++-----------
1 file changed, 9 insertions(+), 11 deletions(-)
diff --git a/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py b/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py
index d27dad4..89d0a3d 100644
--- a/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py
+++ b/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py
@@ -68,19 +68,17 @@ class CommandStatusDict():
"""
Stores new version of report for command (replaces previous)
"""
- from ActionQueue import ActionQueue
-
- key = command['taskId']
- # delete stale data about this command
- self.delete_command_data(key)
+ with self.lock:
+ key = command['taskId']
+ # delete stale data about this command
+ self.delete_command_data(key)
+ self.queue_report_sending(key, command, report)
- is_sent, correlation_id = self.force_update_to_server({command['clusterId']: [report]})
- updatable = report['status'] == CommandStatus.in_progress and self.command_update_output
+ report_dict = {command['clusterId']: [report]}
+ is_sent, correlation_id = self.force_update_to_server(report_dict)
- if not is_sent or updatable:
- self.queue_report_sending(key, command, report)
- else:
- self.server_responses_listener.listener_functions_on_error[correlation_id] = lambda headers, message: self.queue_report_sending(key, command, report)
+ self.server_responses_listener.listener_functions_on_success[correlation_id] = lambda headers, message: \
+ self.clear_reported_reports(report_dict)
def queue_report_sending(self, key, command, report):
with self.lock: