You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ariatosca.apache.org by mx...@apache.org on 2017/06/25 09:19:07 UTC

incubator-ariatosca git commit: introduced process grouping

Repository: incubator-ariatosca
Updated Branches:
  refs/heads/ARIA-285-Cancel-execution-may-leave-running-processes 75112ab05 -> e9ed005df


introduced process grouping


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/e9ed005d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/e9ed005d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/e9ed005d

Branch: refs/heads/ARIA-285-Cancel-execution-may-leave-running-processes
Commit: e9ed005dff47bf4cf056a94969b1274b5b3b4abd
Parents: 75112ab
Author: max-orlov <ma...@gigaspaces.com>
Authored: Sun Jun 25 12:19:02 2017 +0300
Committer: max-orlov <ma...@gigaspaces.com>
Committed: Sun Jun 25 12:19:02 2017 +0300

----------------------------------------------------------------------
 aria/orchestrator/workflows/core/engine.py      | 12 +++++-
 aria/orchestrator/workflows/executor/base.py    |  7 ++++
 aria/orchestrator/workflows/executor/process.py | 29 +++++++++++---
 .../orchestrator/workflows/executor/__init__.py |  7 ++++
 .../workflows/executor/test_process_executor.py | 41 ++++++++++++++++++++
 tests/requirements.txt                          |  1 +
 6 files changed, 90 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/e9ed005d/aria/orchestrator/workflows/core/engine.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/engine.py b/aria/orchestrator/workflows/core/engine.py
index d5a6e70..02b1dee 100644
--- a/aria/orchestrator/workflows/core/engine.py
+++ b/aria/orchestrator/workflows/core/engine.py
@@ -49,7 +49,6 @@ class Engine(logger.LoggerMixin):
 
         if resuming:
             events.on_resume_workflow_signal.send(ctx)
-
         try:
             events.start_workflow_signal.send(ctx)
             while True:
@@ -65,13 +64,22 @@ class Engine(logger.LoggerMixin):
                 else:
                     time.sleep(0.1)
             if cancel:
-                events.on_cancelled_workflow_signal.send(ctx)
+                try:
+                    self._terminate_tasks(executing_tasks)
+                finally:
+                    events.on_cancelled_workflow_signal.send(ctx)
             else:
                 events.on_success_workflow_signal.send(ctx)
         except BaseException as e:
+            # Cleanup any remaining tasks
+            self._terminate_tasks(executing_tasks)
             events.on_failure_workflow_signal.send(ctx, exception=e)
             raise
 
+    def _terminate_tasks(self, tasks):
+        for task in tasks:
+            self._executors[task._executor].terminate(task)
+
     @staticmethod
     def cancel_execution(ctx):
         """

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/e9ed005d/aria/orchestrator/workflows/executor/base.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/base.py b/aria/orchestrator/workflows/executor/base.py
index 6a3c9d2..8975154 100644
--- a/aria/orchestrator/workflows/executor/base.py
+++ b/aria/orchestrator/workflows/executor/base.py
@@ -48,6 +48,13 @@ class BaseExecutor(logger.LoggerMixin):
         """
         pass
 
+    def terminate(self):
+        """
+        Terminate the executing task
+        :return: 
+        """
+        pass
+
     @staticmethod
     def _task_started(ctx):
         events.start_task_signal.send(ctx)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/e9ed005d/aria/orchestrator/workflows/executor/process.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/process.py b/aria/orchestrator/workflows/executor/process.py
index 8518b33..8a10ebb 100644
--- a/aria/orchestrator/workflows/executor/process.py
+++ b/aria/orchestrator/workflows/executor/process.py
@@ -25,6 +25,10 @@ import sys
 # As part of the process executor implementation, subprocess are started with this module as their
 # entry point. We thus remove this module's directory from the python path if it happens to be
 # there
+from collections import namedtuple
+
+import signal
+
 script_dir = os.path.dirname(__file__)
 if script_dir in sys.path:
     sys.path.remove(script_dir)
@@ -57,6 +61,9 @@ UPDATE_TRACKED_CHANGES_FAILED_STR = \
     'Some changes failed writing to storage. For more info refer to the log.'
 
 
