You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ariatosca.apache.org by mx...@apache.org on 2017/06/13 14:21:02 UTC
incubator-ariatosca git commit: wip on executor tests
Repository: incubator-ariatosca
Updated Branches:
refs/heads/ARIA-278-Remove-core-tasks 8044a035f -> 979a4b445
wip on executor tests
Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/979a4b44
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/979a4b44
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/979a4b44
Branch: refs/heads/ARIA-278-Remove-core-tasks
Commit: 979a4b445bf0261db2bb2632cd93e88ea6d61222
Parents: 8044a03
Author: max-orlov <ma...@gigaspaces.com>
Authored: Tue Jun 13 16:49:15 2017 +0300
Committer: max-orlov <ma...@gigaspaces.com>
Committed: Tue Jun 13 16:49:15 2017 +0300
----------------------------------------------------------------------
aria/orchestrator/workflows/executor/base.py | 3 -
aria/orchestrator/workflows/executor/thread.py | 2 +
.../orchestrator/workflows/executor/__init__.py | 42 +++---------
.../workflows/executor/test_executor.py | 70 +++++++++++---------
4 files changed, 49 insertions(+), 68 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/979a4b44/aria/orchestrator/workflows/executor/base.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/base.py b/aria/orchestrator/workflows/executor/base.py
index 089126d..54a9438 100644
--- a/aria/orchestrator/workflows/executor/base.py
+++ b/aria/orchestrator/workflows/executor/base.py
@@ -42,7 +42,6 @@ class BaseExecutor(logger.LoggerMixin):
# itself.
self._task_started(ctx)
self._task_succeeded(ctx)
- ctx.model.task.update(ctx.task)
def close(self):
"""
@@ -58,12 +57,10 @@ class BaseExecutor(logger.LoggerMixin):
def _task_failed(self, ctx, exception, traceback=None):
events.on_failure_task_signal.send(ctx, exception=exception, traceback=traceback)
ctx.model.task.update(ctx.task)
- self.close()
def _task_succeeded(self, ctx):
events.on_success_task_signal.send(ctx)
ctx.model.task.update(ctx.task)
- self.close()
class StubTaskExecutor(BaseExecutor): # pylint: disable=abstract-method
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/979a4b44/aria/orchestrator/workflows/executor/thread.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/thread.py b/aria/orchestrator/workflows/executor/thread.py
index 8c447b6..074b54b 100644
--- a/aria/orchestrator/workflows/executor/thread.py
+++ b/aria/orchestrator/workflows/executor/thread.py
@@ -68,6 +68,8 @@ class ThreadExecutor(BaseExecutor):
self._task_failed(ctx,
exception=e,
traceback=exceptions.get_exception_as_string(*sys.exc_info()))
+ finally:
+ break
# Daemon threads
except BaseException as e:
pass
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/979a4b44/tests/orchestrator/workflows/executor/__init__.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/__init__.py b/tests/orchestrator/workflows/executor/__init__.py
index 07fb8ad..b8032b7 100644
--- a/tests/orchestrator/workflows/executor/__init__.py
+++ b/tests/orchestrator/workflows/executor/__init__.py
@@ -12,47 +12,25 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-import uuid
import logging
-from collections import namedtuple
-from contextlib import contextmanager
import aria
-from aria.modeling import models
-
-
-class MockTask(object):
-
- INFINITE_RETRIES = models.Task.INFINITE_RETRIES
-
- def __init__(self, function, arguments=None, plugin=None, storage=None):
- self.function = self.name = function
- self.plugin_fk = plugin.id if plugin else None
- self.plugin = plugin or None
- self.arguments = arguments or {}
- self.states = []
- self.exception = None
- self.id = str(uuid.uuid4())
- self.logger = logging.getLogger()
- self.context = MockContext(self.id, storage)
- self.attempts_count = 1
- self.max_attempts = 1
- self.ignore_failure = False
- self.interface_name = 'interface_name'
- self.operation_name = 'operation_name'
- self.actor = namedtuple('actor', 'name')(name='actor_name')
- self.model_task = None
-
- for state in models.Task.STATES:
- setattr(self, state.upper(), state)
class MockContext(object):
- def __init__(self, task_id, storage=None):
+ def __init__(self, storage, **kwargs):
self.logger = logging.getLogger('mock_logger')
- self.task_id = task_id
self.model = storage
+ task = storage.task.model_cls(**kwargs)
+ self.model.task.put(task)
+ self._task_id = task.id
+ self.states = []
+ self.exception = None
+
+ @property
+ def task(self):
+ return self.model.task.get(self._task_id)
@property
def serialization_dict(self):
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/979a4b44/tests/orchestrator/workflows/executor/test_executor.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_executor.py b/tests/orchestrator/workflows/executor/test_executor.py
index cfb6975..3fa75ad 100644
--- a/tests/orchestrator/workflows/executor/test_executor.py
+++ b/tests/orchestrator/workflows/executor/test_executor.py
@@ -17,6 +17,8 @@
import pytest
import retrying
+from tests import mock, storage
+
try:
import celery as _celery
app = _celery.Celery()
@@ -25,7 +27,6 @@ except ImportError:
_celery = None
app = None
-import aria
from aria.modeling import models
from aria.orchestrator import events
from aria.orchestrator.workflows.executor import (
@@ -35,41 +36,45 @@ from aria.orchestrator.workflows.executor import (
)
import tests
-from . import MockTask
+from . import MockContext
def _get_function(func):
return '{module}.{func.__name__}'.format(module=__name__, func=func)
-def execute_and_assert(executor, storage=None):
+def execute_and_assert(executor, ctx):
+ node = ctx.model.node.list()[0]
expected_value = 'value'
- successful_task = MockTask(_get_function(mock_successful_task), storage=storage)
- failing_task = MockTask(_get_function(mock_failing_task), storage=storage)
- task_with_inputs = MockTask(_get_function(mock_task_with_input),
- arguments={'input': models.Argument.wrap('input', 'value')},
- storage=storage)
-
- for task in [successful_task, failing_task, task_with_inputs]:
- executor.execute(task)
+ successful_ctx = ctx.model.task.model_cls(function=_get_function(mock_successful_task),
+ node=node,
+ _executor=executor)
+ failing_ctx = ctx.model.task.model_cls(function=_get_function(mock_failing_task), node=node)
+ ctx_with_inputs = ctx.model.task.model_cls(
+ node=node,
+ function=_get_function(mock_task_with_input),
+ arguments={'input': models.Argument.wrap('input', 'value')})
+
+ for task in [successful_ctx, failing_ctx, ctx_with_inputs]:
+ task.execute(ctx)
@retrying.retry(stop_max_delay=10000, wait_fixed=100)
def assertion():
- assert successful_task.states == ['start', 'success']
- assert failing_task.states == ['start', 'failure']
- assert task_with_inputs.states == ['start', 'failure']
- assert isinstance(failing_task.exception, MockException)
- assert isinstance(task_with_inputs.exception, MockException)
- assert task_with_inputs.exception.message == expected_value
+ assert successful_ctx.states == ['start', 'success']
+ assert failing_ctx.states == ['start', 'failure']
+ assert ctx_with_inputs.states == ['start', 'failure']
+ assert isinstance(failing_ctx.exception, MockException)
+ assert isinstance(ctx_with_inputs.exception, MockException)
+ assert ctx_with_inputs.exception.message == expected_value
assertion()
-def test_thread_execute(thread_executor):
- execute_and_assert(thread_executor)
+def test_thread_execute(thread_executor, ctx):
+ execute_and_assert(thread_executor, ctx)
-def test_process_execute(process_executor, storage):
- execute_and_assert(process_executor, storage)
+def test_process_execute(process_executor, ctx):
+ execute_and_assert(process_executor, ctx)
def mock_successful_task(**_):
@@ -94,11 +99,10 @@ class MockException(Exception):
@pytest.fixture
-def storage(tmpdir):
- return aria.application_model_storage(
- aria.storage.sql_mapi.SQLAlchemyModelAPI,
- initiator_kwargs=dict(base_dir=str(tmpdir))
- )
+def ctx(tmpdir):
+ context = mock.context.simple(str(tmpdir))
+ yield context
+ storage.release_sqlite_storage(context.model)
@pytest.fixture(params=[
@@ -124,15 +128,15 @@ def process_executor():
@pytest.fixture(autouse=True)
def register_signals():
- def start_handler(task, *args, **kwargs):
- task.states.append('start')
+ def start_handler(ctx, *args, **kwargs):
+ ctx.states.append('start')
- def success_handler(task, *args, **kwargs):
- task.states.append('success')
+ def success_handler(ctx, *args, **kwargs):
+ ctx.states.append('success')
- def failure_handler(task, exception, *args, **kwargs):
- task.states.append('failure')
- task.exception = exception
+ def failure_handler(ctx, exception, *args, **kwargs):
+ ctx.states.append('failure')
+ ctx.exception = exception
events.start_task_signal.connect(start_handler)
events.on_success_task_signal.connect(success_handler)