You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ds...@apache.org on 2014/08/07 16:16:04 UTC

git commit: AMBARI-6768 Add ability to an agent to cancel queued/running tasks (dsen)

Repository: ambari
Updated Branches:
  refs/heads/trunk 0f24bc24b -> 5bcd37529


AMBARI-6768 Add ability to an agent to cancel queued/running tasks (dsen)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/5bcd3752
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/5bcd3752
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/5bcd3752

Branch: refs/heads/trunk
Commit: 5bcd375294cc072665097187c76199696859c00b
Parents: 0f24bc2
Author: Dmytro Sen <ds...@hortonworks.com>
Authored: Thu Aug 7 17:15:54 2014 +0300
Committer: Dmytro Sen <ds...@hortonworks.com>
Committed: Thu Aug 7 17:15:54 2014 +0300

----------------------------------------------------------------------
 .../src/main/python/ambari_agent/ActionQueue.py | 33 ++++++++
 .../src/main/python/ambari_agent/Controller.py  | 18 ++++-
 .../ambari_agent/CustomServiceOrchestrator.py   | 28 ++++++-
 .../main/python/ambari_agent/PythonExecutor.py  |  7 +-
 .../test/python/ambari_agent/TestActionQueue.py | 39 +++++++++
 .../test/python/ambari_agent/TestController.py  |  6 ++
 .../TestCustomServiceOrchestrator.py            | 84 +++++++++++++++++++-
 .../python/ambari_agent/TestPythonExecutor.py   | 18 +++--
 8 files changed, 220 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/5bcd3752/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
index 58d3e75..d3aad6e 100644
--- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
+++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
@@ -108,6 +108,32 @@ class ActionQueue(threading.Thread):
       logger.debug(pprint.pformat(command))
       self.commandQueue.put(command)
 
+  def cancel(self, commands):
+    for command in commands:
+
+      logger.info("Canceling command {tid}".format(tid = str(command['target_task_id'])))
+      logger.debug(pprint.pformat(command))
+
+      task_id = command['target_task_id']
+      reason = command['reason']
+
+      # Remove from the command queue by task_id
+      queue = self.commandQueue
+      self.commandQueue = Queue.Queue()
+
+      while not queue.empty():
+        queued_command = queue.get(False)
+        if queued_command['task_id'] != task_id:
+          self.commandQueue.put(queued_command)
+        else:
+          logger.info("Canceling " + queued_command['commandType'] + \
+                      " for service " + queued_command['serviceName'] + \
+                      " of cluster " +  queued_command['clusterName'] + \
+                      " to the queue.")
+
+    # Kill if in progress
+    self.customServiceOrchestrator.cancel_command(task_id, reason)
+
   def run(self):
     while not self.stopped():
       while  not self.statusCommandQueue.empty():
@@ -287,3 +313,10 @@ class ActionQueue(threading.Thread):
     Actions that are executed every time when command status changes
     """
     self.controller.heartbeat_wait_event.set()
+
+  # Removes all commands from the queue
+  def reset(self):
+    queue = self.commandQueue
+    with queue.mutex:
+      queue.queue.clear()
+

http://git-wip-us.apache.org/repos/asf/ambari/blob/5bcd3752/ambari-agent/src/main/python/ambari_agent/Controller.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/Controller.py b/ambari-agent/src/main/python/ambari_agent/Controller.py
index 36ed94c..87af939 100644
--- a/ambari-agent/src/main/python/ambari_agent/Controller.py
+++ b/ambari-agent/src/main/python/ambari_agent/Controller.py
@@ -78,7 +78,7 @@ class Controller(threading.Thread):
   def __del__(self):
     logger.info("Server connection disconnected.")
     pass
-
+  
   def registerWithServer(self):
     LiveStatus.SERVICES = []
     LiveStatus.CLIENT_COMPONENTS = []
@@ -142,7 +142,12 @@ class Controller(threading.Thread):
       pass
     return ret
 
-
+  def cancelCommandInQueue(self, commands):
+    """ Remove from the queue commands, kill the process if it's in progress """
+    if commands:
+      self.actionQueue.cancel(commands)
+    pass
+  
   def addToQueue(self, commands):
     """Add to the queue for running the commands """
     """ Put the required actions into the Queue """
@@ -223,6 +228,10 @@ class Controller(threading.Thread):
         else:
           self.responseId=serverId
 
