You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by nc...@apache.org on 2017/01/05 00:05:06 UTC
[25/50] [abbrv] ambari git commit: AMBARI-19342. Race condition in
agent on command reschedule. (mpapirkovskyy)
AMBARI-19342. Race condition in agent on command reschedule. (mpapirkovskyy)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/315691fc
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/315691fc
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/315691fc
Branch: refs/heads/branch-dev-patch-upgrade
Commit: 315691fc790a94e4830fac0ccbb92046bd9a3719
Parents: c458134
Author: Myroslav Papirkovskyi <mp...@hortonworks.com>
Authored: Tue Jan 3 18:41:42 2017 +0200
Committer: Myroslav Papirkovskyi <mp...@hortonworks.com>
Committed: Tue Jan 3 20:18:35 2017 +0200
----------------------------------------------------------------------
.../src/main/python/ambari_agent/ActionQueue.py | 14 ++++++++------
.../src/main/python/ambari_agent/Controller.py | 17 +++++++++++------
2 files changed, 19 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/315691fc/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
index 793eeba..d70b344 100644
--- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
+++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
@@ -89,6 +89,7 @@ class ActionQueue(threading.Thread):
self.parallel_execution = config.get_parallel_exec_option()
if self.parallel_execution == 1:
logger.info("Parallel execution is enabled, will execute agent commands in parallel")
+ self.lock = threading.Lock()
def stop(self):
self._stop.set()
@@ -352,12 +353,13 @@ class ActionQueue(threading.Thread):
# do not fail task which was rescheduled from server
if command_canceled:
- with self.commandQueue.mutex:
- for com in self.commandQueue.queue:
- if com['taskId'] == command['taskId']:
- logger.info('Command with taskId = {cid} was rescheduled by server. '
- 'Fail report on cancelled command won\'t be sent with heartbeat.'.format(cid=taskId))
- return
+ with self.lock:
+ with self.commandQueue.mutex:
+ for com in self.commandQueue.queue:
+ if com['taskId'] == command['taskId']:
+ logger.info('Command with taskId = {cid} was rescheduled by server. '
+ 'Fail report on cancelled command won\'t be sent with heartbeat.'.format(cid=taskId))
+ return
# final result to stdout
commandresult['stdout'] += '\n\nCommand completed successfully!\n' if status == self.COMPLETED_STATUS else '\n\nCommand failed after ' + str(numAttempts) + ' tries\n'
http://git-wip-us.apache.org/repos/asf/ambari/blob/315691fc/ambari-agent/src/main/python/ambari_agent/Controller.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/Controller.py b/ambari-agent/src/main/python/ambari_agent/Controller.py
index a762d3f..56b1992 100644
--- a/ambari-agent/src/main/python/ambari_agent/Controller.py
+++ b/ambari-agent/src/main/python/ambari_agent/Controller.py
@@ -360,13 +360,18 @@ class Controller(threading.Thread):
self.cluster_configuration.update_configurations_from_heartbeat(response)
response_keys = response.keys()
- if 'cancelCommands' in response_keys:
- self.cancelCommandInQueue(response['cancelCommands'])
- if 'executionCommands' in response_keys:
- execution_commands = response['executionCommands']
- self.recovery_manager.process_execution_commands(execution_commands)
- self.addToQueue(execution_commands)
+ # there's case when canceled task can be processed in Action Queue.execute before adding rescheduled task to queue
+ # this can cause command failure instead result suppression
+ # so canceling and putting rescheduled commands should be executed atomically
+ with self.actionQueue.lock:
+ if 'cancelCommands' in response_keys:
+ self.cancelCommandInQueue(response['cancelCommands'])
+
+ if 'executionCommands' in response_keys:
+ execution_commands = response['executionCommands']
+ self.recovery_manager.process_execution_commands(execution_commands)
+ self.addToQueue(execution_commands)
if 'statusCommands' in response_keys:
# try storing execution command details and desired state