You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ariatosca.apache.org by mx...@apache.org on 2017/06/27 14:05:06 UTC
incubator-ariatosca git commit: ARIA-285 Cancel execution may leave
running processes [Forced Update!]
Repository: incubator-ariatosca
Updated Branches:
refs/heads/ARIA-285-Cancel-execution-may-leave-running-processes 26b6c6972 -> 1de1a9585 (forced update)
ARIA-285 Cancel execution may leave running processes
Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/1de1a958
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/1de1a958
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/1de1a958
Branch: refs/heads/ARIA-285-Cancel-execution-may-leave-running-processes
Commit: 1de1a9585f0e9df9f148151b94a170cd7d78cbef
Parents: a75a3de
Author: max-orlov <ma...@gigaspaces.com>
Authored: Sun Jun 25 12:19:02 2017 +0300
Committer: max-orlov <ma...@gigaspaces.com>
Committed: Tue Jun 27 17:04:48 2017 +0300
----------------------------------------------------------------------
aria/orchestrator/workflows/core/engine.py | 10 ++++
aria/orchestrator/workflows/executor/base.py | 9 +++-
aria/orchestrator/workflows/executor/celery.py | 16 +++---
aria/orchestrator/workflows/executor/process.py | 46 ++++++++++++++---
requirements.in | 1 +
requirements.txt | 1 +
.../orchestrator/workflows/executor/__init__.py | 7 +++
.../workflows/executor/test_process_executor.py | 52 +++++++++++++++++++-
tests/requirements.txt | 1 +
9 files changed, 125 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/1de1a958/aria/orchestrator/workflows/core/engine.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/engine.py b/aria/orchestrator/workflows/core/engine.py
index d52ae85..5a94df8 100644
--- a/aria/orchestrator/workflows/core/engine.py
+++ b/aria/orchestrator/workflows/core/engine.py
@@ -66,13 +66,23 @@ class Engine(logger.LoggerMixin):
else:
time.sleep(0.1)
if cancel:
+ self._terminate_tasks(tasks_tracker.executing_tasks)
events.on_cancelled_workflow_signal.send(ctx)
else:
events.on_success_workflow_signal.send(ctx)
except BaseException as e:
+ # Cleanup any remaining tasks
+ self._terminate_tasks(tasks_tracker.executing_tasks)
events.on_failure_workflow_signal.send(ctx, exception=e)
raise
+ def _terminate_tasks(self, tasks):
+ for task in tasks:
+ try:
+ self._executors[task._executor].terminate(task.id)
+ except BaseException:
+ pass
+
@staticmethod
def cancel_execution(ctx):
"""
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/1de1a958/aria/orchestrator/workflows/executor/base.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/base.py b/aria/orchestrator/workflows/executor/base.py
index 6a3c9d2..4cc4503 100644
--- a/aria/orchestrator/workflows/executor/base.py
+++ b/aria/orchestrator/workflows/executor/base.py
@@ -25,7 +25,7 @@ class BaseExecutor(logger.LoggerMixin):
"""
Base class for executors for running tasks
"""
- def _execute(self, task):
+ def _execute(self, ctx):
raise NotImplementedError
def execute(self, ctx):
@@ -48,6 +48,13 @@ class BaseExecutor(logger.LoggerMixin):
"""
pass
+ def terminate(self, ctx):
+ """
+ Terminate the executing task
+ :return:
+ """
+ pass
+
@staticmethod
def _task_started(ctx):
events.start_task_signal.send(ctx)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/1de1a958/aria/orchestrator/workflows/executor/celery.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/celery.py b/aria/orchestrator/workflows/executor/celery.py
index 9d66d26..46b15fd 100644
--- a/aria/orchestrator/workflows/executor/celery.py
+++ b/aria/orchestrator/workflows/executor/celery.py
@@ -42,15 +42,15 @@ class CeleryExecutor(BaseExecutor):
self._receiver_thread.start()
self._started_queue.get(timeout=30)
- def _execute(self, task):
- self._tasks[task.id] = task
- arguments = dict(arg.unwrapped for arg in task.arguments.values())
- arguments['ctx'] = task.context
- self._results[task.id] = self._app.send_task(
- task.operation_mapping,
+ def _execute(self, ctx):
+ self._tasks[ctx.id] = ctx
+ arguments = dict(arg.unwrapped for arg in ctx.arguments.values())
+ arguments['ctx'] = ctx.context
+ self._results[ctx.id] = self._app.send_task(
+ ctx.operation_mapping,
kwargs=arguments,
- task_id=task.id,
- queue=self._get_queue(task))
+ task_id=ctx.id,
+ queue=self._get_queue(ctx))
def close(self):
self._stopped = True
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/1de1a958/aria/orchestrator/workflows/executor/process.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/process.py b/aria/orchestrator/workflows/executor/process.py
index 8518b33..59e61b6 100644
--- a/aria/orchestrator/workflows/executor/process.py
+++ b/aria/orchestrator/workflows/executor/process.py
@@ -25,6 +25,10 @@ import sys
# As part of the process executor implementation, subprocess are started with this module as their
# entry point. We thus remove this module's directory from the python path if it happens to be
# there
+
+import signal
+from collections import namedtuple
+
script_dir = os.path.dirname(__file__)
if script_dir in sys.path:
sys.path.remove(script_dir)
@@ -39,6 +43,7 @@ import tempfile
import Queue
import pickle
+import psutil
import jsonpickle
import aria
@@ -57,6 +62,9 @@ UPDATE_TRACKED_CHANGES_FAILED_STR = \
'Some changes failed writing to storage. For more info refer to the log.'
+_Task = namedtuple('_Task', 'proc, ctx')
+
+
class ProcessExecutor(base.BaseExecutor):
"""
Executor which runs tasks in a subprocess environment
@@ -113,9 +121,26 @@ class ProcessExecutor(base.BaseExecutor):
self._server_socket.close()
self._listener_thread.join(timeout=60)
+ for task_id in self._tasks:
+ self.terminate(task_id)
+
+ def terminate(self, task_id):
+ task = self._tasks.get(task_id)
+ # The process might have managed to finish, thus it would not be in the tasks list
+ if task:
+ try:
+ parent_process = psutil.Process(task.proc.pid)
+ for child_process in reversed(parent_process.children(recursive=True)):
+ try:
+ child_process.send_signal(signal.SIGKILL)
+ except BaseException:
+ pass
+ parent_process.send_signal(signal.SIGKILL)
+ except BaseException:
+ pass
+
def _execute(self, ctx):
self._check_closed()
- self._tasks[ctx.task.id] = ctx
# Temporary file used to pass arguments to the started subprocess
file_descriptor, arguments_json_path = tempfile.mkstemp(prefix='executor-', suffix='.json')
@@ -125,10 +150,15 @@ class ProcessExecutor(base.BaseExecutor):
env = self._construct_subprocess_env(task=ctx.task)
# Asynchronously start the operation in a subprocess
- subprocess.Popen(
- '{0} {1} {2}'.format(sys.executable, __file__, arguments_json_path),
- env=env,
- shell=True)
+ proc = subprocess.Popen(
+ [
+ sys.executable,
+ os.path.expanduser(os.path.expandvars(__file__)),
+ os.path.expanduser(os.path.expandvars(arguments_json_path))
+ ],
+ env=env)
+
+ self._tasks[ctx.task.id] = _Task(ctx=ctx, proc=proc)
def _remove_task(self, task_id):
return self._tasks.pop(task_id)
@@ -191,15 +221,15 @@ class ProcessExecutor(base.BaseExecutor):
_send_message(connection, response)
def _handle_task_started_request(self, task_id, **kwargs):
- self._task_started(self._tasks[task_id])
+ self._task_started(self._tasks[task_id].ctx)
def _handle_task_succeeded_request(self, task_id, **kwargs):
task = self._remove_task(task_id)
- self._task_succeeded(task)
+ self._task_succeeded(task.ctx)
def _handle_task_failed_request(self, task_id, request, **kwargs):
task = self._remove_task(task_id)
- self._task_failed(task, exception=request['exception'], traceback=request['traceback'])
+ self._task_failed(task.ctx, exception=request['exception'], traceback=request['traceback'])
def _send_message(connection, message):
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/1de1a958/requirements.in
----------------------------------------------------------------------
diff --git a/requirements.in b/requirements.in
index cecc9fd..723ed51 100644
--- a/requirements.in
+++ b/requirements.in
@@ -33,6 +33,7 @@ PrettyTable>=0.7,<0.8
click_didyoumean==0.0.3
backports.shutil_get_terminal_size==1.0.0
logutils==0.3.4.1
+psutil>=5.2.2, < 6.0.0
importlib ; python_version < '2.7'
ordereddict ; python_version < '2.7'
total-ordering ; python_version < '2.7' # only one version on pypi
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/1de1a958/requirements.txt
----------------------------------------------------------------------
diff --git a/requirements.txt b/requirements.txt
index 9f929a9..7ee1008 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -26,6 +26,7 @@ networkx==1.9.1
ordereddict==1.1 ; python_version < "2.7"
packaging==16.8 # via setuptools
prettytable==0.7.2
+psutil==5.2.2
pyparsing==2.2.0 # via packaging
requests==2.13.0
retrying==1.3.3
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/1de1a958/tests/orchestrator/workflows/executor/__init__.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/__init__.py b/tests/orchestrator/workflows/executor/__init__.py
index 83584a6..99d0b39 100644
--- a/tests/orchestrator/workflows/executor/__init__.py
+++ b/tests/orchestrator/workflows/executor/__init__.py
@@ -18,10 +18,13 @@ from contextlib import contextmanager
import aria
from aria.modeling import models
+from aria.orchestrator.context.common import BaseContext
class MockContext(object):
+ INSTRUMENTATION_FIELDS = BaseContext.INSTRUMENTATION_FIELDS
+
def __init__(self, storage, task_kwargs=None):
self.logger = logging.getLogger('mock_logger')
self._task_kwargs = task_kwargs or {}
@@ -46,6 +49,10 @@ class MockContext(object):
def close(self):
pass
+ @property
+ def model(self):
+ return self._storage
+
@classmethod
def instantiate_from_dict(cls, storage_kwargs=None, task_kwargs=None):
return cls(storage=aria.application_model_storage(**(storage_kwargs or {})),
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/1de1a958/tests/orchestrator/workflows/executor/test_process_executor.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_process_executor.py b/tests/orchestrator/workflows/executor/test_process_executor.py
index 755b9be..6f5c827 100644
--- a/tests/orchestrator/workflows/executor/test_process_executor.py
+++ b/tests/orchestrator/workflows/executor/test_process_executor.py
@@ -14,17 +14,24 @@
# limitations under the License.
import os
+import time
import Queue
+import subprocess
import pytest
+import psutil
+import retrying
import aria
+from aria import operation
+from aria.modeling import models
from aria.orchestrator import events
from aria.utils.plugin import create as create_plugin
from aria.orchestrator.workflows.executor import process
import tests.storage
import tests.resources
+from tests.helpers import FilesystemDataHolder
from tests.fixtures import ( # pylint: disable=unused-import
plugins_dir,
plugin_manager,
@@ -71,10 +78,45 @@ class TestProcessExecutor(object):
executor.execute(MockContext(model, task_kwargs=dict(function='some.function')))
assert 'closed' in exc_info.value.message
+ def test_process_termination(self, executor, model, fs_test_holder):
+ argument = models.Argument.wrap('holder_path', fs_test_holder._path)
+ model.argument.put(argument)
+ ctx = MockContext(
+ model,
+ task_kwargs=dict(
+ function='{0}.{1}'.format(__name__, freezing_task.__name__),
+ arguments=dict(holder_path=argument)),
+ )
+
+ executor.execute(ctx)
+
+ @retrying.retry(retry_on_result=lambda r: r is False, stop_max_delay=60000, wait_fixed=500)
+ def wait_for_extra_process_id():
+ return fs_test_holder.get('subproc', False)
+
+ pids = [executor._tasks[ctx.task.id].proc.pid, wait_for_extra_process_id()]
+ assert any(p.pid == pid for p in psutil.process_iter() for pid in pids)
+ executor.terminate(ctx.task.id)
+
+ # Give a chance to the processes to terminate
+ time.sleep(2)
+ assert not any(p.pid == pid and p.status() != psutil.STATUS_ZOMBIE
+ for p in psutil.process_iter()
+ for pid in pids)
+
+
+@pytest.fixture
+def fs_test_holder(tmpdir):
+ dataholder_path = str(tmpdir.join('dataholder'))
+ holder = FilesystemDataHolder(dataholder_path)
+ return holder
+
@pytest.fixture
def executor(plugin_manager):
- result = process.ProcessExecutor(plugin_manager=plugin_manager)
+ result = process.ProcessExecutor(
+ plugin_manager=plugin_manager,
+ python_path=[tests.ROOT_DIR])
yield result
result.close()
@@ -92,3 +134,11 @@ def model(tmpdir):
initiator_kwargs=dict(base_dir=str(tmpdir)))
yield _storage
tests.storage.release_sqlite_storage(_storage)
+
+
+@operation
+def freezing_task(holder_path, **_):
+ holder = FilesystemDataHolder(holder_path)
+ holder['subproc'] = subprocess.Popen('while true; do sleep 5; done', shell=True).pid
+ while True:
+ time.sleep(5)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/1de1a958/tests/requirements.txt
----------------------------------------------------------------------
diff --git a/tests/requirements.txt b/tests/requirements.txt
index 71a227a..cf57821 100644
--- a/tests/requirements.txt
+++ b/tests/requirements.txt
@@ -13,6 +13,7 @@
testtools
fasteners==0.13.0
sh==1.12.13
+psutil==5.2.2
mock==1.0.1
pylint==1.6.4
pytest==3.0.2