You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ao...@apache.org on 2020/11/25 09:52:50 UTC

[ambari] branch branch-2.7 updated: AMBARI-25589. When hearbeat is lost sometimes start/stop tasks can hang for a long time. (aonishuk)

This is an automated email from the ASF dual-hosted git repository.

aonishuk pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/ambari.git


The following commit(s) were added to refs/heads/branch-2.7 by this push:
     new 93a4d3d  AMBARI-25589. When hearbeat is lost sometimes start/stop tasks can hang for a long time. (aonishuk)
93a4d3d is described below

commit 93a4d3ddf19edfd6bbbc58e6c8396679a5f68617
Author: Andrew Onishuk <ao...@hortonworks.com>
AuthorDate: Tue Nov 24 13:21:50 2020 +0200

    AMBARI-25589. When hearbeat is lost sometimes start/stop tasks can hang for a long time. (aonishuk)
---
 .../main/python/ambari_agent/CommandStatusDict.py    | 20 +++++++++-----------
 1 file changed, 9 insertions(+), 11 deletions(-)

diff --git a/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py b/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py
index d27dad4..89d0a3d 100644
--- a/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py
+++ b/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py
@@ -68,19 +68,17 @@ class CommandStatusDict():
     """
     Stores new version of report for command (replaces previous)
     """
-    from ActionQueue import ActionQueue
-
-    key = command['taskId']
-    # delete stale data about this command
-    self.delete_command_data(key)
+    with self.lock:
+      key = command['taskId']
+      # delete stale data about this command
+      self.delete_command_data(key)
+      self.queue_report_sending(key, command, report)
 
-    is_sent, correlation_id = self.force_update_to_server({command['clusterId']: [report]})
-    updatable = report['status'] == CommandStatus.in_progress and self.command_update_output
+      report_dict = {command['clusterId']: [report]}
+      is_sent, correlation_id = self.force_update_to_server(report_dict)
 
-    if not is_sent or updatable:
-      self.queue_report_sending(key, command, report)
-    else:
-      self.server_responses_listener.listener_functions_on_error[correlation_id] = lambda headers, message: self.queue_report_sending(key, command, report)
+      self.server_responses_listener.listener_functions_on_success[correlation_id] = lambda headers, message: \
+        self.clear_reported_reports(report_dict)
 
   def queue_report_sending(self, key, command, report):
     with self.lock: