You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ariatosca.apache.org by mx...@apache.org on 2017/06/13 14:21:02 UTC

incubator-ariatosca git commit: wip on executor tests

Repository: incubator-ariatosca
Updated Branches:
  refs/heads/ARIA-278-Remove-core-tasks 8044a035f -> 979a4b445


wip on executor tests


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/979a4b44
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/979a4b44
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/979a4b44

Branch: refs/heads/ARIA-278-Remove-core-tasks
Commit: 979a4b445bf0261db2bb2632cd93e88ea6d61222
Parents: 8044a03
Author: max-orlov <ma...@gigaspaces.com>
Authored: Tue Jun 13 16:49:15 2017 +0300
Committer: max-orlov <ma...@gigaspaces.com>
Committed: Tue Jun 13 16:49:15 2017 +0300

----------------------------------------------------------------------
 aria/orchestrator/workflows/executor/base.py    |  3 -
 aria/orchestrator/workflows/executor/thread.py  |  2 +
 .../orchestrator/workflows/executor/__init__.py | 42 +++---------
 .../workflows/executor/test_executor.py         | 70 +++++++++++---------
 4 files changed, 49 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/979a4b44/aria/orchestrator/workflows/executor/base.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/base.py b/aria/orchestrator/workflows/executor/base.py
index 089126d..54a9438 100644
--- a/aria/orchestrator/workflows/executor/base.py
+++ b/aria/orchestrator/workflows/executor/base.py
@@ -42,7 +42,6 @@ class BaseExecutor(logger.LoggerMixin):
             # itself.
             self._task_started(ctx)
             self._task_succeeded(ctx)
-            ctx.model.task.update(ctx.task)
 
     def close(self):
         """
@@ -58,12 +57,10 @@ class BaseExecutor(logger.LoggerMixin):
     def _task_failed(self, ctx, exception, traceback=None):
         events.on_failure_task_signal.send(ctx, exception=exception, traceback=traceback)
         ctx.model.task.update(ctx.task)
-        self.close()
 
     def _task_succeeded(self, ctx):
         events.on_success_task_signal.send(ctx)
         ctx.model.task.update(ctx.task)
-        self.close()
 
 
 class StubTaskExecutor(BaseExecutor):                                                               # pylint: disable=abstract-method

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/979a4b44/aria/orchestrator/workflows/executor/thread.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/thread.py b/aria/orchestrator/workflows/executor/thread.py
index 8c447b6..074b54b 100644
--- a/aria/orchestrator/workflows/executor/thread.py
+++ b/aria/orchestrator/workflows/executor/thread.py
@@ -68,6 +68,8 @@ class ThreadExecutor(BaseExecutor):
                     self._task_failed(ctx,
                                       exception=e,
                                       traceback=exceptions.get_exception_as_string(*sys.exc_info()))
+                finally:
+                    break
             # Daemon threads
             except BaseException as e:
                 pass

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/979a4b44/tests/orchestrator/workflows/executor/__init__.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/__init__.py b/tests/orchestrator/workflows/executor/__init__.py
index 07fb8ad..b8032b7 100644
--- a/tests/orchestrator/workflows/executor/__init__.py
+++ b/tests/orchestrator/workflows/executor/__init__.py
@@ -12,47 +12,25 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-import uuid
 import logging
-from collections import namedtuple
-from contextlib import contextmanager
 
 import aria
-from aria.modeling import models
-
-
-class MockTask(object):
-
-    INFINITE_RETRIES = models.Task.INFINITE_RETRIES
-
-    def __init__(self, function, arguments=None, plugin=None, storage=None):
-        self.function = self.name = function
-        self.plugin_fk = plugin.id if plugin else None
-        self.plugin = plugin or None
-        self.arguments = arguments or {}
-        self.states = []
-        self.exception = None
-        self.id = str(uuid.uuid4())
-        self.logger = logging.getLogger()
-        self.context = MockContext(self.id, storage)
-        self.attempts_count = 1
-        self.max_attempts = 1
-        self.ignore_failure = False
-        self.interface_name = 'interface_name'
-        self.operation_name = 'operation_name'
-        self.actor = namedtuple('actor', 'name')(name='actor_name')
-        self.model_task = None
-
-        for state in models.Task.STATES:
-            setattr(self, state.upper(), state)
 
 
 class MockContext(object):
 
-    def __init__(self, task_id, storage=None):
+    def __init__(self, storage, **kwargs):
         self.logger = logging.getLogger('mock_logger')
-        self.task_id = task_id
         self.model = storage
+        task = storage.task.model_cls(**kwargs)
+        self.model.task.put(task)
+        self._task_id = task.id
+        self.states = []
+        self.exception = None
+
+    @property
+    def task(self):
+        return self.model.task.get(self._task_id)
 
     @property
     def serialization_dict(self):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/979a4b44/tests/orchestrator/workflows/executor/test_executor.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_executor.py b/tests/orchestrator/workflows/executor/test_executor.py
index cfb6975..3fa75ad 100644
--- a/tests/orchestrator/workflows/executor/test_executor.py
+++ b/tests/orchestrator/workflows/executor/test_executor.py
@@ -17,6 +17,8 @@
 import pytest
 import retrying
 
+from tests import mock, storage
+
 try:
     import celery as _celery
     app = _celery.Celery()
@@ -25,7 +27,6 @@ except ImportError:
     _celery = None
     app = None
 
-import aria
 from aria.modeling import models
 from aria.orchestrator import events
 from aria.orchestrator.workflows.executor import (
@@ -35,41 +36,45 @@ from aria.orchestrator.workflows.executor import (
 )
 
 import tests
-from . import MockTask
+from . import MockContext
 
 
 def _get_function(func):
     return '{module}.{func.__name__}'.format(module=__name__, func=func)
 
 
-def execute_and_assert(executor, storage=None):
+def execute_and_assert(executor, ctx):
+    node = ctx.model.node.list()[0]
     expected_value = 'value'
-    successful_task = MockTask(_get_function(mock_successful_task), storage=storage)
-    failing_task = MockTask(_get_function(mock_failing_task), storage=storage)
-    task_with_inputs = MockTask(_get_function(mock_task_with_input),
-                                arguments={'input': models.Argument.wrap('input', 'value')},
-                                storage=storage)
-
-    for task in [successful_task, failing_task, task_with_inputs]:
-        executor.execute(task)
+    successful_ctx = ctx.model.task.model_cls(function=_get_function(mock_successful_task),
+                                              node=node,
+                                              _executor=executor)
+    failing_ctx = ctx.model.task.model_cls(function=_get_function(mock_failing_task), node=node)
+    ctx_with_inputs = ctx.model.task.model_cls(
+        node=node,
+        function=_get_function(mock_task_with_input),
+        arguments={'input': models.Argument.wrap('input', 'value')})
+
+    for task in [successful_ctx, failing_ctx, ctx_with_inputs]:
+        task.execute(ctx)
 
     @retrying.retry(stop_max_delay=10000, wait_fixed=100)
     def assertion():
-        assert successful_task.states == ['start', 'success']
-        assert failing_task.states == ['start', 'failure']
-        assert task_with_inputs.states == ['start', 'failure']
-        assert isinstance(failing_task.exception, MockException)
-        assert isinstance(task_with_inputs.exception, MockException)
-        assert task_with_inputs.exception.message == expected_value
+        assert successful_ctx.states == ['start', 'success']
+        assert failing_ctx.states == ['start', 'failure']
+        assert ctx_with_inputs.states == ['start', 'failure']
+        assert isinstance(failing_ctx.exception, MockException)
+        assert isinstance(ctx_with_inputs.exception, MockException)
+        assert ctx_with_inputs.exception.message == expected_value
     assertion()
 
 
-def test_thread_execute(thread_executor):
-    execute_and_assert(thread_executor)
+def test_thread_execute(thread_executor, ctx):
+    execute_and_assert(thread_executor, ctx)
 
 
-def test_process_execute(process_executor, storage):
-    execute_and_assert(process_executor, storage)
+def test_process_execute(process_executor, ctx):
+    execute_and_assert(process_executor, ctx)
 
 
 def mock_successful_task(**_):
@@ -94,11 +99,10 @@ class MockException(Exception):
 
 
 @pytest.fixture
-def storage(tmpdir):
-    return aria.application_model_storage(
-        aria.storage.sql_mapi.SQLAlchemyModelAPI,
-        initiator_kwargs=dict(base_dir=str(tmpdir))
-    )
+def ctx(tmpdir):
+    context = mock.context.simple(str(tmpdir))
+    yield context
+    storage.release_sqlite_storage(context.model)
 
 
 @pytest.fixture(params=[
@@ -124,15 +128,15 @@ def process_executor():
 
 @pytest.fixture(autouse=True)
 def register_signals():
-    def start_handler(task, *args, **kwargs):
-        task.states.append('start')
+    def start_handler(ctx, *args, **kwargs):
+        ctx.states.append('start')
 
-    def success_handler(task, *args, **kwargs):
-        task.states.append('success')
+    def success_handler(ctx, *args, **kwargs):
+        ctx.states.append('success')
 
-    def failure_handler(task, exception, *args, **kwargs):
-        task.states.append('failure')
-        task.exception = exception
+    def failure_handler(ctx, exception, *args, **kwargs):
+        ctx.states.append('failure')
+        ctx.exception = exception
 
     events.start_task_signal.connect(start_handler)
     events.on_success_task_signal.connect(success_handler)