You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ariatosca.apache.org by mx...@apache.org on 2017/06/27 14:05:06 UTC

incubator-ariatosca git commit: ARIA-285 Cancel execution may leave running processes [Forced Update!]

Repository: incubator-ariatosca
Updated Branches:
  refs/heads/ARIA-285-Cancel-execution-may-leave-running-processes 26b6c6972 -> 1de1a9585 (forced update)


ARIA-285 Cancel execution may leave running processes


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/1de1a958
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/1de1a958
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/1de1a958

Branch: refs/heads/ARIA-285-Cancel-execution-may-leave-running-processes
Commit: 1de1a9585f0e9df9f148151b94a170cd7d78cbef
Parents: a75a3de
Author: max-orlov <ma...@gigaspaces.com>
Authored: Sun Jun 25 12:19:02 2017 +0300
Committer: max-orlov <ma...@gigaspaces.com>
Committed: Tue Jun 27 17:04:48 2017 +0300

----------------------------------------------------------------------
 aria/orchestrator/workflows/core/engine.py      | 10 ++++
 aria/orchestrator/workflows/executor/base.py    |  9 +++-
 aria/orchestrator/workflows/executor/celery.py  | 16 +++---
 aria/orchestrator/workflows/executor/process.py | 46 ++++++++++++++---
 requirements.in                                 |  1 +
 requirements.txt                                |  1 +
 .../orchestrator/workflows/executor/__init__.py |  7 +++
 .../workflows/executor/test_process_executor.py | 52 +++++++++++++++++++-
 tests/requirements.txt                          |  1 +
 9 files changed, 125 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/1de1a958/aria/orchestrator/workflows/core/engine.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/engine.py b/aria/orchestrator/workflows/core/engine.py
index d52ae85..5a94df8 100644
--- a/aria/orchestrator/workflows/core/engine.py
+++ b/aria/orchestrator/workflows/core/engine.py
@@ -66,13 +66,23 @@ class Engine(logger.LoggerMixin):
                 else:
                     time.sleep(0.1)
             if cancel:
+                self._terminate_tasks(tasks_tracker.executing_tasks)
                 events.on_cancelled_workflow_signal.send(ctx)
             else:
                 events.on_success_workflow_signal.send(ctx)
         except BaseException as e:
+            # Cleanup any remaining tasks
+            self._terminate_tasks(tasks_tracker.executing_tasks)
             events.on_failure_workflow_signal.send(ctx, exception=e)
             raise
 
+    def _terminate_tasks(self, tasks):
+        for task in tasks:
+            try:
+                self._executors[task._executor].terminate(task.id)
+            except BaseException:
+                pass
+
     @staticmethod
     def cancel_execution(ctx):
         """

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/1de1a958/aria/orchestrator/workflows/executor/base.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/base.py b/aria/orchestrator/workflows/executor/base.py
index 6a3c9d2..4cc4503 100644
--- a/aria/orchestrator/workflows/executor/base.py
+++ b/aria/orchestrator/workflows/executor/base.py
@@ -25,7 +25,7 @@ class BaseExecutor(logger.LoggerMixin):
     """
     Base class for executors for running tasks
     """
-    def _execute(self, task):
+    def _execute(self, ctx):
         raise NotImplementedError
 
     def execute(self, ctx):
@@ -48,6 +48,13 @@ class BaseExecutor(logger.LoggerMixin):
         """
         pass
 
+    def terminate(self, ctx):
+        """
+        Terminate the executing task
+        :return:
+        """
+        pass
+
     @staticmethod
     def _task_started(ctx):
         events.start_task_signal.send(ctx)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/1de1a958/aria/orchestrator/workflows/executor/celery.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/celery.py b/aria/orchestrator/workflows/executor/celery.py
index 9d66d26..46b15fd 100644
--- a/aria/orchestrator/workflows/executor/celery.py
+++ b/aria/orchestrator/workflows/executor/celery.py
@@ -42,15 +42,15 @@ class CeleryExecutor(BaseExecutor):
         self._receiver_thread.start()
         self._started_queue.get(timeout=30)
 
-    def _execute(self, task):
-        self._tasks[task.id] = task
-        arguments = dict(arg.unwrapped for arg in task.arguments.values())
-        arguments['ctx'] = task.context
-        self._results[task.id] = self._app.send_task(
-            task.operation_mapping,
+    def _execute(self, ctx):
+        self._tasks[ctx.id] = ctx
+        arguments = dict(arg.unwrapped for arg in ctx.arguments.values())
+        arguments['ctx'] = ctx.context
+        self._results[ctx.id] = self._app.send_task(
+            ctx.operation_mapping,
             kwargs=arguments,
-            task_id=task.id,
-            queue=self._get_queue(task))
+            task_id=ctx.id,
+            queue=self._get_queue(ctx))
 
     def close(self):
         self._stopped = True

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/1de1a958/aria/orchestrator/workflows/executor/process.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/process.py b/aria/orchestrator/workflows/executor/process.py
index 8518b33..59e61b6 100644
--- a/aria/orchestrator/workflows/executor/process.py
+++ b/aria/orchestrator/workflows/executor/process.py
@@ -25,6 +25,10 @@ import sys
 # As part of the process executor implementation, subprocess are started with this module as their
 # entry point. We thus remove this module's directory from the python path if it happens to be
 # there