+        if 'cancelCommands' in response.keys():
+          self.cancelCommandInQueue(response['cancelCommands'])
+          pass
+
         if 'executionCommands' in response.keys():
           self.addToQueue(response['executionCommands'])
           pass
@@ -309,6 +318,11 @@ class Controller(threading.Thread):
     logger.info("Registration response from %s was %s", self.serverHostname, message)
 
     if self.isRegistered:
+      # Clearing command queue to stop executing "stale" commands
+      # after registration
+      logger.info('Resetting ActionQueue...')
+      self.actionQueue.reset()
+
       # Process callbacks
       for callback in self.registration_listeners:
         callback()

http://git-wip-us.apache.org/repos/asf/ambari/blob/5bcd3752/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py b/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
index 4331678..e13e543 100644
--- a/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
+++ b/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
@@ -22,6 +22,7 @@ import logging
 import os
 import json
 import sys
+import shell
 
 from FileCache import FileCache
 from AgentException import AgentException
@@ -68,7 +69,19 @@ class CustomServiceOrchestrator():
       os.unlink(self.status_commands_stderr)
     except OSError:
       pass # Ignore fail
+    self.commands_in_progress = {}
 
+  def map_task_to_process(self, task_id, processId):
+    self.commands_in_progress[task_id] = processId
+
+  def cancel_command(self, task_id, reason):
+    if task_id in self.commands_in_progress.keys():
+      pid = self.commands_in_progress.get(task_id)
+      self.commands_in_progress[task_id] = reason
+      logger.info("Canceling command with task_id - {tid}, " \
+                  "reason - {reason} . Killing process {pid}"
+      .format(tid = str(task_id), reason = reason, pid = pid))
+      shell.kill_process_with_children(pid)
 
   def runCommand(self, command, tmpoutfile, tmperrfile, forced_command_name = None,
                  override_output_files = True):
@@ -132,7 +145,8 @@ class CustomServiceOrchestrator():
         script_params = [command_name, json_path, current_base_dir]
         ret = self.python_executor.run_file(py_file, script_params,
                                self.exec_tmp_dir, tmpoutfile, tmperrfile, timeout,
-                               tmpstrucoutfile, logger_level, override_output_files)
+                               tmpstrucoutfile, logger_level, self.map_task_to_process,
+                               task_id, override_output_files)
         # Next run_file() invocations should always append to current output
         override_output_files = False
         if ret['exitcode'] != 0:
@@ -141,6 +155,18 @@ class CustomServiceOrchestrator():
       if not ret: # Something went wrong
         raise AgentException("No script has been executed")
 
+      # if canceled
+      pid = self.commands_in_progress.pop(task_id)
+      if not isinstance(pid, int):
+        reason = '\nCommand aborted. ' + pid
+        ret['stdout'] += reason
+        ret['stderr'] += reason
+
+        with open(tmpoutfile, "a") as f:
+          f.write(reason)
+        with open(tmperrfile, "a") as f:
+          f.write(reason)
+
     except Exception: # We do not want to let agent fail completely
       exc_type, exc_obj, exc_tb = sys.exc_info()
       message = "Catched an exception while executing "\

http://git-wip-us.apache.org/repos/asf/ambari/blob/5bcd3752/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py b/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py
index c4f1234..704e8f3 100644
--- a/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py
+++ b/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py
@@ -47,8 +47,9 @@ class PythonExecutor:
     self.config = config
     pass
 
