You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ao...@apache.org on 2018/07/30 12:53:34 UTC
[ambari] branch branch-2.7 updated: AMBARI-24349. Rescheduled and
canceled tasks stay in progress forever (aonishuk)
This is an automated email from the ASF dual-hosted git repository.
aonishuk pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/ambari.git
The following commit(s) were added to refs/heads/branch-2.7 by this push:
new 74690fa AMBARI-24349. Rescheduled and canceled tasks stay in progress forever (aonishuk)
74690fa is described below
commit 74690fa9d3f4ab6d31d829ee4b9326528b38522f
Author: Andrew Onishuk <ao...@hortonworks.com>
AuthorDate: Sun Jul 29 13:11:35 2018 +0300
AMBARI-24349. Rescheduled and canceled tasks stay in progress forever (aonishuk)
---
ambari-agent/src/main/python/ambari_agent/ActionQueue.py | 1 +
ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py | 10 ++++++----
2 files changed, 7 insertions(+), 4 deletions(-)
diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
index 393e651..072083a 100644
--- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
+++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
@@ -327,6 +327,7 @@ class ActionQueue(threading.Thread):
if com['taskId'] == command['taskId']:
logger.info("Command with taskId = {cid} was rescheduled by server. "
"Fail report on cancelled command won't be sent with heartbeat.".format(cid=taskId))
+ self.commandStatuses.delete_command_data(command['taskId'])
return
# final result to stdout
diff --git a/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py b/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py
index 567ea2d..d27dad4 100644
--- a/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py
+++ b/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py
@@ -58,6 +58,11 @@ class CommandStatusDict():
self.log_max_symbols_size = initializer_module.config.log_max_symbols_size
self.reported_reports = set()
+ def delete_command_data(self, key):
+ # delete stale data about this command
+ with self.lock:
+ self.reported_reports.discard(key)
+ self.current_state.pop(key, None)
def put_command_status(self, command, report):
"""
@@ -66,11 +71,8 @@ class CommandStatusDict():
from ActionQueue import ActionQueue
key = command['taskId']
-
# delete stale data about this command
- with self.lock:
- self.reported_reports.discard(key)
- self.current_state.pop(key, None)
+ self.delete_command_data(key)
is_sent, correlation_id = self.force_update_to_server({command['clusterId']: [report]})
updatable = report['status'] == CommandStatus.in_progress and self.command_update_output