+
+import signal
+from collections import namedtuple
+
 script_dir = os.path.dirname(__file__)
 if script_dir in sys.path:
     sys.path.remove(script_dir)
@@ -39,6 +43,7 @@ import tempfile
 import Queue
 import pickle
 
+import psutil
 import jsonpickle
 
 import aria
@@ -57,6 +62,9 @@ UPDATE_TRACKED_CHANGES_FAILED_STR = \
     'Some changes failed writing to storage. For more info refer to the log.'
 
 
+_Task = namedtuple('_Task', 'proc, ctx')
+
+
 class ProcessExecutor(base.BaseExecutor):
     """
     Executor which runs tasks in a subprocess environment
@@ -113,9 +121,26 @@ class ProcessExecutor(base.BaseExecutor):
         self._server_socket.close()
         self._listener_thread.join(timeout=60)
 
+        for task_id in self._tasks:
+            self.terminate(task_id)
+
+    def terminate(self, task_id):
+        task = self._tasks.get(task_id)
+        # The process might have managed to finish, thus it would not be in the tasks list
+        if task:
+            try:
+                parent_process = psutil.Process(task.proc.pid)
+                for child_process in reversed(parent_process.children(recursive=True)):
+                    try:
+                        child_process.send_signal(signal.SIGKILL)
+                    except BaseException:
+                        pass
+                parent_process.send_signal(signal.SIGKILL)
+            except BaseException:
+                pass
+
     def _execute(self, ctx):
         self._check_closed()
-        self._tasks[ctx.task.id] = ctx
 
         # Temporary file used to pass arguments to the started subprocess
         file_descriptor, arguments_json_path = tempfile.mkstemp(prefix='executor-', suffix='.json')
@@ -125,10 +150,15 @@ class ProcessExecutor(base.BaseExecutor):
 
         env = self._construct_subprocess_env(task=ctx.task)
         # Asynchronously start the operation in a subprocess
-        subprocess.Popen(
-            '{0} {1} {2}'.format(sys.executable, __file__, arguments_json_path),
-            env=env,
-            shell=True)
+        proc = subprocess.Popen(
+            [
+                sys.executable,
+                os.path.expanduser(os.path.expandvars(__file__)),
+                os.path.expanduser(os.path.expandvars(arguments_json_path))
+            ],
+            env=env)
+
+        self._tasks[ctx.task.id] = _Task(ctx=ctx, proc=proc)
 
     def _remove_task(self, task_id):
         return self._tasks.pop(task_id)
@@ -191,15 +221,15 @@ class ProcessExecutor(base.BaseExecutor):
                 _send_message(connection, response)
 
     def _handle_task_started_request(self, task_id, **kwargs):
-        self._task_started(self._tasks[task_id])
+        self._task_started(self._tasks[task_id].ctx)
 
     def _handle_task_succeeded_request(self, task_id, **kwargs):
         task = self._remove_task(task_id)
-        self._task_succeeded(task)
+        self._task_succeeded(task.ctx)
 
     def _handle_task_failed_request(self, task_id, request, **kwargs):
         task = self._remove_task(task_id)
-        self._task_failed(task, exception=request['exception'], traceback=request['traceback'])
+        self._task_failed(task.ctx, exception=request['exception'], traceback=request['traceback'])
 
 
 def _send_message(connection, message):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/1de1a958/requirements.in
----------------------------------------------------------------------
diff --git a/requirements.in b/requirements.in
index cecc9fd..723ed51 100644
--- a/requirements.in
+++ b/requirements.in
@@ -33,6 +33,7 @@ PrettyTable>=0.7,<0.8
 click_didyoumean==0.0.3
 backports.shutil_get_terminal_size==1.0.0
 logutils==0.3.4.1
+psutil>=5.2.2, < 6.0.0
 importlib ; python_version < '2.7'
 ordereddict ; python_version < '2.7'
 total-ordering ; python_version < '2.7'  # only one version on pypi

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/1de1a958/requirements.txt
----------------------------------------------------------------------
diff --git a/requirements.txt b/requirements.txt
index 9f929a9..7ee1008 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -26,6 +26,7 @@ networkx==1.9.1
 ordereddict==1.1 ; python_version < "2.7"
 packaging==16.8           # via setuptools
 prettytable==0.7.2
+psutil==5.2.2
 pyparsing==2.2.0          # via packaging
 requests==2.13.0
 retrying==1.3.3

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/1de1a958/tests/orchestrator/workflows/executor/__init__.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/__init__.py b/tests/orchestrator/workflows/executor/__init__.py
index 83584a6..99d0b39 100644
--- a/tests/orchestrator/workflows/executor/__init__.py
+++ b/tests/orchestrator/workflows/executor/__init__.py
@@ -18,10 +18,13 @@ from contextlib import contextmanager
 
 import aria
 from aria.modeling import models
+from aria.orchestrator.context.common import BaseContext
 
 
 class MockContext(object):
 
+    INSTRUMENTATION_FIELDS = BaseContext.INSTRUMENTATION_FIELDS
+
     def __init__(self, storage, task_kwargs=None):
         self.logger = logging.getLogger('mock_logger')
         self._task_kwargs = task_kwargs or {}
@@ -46,6 +49,10 @@ class MockContext(object):
     def close(self):
         pass
 
+    @property
+    def model(self):
+        return self._storage
+
     @classmethod
     def instantiate_from_dict(cls, storage_kwargs=None, task_kwargs=None):
         return cls(storage=aria.application_model_storage(**(storage_kwargs or {})),

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/1de1a958/tests/orchestrator/workflows/executor/test_process_executor.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_process_executor.py b/tests/orchestrator/workflows/executor/test_process_executor.py
index 755b9be..6f5c827 100644
--- a/tests/orchestrator/workflows/executor/test_process_executor.py
+++ b/tests/orchestrator/workflows/executor/test_process_executor.py
@@ -14,17 +14,24 @@
 # limitations under the License.
 
 import os
+import time
 import Queue
+import subprocess
 
 import pytest
+import psutil
+import retrying
 
 import aria
+from aria import operation
+from aria.modeling import models
 from aria.orchestrator import events
 from aria.utils.plugin import create as create_plugin
 from aria.orchestrator.workflows.executor import process
 
 import tests.storage
 import tests.resources
+from tests.helpers import FilesystemDataHolder
 from tests.fixtures import (  # pylint: disable=unused-import
     plugins_dir,
     plugin_manager,
@@ -71,10 +78,45 @@ class TestProcessExecutor(object):
             executor.execute(MockContext(model, task_kwargs=dict(function='some.function')))
         assert 'closed' in exc_info.value.message
 
+    def test_process_termination(self, executor, model, fs_test_holder):
+        argument = models.Argument.wrap('holder_path', fs_test_holder._path)
+        model.argument.put(argument)
+        ctx = MockContext(
+            model,
+            task_kwargs=dict(
+                function='{0}.{1}'.format(__name__, freezing_task.__name__),
+                arguments=dict(holder_path=argument)),
+        )
+
+        executor.execute(ctx)
+
+        @retrying.retry(retry_on_result=lambda r: r is False, stop_max_delay=60000, wait_fixed=500)
+        def wait_for_extra_process_id():
+            return fs_test_holder.get('subproc', False)
+
+        pids = [executor._tasks[ctx.task.id].proc.pid, wait_for_extra_process_id()]
+        assert any(p.pid == pid for p in psutil.process_iter() for pid in pids)
+        executor.terminate(ctx.task.id)
+
+        # Give a chance to the processes to terminate
+        time.sleep(2)
+        assert not any(p.pid == pid and p.status() != psutil.STATUS_ZOMBIE
+                       for p in psutil.process_iter()
+                       for pid in pids)
+
+
+@pytest.fixture
+def fs_test_holder(tmpdir):
+    dataholder_path = str(tmpdir.join('dataholder'))
+    holder = FilesystemDataHolder(dataholder_path)
+    return holder
+
 
 @pytest.fixture
 def executor(plugin_manager):
-    result = process.ProcessExecutor(plugin_manager=plugin_manager)
+    result = process.ProcessExecutor(
+        plugin_manager=plugin_manager,
+        python_path=[tests.ROOT_DIR])
     yield result
     result.close()
 
@@ -92,3 +134,11 @@ def model(tmpdir):
                                               initiator_kwargs=dict(base_dir=str(tmpdir)))
     yield _storage
     tests.storage.release_sqlite_storage(_storage)
+
+
+@operation
+def freezing_task(holder_path, **_):
+    holder = FilesystemDataHolder(holder_path)
+    holder['subproc'] = subprocess.Popen('while true; do sleep 5; done', shell=True).pid
+    while True:
+        time.sleep(5)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/1de1a958/tests/requirements.txt
----------------------------------------------------------------------
diff --git a/tests/requirements.txt b/tests/requirements.txt
index 71a227a..cf57821 100644
--- a/tests/requirements.txt
+++ b/tests/requirements.txt
@@ -13,6 +13,7 @@
 testtools
 fasteners==0.13.0
 sh==1.12.13
+psutil==5.2.2
 mock==1.0.1
 pylint==1.6.4
 pytest==3.0.2