You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ariatosca.apache.org by da...@apache.org on 2016/12/11 11:40:56 UTC

[7/7] incubator-ariatosca git commit: ARIA-26 TBD

ARIA-26 TBD


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/4e30e09b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/4e30e09b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/4e30e09b

Branch: refs/heads/ARIA-26-plugin-mechanism
Commit: 4e30e09b76c70f1491664d4d2fedc397d44a42d3
Parents: c6c92ae
Author: Dan Kilman <da...@gigaspaces.com>
Authored: Sun Nov 27 16:31:29 2016 +0200
Committer: Dan Kilman <da...@gigaspaces.com>
Committed: Sun Dec 11 13:40:42 2016 +0200

----------------------------------------------------------------------
 aria/cli/commands.py                            |   4 +-
 .../orchestrator/workflows/executor/__init__.py |   2 +-
 .../orchestrator/workflows/executor/blocking.py |  36 -------
 .../workflows/executor/multiprocess.py          |  61 ++++++-----
 aria/orchestrator/workflows/executor/thread.py  |   3 +-
 aria/utils/plugin.py                            |  21 ++++
 requirements.txt                                |   3 +-
 .../workflows/executor/test_executor.py         | 103 +++++++++----------
 8 files changed, 108 insertions(+), 125 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4e30e09b/aria/cli/commands.py
----------------------------------------------------------------------
diff --git a/aria/cli/commands.py b/aria/cli/commands.py
index 3426bb0..fde1de2 100644
--- a/aria/cli/commands.py
+++ b/aria/cli/commands.py
@@ -33,7 +33,7 @@ from ..logger import LoggerMixin
 from ..storage import (FileSystemModelDriver, FileSystemResourceDriver)
 from ..orchestrator.context.workflow import WorkflowContext
 from ..orchestrator.workflows.core.engine import Engine
-from ..orchestrator.workflows.executor.thread import ThreadExecutor
+from ..orchestrator.workflows.executor.multiprocess import MultiprocessExecutor
 from ..parser import (DSL_SPECIFICATION_PACKAGES, iter_specifications)
 from ..parser.consumption import (
     ConsumptionContext,
@@ -251,7 +251,7 @@ class ExecuteCommand(BaseCommand):
         )
         workflow_function = self._load_workflow_handler(workflow['operation'])
         tasks_graph = workflow_function(workflow_context, **workflow_context.parameters)
-        executor = ThreadExecutor()
+        executor = MultiprocessExecutor()
         workflow_engine = Engine(executor=executor,
                                  workflow_context=workflow_context,
                                  tasks_graph=tasks_graph)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4e30e09b/aria/orchestrator/workflows/executor/__init__.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/__init__.py b/aria/orchestrator/workflows/executor/__init__.py
index 16b6c9b..7b205c1 100644
--- a/aria/orchestrator/workflows/executor/__init__.py
+++ b/aria/orchestrator/workflows/executor/__init__.py
@@ -18,5 +18,5 @@ Executors for task execution
 """
 
 
-from . import blocking, multiprocess, thread
+from . import multiprocess, thread
 from .base import BaseExecutor

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4e30e09b/aria/orchestrator/workflows/executor/blocking.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/blocking.py b/aria/orchestrator/workflows/executor/blocking.py
deleted file mode 100644
index 9d3a9ba..0000000
--- a/aria/orchestrator/workflows/executor/blocking.py
+++ /dev/null
@@ -1,36 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""
-Blocking executor
-"""
-
-from aria.utils import imports
-from .base import BaseExecutor
-
-
-class CurrentThreadBlockingExecutor(BaseExecutor):
-    """
-    Executor which runs tasks in the current thread (blocking)
-    """
-
-    def execute(self, task):
-        self._task_started(task)
-        try:
-            task_func = imports.load_attribute(task.operation_mapping)
-            task_func(ctx=task.context, **task.inputs)
-            self._task_succeeded(task)
-        except BaseException as e:
-            self._task_failed(task, exception=e)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4e30e09b/aria/orchestrator/workflows/executor/multiprocess.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/multiprocess.py b/aria/orchestrator/workflows/executor/multiprocess.py
index d770e07..0537381 100644
--- a/aria/orchestrator/workflows/executor/multiprocess.py
+++ b/aria/orchestrator/workflows/executor/multiprocess.py
@@ -17,6 +17,7 @@
 Multiprocess based executor
 """
 
+import collections
 import multiprocessing
 import threading
 
@@ -26,6 +27,11 @@ from aria.utils import imports
 from .base import BaseExecutor
 
 
+_TaskStarted = collections.namedtuple('_TaskStarted', 'task_id')
+_TaskSucceeded = collections.namedtuple('_TaskSucceeded', 'task_id')
+_TaskFailed = collections.namedtuple('_TaskFailed', 'task_id exception')
+
+
 class MultiprocessExecutor(BaseExecutor):
     """
     Executor which runs tasks in a multiprocess environment
@@ -40,59 +46,52 @@ class MultiprocessExecutor(BaseExecutor):
         self._listener_thread = threading.Thread(target=self._listener)
         self._listener_thread.daemon = True
         self._listener_thread.start()
-        self._pool = multiprocessing.Pool(processes=pool_size)
-
-    def execute(self, task):
-        self._tasks[task.id] = task
-        self._pool.apply_async(_multiprocess_handler, args=(
-            self._queue,
-            task.context,
-            task.id,
-            task.operation_mapping,
-            task.inputs))
+        self._pool = multiprocessing.Pool(processes=pool_size, maxtasksperchild=1)
 
     def close(self):
         self._pool.close()
         self._stopped = True
         self._pool.join()
+        self._manager.shutdown()
+        self._manager.join()
         self._listener_thread.join()
 
+    def execute(self, task):
+        self._tasks[task.id] = task
+        self._pool.apply_async(_handler, kwds={
+            'queue': self._queue,
+            'ctx': task.context,
+            'task_id': task.id,
+            'operation_mapping': task.operation_mapping,
+            'operation_inputs': task.inputs
+        })
+
+    def _remove_task(self, task_id):
+        return self._tasks.pop(task_id)
+
     def _listener(self):
         while not self._stopped:
             try:
                 message = self._queue.get(timeout=1)
