You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ariatosca.apache.org by da...@apache.org on 2016/12/13 10:33:49 UTC

[2/2] incubator-ariatosca git commit: ARIA-26 TBD

ARIA-26 TBD


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/5c3bee47
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/5c3bee47
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/5c3bee47

Branch: refs/heads/ARIA-26-plugin-mechanism
Commit: 5c3bee47d6acbb46235c07b0ff5b5be2460ccb1e
Parents: 04c9bd0
Author: Dan Kilman <da...@gigaspaces.com>
Authored: Sun Nov 27 16:31:29 2016 +0200
Committer: Dan Kilman <da...@gigaspaces.com>
Committed: Tue Dec 13 12:32:04 2016 +0200

----------------------------------------------------------------------
 aria/cli/commands.py                            |   4 +-
 aria/orchestrator/workflows/api/task.py         |  42 +++++---
 aria/orchestrator/workflows/core/task.py        |  10 +-
 .../orchestrator/workflows/executor/__init__.py |   2 +-
 .../orchestrator/workflows/executor/blocking.py |  36 -------
 .../workflows/executor/multiprocess.py          |  86 ++++++++++------
 aria/orchestrator/workflows/executor/thread.py  |   3 +-
 aria/storage/models.py                          |   5 +
 aria/utils/plugin.py                            |  72 +++++++++++++
 requirements.txt                                |   3 +-
 .../workflows/executor/test_executor.py         | 103 +++++++++----------
 11 files changed, 225 insertions(+), 141 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5c3bee47/aria/cli/commands.py
----------------------------------------------------------------------
diff --git a/aria/cli/commands.py b/aria/cli/commands.py
index 141da07..825be68 100644
--- a/aria/cli/commands.py
+++ b/aria/cli/commands.py
@@ -34,7 +34,7 @@ from ..logger import LoggerMixin
 from ..storage import (FileSystemModelDriver, FileSystemResourceDriver)
 from ..orchestrator.context.workflow import WorkflowContext
 from ..orchestrator.workflows.core.engine import Engine
-from ..orchestrator.workflows.executor.thread import ThreadExecutor
+from ..orchestrator.workflows.executor.multiprocess import MultiprocessExecutor
 from ..parser import iter_specifications
 from ..parser.consumption import (
     ConsumptionContext,
@@ -252,7 +252,7 @@ class ExecuteCommand(BaseCommand):
         )
         workflow_function = self._load_workflow_handler(workflow['operation'])
         tasks_graph = workflow_function(workflow_context, **workflow_context.parameters)
-        executor = ThreadExecutor()
+        executor = MultiprocessExecutor()
         workflow_engine = Engine(executor=executor,
                                  workflow_context=workflow_context,
                                  tasks_graph=tasks_graph)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5c3bee47/aria/orchestrator/workflows/api/task.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/api/task.py b/aria/orchestrator/workflows/api/task.py
index 1c12407..70f9773 100644
--- a/aria/orchestrator/workflows/api/task.py
+++ b/aria/orchestrator/workflows/api/task.py
@@ -67,11 +67,11 @@ class OperationTask(BaseTask):
                  max_attempts=None,
                  retry_interval=None,
                  ignore_failure=None,
-                 inputs=None):
+                 inputs=None,
+                 plugin=None):
         """
         Creates an operation task using the name, details, node instance and any additional kwargs.
         :param name: the operation of the name.
-        :param operation_details: the details for the operation.
         :param actor: the operation host on which this operation is registered.
         :param inputs: operation inputs.
         """
@@ -82,6 +82,7 @@ class OperationTask(BaseTask):
         self.name = '{name}.{actor.id}'.format(name=name, actor=actor)
         self.operation_mapping = operation_mapping
         self.inputs = inputs or {}
