You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by nc...@apache.org on 2017/01/05 00:05:06 UTC

[25/50] [abbrv] ambari git commit: AMBARI-19342. Race condition in agent on command reschedule. (mpapirkovskyy)

AMBARI-19342. Race condition in agent on command reschedule. (mpapirkovskyy)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/315691fc
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/315691fc
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/315691fc

Branch: refs/heads/branch-dev-patch-upgrade
Commit: 315691fc790a94e4830fac0ccbb92046bd9a3719
Parents: c458134
Author: Myroslav Papirkovskyi <mp...@hortonworks.com>
Authored: Tue Jan 3 18:41:42 2017 +0200
Committer: Myroslav Papirkovskyi <mp...@hortonworks.com>
Committed: Tue Jan 3 20:18:35 2017 +0200

----------------------------------------------------------------------
 .../src/main/python/ambari_agent/ActionQueue.py    | 14 ++++++++------
 .../src/main/python/ambari_agent/Controller.py     | 17 +++++++++++------
 2 files changed, 19 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/315691fc/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
index 793eeba..d70b344 100644
--- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
+++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
@@ -89,6 +89,7 @@ class ActionQueue(threading.Thread):
     self.parallel_execution = config.get_parallel_exec_option()
     if self.parallel_execution == 1:
       logger.info("Parallel execution is enabled, will execute agent commands in parallel")
+    self.lock = threading.Lock()
 
   def stop(self):
     self._stop.set()
@@ -352,12 +353,13 @@ class ActionQueue(threading.Thread):
 
     # do not fail task which was rescheduled from server
     if command_canceled:
-      with self.commandQueue.mutex:
-        for com in self.commandQueue.queue:
-          if com['taskId'] == command['taskId']:
-            logger.info('Command with taskId = {cid} was rescheduled by server. '
-                        'Fail report on cancelled command won\'t be sent with heartbeat.'.format(cid=taskId))
-            return
+      with self.lock:
+        with self.commandQueue.mutex:
+          for com in self.commandQueue.queue:
+            if com['taskId'] == command['taskId']:
+              logger.info('Command with taskId = {cid} was rescheduled by server. '
+                          'Fail report on cancelled command won\'t be sent with heartbeat.'.format(cid=taskId))
+              return
 
     # final result to stdout
     commandresult['stdout'] += '\n\nCommand completed successfully!\n' if status == self.COMPLETED_STATUS else '\n\nCommand failed after ' + str(numAttempts) + ' tries\n'

http://git-wip-us.apache.org/repos/asf/ambari/blob/315691fc/ambari-agent/src/main/python/ambari_agent/Controller.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/Controller.py b/ambari-agent/src/main/python/ambari_agent/Controller.py
index a762d3f..56b1992 100644
--- a/ambari-agent/src/main/python/ambari_agent/Controller.py
+++ b/ambari-agent/src/main/python/ambari_agent/Controller.py
@@ -360,13 +360,18 @@ class Controller(threading.Thread):
         self.cluster_configuration.update_configurations_from_heartbeat(response)
 
         response_keys = response.keys()
-        if 'cancelCommands' in response_keys:
-          self.cancelCommandInQueue(response['cancelCommands'])
 
-        if 'executionCommands' in response_keys:
-          execution_commands = response['executionCommands']
-          self.recovery_manager.process_execution_commands(execution_commands)
-          self.addToQueue(execution_commands)
+        # there's case when canceled task can be processed in Action Queue.execute before adding rescheduled task to queue
+        # this can cause command failure instead result suppression
+        # so canceling and putting rescheduled commands should be executed atomically
+        with self.actionQueue.lock:
+          if 'cancelCommands' in response_keys:
+            self.cancelCommandInQueue(response['cancelCommands'])
+
+          if 'executionCommands' in response_keys:
+            execution_commands = response['executionCommands']
+            self.recovery_manager.process_execution_commands(execution_commands)
+            self.addToQueue(execution_commands)
 
         if 'statusCommands' in response_keys:
           # try storing execution command details and desired state