-                if message.type == 'task_started':
+                if isinstance(message, _TaskStarted):
                     self._task_started(self._tasks[message.task_id])
-                elif message.type == 'task_succeeded':
+                elif isinstance(message, _TaskSucceeded):
                     self._task_succeeded(self._remove_task(message.task_id))
-                elif message.type == 'task_failed':
+                elif isinstance(message, _TaskFailed):
                     self._task_failed(self._remove_task(message.task_id),
                                       exception=jsonpickle.loads(message.exception))
                 else:
-                    # TODO: something
-                    raise RuntimeError()
+                    raise RuntimeError('Invalid state')
             # Daemon threads
             except BaseException:
                 pass
 
-    def _remove_task(self, task_id):
-        return self._tasks.pop(task_id)
-
-
-class _MultiprocessMessage(object):
-
-    def __init__(self, type, task_id, exception=None):
-        self.type = type
-        self.task_id = task_id
-        self.exception = exception
-
 
-def _multiprocess_handler(queue, ctx, task_id, operation_mapping, operation_inputs):
-    queue.put(_MultiprocessMessage(type='task_started', task_id=task_id))
+def _handler(queue, ctx, task_id, operation_mapping, operation_inputs):
+    queue.put(_TaskStarted(task_id))
     try:
         task_func = imports.load_attribute(operation_mapping)
         task_func(ctx=ctx, **operation_inputs)
-        queue.put(_MultiprocessMessage(type='task_succeeded', task_id=task_id))
+        queue.put(_TaskSucceeded(task_id))
     except BaseException as e:
-        queue.put(_MultiprocessMessage(type='task_failed', task_id=task_id,
-                                       exception=jsonpickle.dumps(e)))
+        queue.put(_TaskFailed(task_id, exception=jsonpickle.dumps(e)))

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4e30e09b/aria/orchestrator/workflows/executor/thread.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/thread.py b/aria/orchestrator/workflows/executor/thread.py
index 76ceefd..16a58fb 100644
--- a/aria/orchestrator/workflows/executor/thread.py
+++ b/aria/orchestrator/workflows/executor/thread.py
@@ -26,7 +26,8 @@ from .base import BaseExecutor
 
 class ThreadExecutor(BaseExecutor):
     """
-    Executor which runs tasks in a separate thread
+    Executor which runs tasks in a separate thread. It's easier writing tests
+    using this executor rather than the full blown multiprocessing executor.
     """
 
     def __init__(self, pool_size=1, *args, **kwargs):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4e30e09b/aria/utils/plugin.py
----------------------------------------------------------------------
diff --git a/aria/utils/plugin.py b/aria/utils/plugin.py
index bb2b974..9868c89 100644
--- a/aria/utils/plugin.py
+++ b/aria/utils/plugin.py
@@ -19,8 +19,13 @@ Contains utility methods that enable dynamic python code loading
 """
 
 import os
+import tempfile
+import subprocess
+import sys
 from importlib import import_module
 
+import wagon
+
 
 def plugin_installer(path, plugin_suffix, package=None, callback=None):
     """
@@ -37,3 +42,19 @@ def plugin_installer(path, plugin_suffix, package=None, callback=None):
         module = import_module(module_name)
         if callback:
             callback(module)
+
+
+def create(source, destination_dir):
+    return wagon.create(source=source, archive_destination_dir=destination_dir)
+
+
+def install(source, prefix):
+    with tempfile.NamedTemporaryFile() as constraint:
+        constraint.write(subprocess.check_output([sys.executable, '-m', 'pip', 'freeze']))
+        constraint.flush()
+        wagon.install(
+            source=source,
+            install_args='--prefix="{prefix}" --constraint="{constraint}"'.format(
+                prefix=prefix,
+                constraint=constraint.name)
+        )

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4e30e09b/requirements.txt
----------------------------------------------------------------------
diff --git a/requirements.txt b/requirements.txt
index 7e87c67..7fb842e 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -23,4 +23,5 @@ Jinja2==2.8
 shortuuid==0.4.3
 CacheControl[filecache]==0.11.6
 clint==0.5.1
-SQLAlchemy==1.1.4
\ No newline at end of file
+SQLAlchemy==1.1.4
+wagon==0.5.0

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4e30e09b/tests/orchestrator/workflows/executor/test_executor.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_executor.py b/tests/orchestrator/workflows/executor/test_executor.py
index a425799..fc81ecc 100644
--- a/tests/orchestrator/workflows/executor/test_executor.py
+++ b/tests/orchestrator/workflows/executor/test_executor.py
@@ -25,7 +25,6 @@ from aria.orchestrator import events
 from aria.orchestrator.workflows.executor import (
     thread,
     multiprocess,
-    blocking,
     # celery
 )
 
@@ -38,47 +37,24 @@ except ImportError:
     app = None
 
 
