You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ariatosca.apache.org by da...@apache.org on 2016/12/11 11:40:56 UTC
[7/7] incubator-ariatosca git commit: ARIA-26 TBD
ARIA-26 TBD
Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/4e30e09b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/4e30e09b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/4e30e09b
Branch: refs/heads/ARIA-26-plugin-mechanism
Commit: 4e30e09b76c70f1491664d4d2fedc397d44a42d3
Parents: c6c92ae
Author: Dan Kilman <da...@gigaspaces.com>
Authored: Sun Nov 27 16:31:29 2016 +0200
Committer: Dan Kilman <da...@gigaspaces.com>
Committed: Sun Dec 11 13:40:42 2016 +0200
----------------------------------------------------------------------
aria/cli/commands.py | 4 +-
.../orchestrator/workflows/executor/__init__.py | 2 +-
.../orchestrator/workflows/executor/blocking.py | 36 -------
.../workflows/executor/multiprocess.py | 61 ++++++-----
aria/orchestrator/workflows/executor/thread.py | 3 +-
aria/utils/plugin.py | 21 ++++
requirements.txt | 3 +-
.../workflows/executor/test_executor.py | 103 +++++++++----------
8 files changed, 108 insertions(+), 125 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4e30e09b/aria/cli/commands.py
----------------------------------------------------------------------
diff --git a/aria/cli/commands.py b/aria/cli/commands.py
index 3426bb0..fde1de2 100644
--- a/aria/cli/commands.py
+++ b/aria/cli/commands.py
@@ -33,7 +33,7 @@ from ..logger import LoggerMixin
from ..storage import (FileSystemModelDriver, FileSystemResourceDriver)
from ..orchestrator.context.workflow import WorkflowContext
from ..orchestrator.workflows.core.engine import Engine
-from ..orchestrator.workflows.executor.thread import ThreadExecutor
+from ..orchestrator.workflows.executor.multiprocess import MultiprocessExecutor
from ..parser import (DSL_SPECIFICATION_PACKAGES, iter_specifications)
from ..parser.consumption import (
ConsumptionContext,
@@ -251,7 +251,7 @@ class ExecuteCommand(BaseCommand):
)
workflow_function = self._load_workflow_handler(workflow['operation'])
tasks_graph = workflow_function(workflow_context, **workflow_context.parameters)
- executor = ThreadExecutor()
+ executor = MultiprocessExecutor()
workflow_engine = Engine(executor=executor,
workflow_context=workflow_context,
tasks_graph=tasks_graph)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4e30e09b/aria/orchestrator/workflows/executor/__init__.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/__init__.py b/aria/orchestrator/workflows/executor/__init__.py
index 16b6c9b..7b205c1 100644
--- a/aria/orchestrator/workflows/executor/__init__.py
+++ b/aria/orchestrator/workflows/executor/__init__.py
@@ -18,5 +18,5 @@ Executors for task execution
"""
-from . import blocking, multiprocess, thread
+from . import multiprocess, thread
from .base import BaseExecutor
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4e30e09b/aria/orchestrator/workflows/executor/blocking.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/blocking.py b/aria/orchestrator/workflows/executor/blocking.py
deleted file mode 100644
index 9d3a9ba..0000000
--- a/aria/orchestrator/workflows/executor/blocking.py
+++ /dev/null
@@ -1,36 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""
-Blocking executor
-"""
-
-from aria.utils import imports
-from .base import BaseExecutor
-
-
-class CurrentThreadBlockingExecutor(BaseExecutor):
- """
- Executor which runs tasks in the current thread (blocking)
- """
-
- def execute(self, task):
- self._task_started(task)
- try:
- task_func = imports.load_attribute(task.operation_mapping)
- task_func(ctx=task.context, **task.inputs)
- self._task_succeeded(task)
- except BaseException as e:
- self._task_failed(task, exception=e)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4e30e09b/aria/orchestrator/workflows/executor/multiprocess.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/multiprocess.py b/aria/orchestrator/workflows/executor/multiprocess.py
index d770e07..0537381 100644
--- a/aria/orchestrator/workflows/executor/multiprocess.py
+++ b/aria/orchestrator/workflows/executor/multiprocess.py
@@ -17,6 +17,7 @@
Multiprocess based executor
"""
+import collections
import multiprocessing
import threading
@@ -26,6 +27,11 @@ from aria.utils import imports
from .base import BaseExecutor
+_TaskStarted = collections.namedtuple('_TaskStarted', 'task_id')
+_TaskSucceeded = collections.namedtuple('_TaskSucceeded', 'task_id')
+_TaskFailed = collections.namedtuple('_TaskFailed', 'task_id exception')
+
+
class MultiprocessExecutor(BaseExecutor):
"""
Executor which runs tasks in a multiprocess environment
@@ -40,59 +46,52 @@ class MultiprocessExecutor(BaseExecutor):
self._listener_thread = threading.Thread(target=self._listener)
self._listener_thread.daemon = True
self._listener_thread.start()
- self._pool = multiprocessing.Pool(processes=pool_size)
-
- def execute(self, task):
- self._tasks[task.id] = task
- self._pool.apply_async(_multiprocess_handler, args=(
- self._queue,
- task.context,
- task.id,
- task.operation_mapping,
- task.inputs))
+ self._pool = multiprocessing.Pool(processes=pool_size, maxtasksperchild=1)
def close(self):
self._pool.close()
self._stopped = True
self._pool.join()
+ self._manager.shutdown()
+ self._manager.join()
self._listener_thread.join()
+ def execute(self, task):
+ self._tasks[task.id] = task
+ self._pool.apply_async(_handler, kwds={
+ 'queue': self._queue,
+ 'ctx': task.context,
+ 'task_id': task.id,
+ 'operation_mapping': task.operation_mapping,
+ 'operation_inputs': task.inputs
+ })
+
+ def _remove_task(self, task_id):
+ return self._tasks.pop(task_id)
+
def _listener(self):
while not self._stopped:
try:
message = self._queue.get(timeout=1)
- if message.type == 'task_started':
+ if isinstance(message, _TaskStarted):
self._task_started(self._tasks[message.task_id])
- elif message.type == 'task_succeeded':
+ elif isinstance(message, _TaskSucceeded):
self._task_succeeded(self._remove_task(message.task_id))
- elif message.type == 'task_failed':
+ elif isinstance(message, _TaskFailed):
self._task_failed(self._remove_task(message.task_id),
exception=jsonpickle.loads(message.exception))
else:
- # TODO: something
- raise RuntimeError()
+ raise RuntimeError('Invalid state')
# Daemon threads
except BaseException:
pass
- def _remove_task(self, task_id):
- return self._tasks.pop(task_id)
-
-
-class _MultiprocessMessage(object):
-
- def __init__(self, type, task_id, exception=None):
- self.type = type
- self.task_id = task_id
- self.exception = exception
-
-def _multiprocess_handler(queue, ctx, task_id, operation_mapping, operation_inputs):
- queue.put(_MultiprocessMessage(type='task_started', task_id=task_id))
+def _handler(queue, ctx, task_id, operation_mapping, operation_inputs):
+ queue.put(_TaskStarted(task_id))
try:
task_func = imports.load_attribute(operation_mapping)
task_func(ctx=ctx, **operation_inputs)
- queue.put(_MultiprocessMessage(type='task_succeeded', task_id=task_id))
+ queue.put(_TaskSucceeded(task_id))
except BaseException as e:
- queue.put(_MultiprocessMessage(type='task_failed', task_id=task_id,
- exception=jsonpickle.dumps(e)))
+ queue.put(_TaskFailed(task_id, exception=jsonpickle.dumps(e)))
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4e30e09b/aria/orchestrator/workflows/executor/thread.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/thread.py b/aria/orchestrator/workflows/executor/thread.py
index 76ceefd..16a58fb 100644
--- a/aria/orchestrator/workflows/executor/thread.py
+++ b/aria/orchestrator/workflows/executor/thread.py
@@ -26,7 +26,8 @@ from .base import BaseExecutor
class ThreadExecutor(BaseExecutor):
"""
- Executor which runs tasks in a separate thread
+ Executor which runs tasks in a separate thread. It's easier writing tests
+ using this executor rather than the full blown multiprocessing executor.
"""
def __init__(self, pool_size=1, *args, **kwargs):
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4e30e09b/aria/utils/plugin.py
----------------------------------------------------------------------
diff --git a/aria/utils/plugin.py b/aria/utils/plugin.py
index bb2b974..9868c89 100644
--- a/aria/utils/plugin.py
+++ b/aria/utils/plugin.py
@@ -19,8 +19,13 @@ Contains utility methods that enable dynamic python code loading
"""
import os
+import tempfile
+import subprocess
+import sys
from importlib import import_module
+import wagon
+
def plugin_installer(path, plugin_suffix, package=None, callback=None):
"""
@@ -37,3 +42,19 @@ def plugin_installer(path, plugin_suffix, package=None, callback=None):
module = import_module(module_name)
if callback:
callback(module)
+
+
+def create(source, destination_dir):
+ return wagon.create(source=source, archive_destination_dir=destination_dir)
+
+
+def install(source, prefix):
+ with tempfile.NamedTemporaryFile() as constraint:
+ constraint.write(subprocess.check_output([sys.executable, '-m', 'pip', 'freeze']))
+ constraint.flush()
+ wagon.install(
+ source=source,
+ install_args='--prefix="{prefix}" --constraint="{constraint}"'.format(
+ prefix=prefix,
+ constraint=constraint.name)
+ )
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4e30e09b/requirements.txt
----------------------------------------------------------------------
diff --git a/requirements.txt b/requirements.txt
index 7e87c67..7fb842e 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -23,4 +23,5 @@ Jinja2==2.8
shortuuid==0.4.3
CacheControl[filecache]==0.11.6
clint==0.5.1
-SQLAlchemy==1.1.4
\ No newline at end of file
+SQLAlchemy==1.1.4
+wagon==0.5.0
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4e30e09b/tests/orchestrator/workflows/executor/test_executor.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_executor.py b/tests/orchestrator/workflows/executor/test_executor.py
index a425799..fc81ecc 100644
--- a/tests/orchestrator/workflows/executor/test_executor.py
+++ b/tests/orchestrator/workflows/executor/test_executor.py
@@ -25,7 +25,6 @@ from aria.orchestrator import events
from aria.orchestrator.workflows.executor import (
thread,
multiprocess,
- blocking,
# celery
)
@@ -38,47 +37,24 @@ except ImportError:
app = None
-class TestExecutor(object):
-
- @pytest.mark.parametrize('executor_cls,executor_kwargs', [
- (thread.ThreadExecutor, {'pool_size': 1}),
- (thread.ThreadExecutor, {'pool_size': 2}),
- (multiprocess.MultiprocessExecutor, {'pool_size': 1}),
- (multiprocess.MultiprocessExecutor, {'pool_size': 2}),
- (blocking.CurrentThreadBlockingExecutor, {}),
- # (celery.CeleryExecutor, {'app': app})
- ])
- def test_execute(self, executor_cls, executor_kwargs):
- self.executor = executor_cls(**executor_kwargs)
- expected_value = 'value'
- successful_task = MockTask(mock_successful_task)
- failing_task = MockTask(mock_failing_task)
- task_with_inputs = MockTask(mock_task_with_input, inputs={'input': expected_value})
-
- for task in [successful_task, failing_task, task_with_inputs]:
- self.executor.execute(task)
-
- @retrying.retry(stop_max_delay=10000, wait_fixed=100)
- def assertion():
- assert successful_task.states == ['start', 'success']
- assert failing_task.states == ['start', 'failure']
- assert task_with_inputs.states == ['start', 'failure']
- assert isinstance(failing_task.exception, MockException)
- assert isinstance(task_with_inputs.exception, MockException)
- assert task_with_inputs.exception.message == expected_value
- assertion()
-
- def setup_method(self):
- events.start_task_signal.connect(start_handler)
- events.on_success_task_signal.connect(success_handler)
- events.on_failure_task_signal.connect(failure_handler)
-
- def teardown_method(self):
- events.start_task_signal.disconnect(start_handler)
- events.on_success_task_signal.disconnect(success_handler)
- events.on_failure_task_signal.disconnect(failure_handler)
- if hasattr(self, 'executor'):
- self.executor.close()
+def test_execute(executor):
+ expected_value = 'value'
+ successful_task = MockTask(mock_successful_task)
+ failing_task = MockTask(mock_failing_task)
+ task_with_inputs = MockTask(mock_task_with_input, inputs={'input': expected_value})
+
+ for task in [successful_task, failing_task, task_with_inputs]:
+ executor.execute(task)
+
+ @retrying.retry(stop_max_delay=10000, wait_fixed=100)
+ def assertion():
+ assert successful_task.states == ['start', 'success']
+ assert failing_task.states == ['start', 'failure']
+ assert task_with_inputs.states == ['start', 'failure']
+ assert isinstance(failing_task.exception, MockException)
+ assert isinstance(task_with_inputs.exception, MockException)
+ assert task_with_inputs.exception.message == expected_value
+ assertion()
def mock_successful_task(**_):
@@ -128,14 +104,35 @@ class MockTask(object):
yield self
-def start_handler(task, *args, **kwargs):
- task.states.append('start')
-
-
-def success_handler(task, *args, **kwargs):
- task.states.append('success')
-
-
-def failure_handler(task, exception, *args, **kwargs):
- task.states.append('failure')
- task.exception = exception
+@pytest.fixture(params=[
+ (thread.ThreadExecutor, {'pool_size': 1}),
+ (thread.ThreadExecutor, {'pool_size': 2}),
+ (multiprocess.MultiprocessExecutor, {'pool_size': 1}),
+ (multiprocess.MultiprocessExecutor, {'pool_size': 2}),
+ # (celery.CeleryExecutor, {'app': app})
+])
+def executor(request):
+ executor_cls, executor_kwargs = request.param
+ result = executor_cls(**executor_kwargs)
+ yield result
+ result.close()
+
+
+@pytest.fixture(autouse=True)
+def register_signals():
+ def start_handler(task, *args, **kwargs):
+ task.states.append('start')
+
+ def success_handler(task, *args, **kwargs):
+ task.states.append('success')
+
+ def failure_handler(task, exception, *args, **kwargs):
+ task.states.append('failure')
+ task.exception = exception
+ events.start_task_signal.connect(start_handler)
+ events.on_success_task_signal.connect(success_handler)
+ events.on_failure_task_signal.connect(failure_handler)
+ yield
+ events.start_task_signal.disconnect(start_handler)
+ events.on_success_task_signal.disconnect(success_handler)
+ events.on_failure_task_signal.disconnect(failure_handler)
Re: [7/7] incubator-ariatosca git commit: ARIA-26 TBD
Posted by Dan Kilman <da...@gigaspaces.com>.
Yes
On Sun, Dec 11, 2016 at 1:50 PM, Arthur Berezin <ar...@gigaspaces.com>
wrote:
> Dan, is this commit is part of the plugin mechanism for ARIA?
> https://issues.apache.org/jira/browse/ARIATOSCA-26
> The title of the commit got me confused.
>
>
> On Sun, Dec 11, 2016 at 1:40 PM <da...@apache.org> wrote:
>
> ARIA-26 TBD
>
>
> Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
> Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/
> commit/4e30e09b
> Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/
> tree/4e30e09b
> Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/
> diff/4e30e09b
>
> Branch: refs/heads/ARIA-26-plugin-mechanism
> Commit: 4e30e09b76c70f1491664d4d2fedc397d44a42d3
> Parents: c6c92ae
> Author: Dan Kilman <da...@gigaspaces.com>
> Authored: Sun Nov 27 16:31:29 2016 +0200
> Committer: Dan Kilman <da...@gigaspaces.com>
> Committed: Sun Dec 11 13:40:42 2016 +0200
>
> ----------------------------------------------------------------------
> aria/cli/commands.py | 4 +-
> .../orchestrator/workflows/executor/__init__.py | 2 +-
> .../orchestrator/workflows/executor/blocking.py | 36 -------
> .../workflows/executor/multiprocess.py | 61 ++++++-----
> aria/orchestrator/workflows/executor/thread.py | 3 +-
> aria/utils/plugin.py | 21 ++++
> requirements.txt | 3 +-
> .../workflows/executor/test_executor.py | 103 +++++++++----------
> 8 files changed, 108 insertions(+), 125 deletions(-)
> ----------------------------------------------------------------------
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/
> blob/4e30e09b/aria/cli/commands.py
> ----------------------------------------------------------------------
> diff --git a/aria/cli/commands.py b/aria/cli/commands.py
> index 3426bb0..fde1de2 100644
> --- a/aria/cli/commands.py
> +++ b/aria/cli/commands.py
> @@ -33,7 +33,7 @@ from ..logger import LoggerMixin
> from ..storage import (FileSystemModelDriver, FileSystemResourceDriver)
> from ..orchestrator.context.workflow import WorkflowContext
> from ..orchestrator.workflows.core.engine import Engine
> -from ..orchestrator.workflows.executor.thread import ThreadExecutor
> +from ..orchestrator.workflows.executor.multiprocess import
> MultiprocessExecutor
> from ..parser import (DSL_SPECIFICATION_PACKAGES, iter_specifications)
> from ..parser.consumption import (
> ConsumptionContext,
> @@ -251,7 +251,7 @@ class ExecuteCommand(BaseCommand):
> )
> workflow_function = self._load_workflow_handler(
> workflow['operation'])
> tasks_graph = workflow_function(workflow_context,
> **workflow_context.parameters)
> - executor = ThreadExecutor()
> + executor = MultiprocessExecutor()
> workflow_engine = Engine(executor=executor,
> workflow_context=workflow_context,
> tasks_graph=tasks_graph)
>
> http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/
> blob/4e30e09b/aria/orchestrator/workflows/executor/__init__.py
> ----------------------------------------------------------------------
> diff --git a/aria/orchestrator/workflows/executor/__init__.py
> b/aria/orchestrator/workflows/executor/__init__.py
> index 16b6c9b..7b205c1 100644
> --- a/aria/orchestrator/workflows/executor/__init__.py
> +++ b/aria/orchestrator/workflows/executor/__init__.py
> @@ -18,5 +18,5 @@ Executors for task execution
> """
>
>
> -from . import blocking, multiprocess, thread
> +from . import multiprocess, thread
> from .base import BaseExecutor
>
> http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/
> blob/4e30e09b/aria/orchestrator/workflows/executor/blocking.py
> ----------------------------------------------------------------------
> diff --git a/aria/orchestrator/workflows/executor/blocking.py
> b/aria/orchestrator/workflows/executor/blocking.py
> deleted file mode 100644
> index 9d3a9ba..0000000
> --- a/aria/orchestrator/workflows/executor/blocking.py
> +++ /dev/null
> @@ -1,36 +0,0 @@
> -# Licensed to the Apache Software Foundation (ASF) under one or more
> -# contributor license agreements. See the NOTICE file distributed with
> -# this work for additional information regarding copyright ownership.
> -# The ASF licenses this file to You under the Apache License, Version 2.0
> -# (the "License"); you may not use this file except in compliance with
> -# the License. You may obtain a copy of the License at
> -#
> -# http://www.apache.org/licenses/LICENSE-2.0
> -#
> -# Unless required by applicable law or agreed to in writing, software
> -# distributed under the License is distributed on an "AS IS" BASIS,
> -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> -# See the License for the specific language governing permissions and
> -# limitations under the License.
> -
> -"""
> -Blocking executor
> -"""
> -
> -from aria.utils import imports
> -from .base import BaseExecutor
> -
> -
> -class CurrentThreadBlockingExecutor(BaseExecutor):
> - """
> - Executor which runs tasks in the current thread (blocking)
> - """
> -
> - def execute(self, task):
> - self._task_started(task)
> - try:
> - task_func = imports.load_attribute(task.operation_mapping)
> - task_func(ctx=task.context, **task.inputs)
> - self._task_succeeded(task)
> - except BaseException as e:
> - self._task_failed(task, exception=e)
>
> http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/
> blob/4e30e09b/aria/orchestrator/workflows/executor/multiprocess.py
> ----------------------------------------------------------------------
> diff --git a/aria/orchestrator/workflows/executor/multiprocess.py
> b/aria/orchestrator/workflows/executor/multiprocess.py
> index d770e07..0537381 100644
> --- a/aria/orchestrator/workflows/executor/multiprocess.py
> +++ b/aria/orchestrator/workflows/executor/multiprocess.py
> @@ -17,6 +17,7 @@
> Multiprocess based executor
> """
>
> +import collections
> import multiprocessing
> import threading
>
> @@ -26,6 +27,11 @@ from aria.utils import imports
> from .base import BaseExecutor
>
>
> +_TaskStarted = collections.namedtuple('_TaskStarted', 'task_id')
> +_TaskSucceeded = collections.namedtuple('_TaskSucceeded', 'task_id')
> +_TaskFailed = collections.namedtuple('_TaskFailed', 'task_id exception')
> +
> +
> class MultiprocessExecutor(BaseExecutor):
> """
> Executor which runs tasks in a multiprocess environment
> @@ -40,59 +46,52 @@ class MultiprocessExecutor(BaseExecutor):
> self._listener_thread = threading.Thread(target=self._listener)
> self._listener_thread.daemon = True
> self._listener_thread.start()
> - self._pool = multiprocessing.Pool(processes=pool_size)
> -
> - def execute(self, task):
> - self._tasks[task.id] = task
> - self._pool.apply_async(_multiprocess_handler, args=(
> - self._queue,
> - task.context,
> - task.id,
> - task.operation_mapping,
> - task.inputs))
> + self._pool = multiprocessing.Pool(processes=pool_size,
> maxtasksperchild=1)
>
> def close(self):
> self._pool.close()
> self._stopped = True
> self._pool.join()
> + self._manager.shutdown()
> + self._manager.join()
> self._listener_thread.join()
>
> + def execute(self, task):
> + self._tasks[task.id] = task
> + self._pool.apply_async(_handler, kwds={
> + 'queue': self._queue,
> + 'ctx': task.context,
> + 'task_id': task.id,
> + 'operation_mapping': task.operation_mapping,
> + 'operation_inputs': task.inputs
> + })
> +
> + def _remove_task(self, task_id):
> + return self._tasks.pop(task_id)
> +
> def _listener(self):
> while not self._stopped:
> try:
> message = self._queue.get(timeout=1)
> - if message.type == 'task_started':
> + if isinstance(message, _TaskStarted):
> self._task_started(self._tasks[message.task_id])
> - elif message.type == 'task_succeeded':
> + elif isinstance(message, _TaskSucceeded):
> self._task_succeeded(self._
> remove_task(message.task_id))
> - elif message.type == 'task_failed':
> + elif isinstance(message, _TaskFailed):
> self._task_failed(self._remove_task(message.task_id),
> exception=jsonpickle.loads(
> message.exception))
> else:
> - # TODO: something
> - raise RuntimeError()
> + raise RuntimeError('Invalid state')
> # Daemon threads
> except BaseException:
> pass
>
> - def _remove_task(self, task_id):
> - return self._tasks.pop(task_id)
> -
> -
> -class _MultiprocessMessage(object):
> -
> - def __init__(self, type, task_id, exception=None):
> - self.type = type
> - self.task_id = task_id
> - self.exception = exception
> -
>
> -def _multiprocess_handler(queue, ctx, task_id, operation_mapping,
> operation_inputs):
> - queue.put(_MultiprocessMessage(type='task_started', task_id=task_id))
> +def _handler(queue, ctx, task_id, operation_mapping, operation_inputs):
> + queue.put(_TaskStarted(task_id))
> try:
> task_func = imports.load_attribute(operation_mapping)
> task_func(ctx=ctx, **operation_inputs)
> - queue.put(_MultiprocessMessage(type='task_succeeded',
> task_id=task_id))
> + queue.put(_TaskSucceeded(task_id))
> except BaseException as e:
> - queue.put(_MultiprocessMessage(type='task_failed',
> task_id=task_id,
> - exception=jsonpickle.dumps(e)))
> + queue.put(_TaskFailed(task_id, exception=jsonpickle.dumps(e)))
>
> http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/
> blob/4e30e09b/aria/orchestrator/workflows/executor/thread.py
> ----------------------------------------------------------------------
> diff --git a/aria/orchestrator/workflows/executor/thread.py
> b/aria/orchestrator/workflows/executor/thread.py
> index 76ceefd..16a58fb 100644
> --- a/aria/orchestrator/workflows/executor/thread.py
> +++ b/aria/orchestrator/workflows/executor/thread.py
> @@ -26,7 +26,8 @@ from .base import BaseExecutor
>
> class ThreadExecutor(BaseExecutor):
> """
> - Executor which runs tasks in a separate thread
> + Executor which runs tasks in a separate thread. It's easier writing
> tests
> + using this executor rather than the full blown multiprocessing
> executor.
> """
>
> def __init__(self, pool_size=1, *args, **kwargs):
>
> http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/
> blob/4e30e09b/aria/utils/plugin.py
> ----------------------------------------------------------------------
> diff --git a/aria/utils/plugin.py b/aria/utils/plugin.py
> index bb2b974..9868c89 100644
> --- a/aria/utils/plugin.py
> +++ b/aria/utils/plugin.py
> @@ -19,8 +19,13 @@ Contains utility methods that enable dynamic python
> code loading
> """
>
> import os
> +import tempfile
> +import subprocess
> +import sys
> from importlib import import_module
>
> +import wagon
> +
>
> def plugin_installer(path, plugin_suffix, package=None, callback=None):
> """
> @@ -37,3 +42,19 @@ def plugin_installer(path, plugin_suffix, package=None,
> callback=None):
> module = import_module(module_name)
> if callback:
> callback(module)
> +
> +
> +def create(source, destination_dir):
> + return wagon.create(source=source, archive_destination_dir=
> destination_dir)
> +
> +
> +def install(source, prefix):
> + with tempfile.NamedTemporaryFile() as constraint:
> + constraint.write(subprocess.check_output([sys.executable, '-m',
> 'pip', 'freeze']))
> + constraint.flush()
> + wagon.install(
> + source=source,
> + install_args='--prefix="{prefix}"
> --constraint="{constraint}"'.format(
> + prefix=prefix,
> + constraint=constraint.name)
> + )
>
> http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/
> blob/4e30e09b/requirements.txt
> ----------------------------------------------------------------------
> diff --git a/requirements.txt b/requirements.txt
> index 7e87c67..7fb842e 100644
> --- a/requirements.txt
> +++ b/requirements.txt
> @@ -23,4 +23,5 @@ Jinja2==2.8
> shortuuid==0.4.3
> CacheControl[filecache]==0.11.6
> clint==0.5.1
> -SQLAlchemy==1.1.4
> \ No newline at end of file
> +SQLAlchemy==1.1.4
> +wagon==0.5.0
>
> http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/
> blob/4e30e09b/tests/orchestrator/workflows/executor/test_executor.py
> ----------------------------------------------------------------------
> diff --git a/tests/orchestrator/workflows/executor/test_executor.py
> b/tests/orchestrator/workflows/executor/test_executor.py
> index a425799..fc81ecc 100644
> --- a/tests/orchestrator/workflows/executor/test_executor.py
> +++ b/tests/orchestrator/workflows/executor/test_executor.py
> @@ -25,7 +25,6 @@ from aria.orchestrator import events
> from aria.orchestrator.workflows.executor import (
> thread,
> multiprocess,
> - blocking,
> # celery
> )
>
> @@ -38,47 +37,24 @@ except ImportError:
> app = None
>
>
> -class TestExecutor(object):
> -
> - @pytest.mark.parametrize('executor_cls,executor_kwargs', [
> - (thread.ThreadExecutor, {'pool_size': 1}),
> - (thread.ThreadExecutor, {'pool_size': 2}),
> - (multiprocess.MultiprocessExecutor, {'pool_size': 1}),
> - (multiprocess.MultiprocessExecutor, {'pool_size': 2}),
> - (blocking.CurrentThreadBlockingExecutor, {}),
> - # (celery.CeleryExecutor, {'app': app})
> - ])
> - def test_execute(self, executor_cls, executor_kwargs):
> - self.executor = executor_cls(**executor_kwargs)
> - expected_value = 'value'
> - successful_task = MockTask(mock_successful_task)
> - failing_task = MockTask(mock_failing_task)
> - task_with_inputs = MockTask(mock_task_with_input,
> inputs={'input': expected_value})
> -
> - for task in [successful_task, failing_task, task_with_inputs]:
> - self.executor.execute(task)
> -
> - @retrying.retry(stop_max_delay=10000, wait_fixed=100)
> - def assertion():
> - assert successful_task.states == ['start', 'success']
> - assert failing_task.states == ['start', 'failure']
> - assert task_with_inputs.states == ['start', 'failure']
> - assert isinstance(failing_task.exception, MockException)
> - assert isinstance(task_with_inputs.exception, MockException)
> - assert task_with_inputs.exception.message == expected_value
> - assertion()
> -
> - def setup_method(self):
> - events.start_task_signal.connect(start_handler)
> - events.on_success_task_signal.connect(success_handler)
> - events.on_failure_task_signal.connect(failure_handler)
> -
> - def teardown_method(self):
> - events.start_task_signal.disconnect(start_handler)
> - events.on_success_task_signal.disconnect(success_handler)
> - events.on_failure_task_signal.disconnect(failure_handler)
> - if hasattr(self, 'executor'):
> - self.executor.close()
> +def test_execute(executor):
> + expected_value = 'value'
> + successful_task = MockTask(mock_successful_task)
> + failing_task = MockTask(mock_failing_task)
> + task_with_inputs = MockTask(mock_task_with_input, inputs={'input':
> expected_value})
> +
> + for task in [successful_task, failing_task, task_with_inputs]:
> + executor.execute(task)
> +
> + @retrying.retry(stop_max_delay=10000, wait_fixed=100)
> + def assertion():
> + assert successful_task.states == ['start', 'success']
> + assert failing_task.states == ['start', 'failure']
> + assert task_with_inputs.states == ['start', 'failure']
> + assert isinstance(failing_task.exception, MockException)
> + assert isinstance(task_with_inputs.exception, MockException)
> + assert task_with_inputs.exception.message == expected_value
> + assertion()
>
>
> def mock_successful_task(**_):
> @@ -128,14 +104,35 @@ class MockTask(object):
> yield self
>
>
> -def start_handler(task, *args, **kwargs):
> - task.states.append('start')
> -
> -
> -def success_handler(task, *args, **kwargs):
> - task.states.append('success')
> -
> -
> -def failure_handler(task, exception, *args, **kwargs):
> - task.states.append('failure')
> - task.exception = exception
> +@pytest.fixture(params=[
> + (thread.ThreadExecutor, {'pool_size': 1}),
> + (thread.ThreadExecutor, {'pool_size': 2}),
> + (multiprocess.MultiprocessExecutor, {'pool_size': 1}),
> + (multiprocess.MultiprocessExecutor, {'pool_size': 2}),
> + # (celery.CeleryExecutor, {'app': app})
> +])
> +def executor(request):
> + executor_cls, executor_kwargs = request.param
> + result = executor_cls(**executor_kwargs)
> + yield result
> + result.close()
> +
> +
> +@pytest.fixture(autouse=True)
> +def register_signals():
> + def start_handler(task, *args, **kwargs):
> + task.states.append('start')
> +
> + def success_handler(task, *args, **kwargs):
> + task.states.append('success')
> +
> + def failure_handler(task, exception, *args, **kwargs):
> + task.states.append('failure')
> + task.exception = exception
> + events.start_task_signal.connect(start_handler)
> + events.on_success_task_signal.connect(success_handler)
> + events.on_failure_task_signal.connect(failure_handler)
> + yield
> + events.start_task_signal.disconnect(start_handler)
> + events.on_success_task_signal.disconnect(success_handler)
> + events.on_failure_task_signal.disconnect(failure_handler)
>
>
Re: [7/7] incubator-ariatosca git commit: ARIA-26 TBD
Posted by Arthur Berezin <ar...@gigaspaces.com>.
Dan, is this commit is part of the plugin mechanism for ARIA?
https://issues.apache.org/jira/browse/ARIATOSCA-26
The title of the commit got me confused.
On Sun, Dec 11, 2016 at 1:40 PM <da...@apache.org> wrote:
ARIA-26 TBD
Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit:
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/4e30e09b
Tree:
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/4e30e09b
Diff:
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/4e30e09b
Branch: refs/heads/ARIA-26-plugin-mechanism
Commit: 4e30e09b76c70f1491664d4d2fedc397d44a42d3
Parents: c6c92ae
Author: Dan Kilman <da...@gigaspaces.com>
Authored: Sun Nov 27 16:31:29 2016 +0200
Committer: Dan Kilman <da...@gigaspaces.com>
Committed: Sun Dec 11 13:40:42 2016 +0200
----------------------------------------------------------------------
aria/cli/commands.py | 4 +-
.../orchestrator/workflows/executor/__init__.py | 2 +-
.../orchestrator/workflows/executor/blocking.py | 36 -------
.../workflows/executor/multiprocess.py | 61 ++++++-----
aria/orchestrator/workflows/executor/thread.py | 3 +-
aria/utils/plugin.py | 21 ++++
requirements.txt | 3 +-
.../workflows/executor/test_executor.py | 103 +++++++++----------
8 files changed, 108 insertions(+), 125 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4e30e09b/aria/cli/commands.py
----------------------------------------------------------------------
diff --git a/aria/cli/commands.py b/aria/cli/commands.py
index 3426bb0..fde1de2 100644
--- a/aria/cli/commands.py
+++ b/aria/cli/commands.py
@@ -33,7 +33,7 @@ from ..logger import LoggerMixin
from ..storage import (FileSystemModelDriver, FileSystemResourceDriver)
from ..orchestrator.context.workflow import WorkflowContext
from ..orchestrator.workflows.core.engine import Engine
-from ..orchestrator.workflows.executor.thread import ThreadExecutor
+from ..orchestrator.workflows.executor.multiprocess import
MultiprocessExecutor
from ..parser import (DSL_SPECIFICATION_PACKAGES, iter_specifications)
from ..parser.consumption import (
ConsumptionContext,
@@ -251,7 +251,7 @@ class ExecuteCommand(BaseCommand):
)
workflow_function =
self._load_workflow_handler(workflow['operation'])
tasks_graph = workflow_function(workflow_context,
**workflow_context.parameters)
- executor = ThreadExecutor()
+ executor = MultiprocessExecutor()
workflow_engine = Engine(executor=executor,
workflow_context=workflow_context,
tasks_graph=tasks_graph)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4e30e09b/aria/orchestrator/workflows/executor/__init__.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/__init__.py
b/aria/orchestrator/workflows/executor/__init__.py
index 16b6c9b..7b205c1 100644
--- a/aria/orchestrator/workflows/executor/__init__.py
+++ b/aria/orchestrator/workflows/executor/__init__.py
@@ -18,5 +18,5 @@ Executors for task execution
"""
-from . import blocking, multiprocess, thread
+from . import multiprocess, thread
from .base import BaseExecutor
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4e30e09b/aria/orchestrator/workflows/executor/blocking.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/blocking.py
b/aria/orchestrator/workflows/executor/blocking.py
deleted file mode 100644
index 9d3a9ba..0000000
--- a/aria/orchestrator/workflows/executor/blocking.py
+++ /dev/null
@@ -1,36 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""
-Blocking executor
-"""
-
-from aria.utils import imports
-from .base import BaseExecutor
-
-
-class CurrentThreadBlockingExecutor(BaseExecutor):
- """
- Executor which runs tasks in the current thread (blocking)
- """
-
- def execute(self, task):
- self._task_started(task)
- try:
- task_func = imports.load_attribute(task.operation_mapping)
- task_func(ctx=task.context, **task.inputs)
- self._task_succeeded(task)
- except BaseException as e:
- self._task_failed(task, exception=e)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4e30e09b/aria/orchestrator/workflows/executor/multiprocess.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/multiprocess.py
b/aria/orchestrator/workflows/executor/multiprocess.py
index d770e07..0537381 100644
--- a/aria/orchestrator/workflows/executor/multiprocess.py
+++ b/aria/orchestrator/workflows/executor/multiprocess.py
@@ -17,6 +17,7 @@
Multiprocess based executor
"""
+import collections
import multiprocessing
import threading
@@ -26,6 +27,11 @@ from aria.utils import imports
from .base import BaseExecutor
+_TaskStarted = collections.namedtuple('_TaskStarted', 'task_id')
+_TaskSucceeded = collections.namedtuple('_TaskSucceeded', 'task_id')
+_TaskFailed = collections.namedtuple('_TaskFailed', 'task_id exception')
+
+
class MultiprocessExecutor(BaseExecutor):
"""
Executor which runs tasks in a multiprocess environment
@@ -40,59 +46,52 @@ class MultiprocessExecutor(BaseExecutor):
self._listener_thread = threading.Thread(target=self._listener)
self._listener_thread.daemon = True
self._listener_thread.start()
- self._pool = multiprocessing.Pool(processes=pool_size)
-
- def execute(self, task):
- self._tasks[task.id] = task
- self._pool.apply_async(_multiprocess_handler, args=(
- self._queue,
- task.context,
- task.id,
- task.operation_mapping,
- task.inputs))
+ self._pool = multiprocessing.Pool(processes=pool_size,
maxtasksperchild=1)
def close(self):
self._pool.close()
self._stopped = True
self._pool.join()
+ self._manager.shutdown()
+ self._manager.join()
self._listener_thread.join()
+ def execute(self, task):
+ self._tasks[task.id] = task
+ self._pool.apply_async(_handler, kwds={
+ 'queue': self._queue,
+ 'ctx': task.context,
+ 'task_id': task.id,
+ 'operation_mapping': task.operation_mapping,
+ 'operation_inputs': task.inputs
+ })
+
+ def _remove_task(self, task_id):
+ return self._tasks.pop(task_id)
+
def _listener(self):
while not self._stopped:
try:
message = self._queue.get(timeout=1)
- if message.type == 'task_started':
+ if isinstance(message, _TaskStarted):
self._task_started(self._tasks[message.task_id])
- elif message.type == 'task_succeeded':
+ elif isinstance(message, _TaskSucceeded):
self._task_succeeded(self._remove_task(message.task_id))
- elif message.type == 'task_failed':
+ elif isinstance(message, _TaskFailed):
self._task_failed(self._remove_task(message.task_id),
exception=jsonpickle.loads(message.exception))
else:
- # TODO: something
- raise RuntimeError()
+ raise RuntimeError('Invalid state')
# Daemon threads
except BaseException:
pass
- def _remove_task(self, task_id):
- return self._tasks.pop(task_id)
-
-
-class _MultiprocessMessage(object):
-
- def __init__(self, type, task_id, exception=None):
- self.type = type
- self.task_id = task_id
- self.exception = exception
-
-def _multiprocess_handler(queue, ctx, task_id, operation_mapping,
operation_inputs):
- queue.put(_MultiprocessMessage(type='task_started', task_id=task_id))
+def _handler(queue, ctx, task_id, operation_mapping, operation_inputs):
+ queue.put(_TaskStarted(task_id))
try:
task_func = imports.load_attribute(operation_mapping)
task_func(ctx=ctx, **operation_inputs)
- queue.put(_MultiprocessMessage(type='task_succeeded',
task_id=task_id))
+ queue.put(_TaskSucceeded(task_id))
except BaseException as e:
- queue.put(_MultiprocessMessage(type='task_failed', task_id=task_id,
- exception=jsonpickle.dumps(e)))
+ queue.put(_TaskFailed(task_id, exception=jsonpickle.dumps(e)))
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4e30e09b/aria/orchestrator/workflows/executor/thread.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/thread.py
b/aria/orchestrator/workflows/executor/thread.py
index 76ceefd..16a58fb 100644
--- a/aria/orchestrator/workflows/executor/thread.py
+++ b/aria/orchestrator/workflows/executor/thread.py
@@ -26,7 +26,8 @@ from .base import BaseExecutor
class ThreadExecutor(BaseExecutor):
"""
- Executor which runs tasks in a separate thread
+ Executor which runs tasks in a separate thread. It's easier writing
tests
+ using this executor rather than the full blown multiprocessing
executor.
"""
def __init__(self, pool_size=1, *args, **kwargs):
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4e30e09b/aria/utils/plugin.py
----------------------------------------------------------------------
diff --git a/aria/utils/plugin.py b/aria/utils/plugin.py
index bb2b974..9868c89 100644
--- a/aria/utils/plugin.py
+++ b/aria/utils/plugin.py
@@ -19,8 +19,13 @@ Contains utility methods that enable dynamic python code
loading
"""
import os
+import tempfile
+import subprocess
+import sys
from importlib import import_module
+import wagon
+
def plugin_installer(path, plugin_suffix, package=None, callback=None):
"""
@@ -37,3 +42,19 @@ def plugin_installer(path, plugin_suffix, package=None,
callback=None):
module = import_module(module_name)
if callback:
callback(module)
+
+
+def create(source, destination_dir):
+ return wagon.create(source=source,
archive_destination_dir=destination_dir)
+
+
+def install(source, prefix):
+ with tempfile.NamedTemporaryFile() as constraint:
+ constraint.write(subprocess.check_output([sys.executable, '-m',
'pip', 'freeze']))
+ constraint.flush()
+ wagon.install(
+ source=source,
+ install_args='--prefix="{prefix}"
--constraint="{constraint}"'.format(
+ prefix=prefix,
+ constraint=constraint.name)
+ )
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4e30e09b/requirements.txt
----------------------------------------------------------------------
diff --git a/requirements.txt b/requirements.txt
index 7e87c67..7fb842e 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -23,4 +23,5 @@ Jinja2==2.8
shortuuid==0.4.3
CacheControl[filecache]==0.11.6
clint==0.5.1
-SQLAlchemy==1.1.4
\ No newline at end of file
+SQLAlchemy==1.1.4
+wagon==0.5.0
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4e30e09b/tests/orchestrator/workflows/executor/test_executor.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_executor.py
b/tests/orchestrator/workflows/executor/test_executor.py
index a425799..fc81ecc 100644
--- a/tests/orchestrator/workflows/executor/test_executor.py
+++ b/tests/orchestrator/workflows/executor/test_executor.py
@@ -25,7 +25,6 @@ from aria.orchestrator import events
from aria.orchestrator.workflows.executor import (
thread,
multiprocess,
- blocking,
# celery
)
@@ -38,47 +37,24 @@ except ImportError:
app = None
-class TestExecutor(object):
-
- @pytest.mark.parametrize('executor_cls,executor_kwargs', [
- (thread.ThreadExecutor, {'pool_size': 1}),
- (thread.ThreadExecutor, {'pool_size': 2}),
- (multiprocess.MultiprocessExecutor, {'pool_size': 1}),
- (multiprocess.MultiprocessExecutor, {'pool_size': 2}),
- (blocking.CurrentThreadBlockingExecutor, {}),
- # (celery.CeleryExecutor, {'app': app})
- ])
- def test_execute(self, executor_cls, executor_kwargs):
- self.executor = executor_cls(**executor_kwargs)
- expected_value = 'value'
- successful_task = MockTask(mock_successful_task)
- failing_task = MockTask(mock_failing_task)
- task_with_inputs = MockTask(mock_task_with_input, inputs={'input':
expected_value})
-
- for task in [successful_task, failing_task, task_with_inputs]:
- self.executor.execute(task)
-
- @retrying.retry(stop_max_delay=10000, wait_fixed=100)
- def assertion():
- assert successful_task.states == ['start', 'success']
- assert failing_task.states == ['start', 'failure']
- assert task_with_inputs.states == ['start', 'failure']
- assert isinstance(failing_task.exception, MockException)
- assert isinstance(task_with_inputs.exception, MockException)
- assert task_with_inputs.exception.message == expected_value
- assertion()
-
- def setup_method(self):
- events.start_task_signal.connect(start_handler)
- events.on_success_task_signal.connect(success_handler)
- events.on_failure_task_signal.connect(failure_handler)
-
- def teardown_method(self):
- events.start_task_signal.disconnect(start_handler)
- events.on_success_task_signal.disconnect(success_handler)
- events.on_failure_task_signal.disconnect(failure_handler)
- if hasattr(self, 'executor'):
- self.executor.close()
+def test_execute(executor):
+ expected_value = 'value'
+ successful_task = MockTask(mock_successful_task)
+ failing_task = MockTask(mock_failing_task)
+ task_with_inputs = MockTask(mock_task_with_input, inputs={'input':
expected_value})
+
+ for task in [successful_task, failing_task, task_with_inputs]:
+ executor.execute(task)
+
+ @retrying.retry(stop_max_delay=10000, wait_fixed=100)
+ def assertion():
+ assert successful_task.states == ['start', 'success']
+ assert failing_task.states == ['start', 'failure']
+ assert task_with_inputs.states == ['start', 'failure']
+ assert isinstance(failing_task.exception, MockException)
+ assert isinstance(task_with_inputs.exception, MockException)
+ assert task_with_inputs.exception.message == expected_value
+ assertion()
def mock_successful_task(**_):
@@ -128,14 +104,35 @@ class MockTask(object):
yield self
-def start_handler(task, *args, **kwargs):
- task.states.append('start')
-
-
-def success_handler(task, *args, **kwargs):
- task.states.append('success')
-
-
-def failure_handler(task, exception, *args, **kwargs):
- task.states.append('failure')
- task.exception = exception
+@pytest.fixture(params=[
+ (thread.ThreadExecutor, {'pool_size': 1}),
+ (thread.ThreadExecutor, {'pool_size': 2}),
+ (multiprocess.MultiprocessExecutor, {'pool_size': 1}),
+ (multiprocess.MultiprocessExecutor, {'pool_size': 2}),
+ # (celery.CeleryExecutor, {'app': app})
+])
+def executor(request):
+ executor_cls, executor_kwargs = request.param
+ result = executor_cls(**executor_kwargs)
+ yield result
+ result.close()
+
+
+@pytest.fixture(autouse=True)
+def register_signals():
+ def start_handler(task, *args, **kwargs):
+ task.states.append('start')
+
+ def success_handler(task, *args, **kwargs):
+ task.states.append('success')
+
+ def failure_handler(task, exception, *args, **kwargs):
+ task.states.append('failure')
+ task.exception = exception
+ events.start_task_signal.connect(start_handler)
+ events.on_success_task_signal.connect(success_handler)
+ events.on_failure_task_signal.connect(failure_handler)
+ yield
+ events.start_task_signal.disconnect(start_handler)
+ events.on_success_task_signal.disconnect(success_handler)
+ events.on_failure_task_signal.disconnect(failure_handler)