+        self.plugin = plugin or {}
         self.max_attempts = (self.workflow_context._task_max_attempts
                              if max_attempts is None else max_attempts)
         self.retry_interval = (self.workflow_context._task_retry_interval
@@ -98,15 +99,13 @@ class OperationTask(BaseTask):
         :param name: the name of the operation.
         """
         assert isinstance(instance, models.NodeInstance)
-        operation_details = instance.node.operations[name]
-        operation_inputs = operation_details.get('inputs', {})
-        operation_inputs.update(inputs or {})
-        return cls(name=name,
-                   actor=instance,
-                   operation_mapping=operation_details.get('operation', ''),
-                   inputs=operation_inputs,
-                   *args,
-                   **kwargs)
+        return cls._instance(instance=instance,
+                             name=name,
+                             operation_details=instance.node.operations[name],
+                             inputs=inputs,
+                             plugins=instance.node.plugins or [],
+                             *args,
+                             **kwargs)
 
     @classmethod
     def relationship_instance(cls, instance, name, operation_end, inputs=None, *args, **kwargs):
@@ -125,12 +124,31 @@ class OperationTask(BaseTask):
                 cls.TARGET_OPERATION, cls.SOURCE_OPERATION
             ))
         operation_details = getattr(instance.relationship, operation_end)[name]
+        if operation_end == cls.SOURCE_OPERATION:
+            plugins = instance.relationship.source_node.plugins
+        else:
+            plugins = instance.relationship.target_node.plugins
+        return cls._instance(instance=instance,
+                             name=name,
+                             operation_details=operation_details,
+                             inputs=inputs,
+                             plugins=plugins or [],
+                             *args,
+                             **kwargs)
+
+    @classmethod
+    def _instance(cls, instance, name, operation_details, inputs, plugins, *args, **kwargs):
+        operation_mapping = operation_details.get('operation')
         operation_inputs = operation_details.get('inputs', {})
         operation_inputs.update(inputs or {})
+        plugin_name = operation_details.get('plugin')
+        matching_plugins = [p for p in plugins if p['name'] == plugin_name]
+        plugin = matching_plugins[0] if matching_plugins else {}
         return cls(actor=instance,
                    name=name,
-                   operation_mapping=operation_details.get('operation'),
+                   operation_mapping=operation_mapping,
                    inputs=operation_inputs,
+                   plugin=plugin,
                    *args,
                    **kwargs)
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5c3bee47/aria/orchestrator/workflows/core/task.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/task.py b/aria/orchestrator/workflows/core/task.py
index 0be17fe..3d869ba 100644
--- a/aria/orchestrator/workflows/core/task.py
+++ b/aria/orchestrator/workflows/core/task.py
@@ -106,8 +106,9 @@ class OperationTask(BaseTask):
     def __init__(self, api_task, *args, **kwargs):
         super(OperationTask, self).__init__(id=api_task.id, **kwargs)
         self._workflow_context = api_task._workflow_context
-        base_task_model = api_task._workflow_context.model.task.model_cls
+        model = api_task._workflow_context.model
 
+        base_task_model = model.task.model_cls
         if isinstance(api_task.actor, models.NodeInstance):
             context_class = operation_context.NodeOperationContext
             task_model_cls = base_task_model.as_node_instance
@@ -117,7 +118,11 @@ class OperationTask(BaseTask):
         else:
             raise RuntimeError('No operation context could be created for {actor.model_cls}'
                                .format(actor=api_task.actor))
-
+        plugin = api_task.plugin
+        plugins = model.plugin.list(filters={'package_name': plugin.get('package_name', ''),
+                                             'package_version': plugin.get('package_version', '')},
+                                    include=['id'])
+        plugin_id = plugins[0].id if plugins else None
         operation_task = task_model_cls(
             name=api_task.name,
             operation_mapping=api_task.operation_mapping,
@@ -127,6 +132,7 @@ class OperationTask(BaseTask):
             max_attempts=api_task.max_attempts,
             retry_interval=api_task.retry_interval,
             ignore_failure=api_task.ignore_failure,
+            plugin_id=plugin_id
         )
         self._workflow_context.model.task.put(operation_task)
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5c3bee47/aria/orchestrator/workflows/executor/__init__.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/__init__.py b/aria/orchestrator/workflows/executor/__init__.py
index 16b6c9b..7b205c1 100644
--- a/aria/orchestrator/workflows/executor/__init__.py
+++ b/aria/orchestrator/workflows/executor/__init__.py
@@ -18,5 +18,5 @@ Executors for task execution
 """
 
 
-from . import blocking, multiprocess, thread
+from . import multiprocess, thread
 from .base import BaseExecutor

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5c3bee47/aria/orchestrator/workflows/executor/blocking.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/blocking.py b/aria/orchestrator/workflows/executor/blocking.py
deleted file mode 100644
index 9d3a9ba..0000000
--- a/aria/orchestrator/workflows/executor/blocking.py
+++ /dev/null
@@ -1,36 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""
-Blocking executor
-"""
-
-from aria.utils import imports
-from .base import BaseExecutor
-
-
-class CurrentThreadBlockingExecutor(BaseExecutor):
-    """
-    Executor which runs tasks in the current thread (blocking)
-    """
-
-    def execute(self, task):
-        self._task_started(task)
-        try:
-            task_func = imports.load_attribute(task.operation_mapping)
-            task_func(ctx=task.context, **task.inputs)
-            self._task_succeeded(task)
-        except BaseException as e:
-            self._task_failed(task, exception=e)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5c3bee47/aria/orchestrator/workflows/executor/multiprocess.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/multiprocess.py b/aria/orchestrator/workflows/executor/multiprocess.py
index d770e07..7fbf7e9 100644
--- a/aria/orchestrator/workflows/executor/multiprocess.py
+++ b/aria/orchestrator/workflows/executor/multiprocess.py
@@ -17,7 +17,10 @@
 Multiprocess based executor
 """
 
+import collections
 import multiprocessing
+import os
+import sys
 import threading
 
 import jsonpickle
@@ -26,73 +29,90 @@ from aria.utils import imports
 from .base import BaseExecutor
 
 
+_TaskStarted = collections.namedtuple('_TaskStarted', 'task_id')
+_TaskSucceeded = collections.namedtuple('_TaskSucceeded', 'task_id')
+_TaskFailed = collections.namedtuple('_TaskFailed', 'task_id exception')
+
+
 class MultiprocessExecutor(BaseExecutor):
     """
     Executor which runs tasks in a multiprocess environment
     """
 
-    def __init__(self, pool_size=1, *args, **kwargs):
+    def __init__(self, pool_size=1, plugin_manager=None, *args, **kwargs):
         super(MultiprocessExecutor, self).__init__(*args, **kwargs)
         self._stopped = False
-        self._manager = multiprocessing.Manager()
+        self._plugin_manager = plugin_manager
+        self._multiprocessing_manager = multiprocessing.Manager()
         self._queue = self._manager.Queue()
         self._tasks = {}
         self._listener_thread = threading.Thread(target=self._listener)
         self._listener_thread.daemon = True
         self._listener_thread.start()
-        self._pool = multiprocessing.Pool(processes=pool_size)
-
-    def execute(self, task):
-        self._tasks[task.id] = task
-        self._pool.apply_async(_multiprocess_handler, args=(
-            self._queue,
-            task.context,
-            task.id,
-            task.operation_mapping,
-            task.inputs))
+        self._pool = multiprocessing.Pool(processes=pool_size, maxtasksperchild=1)
 
     def close(self):
         self._pool.close()
         self._stopped = True
         self._pool.join()
+        self._multiprocessing_manager.shutdown()
+        self._multiprocessing_manager.join()
         self._listener_thread.join()
 
+    def execute(self, task):
+        self._tasks[task.id] = task
+        kwargs = {
+            'queue': self._queue,
+            'ctx': task.context,
+            'task_id': task.id,
+            'operation_mapping': task.operation_mapping,
+            'operation_inputs': task.inputs,
+            'plugin_prefix': None
+        }
+        if task.plugin_id and self._plugin_manager:
+            kwargs['plugin_prefix'] = self._plugin_manager.get_plugin_prefix(task.plugin)
+        self._pool.apply_async(_handler, kwds=kwargs)
+
+    def _remove_task(self, task_id):
+        return self._tasks.pop(task_id)
+
     def _listener(self):
         while not self._stopped:
             try:
                 message = self._queue.get(timeout=1)
-                if message.type == 'task_started':
+                if isinstance(message, _TaskStarted):
                     self._task_started(self._tasks[message.task_id])
-                elif message.type == 'task_succeeded':
+                elif isinstance(message, _TaskSucceeded):
                     self._task_succeeded(self._remove_task(message.task_id))
-                elif message.type == 'task_failed':
+                elif isinstance(message, _TaskFailed):
                     self._task_failed(self._remove_task(message.task_id),
                                       exception=jsonpickle.loads(message.exception))
                 else:
-                    # TODO: something
-                    raise RuntimeError()
+                    raise RuntimeError('Invalid state')
             # Daemon threads
             except BaseException:
                 pass
 
-    def _remove_task(self, task_id):
-        return self._tasks.pop(task_id)
-
-
-class _MultiprocessMessage(object):
-
-    def __init__(self, type, task_id, exception=None):
-        self.type = type
-        self.task_id = task_id
-        self.exception = exception
-
 
-def _multiprocess_handler(queue, ctx, task_id, operation_mapping, operation_inputs):
-    queue.put(_MultiprocessMessage(type='task_started', task_id=task_id))
+def _handler(queue, ctx, task_id, operation_mapping, operation_inputs, plugin_prefix):
+    queue.put(_TaskStarted(task_id))
+    if plugin_prefix:
+        bin_dir = 'Scripts' if os.name == 'nt' else 'bin'
+        os.environ['PATH'] = '{0}{1}{2}'.format(
+            os.path.join(plugin_prefix, bin_dir),
+            os.pathsep,
+            os.environ.get('PATH', ''))
+        if os.name == 'nt':
+            pythonpath_dirs = [os.path.join(plugin_prefix, 'Lib', 'site-packages')]
+        else:
+            pythonpath_dirs = [os.path.join(
+                plugin_prefix, 'lib{0}'.format(b),
+                'python{0}.{1}'.format(sys.version_info[0], sys.version_info[1]),
+                'site-packages') for b in ['', '64']]
+        sys.path.extend(pythonpath_dirs)
     try:
         task_func = imports.load_attribute(operation_mapping)
         task_func(ctx=ctx, **operation_inputs)
-        queue.put(_MultiprocessMessage(type='task_succeeded', task_id=task_id))
+        queue.put(_TaskSucceeded(task_id))
     except BaseException as e:
-        queue.put(_MultiprocessMessage(type='task_failed', task_id=task_id,
-                                       exception=jsonpickle.dumps(e)))
+        queue.put(_TaskFailed(task_id, exception=jsonpickle.dumps(e)))

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5c3bee47/aria/orchestrator/workflows/executor/thread.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/thread.py b/aria/orchestrator/workflows/executor/thread.py
index 76ceefd..16a58fb 100644
--- a/aria/orchestrator/workflows/executor/thread.py
+++ b/aria/orchestrator/workflows/executor/thread.py
@@ -26,7 +26,8 @@ from .base import BaseExecutor
 
 class ThreadExecutor(BaseExecutor):
     """
-    Executor which runs tasks in a separate thread
+    Executor which runs tasks in a separate thread. It's easier writing tests
+    using this executor rather than the full blown multiprocessing executor.
     """
 
     def __init__(self, pool_size=1, *args, **kwargs):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5c3bee47/aria/storage/models.py
----------------------------------------------------------------------
diff --git a/aria/storage/models.py b/aria/storage/models.py
index 6302e66..d467ce9 100644
--- a/aria/storage/models.py
+++ b/aria/storage/models.py
@@ -550,6 +550,11 @@ class Task(SQLModelBase):
     name = Column(String)
     operation_mapping = Column(String)
     inputs = Column(Dict)
+    plugin_id = foreign_key(Plugin.id, nullable=True)
+
+    @declared_attr
+    def plugin(cls):
+        return one_to_many_relationship(cls, Plugin, cls.plugin_id)
 
     @declared_attr
     def execution(cls):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5c3bee47/aria/utils/plugin.py
----------------------------------------------------------------------
diff --git a/aria/utils/plugin.py b/aria/utils/plugin.py
new file mode 100644
index 0000000..944ba62
--- /dev/null
+++ b/aria/utils/plugin.py
@@ -0,0 +1,72 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+import tempfile
+import subprocess
+import sys
+from datetime import datetime
+
+import wagon
+
+
+def create(source, destination_dir):
+    return wagon.create(source=source, archive_destination_dir=destination_dir)
+
+
+class PluginManager(object):
+
+    def __init(self, model, plugins_dir):
+        self._model = model
+        self._plugins_dir = plugins_dir
+
+    def install(self, source):
+        metadata = wagon.show(source)
+        cls = self._model.plugin.model_cls
+        plugin = cls(
+            archive_name=metadata['archive_name'],
+            supported_platform=metadata['supported_platform'],
+            supported_py_versions=metadata['supported_python_versions'],
+            distribution=metadata['build_server_os_properties']['distribution'],
+            distribution_release=metadata['build_server_os_properties']['distribution_version'],
+            distribution_version=metadata['build_server_os_properties']['distribution_release'],
+            package_name=metadata['package_name'],
+            package_version=metadata['package_version'],
+            package_source=metadata['package_source'],
+            wheels=metadata['wheels'],
+            uploaded_at=datetime.now()
+        )
+        if len(self._model.plugin.list(filters={'package_name': plugin.package_name,
+                                                'package_version': plugin.package_version})):
+            raise RuntimeError
+
+        self._install_wagon(source=source, prefix=self.get_plugin_prefix(plugin))
+        self._model.plugin.store(plugin)
+        return plugin
+
+    def get_plugin_prefix(self, plugin):
+        return os.path.join(
+            self._plugins_dir,
+            '{0}-{1}'.format(plugin.package_name, plugin.package_version))
+
+    def _install_wagon(source, prefix):
+        with tempfile.NamedTemporaryFile() as constraint:
+            constraint.write(subprocess.check_output([sys.executable, '-m', 'pip', 'freeze']))
+            constraint.flush()
+            wagon.install(
+                source=source,
+                install_args='--prefix="{prefix}" --constraint="{constraint}"'.format(
+                    prefix=prefix,
+                    constraint=constraint.name))

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5c3bee47/requirements.txt
----------------------------------------------------------------------
diff --git a/requirements.txt b/requirements.txt
index 31b0b79..0005a5e 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -24,4 +24,5 @@ Jinja2==2.8
 shortuuid==0.4.3
 CacheControl[filecache]==0.11.6
 clint==0.5.1
-SQLAlchemy==1.1.4
\ No newline at end of file
+SQLAlchemy==1.1.4
+wagon==0.5.0

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5c3bee47/tests/orchestrator/workflows/executor/test_executor.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_executor.py b/tests/orchestrator/workflows/executor/test_executor.py
index 654542c..a76adca 100644
--- a/tests/orchestrator/workflows/executor/test_executor.py
+++ b/tests/orchestrator/workflows/executor/test_executor.py
@@ -33,52 +33,28 @@ from aria.orchestrator import events
 from aria.orchestrator.workflows.executor import (
     thread,
     multiprocess,
-    blocking,
     # celery
 )
 
 
-class TestExecutor(object):
-
-    @pytest.mark.parametrize('executor_cls,executor_kwargs', [
-        (thread.ThreadExecutor, {'pool_size': 1}),
-        (thread.ThreadExecutor, {'pool_size': 2}),
-        (multiprocess.MultiprocessExecutor, {'pool_size': 1}),
-        (multiprocess.MultiprocessExecutor, {'pool_size': 2}),
-        (blocking.CurrentThreadBlockingExecutor, {}),
-        # (celery.CeleryExecutor, {'app': app})
-    ])
-    def test_execute(self, executor_cls, executor_kwargs):
-        self.executor = executor_cls(**executor_kwargs)
-        expected_value = 'value'
-        successful_task = MockTask(mock_successful_task)
-        failing_task = MockTask(mock_failing_task)
-        task_with_inputs = MockTask(mock_task_with_input, inputs={'input': expected_value})
-
-        for task in [successful_task, failing_task, task_with_inputs]:
-            self.executor.execute(task)
-
-        @retrying.retry(stop_max_delay=10000, wait_fixed=100)
-        def assertion():
-            assert successful_task.states == ['start', 'success']
-            assert failing_task.states == ['start', 'failure']
-            assert task_with_inputs.states == ['start', 'failure']
-            assert isinstance(failing_task.exception, MockException)
-            assert isinstance(task_with_inputs.exception, MockException)
-            assert task_with_inputs.exception.message == expected_value
-        assertion()
-
-    def setup_method(self):
-        events.start_task_signal.connect(start_handler)
-        events.on_success_task_signal.connect(success_handler)
-        events.on_failure_task_signal.connect(failure_handler)
-
-    def teardown_method(self):
-        events.start_task_signal.disconnect(start_handler)
-        events.on_success_task_signal.disconnect(success_handler)
-        events.on_failure_task_signal.disconnect(failure_handler)
-        if hasattr(self, 'executor'):
-            self.executor.close()
+def test_execute(executor):
+    expected_value = 'value'
+    successful_task = MockTask(mock_successful_task)
+    failing_task = MockTask(mock_failing_task)
+    task_with_inputs = MockTask(mock_task_with_input, inputs={'input': expected_value})
+
+    for task in [successful_task, failing_task, task_with_inputs]:
+        executor.execute(task)
+
+    @retrying.retry(stop_max_delay=10000, wait_fixed=100)
+    def assertion():
+        assert successful_task.states == ['start', 'success']
+        assert failing_task.states == ['start', 'failure']
+        assert task_with_inputs.states == ['start', 'failure']
+        assert isinstance(failing_task.exception, MockException)
+        assert isinstance(task_with_inputs.exception, MockException)
+        assert task_with_inputs.exception.message == expected_value
+    assertion()
 
 
 def mock_successful_task(**_):
@@ -128,14 +104,35 @@ class MockTask(object):
         yield self
 
 
-def start_handler(task, *args, **kwargs):
-    task.states.append('start')
-
-
-def success_handler(task, *args, **kwargs):
-    task.states.append('success')
-
-
-def failure_handler(task, exception, *args, **kwargs):
-    task.states.append('failure')
-    task.exception = exception
+@pytest.fixture(params=[
+    (thread.ThreadExecutor, {'pool_size': 1}),
+    (thread.ThreadExecutor, {'pool_size': 2}),
+    (multiprocess.MultiprocessExecutor, {'pool_size': 1}),
+    (multiprocess.MultiprocessExecutor, {'pool_size': 2}),
+    # (celery.CeleryExecutor, {'app': app})
+])
+def executor(request):
+    executor_cls, executor_kwargs = request.param
+    result = executor_cls(**executor_kwargs)
+    yield result
+    result.close()
+
+
+@pytest.fixture(autouse=True)
+def register_signals():
+    def start_handler(task, *args, **kwargs):
+        task.states.append('start')
+
+    def success_handler(task, *args, **kwargs):
+        task.states.append('success')
+
+    def failure_handler(task, exception, *args, **kwargs):
+        task.states.append('failure')
+        task.exception = exception
+    events.start_task_signal.connect(start_handler)
+    events.on_success_task_signal.connect(success_handler)
+    events.on_failure_task_signal.connect(failure_handler)
+    yield
+    events.start_task_signal.disconnect(start_handler)
+    events.on_success_task_signal.disconnect(success_handler)
+    events.on_failure_task_signal.disconnect(failure_handler)