-class TestExecutor(object):
-
-    @pytest.mark.parametrize('executor_cls,executor_kwargs', [
-        (thread.ThreadExecutor, {'pool_size': 1}),
-        (thread.ThreadExecutor, {'pool_size': 2}),
-        (multiprocess.MultiprocessExecutor, {'pool_size': 1}),
-        (multiprocess.MultiprocessExecutor, {'pool_size': 2}),
-        (blocking.CurrentThreadBlockingExecutor, {}),
-        # (celery.CeleryExecutor, {'app': app})
-    ])
-    def test_execute(self, executor_cls, executor_kwargs):
-        self.executor = executor_cls(**executor_kwargs)
-        expected_value = 'value'
-        successful_task = MockTask(mock_successful_task)
-        failing_task = MockTask(mock_failing_task)
-        task_with_inputs = MockTask(mock_task_with_input, inputs={'input': expected_value})
-
-        for task in [successful_task, failing_task, task_with_inputs]:
-            self.executor.execute(task)
-
-        @retrying.retry(stop_max_delay=10000, wait_fixed=100)
-        def assertion():
-            assert successful_task.states == ['start', 'success']
-            assert failing_task.states == ['start', 'failure']
-            assert task_with_inputs.states == ['start', 'failure']
-            assert isinstance(failing_task.exception, MockException)
-            assert isinstance(task_with_inputs.exception, MockException)
-            assert task_with_inputs.exception.message == expected_value
-        assertion()
-
-    def setup_method(self):
-        events.start_task_signal.connect(start_handler)
-        events.on_success_task_signal.connect(success_handler)
-        events.on_failure_task_signal.connect(failure_handler)
-
-    def teardown_method(self):
-        events.start_task_signal.disconnect(start_handler)
-        events.on_success_task_signal.disconnect(success_handler)
-        events.on_failure_task_signal.disconnect(failure_handler)
-        if hasattr(self, 'executor'):
-            self.executor.close()
+def test_execute(executor):
+    expected_value = 'value'
+    successful_task = MockTask(mock_successful_task)
+    failing_task = MockTask(mock_failing_task)
+    task_with_inputs = MockTask(mock_task_with_input, inputs={'input': expected_value})
+
+    for task in [successful_task, failing_task, task_with_inputs]:
+        executor.execute(task)
+
+    @retrying.retry(stop_max_delay=10000, wait_fixed=100)
+    def assertion():
+        assert successful_task.states == ['start', 'success']
+        assert failing_task.states == ['start', 'failure']
+        assert task_with_inputs.states == ['start', 'failure']
+        assert isinstance(failing_task.exception, MockException)
+        assert isinstance(task_with_inputs.exception, MockException)
+        assert task_with_inputs.exception.message == expected_value
+    assertion()
 
 
 def mock_successful_task(**_):
@@ -128,14 +104,35 @@ class MockTask(object):
         yield self
 
 
-def start_handler(task, *args, **kwargs):
-    task.states.append('start')
-
-
-def success_handler(task, *args, **kwargs):
-    task.states.append('success')
-
-
-def failure_handler(task, exception, *args, **kwargs):
-    task.states.append('failure')
-    task.exception = exception
+@pytest.fixture(params=[
+    (thread.ThreadExecutor, {'pool_size': 1}),
+    (thread.ThreadExecutor, {'pool_size': 2}),
+    (multiprocess.MultiprocessExecutor, {'pool_size': 1}),
+    (multiprocess.MultiprocessExecutor, {'pool_size': 2}),
+    # (celery.CeleryExecutor, {'app': app})
+])
+def executor(request):
+    executor_cls, executor_kwargs = request.param
+    result = executor_cls(**executor_kwargs)
+    yield result
+    result.close()
+
+
+@pytest.fixture(autouse=True)
+def register_signals():
+    def start_handler(task, *args, **kwargs):
+        task.states.append('start')
+
+    def success_handler(task, *args, **kwargs):
+        task.states.append('success')
+
+    def failure_handler(task, exception, *args, **kwargs):
+        task.states.append('failure')
+        task.exception = exception
+    events.start_task_signal.connect(start_handler)
+    events.on_success_task_signal.connect(success_handler)
+    events.on_failure_task_signal.connect(failure_handler)
+    yield
+    events.start_task_signal.disconnect(start_handler)
+    events.on_success_task_signal.disconnect(success_handler)
+    events.on_failure_task_signal.disconnect(failure_handler)


Re: [7/7] incubator-ariatosca git commit: ARIA-26 TBD

Posted by Dan Kilman <da...@gigaspaces.com>.
Yes

On Sun, Dec 11, 2016 at 1:50 PM, Arthur Berezin <ar...@gigaspaces.com>
wrote:

> Dan, is this commit is part of the plugin mechanism for ARIA?
> https://issues.apache.org/jira/browse/ARIATOSCA-26
> The title of the commit got me confused.
>
>
> On Sun, Dec 11, 2016 at 1:40 PM <da...@apache.org> wrote:
>
> ARIA-26 TBD
>
>
> Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
> Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/
> commit/4e30e09b
> Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/
> tree/4e30e09b
> Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/
> diff/4e30e09b
>
> Branch: refs/heads/ARIA-26-plugin-mechanism
> Commit: 4e30e09b76c70f1491664d4d2fedc397d44a42d3
> Parents: c6c92ae
> Author: Dan Kilman <da...@gigaspaces.com>
> Authored: Sun Nov 27 16:31:29 2016 +0200
> Committer: Dan Kilman <da...@gigaspaces.com>
> Committed: Sun Dec 11 13:40:42 2016 +0200
>
> ----------------------------------------------------------------------
>  aria/cli/commands.py                            |   4 +-
>  .../orchestrator/workflows/executor/__init__.py |   2 +-
>  .../orchestrator/workflows/executor/blocking.py |  36 -------
>  .../workflows/executor/multiprocess.py          |  61 ++++++-----
>  aria/orchestrator/workflows/executor/thread.py  |   3 +-
>  aria/utils/plugin.py                            |  21 ++++
>  requirements.txt                                |   3 +-
>  .../workflows/executor/test_executor.py         | 103 +++++++++----------
>  8 files changed, 108 insertions(+), 125 deletions(-)
> ----------------------------------------------------------------------
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/
> blob/4e30e09b/aria/cli/commands.py
> ----------------------------------------------------------------------
> diff --git a/aria/cli/commands.py b/aria/cli/commands.py
> index 3426bb0..fde1de2 100644
> --- a/aria/cli/commands.py
> +++ b/aria/cli/commands.py
> @@ -33,7 +33,7 @@ from ..logger import LoggerMixin
>  from ..storage import (FileSystemModelDriver, FileSystemResourceDriver)
>  from ..orchestrator.context.workflow import WorkflowContext
>  from ..orchestrator.workflows.core.engine import Engine
> -from ..orchestrator.workflows.executor.thread import ThreadExecutor
> +from ..orchestrator.workflows.executor.multiprocess import
> MultiprocessExecutor
>  from ..parser import (DSL_SPECIFICATION_PACKAGES, iter_specifications)
>  from ..parser.consumption import (
>      ConsumptionContext,
> @@ -251,7 +251,7 @@ class ExecuteCommand(BaseCommand):
>          )
>          workflow_function = self._load_workflow_handler(
> workflow['operation'])
>          tasks_graph = workflow_function(workflow_context,
> **workflow_context.parameters)
> -        executor = ThreadExecutor()
> +        executor = MultiprocessExecutor()
>          workflow_engine = Engine(executor=executor,
>                                   workflow_context=workflow_context,
>                                   tasks_graph=tasks_graph)
>
> http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/
> blob/4e30e09b/aria/orchestrator/workflows/executor/__init__.py
> ----------------------------------------------------------------------
> diff --git a/aria/orchestrator/workflows/executor/__init__.py
> b/aria/orchestrator/workflows/executor/__init__.py
> index 16b6c9b..7b205c1 100644
> --- a/aria/orchestrator/workflows/executor/__init__.py
> +++ b/aria/orchestrator/workflows/executor/__init__.py
> @@ -18,5 +18,5 @@ Executors for task execution
>  """
>
>
> -from . import blocking, multiprocess, thread
> +from . import multiprocess, thread
>  from .base import BaseExecutor
>
> http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/
> blob/4e30e09b/aria/orchestrator/workflows/executor/blocking.py
> ----------------------------------------------------------------------
> diff --git a/aria/orchestrator/workflows/executor/blocking.py
> b/aria/orchestrator/workflows/executor/blocking.py
> deleted file mode 100644
> index 9d3a9ba..0000000
> --- a/aria/orchestrator/workflows/executor/blocking.py
> +++ /dev/null
> @@ -1,36 +0,0 @@
> -# Licensed to the Apache Software Foundation (ASF) under one or more
> -# contributor license agreements.  See the NOTICE file distributed with
> -# this work for additional information regarding copyright ownership.
> -# The ASF licenses this file to You under the Apache License, Version 2.0
> -# (the "License"); you may not use this file except in compliance with
> -# the License.  You may obtain a copy of the License at
> -#
> -#     http://www.apache.org/licenses/LICENSE-2.0
> -#
> -# Unless required by applicable law or agreed to in writing, software
> -# distributed under the License is distributed on an "AS IS" BASIS,
> -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> -# See the License for the specific language governing permissions and
> -# limitations under the License.
> -
> -"""
> -Blocking executor
> -"""
> -
> -from aria.utils import imports
> -from .base import BaseExecutor
> -
> -
> -class CurrentThreadBlockingExecutor(BaseExecutor):
> -    """
> -    Executor which runs tasks in the current thread (blocking)
> -    """
> -
> -    def execute(self, task):
> -        self._task_started(task)
> -        try:
> -            task_func = imports.load_attribute(task.operation_mapping)
> -            task_func(ctx=task.context, **task.inputs)
> -            self._task_succeeded(task)
> -        except BaseException as e:
> -            self._task_failed(task, exception=e)
>
> http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/
> blob/4e30e09b/aria/orchestrator/workflows/executor/multiprocess.py
> ----------------------------------------------------------------------
> diff --git a/aria/orchestrator/workflows/executor/multiprocess.py
> b/aria/orchestrator/workflows/executor/multiprocess.py
> index d770e07..0537381 100644
> --- a/aria/orchestrator/workflows/executor/multiprocess.py
> +++ b/aria/orchestrator/workflows/executor/multiprocess.py
> @@ -17,6 +17,7 @@
>  Multiprocess based executor
>  """
>
> +import collections
>  import multiprocessing
>  import threading
>
> @@ -26,6 +27,11 @@ from aria.utils import imports
>  from .base import BaseExecutor
>
>
> +_TaskStarted = collections.namedtuple('_TaskStarted', 'task_id')
> +_TaskSucceeded = collections.namedtuple('_TaskSucceeded', 'task_id')
> +_TaskFailed = collections.namedtuple('_TaskFailed', 'task_id exception')
> +
> +
>  class MultiprocessExecutor(BaseExecutor):
>      """
>      Executor which runs tasks in a multiprocess environment
> @@ -40,59 +46,52 @@ class MultiprocessExecutor(BaseExecutor):
>          self._listener_thread = threading.Thread(target=self._listener)
>          self._listener_thread.daemon = True
>          self._listener_thread.start()
> -        self._pool = multiprocessing.Pool(processes=pool_size)
> -
> -    def execute(self, task):
> -        self._tasks[task.id] = task
> -        self._pool.apply_async(_multiprocess_handler, args=(
> -            self._queue,
> -            task.context,
> -            task.id,
> -            task.operation_mapping,
> -            task.inputs))
> +        self._pool = multiprocessing.Pool(processes=pool_size,
> maxtasksperchild=1)
>
>      def close(self):
>          self._pool.close()
>          self._stopped = True
>          self._pool.join()
> +        self._manager.shutdown()
> +        self._manager.join()
>          self._listener_thread.join()
>
> +    def execute(self, task):
> +        self._tasks[task.id] = task
> +        self._pool.apply_async(_handler, kwds={
> +            'queue': self._queue,
> +            'ctx': task.context,
> +            'task_id': task.id,
> +            'operation_mapping': task.operation_mapping,
> +            'operation_inputs': task.inputs
> +        })
> +
> +    def _remove_task(self, task_id):
> +        return self._tasks.pop(task_id)
> +
>      def _listener(self):
>          while not self._stopped:
>              try:
>                  message = self._queue.get(timeout=1)
> -                if message.type == 'task_started':
> +                if isinstance(message, _TaskStarted):
>                      self._task_started(self._tasks[message.task_id])
> -                elif message.type == 'task_succeeded':
> +                elif isinstance(message, _TaskSucceeded):
>                      self._task_succeeded(self._
> remove_task(message.task_id))
> -                elif message.type == 'task_failed':
> +                elif isinstance(message, _TaskFailed):
>                      self._task_failed(self._remove_task(message.task_id),
>                                        exception=jsonpickle.loads(
> message.exception))
>                  else:
> -                    # TODO: something
> -                    raise RuntimeError()
> +                    raise RuntimeError('Invalid state')
>              # Daemon threads
>              except BaseException:
>                  pass
>
> -    def _remove_task(self, task_id):
> -        return self._tasks.pop(task_id)
> -
> -
> -class _MultiprocessMessage(object):
> -
> -    def __init__(self, type, task_id, exception=None):
> -        self.type = type
> -        self.task_id = task_id
> -        self.exception = exception
> -
>
> -def _multiprocess_handler(queue, ctx, task_id, operation_mapping,
> operation_inputs):
> -    queue.put(_MultiprocessMessage(type='task_started', task_id=task_id))
> +def _handler(queue, ctx, task_id, operation_mapping, operation_inputs):
> +    queue.put(_TaskStarted(task_id))
>      try:
>          task_func = imports.load_attribute(operation_mapping)
>          task_func(ctx=ctx, **operation_inputs)
> -        queue.put(_MultiprocessMessage(type='task_succeeded',
> task_id=task_id))
> +        queue.put(_TaskSucceeded(task_id))
>      except BaseException as e:
> -        queue.put(_MultiprocessMessage(type='task_failed',
> task_id=task_id,
> -                                       exception=jsonpickle.dumps(e)))
> +        queue.put(_TaskFailed(task_id, exception=jsonpickle.dumps(e)))
>
> http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/
> blob/4e30e09b/aria/orchestrator/workflows/executor/thread.py
> ----------------------------------------------------------------------
> diff --git a/aria/orchestrator/workflows/executor/thread.py
> b/aria/orchestrator/workflows/executor/thread.py
> index 76ceefd..16a58fb 100644
> --- a/aria/orchestrator/workflows/executor/thread.py
> +++ b/aria/orchestrator/workflows/executor/thread.py
> @@ -26,7 +26,8 @@ from .base import BaseExecutor
>
>  class ThreadExecutor(BaseExecutor):
>      """
> -    Executor which runs tasks in a separate thread
> +    Executor which runs tasks in a separate thread. It's easier writing
> tests
> +    using this executor rather than the full blown multiprocessing
> executor.
>      """
>
>      def __init__(self, pool_size=1, *args, **kwargs):
>
> http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/
> blob/4e30e09b/aria/utils/plugin.py
> ----------------------------------------------------------------------
> diff --git a/aria/utils/plugin.py b/aria/utils/plugin.py
> index bb2b974..9868c89 100644
> --- a/aria/utils/plugin.py
> +++ b/aria/utils/plugin.py
> @@ -19,8 +19,13 @@ Contains utility methods that enable dynamic python
> code loading
>  """
>
>  import os
> +import tempfile
> +import subprocess
> +import sys
>  from importlib import import_module
>
> +import wagon
> +
>
>  def plugin_installer(path, plugin_suffix, package=None, callback=None):
>      """
> @@ -37,3 +42,19 @@ def plugin_installer(path, plugin_suffix, package=None,
> callback=None):
>          module = import_module(module_name)
>          if callback:
>              callback(module)
> +
> +
> +def create(source, destination_dir):
> +    return wagon.create(source=source, archive_destination_dir=
> destination_dir)
> +
> +
> +def install(source, prefix):
> +    with tempfile.NamedTemporaryFile() as constraint:
> +        constraint.write(subprocess.check_output([sys.executable, '-m',
> 'pip', 'freeze']))
> +        constraint.flush()
> +        wagon.install(
> +            source=source,
> +            install_args='--prefix="{prefix}"
> --constraint="{constraint}"'.format(
> +                prefix=prefix,
> +                constraint=constraint.name)
> +        )
>
> http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/
> blob/4e30e09b/requirements.txt
> ----------------------------------------------------------------------
> diff --git a/requirements.txt b/requirements.txt
> index 7e87c67..7fb842e 100644
> --- a/requirements.txt
> +++ b/requirements.txt
> @@ -23,4 +23,5 @@ Jinja2==2.8
>  shortuuid==0.4.3
>  CacheControl[filecache]==0.11.6
>  clint==0.5.1
> -SQLAlchemy==1.1.4
> \ No newline at end of file
> +SQLAlchemy==1.1.4
> +wagon==0.5.0
>
> http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/
> blob/4e30e09b/tests/orchestrator/workflows/executor/test_executor.py
> ----------------------------------------------------------------------
> diff --git a/tests/orchestrator/workflows/executor/test_executor.py
> b/tests/orchestrator/workflows/executor/test_executor.py
> index a425799..fc81ecc 100644
> --- a/tests/orchestrator/workflows/executor/test_executor.py
> +++ b/tests/orchestrator/workflows/executor/test_executor.py
> @@ -25,7 +25,6 @@ from aria.orchestrator import events
>  from aria.orchestrator.workflows.executor import (
>      thread,
>      multiprocess,
> -    blocking,
>      # celery
>  )
>
> @@ -38,47 +37,24 @@ except ImportError:
>      app = None
>
>
> -class TestExecutor(object):
> -
> -    @pytest.mark.parametrize('executor_cls,executor_kwargs', [
> -        (thread.ThreadExecutor, {'pool_size': 1}),
> -        (thread.ThreadExecutor, {'pool_size': 2}),
> -        (multiprocess.MultiprocessExecutor, {'pool_size': 1}),
> -        (multiprocess.MultiprocessExecutor, {'pool_size': 2}),
> -        (blocking.CurrentThreadBlockingExecutor, {}),
> -        # (celery.CeleryExecutor, {'app': app})
> -    ])
> -    def test_execute(self, executor_cls, executor_kwargs):
> -        self.executor = executor_cls(**executor_kwargs)
> -        expected_value = 'value'
> -        successful_task = MockTask(mock_successful_task)
> -        failing_task = MockTask(mock_failing_task)
> -        task_with_inputs = MockTask(mock_task_with_input,
> inputs={'input': expected_value})
> -
> -        for task in [successful_task, failing_task, task_with_inputs]:
> -            self.executor.execute(task)
> -
> -        @retrying.retry(stop_max_delay=10000, wait_fixed=100)
> -        def assertion():
> -            assert successful_task.states == ['start', 'success']
> -            assert failing_task.states == ['start', 'failure']
> -            assert task_with_inputs.states == ['start', 'failure']
> -            assert isinstance(failing_task.exception, MockException)
> -            assert isinstance(task_with_inputs.exception, MockException)
> -            assert task_with_inputs.exception.message == expected_value
> -        assertion()
> -
> -    def setup_method(self):
> -        events.start_task_signal.connect(start_handler)
> -        events.on_success_task_signal.connect(success_handler)
> -        events.on_failure_task_signal.connect(failure_handler)
> -
> -    def teardown_method(self):
> -        events.start_task_signal.disconnect(start_handler)
> -        events.on_success_task_signal.disconnect(success_handler)
> -        events.on_failure_task_signal.disconnect(failure_handler)
> -        if hasattr(self, 'executor'):
> -            self.executor.close()
> +def test_execute(executor):
> +    expected_value = 'value'
> +    successful_task = MockTask(mock_successful_task)
> +    failing_task = MockTask(mock_failing_task)
> +    task_with_inputs = MockTask(mock_task_with_input, inputs={'input':
> expected_value})
> +
> +    for task in [successful_task, failing_task, task_with_inputs]:
> +        executor.execute(task)
> +
> +    @retrying.retry(stop_max_delay=10000, wait_fixed=100)
> +    def assertion():
> +        assert successful_task.states == ['start', 'success']
> +        assert failing_task.states == ['start', 'failure']
> +        assert task_with_inputs.states == ['start', 'failure']
> +        assert isinstance(failing_task.exception, MockException)
> +        assert isinstance(task_with_inputs.exception, MockException)
> +        assert task_with_inputs.exception.message == expected_value
> +    assertion()
>
>
>  def mock_successful_task(**_):
> @@ -128,14 +104,35 @@ class MockTask(object):
>          yield self
>
>
> -def start_handler(task, *args, **kwargs):
> -    task.states.append('start')
> -
> -
> -def success_handler(task, *args, **kwargs):
> -    task.states.append('success')
> -
> -
> -def failure_handler(task, exception, *args, **kwargs):
> -    task.states.append('failure')
> -    task.exception = exception
> +@pytest.fixture(params=[
> +    (thread.ThreadExecutor, {'pool_size': 1}),
> +    (thread.ThreadExecutor, {'pool_size': 2}),
> +    (multiprocess.MultiprocessExecutor, {'pool_size': 1}),
> +    (multiprocess.MultiprocessExecutor, {'pool_size': 2}),
> +    # (celery.CeleryExecutor, {'app': app})
> +])
> +def executor(request):
> +    executor_cls, executor_kwargs = request.param
> +    result = executor_cls(**executor_kwargs)
> +    yield result
> +    result.close()
> +
> +
> +@pytest.fixture(autouse=True)
> +def register_signals():
> +    def start_handler(task, *args, **kwargs):
> +        task.states.append('start')
> +
> +    def success_handler(task, *args, **kwargs):
> +        task.states.append('success')
> +
> +    def failure_handler(task, exception, *args, **kwargs):
> +        task.states.append('failure')
> +        task.exception = exception
> +    events.start_task_signal.connect(start_handler)
> +    events.on_success_task_signal.connect(success_handler)
> +    events.on_failure_task_signal.connect(failure_handler)
> +    yield
> +    events.start_task_signal.disconnect(start_handler)
> +    events.on_success_task_signal.disconnect(success_handler)
> +    events.on_failure_task_signal.disconnect(failure_handler)
>
>

Re: [7/7] incubator-ariatosca git commit: ARIA-26 TBD

Posted by Arthur Berezin <ar...@gigaspaces.com>.
Dan, is this commit is part of the plugin mechanism for ARIA?
https://issues.apache.org/jira/browse/ARIATOSCA-26
The title of the commit got me confused.


On Sun, Dec 11, 2016 at 1:40 PM <da...@apache.org> wrote:

ARIA-26 TBD


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit:
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/4e30e09b
Tree:
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/4e30e09b
Diff:
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/4e30e09b

Branch: refs/heads/ARIA-26-plugin-mechanism
Commit: 4e30e09b76c70f1491664d4d2fedc397d44a42d3
Parents: c6c92ae
Author: Dan Kilman <da...@gigaspaces.com>
Authored: Sun Nov 27 16:31:29 2016 +0200
Committer: Dan Kilman <da...@gigaspaces.com>
Committed: Sun Dec 11 13:40:42 2016 +0200

----------------------------------------------------------------------
 aria/cli/commands.py                            |   4 +-
 .../orchestrator/workflows/executor/__init__.py |   2 +-
 .../orchestrator/workflows/executor/blocking.py |  36 -------
 .../workflows/executor/multiprocess.py          |  61 ++++++-----
 aria/orchestrator/workflows/executor/thread.py  |   3 +-
 aria/utils/plugin.py                            |  21 ++++
 requirements.txt                                |   3 +-
 .../workflows/executor/test_executor.py         | 103 +++++++++----------
 8 files changed, 108 insertions(+), 125 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4e30e09b/aria/cli/commands.py
----------------------------------------------------------------------
diff --git a/aria/cli/commands.py b/aria/cli/commands.py
index 3426bb0..fde1de2 100644
--- a/aria/cli/commands.py
+++ b/aria/cli/commands.py
@@ -33,7 +33,7 @@ from ..logger import LoggerMixin
 from ..storage import (FileSystemModelDriver, FileSystemResourceDriver)
 from ..orchestrator.context.workflow import WorkflowContext
 from ..orchestrator.workflows.core.engine import Engine
-from ..orchestrator.workflows.executor.thread import ThreadExecutor
+from ..orchestrator.workflows.executor.multiprocess import
MultiprocessExecutor
 from ..parser import (DSL_SPECIFICATION_PACKAGES, iter_specifications)
 from ..parser.consumption import (
     ConsumptionContext,
@@ -251,7 +251,7 @@ class ExecuteCommand(BaseCommand):
         )
         workflow_function =
self._load_workflow_handler(workflow['operation'])
         tasks_graph = workflow_function(workflow_context,
**workflow_context.parameters)
-        executor = ThreadExecutor()
+        executor = MultiprocessExecutor()
         workflow_engine = Engine(executor=executor,
                                  workflow_context=workflow_context,
                                  tasks_graph=tasks_graph)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4e30e09b/aria/orchestrator/workflows/executor/__init__.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/__init__.py
b/aria/orchestrator/workflows/executor/__init__.py
index 16b6c9b..7b205c1 100644
--- a/aria/orchestrator/workflows/executor/__init__.py
+++ b/aria/orchestrator/workflows/executor/__init__.py
@@ -18,5 +18,5 @@ Executors for task execution
 """