+_Task = namedtuple('_Task', 'proc, ctx')
+
+
 class ProcessExecutor(base.BaseExecutor):
     """
     Executor which runs tasks in a subprocess environment
@@ -113,9 +120,18 @@ class ProcessExecutor(base.BaseExecutor):
         self._server_socket.close()
         self._listener_thread.join(timeout=60)
 
+    def terminate(self, ctx):
+        task = self._tasks.get(ctx.task.id)
+        # The process might have managed to finished so it would not be in the tasks list
+        if task:
+            if os.getsid(os.getpid()) != os.getpgid(task.proc.pid):
+                # If the above condition is false, the process group leader is the group leader
+                # for the current session of the system, and killing it will kill the the entire
+                # os session.
+                os.killpg(os.getpgid(task.proc.pid), signal.SIGTERM)
+
     def _execute(self, ctx):
         self._check_closed()
-        self._tasks[ctx.task.id] = ctx
 
         # Temporary file used to pass arguments to the started subprocess
         file_descriptor, arguments_json_path = tempfile.mkstemp(prefix='executor-', suffix='.json')
@@ -125,11 +141,14 @@ class ProcessExecutor(base.BaseExecutor):
 
         env = self._construct_subprocess_env(task=ctx.task)
         # Asynchronously start the operation in a subprocess
-        subprocess.Popen(
+        p = subprocess.Popen(
             '{0} {1} {2}'.format(sys.executable, __file__, arguments_json_path),
             env=env,
+            preexec_fn=os.setsid,
             shell=True)
 
+        self._tasks[ctx.task.id] = _Task(ctx=ctx, proc=p)
+
     def _remove_task(self, task_id):
         return self._tasks.pop(task_id)
 
@@ -191,15 +210,15 @@ class ProcessExecutor(base.BaseExecutor):
                 _send_message(connection, response)
 
     def _handle_task_started_request(self, task_id, **kwargs):
-        self._task_started(self._tasks[task_id])
+        self._task_started(self._tasks[task_id].ctx)
 
     def _handle_task_succeeded_request(self, task_id, **kwargs):
         task = self._remove_task(task_id)
-        self._task_succeeded(task)
+        self._task_succeeded(task.ctx)
 
     def _handle_task_failed_request(self, task_id, request, **kwargs):
         task = self._remove_task(task_id)
-        self._task_failed(task, exception=request['exception'], traceback=request['traceback'])
+        self._task_failed(task.ctx, exception=request['exception'], traceback=request['traceback'])
 
 
 def _send_message(connection, message):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/e9ed005d/tests/orchestrator/workflows/executor/__init__.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/__init__.py b/tests/orchestrator/workflows/executor/__init__.py
index 83584a6..99d0b39 100644
--- a/tests/orchestrator/workflows/executor/__init__.py
+++ b/tests/orchestrator/workflows/executor/__init__.py
@@ -18,10 +18,13 @@ from contextlib import contextmanager
 
 import aria
 from aria.modeling import models
+from aria.orchestrator.context.common import BaseContext
 
 
 class MockContext(object):
 
+    INSTRUMENTATION_FIELDS = BaseContext.INSTRUMENTATION_FIELDS
+
     def __init__(self, storage, task_kwargs=None):
         self.logger = logging.getLogger('mock_logger')
         self._task_kwargs = task_kwargs or {}
@@ -46,6 +49,10 @@ class MockContext(object):
     def close(self):
         pass
 
+    @property
+    def model(self):
+        return self._storage
+
     @classmethod
     def instantiate_from_dict(cls, storage_kwargs=None, task_kwargs=None):
         return cls(storage=aria.application_model_storage(**(storage_kwargs or {})),

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/e9ed005d/tests/orchestrator/workflows/executor/test_process_executor.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_process_executor.py b/tests/orchestrator/workflows/executor/test_process_executor.py
index 755b9be..591305a 100644
--- a/tests/orchestrator/workflows/executor/test_process_executor.py
+++ b/tests/orchestrator/workflows/executor/test_process_executor.py
@@ -15,16 +15,22 @@
 
 import os
 import Queue
+import subprocess
 
 import pytest
+import time
+import psutil
 
 import aria
+from aria import operation
+from aria.modeling import models
 from aria.orchestrator import events
 from aria.utils.plugin import create as create_plugin
 from aria.orchestrator.workflows.executor import process
 
 import tests.storage
 import tests.resources
+from tests.helpers import FilesystemDataHolder
 from tests.fixtures import (  # pylint: disable=unused-import
     plugins_dir,
     plugin_manager,
@@ -71,6 +77,33 @@ class TestProcessExecutor(object):
             executor.execute(MockContext(model, task_kwargs=dict(function='some.function')))
         assert 'closed' in exc_info.value.message
 
+    def test_process_termination(self, executor, model, fs_test_holder):
+        ctx = MockContext(
+            model,
+            task_kwargs=dict(
+                function='{0}.{1}'.format(__name__, freezing_task.__name__),
+                arguments=dict(holder_path=models.Argument.wrap('holder_path',
+                                                                fs_test_holder._path))),
+        )
+
+        executor.execute(ctx)
+
+        while fs_test_holder.get('subproc', None) is None:
+            time.sleep(1)
+        pids = [executor._tasks[ctx.task.id].proc.pid, fs_test_holder['subproc']]
+        assert any(p.pid == pid for p in psutil.process_iter() for pid in pids)
+        executor.terminate(ctx)
+        assert not any(p.pid == pid and p.status() != psutil.STATUS_ZOMBIE
+                       for p in psutil.process_iter()
+                       for pid in pids)
+
+
+@pytest.fixture
+def fs_test_holder(tmpdir):
+    dataholder_path = str(tmpdir.join('dataholder'))
+    holder = FilesystemDataHolder(dataholder_path)
+    return holder
+
 
 @pytest.fixture
 def executor(plugin_manager):
@@ -92,3 +125,11 @@ def model(tmpdir):
                                               initiator_kwargs=dict(base_dir=str(tmpdir)))
     yield _storage
     tests.storage.release_sqlite_storage(_storage)
+
+
+@operation
+def freezing_task(holder_path, **_):
+    holder = FilesystemDataHolder(holder_path)
+    holder['subproc'] = subprocess.Popen('while true; do sleep 5; done', shell=True).pid
+    while True:
+        time.sleep(5)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/e9ed005d/tests/requirements.txt
----------------------------------------------------------------------
diff --git a/tests/requirements.txt b/tests/requirements.txt
index 71a227a..cf57821 100644
--- a/tests/requirements.txt
+++ b/tests/requirements.txt
@@ -13,6 +13,7 @@
 testtools
 fasteners==0.13.0
 sh==1.12.13
+psutil==5.2.2
 mock==1.0.1
 pylint==1.6.4
 pytest==3.0.2