-  def run_file(self, script, script_params, tmp_dir, tmpoutfile, tmperrfile, timeout,
-               tmpstructedoutfile, logger_level, override_output_files = True):
+  def run_file(self, script, script_params, tmp_dir, tmpoutfile, tmperrfile,
+               timeout, tmpstructedoutfile, logger_level, callback, task_id,
+               override_output_files = True):
     """
     Executes the specified python file in a separate subprocess.
     Method returns only when the subprocess is finished.
@@ -77,6 +78,8 @@ class PythonExecutor:
     pythonCommand = self.python_command(script, script_params)
     logger.info("Running command " + pprint.pformat(pythonCommand))
     process = self.launch_python_subprocess(pythonCommand, tmpout, tmperr)
+    # map task_id to pid
+    callback(task_id, process.pid)
     logger.debug("Launching watchdog thread")
     self.event.clear()
     self.python_process_has_been_killed = False

http://git-wip-us.apache.org/repos/asf/ambari/blob/5bcd3752/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
index 04c12b2..e06efe4 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
@@ -487,3 +487,42 @@ class TestActionQueue(TestCase):
     self.assertTrue(requestComponentStatus_mock.called)
     self.assertEqual(len(report['componentStatus']), 1)
     self.assertTrue(report['componentStatus'][0].has_key('alerts'))
+
+  @patch.object(ActionQueue, "process_command")
+  @patch.object(Queue, "get")
+  @patch.object(CustomServiceOrchestrator, "__init__")
+  def test_reset_queue(self, CustomServiceOrchestrator_mock,
+                                get_mock, process_command_mock):
+    CustomServiceOrchestrator_mock.return_value = None
+    dummy_controller = MagicMock()
+    config = MagicMock()
+    actionQueue = ActionQueue(config, dummy_controller)
+    actionQueue.start()
+    actionQueue.put([self.datanode_install_command, self.hbase_install_command])
+    self.assertEqual(2, actionQueue.commandQueue.qsize())
+    actionQueue.reset()
+    self.assertTrue(actionQueue.commandQueue.empty())
+    time.sleep(0.1)
+    actionQueue.stop()
+    actionQueue.join()
+    self.assertEqual(actionQueue.stopped(), True, 'Action queue is not stopped.')
+
+  @patch.object(ActionQueue, "process_command")
+  @patch.object(Queue, "get")
+  @patch.object(CustomServiceOrchestrator, "__init__")
+  def test_cancel(self, CustomServiceOrchestrator_mock,
+                       get_mock, process_command_mock):
+    CustomServiceOrchestrator_mock.return_value = None
+    dummy_controller = MagicMock()
+    config = MagicMock()
+    actionQueue = ActionQueue(config, dummy_controller)
+    actionQueue.start()
+    actionQueue.put([self.datanode_install_command, self.hbase_install_command])
+    self.assertEqual(2, actionQueue.commandQueue.qsize())
+    actionQueue.reset()
+    self.assertTrue(actionQueue.commandQueue.empty())
+    time.sleep(0.1)
+    actionQueue.stop()
+    actionQueue.join()
+    self.assertEqual(actionQueue.stopped(), True, 'Action queue is not stopped.')
+

http://git-wip-us.apache.org/repos/asf/ambari/blob/5bcd3752/ambari-agent/src/test/python/ambari_agent/TestController.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestController.py b/ambari-agent/src/test/python/ambari_agent/TestController.py
index 6c1dcd0..9ec23db 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestController.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestController.py
@@ -236,6 +236,8 @@ class TestController(unittest.TestCase):
     self.controller.registerWithServer = registerWithServer
     heartbeatWithServer = MagicMock(name="heartbeatWithServer")
     self.controller.heartbeatWithServer = heartbeatWithServer
+    actionQueue = MagicMock(name="actionQueue")
+    self.controller.actionQueue = actionQueue
 
     Controller.Controller.__sendRequest__ = MagicMock(side_effect=Exception())
 
@@ -257,6 +259,8 @@ class TestController(unittest.TestCase):
     self.controller.registerWithServer = registerWithServer
     heartbeatWithServer = MagicMock(name="heartbeatWithServer")
     self.controller.heartbeatWithServer = heartbeatWithServer
+    actionQueue = MagicMock(name="actionQueue")
+    self.controller.actionQueue = actionQueue
 
     listener1 = MagicMock()
     listener2 = MagicMock()
@@ -282,6 +286,8 @@ class TestController(unittest.TestCase):
     self.controller.registerWithServer = registerWithServer
     heartbeatWithServer = MagicMock(name="heartbeatWithServer")
     self.controller.heartbeatWithServer = heartbeatWithServer
+    actionQueue = MagicMock(name="actionQueue")
+    self.controller.actionQueue = actionQueue
 
     self.controller.isRegistered = True
     self.controller.registerAndHeartbeat()

http://git-wip-us.apache.org/repos/asf/ambari/blob/5bcd3752/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py b/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py
index e8668f6..d669cd2 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py
@@ -18,9 +18,11 @@ See the License for the specific language governing permissions and
 limitations under the License.
 '''
 import ConfigParser
+from multiprocessing.pool import ThreadPool
 import os
 
 import pprint
+import shell
 
 from unittest import TestCase
 import threading
@@ -185,6 +187,8 @@ class TestCustomServiceOrchestrator(TestCase):
        '/hooks_dir/prefix-command')
     dummy_controller = MagicMock()
     orchestrator = CustomServiceOrchestrator(self.config, dummy_controller)
+    unix_process_id = 111
+    orchestrator.commands_in_progress = {command['taskId']: unix_process_id}
     get_hook_base_dir_mock.return_value = "/hooks/"
     # normal run case
     run_file_mock.return_value = {
@@ -208,9 +212,9 @@ class TestCustomServiceOrchestrator(TestCase):
     ret = orchestrator.runCommand(command, "out.txt", "err.txt",
               forced_command_name=CustomServiceOrchestrator.COMMAND_NAME_STATUS)
     ## Check that override_output_files was true only during first call
-    self.assertEquals(run_file_mock.call_args_list[0][0][8], True)
-    self.assertEquals(run_file_mock.call_args_list[1][0][8], False)
-    self.assertEquals(run_file_mock.call_args_list[2][0][8], False)
+    self.assertEquals(run_file_mock.call_args_list[0][0][10], True)
+    self.assertEquals(run_file_mock.call_args_list[1][0][10], False)
+    self.assertEquals(run_file_mock.call_args_list[2][0][10], False)
     ## Check that forced_command_name was taken into account
     self.assertEqual(run_file_mock.call_args_list[0][0][1][0],
                                   CustomServiceOrchestrator.COMMAND_NAME_STATUS)
@@ -229,6 +233,78 @@ class TestCustomServiceOrchestrator(TestCase):
 
     pass
 
+  @patch("shell.kill_process_with_children")
+  @patch.object(CustomServiceOrchestrator, "resolve_script_path")
+  @patch.object(CustomServiceOrchestrator, "resolve_hook_script_path")
+  @patch.object(FileCache, "get_service_base_dir")
+  @patch.object(FileCache, "get_hook_base_dir")
+  @patch.object(CustomServiceOrchestrator, "dump_command_to_json")
+  @patch.object(PythonExecutor, "run_file")
+  @patch.object(FileCache, "__init__")
+  def test_cancel_command(self, FileCache_mock,
+                      run_file_mock, dump_command_to_json_mock,
+                      get_hook_base_dir_mock, get_service_base_dir_mock,
+                      resolve_hook_script_path_mock, resolve_script_path_mock,
+                      kill_process_with_children_mock):
+    FileCache_mock.return_value = None
+    command = {
+      'role' : 'REGION_SERVER',
+      'hostLevelParams' : {
+        'stack_name' : 'HDP',
+        'stack_version' : '2.0.7',
+        'jdk_location' : 'some_location'
+      },
+      'commandParams': {
+        'script_type': 'PYTHON',
+        'script': 'scripts/hbase_regionserver.py',
+        'command_timeout': '600',
+        'service_package_folder' : 'HBASE'
+      },
+      'taskId' : '3',
+      'roleCommand': 'INSTALL'
+    }
+    get_service_base_dir_mock.return_value = "/basedir/"
+    resolve_script_path_mock.return_value = "/basedir/scriptpath"
+    resolve_hook_script_path_mock.return_value = \
+      ('/hooks_dir/prefix-command/scripts/hook.py',
+       '/hooks_dir/prefix-command')
+    dummy_controller = MagicMock()
+    orchestrator = CustomServiceOrchestrator(self.config, dummy_controller)
+    unix_process_id = 111
+    orchestrator.commands_in_progress = {command['taskId']: unix_process_id}
+    get_hook_base_dir_mock.return_value = "/hooks/"
+    run_file_mock_return_value = {
+      'stdout' : 'killed',
+      'stderr' : 'killed',
+      'exitcode': 1,
+      }
+    def side_effect(*args, **kwargs):
+      time.sleep(0.2)
+      return run_file_mock_return_value
+    run_file_mock.side_effect = side_effect
+
+    _, out = tempfile.mkstemp()
+    _, err = tempfile.mkstemp()
+    pool = ThreadPool(processes=1)
+    async_result = pool.apply_async(orchestrator.runCommand, (command, out, err))
+
+    time.sleep(0.1)
+    orchestrator.cancel_command(command['taskId'], 'reason')
+
+    ret = async_result.get()
+
+    self.assertEqual(ret['exitcode'], 1)
+    self.assertEquals(ret['stdout'], 'killed\nCommand aborted. reason')
+    self.assertEquals(ret['stderr'], 'killed\nCommand aborted. reason')
+
+    self.assertTrue(kill_process_with_children_mock.called)
+    self.assertFalse(command['taskId'] in orchestrator.commands_in_progress.keys())
+    self.assertTrue(os.path.exists(out))
+    self.assertTrue(os.path.exists(err))
+    os.remove(out)
+    os.remove(err)
+
+
   @patch.object(CustomServiceOrchestrator, "dump_command_to_json")
   @patch.object(PythonExecutor, "run_file")
   @patch.object(FileCache, "__init__")
@@ -252,6 +328,8 @@ class TestCustomServiceOrchestrator(TestCase):
     }
     dummy_controller = MagicMock()
     orchestrator = CustomServiceOrchestrator(self.config, dummy_controller)
+    unix_process_id = 111
+    orchestrator.commands_in_progress = {command['taskId']: unix_process_id}
     # normal run case
     run_file_mock.return_value = {
       'stdout' : 'sss',

http://git-wip-us.apache.org/repos/asf/ambari/blob/5bcd3752/ambari-agent/src/test/python/ambari_agent/TestPythonExecutor.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestPythonExecutor.py b/ambari-agent/src/test/python/ambari_agent/TestPythonExecutor.py
index c33b97d..d1bec5d 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestPythonExecutor.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestPythonExecutor.py
@@ -55,12 +55,15 @@ class TestPythonExecutor(TestCase):
     runShellKillPgrp_method.side_effect = lambda python : python.terminate()
     executor.runShellKillPgrp = runShellKillPgrp_method
     subproc_mock.returncode = None
+    callback_method = MagicMock()
     thread = Thread(target =  executor.run_file, args = ("fake_puppetFile",
-      ["arg1", "arg2"], "/fake_tmp_dir", tmpoutfile, tmperrfile, PYTHON_TIMEOUT_SECONDS, tmpstrucout,"INFO"))
+      ["arg1", "arg2"], "/fake_tmp_dir", tmpoutfile, tmperrfile,
+      PYTHON_TIMEOUT_SECONDS, tmpstrucout, "INFO", callback_method, '1'))
     thread.start()
     time.sleep(0.1)
     subproc_mock.finished_event.wait()
     self.assertEquals(subproc_mock.was_terminated, True, "Subprocess should be terminated due to timeout")
+    self.assertTrue(callback_method.called)
 
 
   def test_watchdog_2(self):
@@ -83,16 +86,18 @@ class TestPythonExecutor(TestCase):
     runShellKillPgrp_method.side_effect = lambda python : python.terminate()
     executor.runShellKillPgrp = runShellKillPgrp_method
     subproc_mock.returncode = 0
+    callback_method = MagicMock()
     thread = Thread(target =  executor.run_file, args = ("fake_puppetFile", ["arg1", "arg2"],
                                                       "/fake_tmp_dir", tmpoutfile, tmperrfile,
-                                                      PYTHON_TIMEOUT_SECONDS, tmpstrucout, "INFO"))
+                                                      PYTHON_TIMEOUT_SECONDS, tmpstrucout,
+                                                      "INFO", callback_method, "1-1"))
     thread.start()
     time.sleep(0.1)
     subproc_mock.should_finish_event.set()
     subproc_mock.finished_event.wait()
     self.assertEquals(subproc_mock.was_terminated, False, "Subprocess should not be terminated before timeout")
     self.assertEquals(subproc_mock.returncode, 0, "Subprocess should not be terminated before timeout")
-
+    self.assertTrue(callback_method.called)
 
   def test_execution_results(self):
     subproc_mock = self.Subprocess_mockup()
@@ -112,10 +117,13 @@ class TestPythonExecutor(TestCase):
     executor.runShellKillPgrp = runShellKillPgrp_method
     subproc_mock.returncode = 0
     subproc_mock.should_finish_event.set()
-    result = executor.run_file("file", ["arg1", "arg2"], "/fake_tmp_dir", tmpoutfile, tmperrfile, PYTHON_TIMEOUT_SECONDS, tmpstroutfile, "INFO")
+    callback_method = MagicMock()
+    result = executor.run_file("file", ["arg1", "arg2"], "/fake_tmp_dir",
+                               tmpoutfile, tmperrfile, PYTHON_TIMEOUT_SECONDS,
+                               tmpstroutfile, "INFO", callback_method, "1-1")
     self.assertEquals(result, {'exitcode': 0, 'stderr': 'Dummy err', 'stdout': 'Dummy output',
                                'structuredOut': {}})
-
+    self.assertTrue(callback_method.called)
 
   def test_is_successfull(self):
     executor = PythonExecutor("/tmp", AmbariConfig().getConfig())