-from . import blocking, multiprocess, thread
+from . import multiprocess, thread
 from .base import BaseExecutor

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4e30e09b/aria/orchestrator/workflows/executor/blocking.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/blocking.py
b/aria/orchestrator/workflows/executor/blocking.py
deleted file mode 100644
index 9d3a9ba..0000000
--- a/aria/orchestrator/workflows/executor/blocking.py
+++ /dev/null
@@ -1,36 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""
-Blocking executor
-"""
-
-from aria.utils import imports
-from .base import BaseExecutor
-
-
-class CurrentThreadBlockingExecutor(BaseExecutor):
-    """
-    Executor which runs tasks in the current thread (blocking)
-    """
-
-    def execute(self, task):
-        self._task_started(task)
-        try:
-            task_func = imports.load_attribute(task.operation_mapping)
-            task_func(ctx=task.context, **task.inputs)
-            self._task_succeeded(task)
-        except BaseException as e:
-            self._task_failed(task, exception=e)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4e30e09b/aria/orchestrator/workflows/executor/multiprocess.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/multiprocess.py
b/aria/orchestrator/workflows/executor/multiprocess.py
index d770e07..0537381 100644
--- a/aria/orchestrator/workflows/executor/multiprocess.py
+++ b/aria/orchestrator/workflows/executor/multiprocess.py
@@ -17,6 +17,7 @@
 Multiprocess based executor
 """

