You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by jo...@apache.org on 2016/12/02 21:24:48 UTC

[07/24] ambari git commit: AMBARI-19051. Stage is sometimes marked as failed on command reschedule. (mpapirkovskyy)

AMBARI-19051. Stage is sometimes marked as failed on command reschedule. (mpapirkovskyy)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/d24beb17
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/d24beb17
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/d24beb17

Branch: refs/heads/branch-feature-AMBARI-18456
Commit: d24beb17e238391b196a2bfae1217035678a0a14
Parents: 615438b
Author: Myroslav Papirkovskyi <mp...@hortonworks.com>
Authored: Thu Dec 1 18:48:25 2016 +0200
Committer: Myroslav Papirkovskyi <mp...@hortonworks.com>
Committed: Fri Dec 2 09:51:16 2016 +0200

----------------------------------------------------------------------
 .../src/main/python/ambari_agent/ActionQueue.py | 11 +++++
 .../test/python/ambari_agent/TestActionQueue.py | 48 ++++++++++++++++++++
 2 files changed, 59 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/d24beb17/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
index 4416b9a..cb4fcb9 100644
--- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
+++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
@@ -300,6 +300,7 @@ class ActionQueue(threading.Thread):
 
     logger.info("Command execution metadata - taskId = {taskId}, retry enabled = {retryAble}, max retry duration (sec) = {retryDuration}, log_output = {log_command_output}".
                  format(taskId=taskId, retryAble=retryAble, retryDuration=retryDuration, log_command_output=log_command_output))
+    command_canceled = False
     while retryDuration >= 0:
       numAttempts += 1
       start = 0
@@ -328,6 +329,7 @@ class ActionQueue(threading.Thread):
           status = self.FAILED_STATUS
           if (commandresult['exitcode'] == -signal.SIGTERM) or (commandresult['exitcode'] == -signal.SIGKILL):
             logger.info('Command with taskId = {cid} was canceled!'.format(cid=taskId))
+            command_canceled = True
             break
 
       if status != self.COMPLETED_STATUS and retryAble and retryDuration > 0:
@@ -344,6 +346,15 @@ class ActionQueue(threading.Thread):
                     .format(cid=taskId, status=status, retryAble=retryAble, retryDuration=retryDuration, delay=delay))
         break
 
+    # do not fail task which was rescheduled from server
+    if command_canceled:
+      with self.commandQueue.mutex:
+        for com in self.commandQueue.queue:
+          if com['taskId'] == command['taskId']:
+            logger.info('Command with taskId = {cid} was rescheduled by server. '
+                        'Fail report on cancelled command won\'t be sent with heartbeat.'.format(cid=taskId))
+            return
+
     # final result to stdout
     commandresult['stdout'] += '\n\nCommand completed successfully!\n' if status == self.COMPLETED_STATUS else '\n\nCommand failed after ' + str(numAttempts) + ' tries\n'
     logger.info('Command with taskId = {cid} completed successfully!'.format(cid=taskId) if status == self.COMPLETED_STATUS else 'Command with taskId = {cid} failed after {attempts} tries'.format(cid=taskId, attempts=numAttempts))

http://git-wip-us.apache.org/repos/asf/ambari/blob/d24beb17/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
index 4a63f7c..65127f2 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
@@ -27,6 +27,7 @@ import os, errno, time, pprint, tempfile, threading
 import sys
 from threading import Thread
 import copy
+import signal
 
 from mock.mock import patch, MagicMock, call
 from ambari_agent.CustomServiceOrchestrator import CustomServiceOrchestrator
@@ -690,6 +691,53 @@ class TestActionQueue(TestCase):
     report = actionQueue.result()
     self.assertEqual(len(report['reports']), 0)
 
+  def test_cancel_with_reschedule_command(self):
+    config = AmbariConfig()
+    tempdir = tempfile.gettempdir()
+    config.set('agent', 'prefix', tempdir)
+    config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache")
+    config.set('agent', 'tolerate_download_failures', "true")
+    dummy_controller = MagicMock()
+    actionQueue = ActionQueue(config, dummy_controller)
+    unfreeze_flag = threading.Event()
+    python_execution_result_dict = {
+      'stdout': 'out',
+      'stderr': 'stderr',
+      'structuredOut' : '',
+      'status' : '',
+      'exitcode' : -signal.SIGTERM
+    }
+
+    def side_effect(command, tmpoutfile, tmperrfile, override_output_files=True, retry=False):
+      unfreeze_flag.wait()
+      return python_execution_result_dict
+    def patched_aq_execute_command(command):
+      # We have to perform patching for separate thread in the same thread
+      with patch.object(CustomServiceOrchestrator, "runCommand") as runCommand_mock:
+        runCommand_mock.side_effect = side_effect
+        actionQueue.execute_command(command)
+
+    # We call method in a separate thread
+    execution_thread = Thread(target = patched_aq_execute_command ,
+                              args = (self.datanode_install_command, ))
+    execution_thread.start()
+    #  check in progress report
+    # wait until ready
+    while True:
+      time.sleep(0.1)
+      report = actionQueue.result()
+      if len(report['reports']) != 0:
+        break
+
+    unfreeze_flag.set()
+    # wait until ready
+    while len(report['reports']) != 0:
+      time.sleep(0.1)
+      report = actionQueue.result()
+
+    # check report
+    self.assertEqual(len(report['reports']), 0)
+
 
   @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
   @patch.object(CustomServiceOrchestrator, "runCommand")