You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ariatosca.apache.org by mx...@apache.org on 2017/06/25 09:19:07 UTC
incubator-ariatosca git commit: introduced process grouping
Repository: incubator-ariatosca
Updated Branches:
refs/heads/ARIA-285-Cancel-execution-may-leave-running-processes 75112ab05 -> e9ed005df
introduced process grouping
Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/e9ed005d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/e9ed005d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/e9ed005d
Branch: refs/heads/ARIA-285-Cancel-execution-may-leave-running-processes
Commit: e9ed005dff47bf4cf056a94969b1274b5b3b4abd
Parents: 75112ab
Author: max-orlov <ma...@gigaspaces.com>
Authored: Sun Jun 25 12:19:02 2017 +0300
Committer: max-orlov <ma...@gigaspaces.com>
Committed: Sun Jun 25 12:19:02 2017 +0300
----------------------------------------------------------------------
aria/orchestrator/workflows/core/engine.py | 12 +++++-
aria/orchestrator/workflows/executor/base.py | 7 ++++
aria/orchestrator/workflows/executor/process.py | 29 +++++++++++---
.../orchestrator/workflows/executor/__init__.py | 7 ++++
.../workflows/executor/test_process_executor.py | 41 ++++++++++++++++++++
tests/requirements.txt | 1 +
6 files changed, 90 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/e9ed005d/aria/orchestrator/workflows/core/engine.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/engine.py b/aria/orchestrator/workflows/core/engine.py
index d5a6e70..02b1dee 100644
--- a/aria/orchestrator/workflows/core/engine.py
+++ b/aria/orchestrator/workflows/core/engine.py
@@ -49,7 +49,6 @@ class Engine(logger.LoggerMixin):
if resuming:
events.on_resume_workflow_signal.send(ctx)
-
try:
events.start_workflow_signal.send(ctx)
while True:
@@ -65,13 +64,22 @@ class Engine(logger.LoggerMixin):
else:
time.sleep(0.1)
if cancel:
- events.on_cancelled_workflow_signal.send(ctx)
+ try:
+ self._terminate_tasks(executing_tasks)
+ finally:
+ events.on_cancelled_workflow_signal.send(ctx)
else:
events.on_success_workflow_signal.send(ctx)
except BaseException as e:
+ # Cleanup any remaining tasks
+ self._terminate_tasks(executing_tasks)
events.on_failure_workflow_signal.send(ctx, exception=e)
raise
+ def _terminate_tasks(self, tasks):
+ for task in tasks:
+ self._executors[task._executor].terminate(task)
+
@staticmethod
def cancel_execution(ctx):
"""
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/e9ed005d/aria/orchestrator/workflows/executor/base.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/base.py b/aria/orchestrator/workflows/executor/base.py
index 6a3c9d2..8975154 100644
--- a/aria/orchestrator/workflows/executor/base.py
+++ b/aria/orchestrator/workflows/executor/base.py
@@ -48,6 +48,13 @@ class BaseExecutor(logger.LoggerMixin):
"""
pass
+ def terminate(self):
+ """
+ Terminate the executing task
+ :return:
+ """
+ pass
+
@staticmethod
def _task_started(ctx):
events.start_task_signal.send(ctx)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/e9ed005d/aria/orchestrator/workflows/executor/process.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/process.py b/aria/orchestrator/workflows/executor/process.py
index 8518b33..8a10ebb 100644
--- a/aria/orchestrator/workflows/executor/process.py
+++ b/aria/orchestrator/workflows/executor/process.py
@@ -25,6 +25,10 @@ import sys
# As part of the process executor implementation, subprocess are started with this module as their
# entry point. We thus remove this module's directory from the python path if it happens to be
# there
+from collections import namedtuple
+
+import signal
+
script_dir = os.path.dirname(__file__)
if script_dir in sys.path:
sys.path.remove(script_dir)
@@ -57,6 +61,9 @@ UPDATE_TRACKED_CHANGES_FAILED_STR = \
'Some changes failed writing to storage. For more info refer to the log.'
+_Task = namedtuple('_Task', 'proc, ctx')
+
+
class ProcessExecutor(base.BaseExecutor):
"""
Executor which runs tasks in a subprocess environment
@@ -113,9 +120,18 @@ class ProcessExecutor(base.BaseExecutor):
self._server_socket.close()
self._listener_thread.join(timeout=60)
+ def terminate(self, ctx):
+ task = self._tasks.get(ctx.task.id)
+ # The process might have managed to finished so it would not be in the tasks list
+ if task:
+ if os.getsid(os.getpid()) != os.getpgid(task.proc.pid):
+ # If the above condition is false, the process group leader is the group leader
+ # for the current session of the system, and killing it will kill the the entire
+ # os session.
+ os.killpg(os.getpgid(task.proc.pid), signal.SIGTERM)
+
def _execute(self, ctx):
self._check_closed()
- self._tasks[ctx.task.id] = ctx
# Temporary file used to pass arguments to the started subprocess
file_descriptor, arguments_json_path = tempfile.mkstemp(prefix='executor-', suffix='.json')
@@ -125,11 +141,14 @@ class ProcessExecutor(base.BaseExecutor):
env = self._construct_subprocess_env(task=ctx.task)
# Asynchronously start the operation in a subprocess
- subprocess.Popen(
+ p = subprocess.Popen(
'{0} {1} {2}'.format(sys.executable, __file__, arguments_json_path),
env=env,
+ preexec_fn=os.setsid,
shell=True)
+ self._tasks[ctx.task.id] = _Task(ctx=ctx, proc=p)
+
def _remove_task(self, task_id):
return self._tasks.pop(task_id)
@@ -191,15 +210,15 @@ class ProcessExecutor(base.BaseExecutor):
_send_message(connection, response)
def _handle_task_started_request(self, task_id, **kwargs):
- self._task_started(self._tasks[task_id])
+ self._task_started(self._tasks[task_id].ctx)
def _handle_task_succeeded_request(self, task_id, **kwargs):
task = self._remove_task(task_id)
- self._task_succeeded(task)
+ self._task_succeeded(task.ctx)
def _handle_task_failed_request(self, task_id, request, **kwargs):
task = self._remove_task(task_id)
- self._task_failed(task, exception=request['exception'], traceback=request['traceback'])
+ self._task_failed(task.ctx, exception=request['exception'], traceback=request['traceback'])
def _send_message(connection, message):
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/e9ed005d/tests/orchestrator/workflows/executor/__init__.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/__init__.py b/tests/orchestrator/workflows/executor/__init__.py
index 83584a6..99d0b39 100644
--- a/tests/orchestrator/workflows/executor/__init__.py
+++ b/tests/orchestrator/workflows/executor/__init__.py
@@ -18,10 +18,13 @@ from contextlib import contextmanager
import aria
from aria.modeling import models
+from aria.orchestrator.context.common import BaseContext
class MockContext(object):
+ INSTRUMENTATION_FIELDS = BaseContext.INSTRUMENTATION_FIELDS
+
def __init__(self, storage, task_kwargs=None):
self.logger = logging.getLogger('mock_logger')
self._task_kwargs = task_kwargs or {}
@@ -46,6 +49,10 @@ class MockContext(object):
def close(self):
pass
+ @property
+ def model(self):
+ return self._storage
+
@classmethod
def instantiate_from_dict(cls, storage_kwargs=None, task_kwargs=None):
return cls(storage=aria.application_model_storage(**(storage_kwargs or {})),
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/e9ed005d/tests/orchestrator/workflows/executor/test_process_executor.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_process_executor.py b/tests/orchestrator/workflows/executor/test_process_executor.py
index 755b9be..591305a 100644
--- a/tests/orchestrator/workflows/executor/test_process_executor.py
+++ b/tests/orchestrator/workflows/executor/test_process_executor.py
@@ -15,16 +15,22 @@
import os
import Queue
+import subprocess
import pytest
+import time
+import psutil
import aria
+from aria import operation
+from aria.modeling import models
from aria.orchestrator import events
from aria.utils.plugin import create as create_plugin
from aria.orchestrator.workflows.executor import process
import tests.storage
import tests.resources
+from tests.helpers import FilesystemDataHolder
from tests.fixtures import ( # pylint: disable=unused-import
plugins_dir,
plugin_manager,
@@ -71,6 +77,33 @@ class TestProcessExecutor(object):
executor.execute(MockContext(model, task_kwargs=dict(function='some.function')))
assert 'closed' in exc_info.value.message
+ def test_process_termination(self, executor, model, fs_test_holder):
+ ctx = MockContext(
+ model,
+ task_kwargs=dict(
+ function='{0}.{1}'.format(__name__, freezing_task.__name__),
+ arguments=dict(holder_path=models.Argument.wrap('holder_path',
+ fs_test_holder._path))),
+ )
+
+ executor.execute(ctx)
+
+ while fs_test_holder.get('subproc', None) is None:
+ time.sleep(1)
+ pids = [executor._tasks[ctx.task.id].proc.pid, fs_test_holder['subproc']]
+ assert any(p.pid == pid for p in psutil.process_iter() for pid in pids)
+ executor.terminate(ctx)
+ assert not any(p.pid == pid and p.status() != psutil.STATUS_ZOMBIE
+ for p in psutil.process_iter()
+ for pid in pids)
+
+
+@pytest.fixture
+def fs_test_holder(tmpdir):
+ dataholder_path = str(tmpdir.join('dataholder'))
+ holder = FilesystemDataHolder(dataholder_path)
+ return holder
+
@pytest.fixture
def executor(plugin_manager):
@@ -92,3 +125,11 @@ def model(tmpdir):
initiator_kwargs=dict(base_dir=str(tmpdir)))
yield _storage
tests.storage.release_sqlite_storage(_storage)
+
+
+@operation
+def freezing_task(holder_path, **_):
+ holder = FilesystemDataHolder(holder_path)
+ holder['subproc'] = subprocess.Popen('while true; do sleep 5; done', shell=True).pid
+ while True:
+ time.sleep(5)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/e9ed005d/tests/requirements.txt
----------------------------------------------------------------------
diff --git a/tests/requirements.txt b/tests/requirements.txt
index 71a227a..cf57821 100644
--- a/tests/requirements.txt
+++ b/tests/requirements.txt
@@ -13,6 +13,7 @@
testtools
fasteners==0.13.0
sh==1.12.13
+psutil==5.2.2
mock==1.0.1
pylint==1.6.4
pytest==3.0.2