+import collections
 import multiprocessing
 import threading

@@ -26,6 +27,11 @@ from aria.utils import imports
 from .base import BaseExecutor


+_TaskStarted = collections.namedtuple('_TaskStarted', 'task_id')
+_TaskSucceeded = collections.namedtuple('_TaskSucceeded', 'task_id')
+_TaskFailed = collections.namedtuple('_TaskFailed', 'task_id exception')
+
+
 class MultiprocessExecutor(BaseExecutor):
     """
     Executor which runs tasks in a multiprocess environment
@@ -40,59 +46,52 @@ class MultiprocessExecutor(BaseExecutor):
         self._listener_thread = threading.Thread(target=self._listener)
         self._listener_thread.daemon = True
         self._listener_thread.start()
-        self._pool = multiprocessing.Pool(processes=pool_size)
-
-    def execute(self, task):
-        self._tasks[task.id] = task
-        self._pool.apply_async(_multiprocess_handler, args=(
-            self._queue,
-            task.context,
-            task.id,
-            task.operation_mapping,
-            task.inputs))
+        self._pool = multiprocessing.Pool(processes=pool_size,
maxtasksperchild=1)

     def close(self):
         self._pool.close()
         self._stopped = True
         self._pool.join()
+        self._manager.shutdown()
+        self._manager.join()
         self._listener_thread.join()

+    def execute(self, task):
+        self._tasks[task.id] = task
+        self._pool.apply_async(_handler, kwds={
+            'queue': self._queue,
+            'ctx': task.context,
+            'task_id': task.id,
+            'operation_mapping': task.operation_mapping,
+            'operation_inputs': task.inputs
+        })
+
+    def _remove_task(self, task_id):
+        return self._tasks.pop(task_id)
+
     def _listener(self):
         while not self._stopped:
             try:
                 message = self._queue.get(timeout=1)
-                if message.type == 'task_started':
+                if isinstance(message, _TaskStarted):
                     self._task_started(self._tasks[message.task_id])
-                elif message.type == 'task_succeeded':
+                elif isinstance(message, _TaskSucceeded):

 self._task_succeeded(self._remove_task(message.task_id))
-                elif message.type == 'task_failed':
+                elif isinstance(message, _TaskFailed):
                     self._task_failed(self._remove_task(message.task_id),

 exception=jsonpickle.loads(message.exception))
                 else:
-                    # TODO: something
-                    raise RuntimeError()
+                    raise RuntimeError('Invalid state')
             # Daemon threads
             except BaseException:
                 pass

-    def _remove_task(self, task_id):
-        return self._tasks.pop(task_id)
-
-
-class _MultiprocessMessage(object):
-
-    def __init__(self, type, task_id, exception=None):
-        self.type = type
-        self.task_id = task_id
-        self.exception = exception
-

-def _multiprocess_handler(queue, ctx, task_id, operation_mapping,
operation_inputs):
-    queue.put(_MultiprocessMessage(type='task_started', task_id=task_id))
+def _handler(queue, ctx, task_id, operation_mapping, operation_inputs):
+    queue.put(_TaskStarted(task_id))
     try:
         task_func = imports.load_attribute(operation_mapping)
         task_func(ctx=ctx, **operation_inputs)
-        queue.put(_MultiprocessMessage(type='task_succeeded',
task_id=task_id))
+        queue.put(_TaskSucceeded(task_id))
     except BaseException as e:
-        queue.put(_MultiprocessMessage(type='task_failed', task_id=task_id,
-                                       exception=jsonpickle.dumps(e)))
+        queue.put(_TaskFailed(task_id, exception=jsonpickle.dumps(e)))

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4e30e09b/aria/orchestrator/workflows/executor/thread.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/thread.py
b/aria/orchestrator/workflows/executor/thread.py
index 76ceefd..16a58fb 100644
--- a/aria/orchestrator/workflows/executor/thread.py
+++ b/aria/orchestrator/workflows/executor/thread.py
@@ -26,7 +26,8 @@ from .base import BaseExecutor

 class ThreadExecutor(BaseExecutor):
     """
