You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by nc...@apache.org on 2016/12/02 20:26:32 UTC
[34/50] [abbrv] ambari git commit: AMBARI-19051. Stage is sometimes
marked as failed on command reschedule. (mpapirkovskyy)
AMBARI-19051. Stage is sometimes marked as failed on command reschedule. (mpapirkovskyy)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/d24beb17
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/d24beb17
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/d24beb17
Branch: refs/heads/branch-dev-patch-upgrade
Commit: d24beb17e238391b196a2bfae1217035678a0a14
Parents: 615438b
Author: Myroslav Papirkovskyi <mp...@hortonworks.com>
Authored: Thu Dec 1 18:48:25 2016 +0200
Committer: Myroslav Papirkovskyi <mp...@hortonworks.com>
Committed: Fri Dec 2 09:51:16 2016 +0200
----------------------------------------------------------------------
.../src/main/python/ambari_agent/ActionQueue.py | 11 +++++
.../test/python/ambari_agent/TestActionQueue.py | 48 ++++++++++++++++++++
2 files changed, 59 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/d24beb17/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
index 4416b9a..cb4fcb9 100644
--- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
+++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
@@ -300,6 +300,7 @@ class ActionQueue(threading.Thread):
logger.info("Command execution metadata - taskId = {taskId}, retry enabled = {retryAble}, max retry duration (sec) = {retryDuration}, log_output = {log_command_output}".
format(taskId=taskId, retryAble=retryAble, retryDuration=retryDuration, log_command_output=log_command_output))
+ command_canceled = False
while retryDuration >= 0:
numAttempts += 1
start = 0
@@ -328,6 +329,7 @@ class ActionQueue(threading.Thread):
status = self.FAILED_STATUS
if (commandresult['exitcode'] == -signal.SIGTERM) or (commandresult['exitcode'] == -signal.SIGKILL):
logger.info('Command with taskId = {cid} was canceled!'.format(cid=taskId))
+ command_canceled = True
break
if status != self.COMPLETED_STATUS and retryAble and retryDuration > 0:
@@ -344,6 +346,15 @@ class ActionQueue(threading.Thread):
.format(cid=taskId, status=status, retryAble=retryAble, retryDuration=retryDuration, delay=delay))
break
+ # do not fail task which was rescheduled from server
+ if command_canceled:
+ with self.commandQueue.mutex:
+ for com in self.commandQueue.queue:
+ if com['taskId'] == command['taskId']:
+ logger.info('Command with taskId = {cid} was rescheduled by server. '
+ 'Fail report on cancelled command won\'t be sent with heartbeat.'.format(cid=taskId))
+ return
+
# final result to stdout
commandresult['stdout'] += '\n\nCommand completed successfully!\n' if status == self.COMPLETED_STATUS else '\n\nCommand failed after ' + str(numAttempts) + ' tries\n'
logger.info('Command with taskId = {cid} completed successfully!'.format(cid=taskId) if status == self.COMPLETED_STATUS else 'Command with taskId = {cid} failed after {attempts} tries'.format(cid=taskId, attempts=numAttempts))
http://git-wip-us.apache.org/repos/asf/ambari/blob/d24beb17/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
index 4a63f7c..65127f2 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
@@ -27,6 +27,7 @@ import os, errno, time, pprint, tempfile, threading
import sys
from threading import Thread
import copy
+import signal
from mock.mock import patch, MagicMock, call
from ambari_agent.CustomServiceOrchestrator import CustomServiceOrchestrator
@@ -690,6 +691,53 @@ class TestActionQueue(TestCase):
report = actionQueue.result()
self.assertEqual(len(report['reports']), 0)
+ def test_cancel_with_reschedule_command(self):
+ config = AmbariConfig()
+ tempdir = tempfile.gettempdir()
+ config.set('agent', 'prefix', tempdir)
+ config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
+ config.set('agent', 'tolerate_download_failures', "true")
+ dummy_controller = MagicMock()
+ actionQueue = ActionQueue(config, dummy_controller)
+ unfreeze_flag = threading.Event()
+ python_execution_result_dict = {
+ 'stdout': 'out',
+ 'stderr': 'stderr',
+ 'structuredOut' : '',
+ 'status' : '',
+ 'exitcode' : -signal.SIGTERM
+ }
+
+ def side_effect(command, tmpoutfile, tmperrfile, override_output_files=True, retry=False):
+ unfreeze_flag.wait()
+ return python_execution_result_dict
+ def patched_aq_execute_command(command):
+ # We have to perform patching for separate thread in the same thread
+ with patch.object(CustomServiceOrchestrator, "runCommand") as runCommand_mock:
+ runCommand_mock.side_effect = side_effect
+ actionQueue.execute_command(command)
+
+ # We call method in a separate thread
+ execution_thread = Thread(target = patched_aq_execute_command ,
+ args = (self.datanode_install_command, ))
+ execution_thread.start()
+ # check in progress report
+ # wait until ready
+ while True:
+ time.sleep(0.1)
+ report = actionQueue.result()
+ if len(report['reports']) != 0:
+ break
+
+ unfreeze_flag.set()
+ # wait until ready
+ while len(report['reports']) != 0:
+ time.sleep(0.1)
+ report = actionQueue.result()
+
+ # check report
+ self.assertEqual(len(report['reports']), 0)
+
@patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
@patch.object(CustomServiceOrchestrator, "runCommand")