You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ao...@apache.org on 2018/07/30 12:53:25 UTC

[ambari] branch trunk updated: AMBARI-24349. Rescheduled and canceled tasks stay in progress forever (aonishuk)

This is an automated email from the ASF dual-hosted git repository.

aonishuk pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/ambari.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 02ba3c4  AMBARI-24349. Rescheduled and canceled tasks stay in progress forever (aonishuk)
02ba3c4 is described below

commit 02ba3c4be31fc0364811dd1a8a87fa031768368f
Author: Andrew Onishuk <ao...@hortonworks.com>
AuthorDate: Sun Jul 29 13:11:30 2018 +0300

    AMBARI-24349. Rescheduled and canceled tasks stay in progress forever (aonishuk)
---
 ambari-agent/src/main/python/ambari_agent/ActionQueue.py       |  1 +
 ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py | 10 ++++++----
 2 files changed, 7 insertions(+), 4 deletions(-)

diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
index 393e651..072083a 100644
--- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
+++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
@@ -327,6 +327,7 @@ class ActionQueue(threading.Thread):
             if com['taskId'] == command['taskId']:
               logger.info("Command with taskId = {cid} was rescheduled by server. "
                           "Fail report on cancelled command won't be sent with heartbeat.".format(cid=taskId))
+              self.commandStatuses.delete_command_data(command['taskId'])
               return
 
     # final result to stdout
diff --git a/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py b/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py
index 567ea2d..d27dad4 100644
--- a/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py
+++ b/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py
@@ -58,6 +58,11 @@ class CommandStatusDict():
     self.log_max_symbols_size = initializer_module.config.log_max_symbols_size
     self.reported_reports = set()
 
+  def delete_command_data(self, key):
+    # delete stale data about this command
+    with self.lock:
+      self.reported_reports.discard(key)
+      self.current_state.pop(key, None)
 
   def put_command_status(self, command, report):
     """
@@ -66,11 +71,8 @@ class CommandStatusDict():
     from ActionQueue import ActionQueue
 
     key = command['taskId']
-
     # delete stale data about this command
-    with self.lock:
-      self.reported_reports.discard(key)
-      self.current_state.pop(key, None)
+    self.delete_command_data(key)
 
     is_sent, correlation_id = self.force_update_to_server({command['clusterId']: [report]})
     updatable = report['status'] == CommandStatus.in_progress and self.command_update_output