-    Executor which runs tasks in a separate thread
+    Executor which runs tasks in a separate thread. It's easier writing
tests
+    using this executor rather than the full blown multiprocessing
executor.
     """

     def __init__(self, pool_size=1, *args, **kwargs):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4e30e09b/aria/utils/plugin.py
----------------------------------------------------------------------
diff --git a/aria/utils/plugin.py b/aria/utils/plugin.py
index bb2b974..9868c89 100644
--- a/aria/utils/plugin.py
+++ b/aria/utils/plugin.py
@@ -19,8 +19,13 @@ Contains utility methods that enable dynamic python code
loading
 """

 import os
+import tempfile
+import subprocess
+import sys
 from importlib import import_module

+import wagon
+

 def plugin_installer(path, plugin_suffix, package=None, callback=None):
     """
@@ -37,3 +42,19 @@ def plugin_installer(path, plugin_suffix, package=None,
callback=None):
         module = import_module(module_name)
         if callback:
             callback(module)
+
+
+def create(source, destination_dir):
+    return wagon.create(source=source,
archive_destination_dir=destination_dir)
+
+
+def install(source, prefix):
+    with tempfile.NamedTemporaryFile() as constraint:
+        constraint.write(subprocess.check_output([sys.executable, '-m',
'pip', 'freeze']))
+        constraint.flush()
+        wagon.install(
+            source=source,
+            install_args='--prefix="{prefix}"
--constraint="{constraint}"'.format(
+                prefix=prefix,
+                constraint=constraint.name)
+        )

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4e30e09b/requirements.txt
----------------------------------------------------------------------
diff --git a/requirements.txt b/requirements.txt
index 7e87c67..7fb842e 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -23,4 +23,5 @@ Jinja2==2.8
 shortuuid==0.4.3
 CacheControl[filecache]==0.11.6
 clint==0.5.1
-SQLAlchemy==1.1.4
\ No newline at end of file
+SQLAlchemy==1.1.4
+wagon==0.5.0

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4e30e09b/tests/orchestrator/workflows/executor/test_executor.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_executor.py
b/tests/orchestrator/workflows/executor/test_executor.py
index a425799..fc81ecc 100644
--- a/tests/orchestrator/workflows/executor/test_executor.py
+++ b/tests/orchestrator/workflows/executor/test_executor.py
@@ -25,7 +25,6 @@ from aria.orchestrator import events
 from aria.orchestrator.workflows.executor import (
     thread,
     multiprocess,
-    blocking,
     # celery
 )

@@ -38,47 +37,24 @@ except ImportError:
     app = None


-class TestExecutor(object):
-
-    @pytest.mark.parametrize('executor_cls,executor_kwargs', [
-        (thread.ThreadExecutor, {'pool_size': 1}),
-        (thread.ThreadExecutor, {'pool_size': 2}),
-        (multiprocess.MultiprocessExecutor, {'pool_size': 1}),
-        (multiprocess.MultiprocessExecutor, {'pool_size': 2}),
-        (blocking.CurrentThreadBlockingExecutor, {}),
-        # (celery.CeleryExecutor, {'app': app})
-    ])
-    def test_execute(self, executor_cls, executor_kwargs):
-        self.executor = executor_cls(**executor_kwargs)
-        expected_value = 'value'
-        successful_task = MockTask(mock_successful_task)
-        failing_task = MockTask(mock_failing_task)
-        task_with_inputs = MockTask(mock_task_with_input, inputs={'input':
expected_value})
-
-        for task in [successful_task, failing_task, task_with_inputs]:
-            self.executor.execute(task)
-
-        @retrying.retry(stop_max_delay=10000, wait_fixed=100)
-        def assertion():
-            assert successful_task.states == ['start', 'success']
-            assert failing_task.states == ['start', 'failure']
-            assert task_with_inputs.states == ['start', 'failure']
-            assert isinstance(failing_task.exception, MockException)
-            assert isinstance(task_with_inputs.exception, MockException)
-            assert task_with_inputs.exception.message == expected_value
-        assertion()
-
-    def setup_method(self):
-        events.start_task_signal.connect(start_handler)
-        events.on_success_task_signal.connect(success_handler)
-        events.on_failure_task_signal.connect(failure_handler)
-
-    def teardown_method(self):
-        events.start_task_signal.disconnect(start_handler)
-        events.on_success_task_signal.disconnect(success_handler)
-        events.on_failure_task_signal.disconnect(failure_handler)
-        if hasattr(self, 'executor'):
-            self.executor.close()
+def test_execute(executor):
+    expected_value = 'value'
+    successful_task = MockTask(mock_successful_task)
+    failing_task = MockTask(mock_failing_task)
+    task_with_inputs = MockTask(mock_task_with_input, inputs={'input':
expected_value})
+
+    for task in [successful_task, failing_task, task_with_inputs]:
+        executor.execute(task)
+
+    @retrying.retry(stop_max_delay=10000, wait_fixed=100)
+    def assertion():
+        assert successful_task.states == ['start', 'success']
+        assert failing_task.states == ['start', 'failure']
+        assert task_with_inputs.states == ['start', 'failure']
+        assert isinstance(failing_task.exception, MockException)
+        assert isinstance(task_with_inputs.exception, MockException)
+        assert task_with_inputs.exception.message == expected_value
+    assertion()


 def mock_successful_task(**_):
@@ -128,14 +104,35 @@ class MockTask(object):
         yield self


-def start_handler(task, *args, **kwargs):
-    task.states.append('start')
-
-
-def success_handler(task, *args, **kwargs):
-    task.states.append('success')
-
-
-def failure_handler(task, exception, *args, **kwargs):
-    task.states.append('failure')
-    task.exception = exception
+@pytest.fixture(params=[
+    (thread.ThreadExecutor, {'pool_size': 1}),
+    (thread.ThreadExecutor, {'pool_size': 2}),
+    (multiprocess.MultiprocessExecutor, {'pool_size': 1}),
+    (multiprocess.MultiprocessExecutor, {'pool_size': 2}),
+    # (celery.CeleryExecutor, {'app': app})
+])
+def executor(request):
+    executor_cls, executor_kwargs = request.param
+    result = executor_cls(**executor_kwargs)
+    yield result
+    result.close()
+
+
+@pytest.fixture(autouse=True)
+def register_signals():
+    def start_handler(task, *args, **kwargs):
+        task.states.append('start')
+
+    def success_handler(task, *args, **kwargs):
+        task.states.append('success')
+
+    def failure_handler(task, exception, *args, **kwargs):
+        task.states.append('failure')
+        task.exception = exception
+    events.start_task_signal.connect(start_handler)
+    events.on_success_task_signal.connect(success_handler)
+    events.on_failure_task_signal.connect(failure_handler)
+    yield
+    events.start_task_signal.disconnect(start_handler)
+    events.on_success_task_signal.disconnect(success_handler)
+    events.on_failure_task_signal.disconnect(failure_handler)