You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ds...@apache.org on 2014/08/07 16:16:04 UTC
git commit: AMBARI-6768 Add ability to an agent to cancel
queued/running tasks (dsen)
Repository: ambari
Updated Branches:
refs/heads/trunk 0f24bc24b -> 5bcd37529
AMBARI-6768 Add ability to an agent to cancel queued/running tasks (dsen)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/5bcd3752
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/5bcd3752
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/5bcd3752
Branch: refs/heads/trunk
Commit: 5bcd375294cc072665097187c76199696859c00b
Parents: 0f24bc2
Author: Dmytro Sen <ds...@hortonworks.com>
Authored: Thu Aug 7 17:15:54 2014 +0300
Committer: Dmytro Sen <ds...@hortonworks.com>
Committed: Thu Aug 7 17:15:54 2014 +0300
----------------------------------------------------------------------
.../src/main/python/ambari_agent/ActionQueue.py | 33 ++++++++
.../src/main/python/ambari_agent/Controller.py | 18 ++++-
.../ambari_agent/CustomServiceOrchestrator.py | 28 ++++++-
.../main/python/ambari_agent/PythonExecutor.py | 7 +-
.../test/python/ambari_agent/TestActionQueue.py | 39 +++++++++
.../test/python/ambari_agent/TestController.py | 6 ++
.../TestCustomServiceOrchestrator.py | 84 +++++++++++++++++++-
.../python/ambari_agent/TestPythonExecutor.py | 18 +++--
8 files changed, 220 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/5bcd3752/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
index 58d3e75..d3aad6e 100644
--- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
+++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
@@ -108,6 +108,32 @@ class ActionQueue(threading.Thread):
logger.debug(pprint.pformat(command))
self.commandQueue.put(command)
+ def cancel(self, commands):
+ for command in commands:
+
+ logger.info("Canceling command {tid}".format(tid = str(command['target_task_id'])))
+ logger.debug(pprint.pformat(command))
+
+ task_id = command['target_task_id']
+ reason = command['reason']
+
+ # Remove from the command queue by task_id
+ queue = self.commandQueue
+ self.commandQueue = Queue.Queue()
+
+ while not queue.empty():
+ queued_command = queue.get(False)
+ if queued_command['task_id'] != task_id:
+ self.commandQueue.put(queued_command)
+ else:
+ logger.info("Canceling " + queued_command['commandType'] + \
+ " for service " + queued_command['serviceName'] + \
+ " of cluster " + queued_command['clusterName'] + \
+ " to the queue.")
+
+ # Kill if in progress
+ self.customServiceOrchestrator.cancel_command(task_id, reason)
+
def run(self):
while not self.stopped():
while not self.statusCommandQueue.empty():
@@ -287,3 +313,10 @@ class ActionQueue(threading.Thread):
Actions that are executed every time when command status changes
"""
self.controller.heartbeat_wait_event.set()
+
+ # Removes all commands from the queue
+ def reset(self):
+ queue = self.commandQueue
+ with queue.mutex:
+ queue.queue.clear()
+
http://git-wip-us.apache.org/repos/asf/ambari/blob/5bcd3752/ambari-agent/src/main/python/ambari_agent/Controller.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/Controller.py b/ambari-agent/src/main/python/ambari_agent/Controller.py
index 36ed94c..87af939 100644
--- a/ambari-agent/src/main/python/ambari_agent/Controller.py
+++ b/ambari-agent/src/main/python/ambari_agent/Controller.py
@@ -78,7 +78,7 @@ class Controller(threading.Thread):
def __del__(self):
logger.info("Server connection disconnected.")
pass
-
+
def registerWithServer(self):
LiveStatus.SERVICES = []
LiveStatus.CLIENT_COMPONENTS = []
@@ -142,7 +142,12 @@ class Controller(threading.Thread):
pass
return ret
-
+ def cancelCommandInQueue(self, commands):
+ """ Remove from the queue commands, kill the process if it's in progress """
+ if commands:
+ self.actionQueue.cancel(commands)
+ pass
+
def addToQueue(self, commands):
"""Add to the queue for running the commands """
""" Put the required actions into the Queue """
@@ -223,6 +228,10 @@ class Controller(threading.Thread):
else:
self.responseId=serverId
+ if 'cancelCommands' in response.keys():
+ self.cancelCommandInQueue(response['cancelCommands'])
+ pass
+
if 'executionCommands' in response.keys():
self.addToQueue(response['executionCommands'])
pass
@@ -309,6 +318,11 @@ class Controller(threading.Thread):
logger.info("Registration response from %s was %s", self.serverHostname, message)
if self.isRegistered:
+ # Clearing command queue to stop executing "stale" commands
+ # after registration
+ logger.info('Resetting ActionQueue...')
+ self.actionQueue.reset()
+
# Process callbacks
for callback in self.registration_listeners:
callback()
http://git-wip-us.apache.org/repos/asf/ambari/blob/5bcd3752/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py b/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
index 4331678..e13e543 100644
--- a/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
+++ b/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
@@ -22,6 +22,7 @@ import logging
import os
import json
import sys
+import shell
from FileCache import FileCache
from AgentException import AgentException
@@ -68,7 +69,19 @@ class CustomServiceOrchestrator():
os.unlink(self.status_commands_stderr)
except OSError:
pass # Ignore fail
+ self.commands_in_progress = {}
+ def map_task_to_process(self, task_id, processId):
+ self.commands_in_progress[task_id] = processId
+
+ def cancel_command(self, task_id, reason):
+ if task_id in self.commands_in_progress.keys():
+ pid = self.commands_in_progress.get(task_id)
+ self.commands_in_progress[task_id] = reason
+ logger.info("Canceling command with task_id - {tid}, " \
+ "reason - {reason} . Killing process {pid}"
+ .format(tid = str(task_id), reason = reason, pid = pid))
+ shell.kill_process_with_children(pid)
def runCommand(self, command, tmpoutfile, tmperrfile, forced_command_name = None,
override_output_files = True):
@@ -132,7 +145,8 @@ class CustomServiceOrchestrator():
script_params = [command_name, json_path, current_base_dir]
ret = self.python_executor.run_file(py_file, script_params,
self.exec_tmp_dir, tmpoutfile, tmperrfile, timeout,
- tmpstrucoutfile, logger_level, override_output_files)
+ tmpstrucoutfile, logger_level, self.map_task_to_process,
+ task_id, override_output_files)
# Next run_file() invocations should always append to current output
override_output_files = False
if ret['exitcode'] != 0:
@@ -141,6 +155,18 @@ class CustomServiceOrchestrator():
if not ret: # Something went wrong
raise AgentException("No script has been executed")
+ # if canceled
+ pid = self.commands_in_progress.pop(task_id)
+ if not isinstance(pid, int):
+ reason = '\nCommand aborted. ' + pid
+ ret['stdout'] += reason
+ ret['stderr'] += reason
+
+ with open(tmpoutfile, "a") as f:
+ f.write(reason)
+ with open(tmperrfile, "a") as f:
+ f.write(reason)
+
except Exception: # We do not want to let agent fail completely
exc_type, exc_obj, exc_tb = sys.exc_info()
message = "Catched an exception while executing "\
http://git-wip-us.apache.org/repos/asf/ambari/blob/5bcd3752/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py b/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py
index c4f1234..704e8f3 100644
--- a/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py
+++ b/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py
@@ -47,8 +47,9 @@ class PythonExecutor:
self.config = config
pass
- def run_file(self, script, script_params, tmp_dir, tmpoutfile, tmperrfile, timeout,
- tmpstructedoutfile, logger_level, override_output_files = True):
+ def run_file(self, script, script_params, tmp_dir, tmpoutfile, tmperrfile,
+ timeout, tmpstructedoutfile, logger_level, callback, task_id,
+ override_output_files = True):
"""
Executes the specified python file in a separate subprocess.
Method returns only when the subprocess is finished.
@@ -77,6 +78,8 @@ class PythonExecutor:
pythonCommand = self.python_command(script, script_params)
logger.info("Running command " + pprint.pformat(pythonCommand))
process = self.launch_python_subprocess(pythonCommand, tmpout, tmperr)
+ # map task_id to pid
+ callback(task_id, process.pid)
logger.debug("Launching watchdog thread")
self.event.clear()
self.python_process_has_been_killed = False
http://git-wip-us.apache.org/repos/asf/ambari/blob/5bcd3752/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
index 04c12b2..e06efe4 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py
@@ -487,3 +487,42 @@ class TestActionQueue(TestCase):
self.assertTrue(requestComponentStatus_mock.called)
self.assertEqual(len(report['componentStatus']), 1)
self.assertTrue(report['componentStatus'][0].has_key('alerts'))
+
+ @patch.object(ActionQueue, "process_command")
+ @patch.object(Queue, "get")
+ @patch.object(CustomServiceOrchestrator, "__init__")
+ def test_reset_queue(self, CustomServiceOrchestrator_mock,
+ get_mock, process_command_mock):
+ CustomServiceOrchestrator_mock.return_value = None
+ dummy_controller = MagicMock()
+ config = MagicMock()
+ actionQueue = ActionQueue(config, dummy_controller)
+ actionQueue.start()
+ actionQueue.put([self.datanode_install_command, self.hbase_install_command])
+ self.assertEqual(2, actionQueue.commandQueue.qsize())
+ actionQueue.reset()
+ self.assertTrue(actionQueue.commandQueue.empty())
+ time.sleep(0.1)
+ actionQueue.stop()
+ actionQueue.join()
+ self.assertEqual(actionQueue.stopped(), True, 'Action queue is not stopped.')
+
+ @patch.object(ActionQueue, "process_command")
+ @patch.object(Queue, "get")
+ @patch.object(CustomServiceOrchestrator, "__init__")
+ def test_cancel(self, CustomServiceOrchestrator_mock,
+ get_mock, process_command_mock):
+ CustomServiceOrchestrator_mock.return_value = None
+ dummy_controller = MagicMock()
+ config = MagicMock()
+ actionQueue = ActionQueue(config, dummy_controller)
+ actionQueue.start()
+ actionQueue.put([self.datanode_install_command, self.hbase_install_command])
+ self.assertEqual(2, actionQueue.commandQueue.qsize())
+ actionQueue.reset()
+ self.assertTrue(actionQueue.commandQueue.empty())
+ time.sleep(0.1)
+ actionQueue.stop()
+ actionQueue.join()
+ self.assertEqual(actionQueue.stopped(), True, 'Action queue is not stopped.')
+
http://git-wip-us.apache.org/repos/asf/ambari/blob/5bcd3752/ambari-agent/src/test/python/ambari_agent/TestController.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestController.py b/ambari-agent/src/test/python/ambari_agent/TestController.py
index 6c1dcd0..9ec23db 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestController.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestController.py
@@ -236,6 +236,8 @@ class TestController(unittest.TestCase):
self.controller.registerWithServer = registerWithServer
heartbeatWithServer = MagicMock(name="heartbeatWithServer")
self.controller.heartbeatWithServer = heartbeatWithServer
+ actionQueue = MagicMock(name="actionQueue")
+ self.controller.actionQueue = actionQueue
Controller.Controller.__sendRequest__ = MagicMock(side_effect=Exception())
@@ -257,6 +259,8 @@ class TestController(unittest.TestCase):
self.controller.registerWithServer = registerWithServer
heartbeatWithServer = MagicMock(name="heartbeatWithServer")
self.controller.heartbeatWithServer = heartbeatWithServer
+ actionQueue = MagicMock(name="actionQueue")
+ self.controller.actionQueue = actionQueue
listener1 = MagicMock()
listener2 = MagicMock()
@@ -282,6 +286,8 @@ class TestController(unittest.TestCase):
self.controller.registerWithServer = registerWithServer
heartbeatWithServer = MagicMock(name="heartbeatWithServer")
self.controller.heartbeatWithServer = heartbeatWithServer
+ actionQueue = MagicMock(name="actionQueue")
+ self.controller.actionQueue = actionQueue
self.controller.isRegistered = True
self.controller.registerAndHeartbeat()
http://git-wip-us.apache.org/repos/asf/ambari/blob/5bcd3752/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py b/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py
index e8668f6..d669cd2 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py
@@ -18,9 +18,11 @@ See the License for the specific language governing permissions and
limitations under the License.
'''
import ConfigParser
+from multiprocessing.pool import ThreadPool
import os
import pprint
+import shell
from unittest import TestCase
import threading
@@ -185,6 +187,8 @@ class TestCustomServiceOrchestrator(TestCase):
'/hooks_dir/prefix-command')
dummy_controller = MagicMock()
orchestrator = CustomServiceOrchestrator(self.config, dummy_controller)
+ unix_process_id = 111
+ orchestrator.commands_in_progress = {command['taskId']: unix_process_id}
get_hook_base_dir_mock.return_value = "/hooks/"
# normal run case
run_file_mock.return_value = {
@@ -208,9 +212,9 @@ class TestCustomServiceOrchestrator(TestCase):
ret = orchestrator.runCommand(command, "out.txt", "err.txt",
forced_command_name=CustomServiceOrchestrator.COMMAND_NAME_STATUS)
## Check that override_output_files was true only during first call
- self.assertEquals(run_file_mock.call_args_list[0][0][8], True)
- self.assertEquals(run_file_mock.call_args_list[1][0][8], False)
- self.assertEquals(run_file_mock.call_args_list[2][0][8], False)
+ self.assertEquals(run_file_mock.call_args_list[0][0][10], True)
+ self.assertEquals(run_file_mock.call_args_list[1][0][10], False)
+ self.assertEquals(run_file_mock.call_args_list[2][0][10], False)
## Check that forced_command_name was taken into account
self.assertEqual(run_file_mock.call_args_list[0][0][1][0],
CustomServiceOrchestrator.COMMAND_NAME_STATUS)
@@ -229,6 +233,78 @@ class TestCustomServiceOrchestrator(TestCase):
pass
+ @patch("shell.kill_process_with_children")
+ @patch.object(CustomServiceOrchestrator, "resolve_script_path")
+ @patch.object(CustomServiceOrchestrator, "resolve_hook_script_path")
+ @patch.object(FileCache, "get_service_base_dir")
+ @patch.object(FileCache, "get_hook_base_dir")
+ @patch.object(CustomServiceOrchestrator, "dump_command_to_json")
+ @patch.object(PythonExecutor, "run_file")
+ @patch.object(FileCache, "__init__")
+ def test_cancel_command(self, FileCache_mock,
+ run_file_mock, dump_command_to_json_mock,
+ get_hook_base_dir_mock, get_service_base_dir_mock,
+ resolve_hook_script_path_mock, resolve_script_path_mock,
+ kill_process_with_children_mock):
+ FileCache_mock.return_value = None
+ command = {
+ 'role' : 'REGION_SERVER',
+ 'hostLevelParams' : {
+ 'stack_name' : 'HDP',
+ 'stack_version' : '2.0.7',
+ 'jdk_location' : 'some_location'
+ },
+ 'commandParams': {
+ 'script_type': 'PYTHON',
+ 'script': 'scripts/hbase_regionserver.py',
+ 'command_timeout': '600',
+ 'service_package_folder' : 'HBASE'
+ },
+ 'taskId' : '3',
+ 'roleCommand': 'INSTALL'
+ }
+ get_service_base_dir_mock.return_value = "/basedir/"
+ resolve_script_path_mock.return_value = "/basedir/scriptpath"
+ resolve_hook_script_path_mock.return_value = \
+ ('/hooks_dir/prefix-command/scripts/hook.py',
+ '/hooks_dir/prefix-command')
+ dummy_controller = MagicMock()
+ orchestrator = CustomServiceOrchestrator(self.config, dummy_controller)
+ unix_process_id = 111
+ orchestrator.commands_in_progress = {command['taskId']: unix_process_id}
+ get_hook_base_dir_mock.return_value = "/hooks/"
+ run_file_mock_return_value = {
+ 'stdout' : 'killed',
+ 'stderr' : 'killed',
+ 'exitcode': 1,
+ }
+ def side_effect(*args, **kwargs):
+ time.sleep(0.2)
+ return run_file_mock_return_value
+ run_file_mock.side_effect = side_effect
+
+ _, out = tempfile.mkstemp()
+ _, err = tempfile.mkstemp()
+ pool = ThreadPool(processes=1)
+ async_result = pool.apply_async(orchestrator.runCommand, (command, out, err))
+
+ time.sleep(0.1)
+ orchestrator.cancel_command(command['taskId'], 'reason')
+
+ ret = async_result.get()
+
+ self.assertEqual(ret['exitcode'], 1)
+ self.assertEquals(ret['stdout'], 'killed\nCommand aborted. reason')
+ self.assertEquals(ret['stderr'], 'killed\nCommand aborted. reason')
+
+ self.assertTrue(kill_process_with_children_mock.called)
+ self.assertFalse(command['taskId'] in orchestrator.commands_in_progress.keys())
+ self.assertTrue(os.path.exists(out))
+ self.assertTrue(os.path.exists(err))
+ os.remove(out)
+ os.remove(err)
+
+
@patch.object(CustomServiceOrchestrator, "dump_command_to_json")
@patch.object(PythonExecutor, "run_file")
@patch.object(FileCache, "__init__")
@@ -252,6 +328,8 @@ class TestCustomServiceOrchestrator(TestCase):
}
dummy_controller = MagicMock()
orchestrator = CustomServiceOrchestrator(self.config, dummy_controller)
+ unix_process_id = 111
+ orchestrator.commands_in_progress = {command['taskId']: unix_process_id}
# normal run case
run_file_mock.return_value = {
'stdout' : 'sss',
http://git-wip-us.apache.org/repos/asf/ambari/blob/5bcd3752/ambari-agent/src/test/python/ambari_agent/TestPythonExecutor.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestPythonExecutor.py b/ambari-agent/src/test/python/ambari_agent/TestPythonExecutor.py
index c33b97d..d1bec5d 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestPythonExecutor.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestPythonExecutor.py
@@ -55,12 +55,15 @@ class TestPythonExecutor(TestCase):
runShellKillPgrp_method.side_effect = lambda python : python.terminate()
executor.runShellKillPgrp = runShellKillPgrp_method
subproc_mock.returncode = None
+ callback_method = MagicMock()
thread = Thread(target = executor.run_file, args = ("fake_puppetFile",
- ["arg1", "arg2"], "/fake_tmp_dir", tmpoutfile, tmperrfile, PYTHON_TIMEOUT_SECONDS, tmpstrucout,"INFO"))
+ ["arg1", "arg2"], "/fake_tmp_dir", tmpoutfile, tmperrfile,
+ PYTHON_TIMEOUT_SECONDS, tmpstrucout, "INFO", callback_method, '1'))
thread.start()
time.sleep(0.1)
subproc_mock.finished_event.wait()
self.assertEquals(subproc_mock.was_terminated, True, "Subprocess should be terminated due to timeout")
+ self.assertTrue(callback_method.called)
def test_watchdog_2(self):
@@ -83,16 +86,18 @@ class TestPythonExecutor(TestCase):
runShellKillPgrp_method.side_effect = lambda python : python.terminate()
executor.runShellKillPgrp = runShellKillPgrp_method
subproc_mock.returncode = 0
+ callback_method = MagicMock()
thread = Thread(target = executor.run_file, args = ("fake_puppetFile", ["arg1", "arg2"],
"/fake_tmp_dir", tmpoutfile, tmperrfile,
- PYTHON_TIMEOUT_SECONDS, tmpstrucout, "INFO"))
+ PYTHON_TIMEOUT_SECONDS, tmpstrucout,
+ "INFO", callback_method, "1-1"))
thread.start()
time.sleep(0.1)
subproc_mock.should_finish_event.set()
subproc_mock.finished_event.wait()
self.assertEquals(subproc_mock.was_terminated, False, "Subprocess should not be terminated before timeout")
self.assertEquals(subproc_mock.returncode, 0, "Subprocess should not be terminated before timeout")
-
+ self.assertTrue(callback_method.called)
def test_execution_results(self):
subproc_mock = self.Subprocess_mockup()
@@ -112,10 +117,13 @@ class TestPythonExecutor(TestCase):
executor.runShellKillPgrp = runShellKillPgrp_method
subproc_mock.returncode = 0
subproc_mock.should_finish_event.set()
- result = executor.run_file("file", ["arg1", "arg2"], "/fake_tmp_dir", tmpoutfile, tmperrfile, PYTHON_TIMEOUT_SECONDS, tmpstroutfile, "INFO")
+ callback_method = MagicMock()
+ result = executor.run_file("file", ["arg1", "arg2"], "/fake_tmp_dir",
+ tmpoutfile, tmperrfile, PYTHON_TIMEOUT_SECONDS,
+ tmpstroutfile, "INFO", callback_method, "1-1")
self.assertEquals(result, {'exitcode': 0, 'stderr': 'Dummy err', 'stdout': 'Dummy output',
'structuredOut': {}})
-
+ self.assertTrue(callback_method.called)
def test_is_successfull(self):
executor = PythonExecutor("/tmp", AmbariConfig().getConfig())