You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ao...@apache.org on 2018/06/14 16:09:00 UTC

[ambari] 01/02: AMBARI-24097. Canceling task during blueprint install results in agent not responding to any other tasks (aonishuk)

This is an automated email from the ASF dual-hosted git repository.

aonishuk pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/ambari.git

commit 451cdacb6bd6549de297ef4c995111a6e01832f6
Author: Andrew Onishuk <ao...@hortonworks.com>
AuthorDate: Thu Jun 14 12:33:52 2018 +0300

    AMBARI-24097. Canceling task during blueprint install results in agent not responding to any other tasks (aonishuk)
---
 ambari-agent/src/main/python/ambari_agent/ActionQueue.py | 14 +++++++++++++-
 1 file changed, 13 insertions(+), 1 deletion(-)

diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
index f0c996b..6ee3ec0 100644
--- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
+++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
@@ -83,6 +83,8 @@ class ActionQueue(threading.Thread):
     self.tmpdir = self.config.get('agent', 'prefix')
     self.customServiceOrchestrator = initializer_module.customServiceOrchestrator
     self.parallel_execution = self.config.get_parallel_exec_option()
+    self.taskIdsToCancel = set()
+    self.cancelEvent = threading.Event()
     self.component_status_executor = initializer_module.component_status_executor
     if self.parallel_execution == 1:
       logger.info("Parallel execution is enabled, will execute agent commands in parallel")
@@ -133,6 +135,8 @@ class ActionQueue(threading.Thread):
 
       # Kill if in progress
       self.customServiceOrchestrator.cancel_command(task_id, reason)
+      self.taskIdsToCancel.add(task_id)
+      self.cancelEvent.set()
 
   def run(self):
     while not self.stop_event.is_set():
@@ -275,6 +279,13 @@ class ActionQueue(threading.Thread):
                  format(taskId=taskId, retryAble=retryAble, retryDuration=retryDuration, log_command_output=log_command_output))
     command_canceled = False
     while retryDuration >= 0:
+      if taskId in self.taskIdsToCancel:
+        logger.info('Command with taskId = {0} canceled'.format(taskId))
+        command_canceled = True
+
+        self.taskIdsToCancel.discard(taskId)
+        break
+
       numAttempts += 1
       start = 0
       if retryAble:
@@ -303,6 +314,7 @@ class ActionQueue(threading.Thread):
           if (commandresult['exitcode'] == -signal.SIGTERM) or (commandresult['exitcode'] == -signal.SIGKILL):
             logger.info('Command with taskId = {cid} was canceled!'.format(cid=taskId))
             command_canceled = True
+            self.taskIdsToCancel.discard(taskId)
             break
 
       if status != self.COMPLETED_STATUS and retryAble and retryDuration > 0:
@@ -316,7 +328,7 @@ class ActionQueue(threading.Thread):
           command['agentLevelParams'] = {}
 
         command['agentLevelParams']['commandBeingRetried'] = "true"
-        time.sleep(delay)
+        self.cancelEvent.wait(delay) # wake up if something was canceled
         continue
       else:
         logger.info("Quit retrying for command with taskId = {cid}. Status: {status}, retryAble: {retryAble}, retryDuration (sec): {retryDuration}, last delay (sec): {delay}"

-- 
To stop receiving notification emails like this one, please contact
aonishuk@apache.org.