You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ariatosca.apache.org by mx...@apache.org on 2017/06/14 15:05:35 UTC

incubator-ariatosca git commit: executor test fixes

Repository: incubator-ariatosca
Updated Branches:
  refs/heads/ARIA-278-Remove-core-tasks 546e7af7a -> 4ad3600b2


executor test fixes


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/4ad3600b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/4ad3600b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/4ad3600b

Branch: refs/heads/ARIA-278-Remove-core-tasks
Commit: 4ad3600b2cf7edb1190e7dc76f2d2d39e5805391
Parents: 546e7af
Author: max-orlov <ma...@gigaspaces.com>
Authored: Wed Jun 14 18:05:30 2017 +0300
Committer: max-orlov <ma...@gigaspaces.com>
Committed: Wed Jun 14 18:05:30 2017 +0300

----------------------------------------------------------------------
 .../orchestrator/workflows/executor/__init__.py | 17 +++++++
 .../workflows/executor/test_executor.py         | 53 +++++++++++++-------
 .../workflows/executor/test_process_executor.py | 42 ++++++++++++----
 .../executor/test_process_executor_extension.py |  7 ++-
 .../test_process_executor_tracked_changes.py    |  7 ++-
 5 files changed, 93 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4ad3600b/tests/orchestrator/workflows/executor/__init__.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/__init__.py b/tests/orchestrator/workflows/executor/__init__.py
index b8032b7..87910e9 100644
--- a/tests/orchestrator/workflows/executor/__init__.py
+++ b/tests/orchestrator/workflows/executor/__init__.py
@@ -15,6 +15,7 @@
 import logging
 
 import aria
+from aria.orchestrator.context.operation import NodeOperationContext
 
 
 class MockContext(object):
@@ -54,3 +55,19 @@ class MockContext(object):
     @staticmethod
     def close():
         pass
+
+
+def put_to_storage_and_get_ctx(ctx, task):
+    ctx.model.task.put(task)
+    op_ctx = NodeOperationContext(
+        model_storage=ctx.model,
+        resource_storage=ctx.resource,
+        workdir=ctx._workdir,
+        task_id=task.id,
+        actor_id=task.actor.id if task.actor else None,
+        service_id=task.execution.service.id,
+        execution_id=task.execution.id,
+        name=task.name
+    )
+    op_ctx.states = []
+    return op_ctx

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4ad3600b/tests/orchestrator/workflows/executor/test_executor.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_executor.py b/tests/orchestrator/workflows/executor/test_executor.py
index 410a982..d91b3e0 100644
--- a/tests/orchestrator/workflows/executor/test_executor.py
+++ b/tests/orchestrator/workflows/executor/test_executor.py
@@ -18,6 +18,7 @@ import pytest
 import retrying
 
 from tests import mock, storage
+from . import put_to_storage_and_get_ctx
 
 try:
     import celery as _celery
@@ -44,24 +45,31 @@ def _get_function(func):
 
 def execute_and_assert(executor, ctx):
     node = ctx.model.node.list()[0]
-    execution = ctx.model.execution.list()[0]
     expected_value = 'value'
-    successful_ctx = models.Task(function=_get_function(mock_successful_task),
-                                 node=node, _executor=executor, execution=execution)
-    failing_ctx = models.Task(
-        function=_get_function(mock_failing_task), node=node, _executor=executor, execution=execution)
-    ctx_with_inputs = models.Task(
-        node=node,
-        function=_get_function(mock_task_with_input),
-        arguments={'input': models.Argument.wrap('input', 'value')},
-        _executor=executor,
-        execution=execution)
-
-    ctx.model.execution.update(execution)
+    successful_ctx = put_to_storage_and_get_ctx(
+        ctx,
+        models.Task(
+            function=_get_function(mock_successful_task), node=node, execution=ctx.execution
+        )
+    )
+    failing_ctx = put_to_storage_and_get_ctx(
+        ctx,
+        models.Task(
+            function=_get_function(mock_failing_task), node=node, execution=ctx.execution
+        )
+    )
+    ctx_with_inputs = put_to_storage_and_get_ctx(
+        ctx,
+        models.Task(
+            node=node,
+            function=_get_function(mock_task_with_input),
+            arguments={'input': models.Argument.wrap('input', 'value')},
+            execution=ctx.execution
+        )
+    )
 
     for op_ctx in [successful_ctx, failing_ctx, ctx_with_inputs]:
-        op_ctx.states = []
-        op_ctx.execute(ctx)
+        executor.execute(op_ctx)
 
     @retrying.retry(stop_max_delay=10000, wait_fixed=100)
     def assertion():
@@ -111,9 +119,18 @@ def ctx(tmpdir):
     storage.release_sqlite_storage(context.model)
 
 
-@pytest.fixture
-def thread_executor():
-    return thread.ThreadExecutor
+@pytest.fixture(params=[
+    (thread.ThreadExecutor, {'pool_size': 1}),
+    (thread.ThreadExecutor, {'pool_size': 2}),
+    # subprocess needs to load a tests module so we explicitly add the root directory as if
+    # the project has been installed in editable mode
+    # (celery.CeleryExecutor, {'app': app})
+])
+def thread_executor(request):
+    executor_cls, executor_kwargs = request.param
+    result = executor_cls(**executor_kwargs)
+    yield result
+    result.close()
 
 
 @pytest.fixture

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4ad3600b/tests/orchestrator/workflows/executor/test_process_executor.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_process_executor.py b/tests/orchestrator/workflows/executor/test_process_executor.py
index bca2ea3..6f9ce77 100644
--- a/tests/orchestrator/workflows/executor/test_process_executor.py
+++ b/tests/orchestrator/workflows/executor/test_process_executor.py
@@ -18,24 +18,35 @@ import Queue
 
 import pytest
 
-import aria
 from aria.orchestrator import events
 from aria.utils.plugin import create as create_plugin
 from aria.orchestrator.workflows.executor import process
+from aria.modeling import models
 
 import tests.storage
 import tests.resources
+from tests import mock, storage
 from tests.fixtures import (  # pylint: disable=unused-import
     plugins_dir,
     plugin_manager,
     fs_model as model
 )
+from tests.orchestrator.workflows.executor import put_to_storage_and_get_ctx
 
 
 class TestProcessExecutor(object):
 
-    def test_plugin_execution(self, executor, mock_plugin, storage):
-        task = MockTask('mock_plugin1.operation', plugin=mock_plugin, storage=storage)
+    def test_plugin_execution(self, executor, mock_plugin, ctx):
+        node = next(ctx.nodes)
+        task_ctx = put_to_storage_and_get_ctx(
+            ctx,
+            models.Task(
+                function='mock_plugin1.operation',
+                plugin_fk=mock_plugin.id,
+                node=node,
+                execution=ctx.execution
+            )
+        )
 
         queue = Queue.Queue()
 
@@ -45,7 +56,7 @@ class TestProcessExecutor(object):
         events.on_success_task_signal.connect(handler)
         events.on_failure_task_signal.connect(handler)
         try:
-            executor.execute(task)
+            executor.execute(task_ctx)
             error = queue.get(timeout=60)
             # tests/resources/plugins/mock-plugin1 is the plugin installed
             # during this tests setup. The module mock_plugin1 contains a single
@@ -62,10 +73,19 @@ class TestProcessExecutor(object):
             events.on_success_task_signal.disconnect(handler)
             events.on_failure_task_signal.disconnect(handler)
 
-    def test_closed(self, executor):
+    def test_closed(self, ctx, executor):
         executor.close()
+        node = next(ctx.nodes)
+        task_ctx = put_to_storage_and_get_ctx(
+            ctx,
+            models.Task(
+                function='mock_plugin1.operation',
+                node=node,
+                execution=ctx.execution
+            )
+        )
         with pytest.raises(RuntimeError) as exc_info:
-            executor.execute(task=MockTask(function='some.function'))
+            executor.execute(task_ctx)
         assert 'closed' in exc_info.value.message
 
 
@@ -84,8 +104,8 @@ def mock_plugin(plugin_manager, tmpdir):
 
 
 @pytest.fixture
-def storage(tmpdir):
-    return aria.application_model_storage(
-        aria.storage.sql_mapi.SQLAlchemyModelAPI,
-        initiator_kwargs=dict(base_dir=str(tmpdir))
-    )
+def ctx(tmpdir):
+    context = mock.context.simple(str(tmpdir))
+    ctx.states = []
+    yield context
+    storage.release_sqlite_storage(context.model)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4ad3600b/tests/orchestrator/workflows/executor/test_process_executor_extension.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_process_executor_extension.py b/tests/orchestrator/workflows/executor/test_process_executor_extension.py
index 5f0b75f..b4c99d2 100644
--- a/tests/orchestrator/workflows/executor/test_process_executor_extension.py
+++ b/tests/orchestrator/workflows/executor/test_process_executor_extension.py
@@ -19,7 +19,7 @@ from aria import extension
 from aria.orchestrator.workflows import api
 from aria.orchestrator.workflows.core import engine
 from aria.orchestrator.workflows.executor import process
-from aria.orchestrator import workflow, operation
+from aria.orchestrator import workflow, operation, workflow_runner
 
 import tests
 from tests import mock
@@ -57,7 +57,10 @@ def test_decorate_extension(context, executor):
         graph.add_tasks(task)
         return graph
     graph = mock_workflow(ctx=context)  # pylint: disable=no-value-for-parameter
-    eng = engine.Engine(executor=executor, workflow_context=context, tasks_graph=graph)
+    workflow_runner.construct_execution_tasks(context.execution, graph, executor.__class__)
+    context.execution = context.execution
+    execution_graph = workflow_runner.get_execution_graph(context.execution)
+    eng = engine.Engine(executor, context, execution_graph)
     eng.execute()
     out = get_node(context).attributes.get('out').value
     assert out['wrapper_arguments'] == arguments

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4ad3600b/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py b/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
index 7dbcc5a..57bf7bd 100644
--- a/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
+++ b/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
@@ -20,7 +20,7 @@ import pytest
 from aria.orchestrator.workflows import api
 from aria.orchestrator.workflows.core import engine
 from aria.orchestrator.workflows.executor import process
-from aria.orchestrator import workflow, operation
+from aria.orchestrator import workflow, operation, workflow_runner
 from aria.orchestrator.workflows import exceptions
 
 import tests
@@ -107,7 +107,10 @@ def _run_workflow(context, executor, op_func, arguments=None):
         graph.add_tasks(task)
         return graph
     graph = mock_workflow(ctx=context)  # pylint: disable=no-value-for-parameter
-    eng = engine.Engine(executor=executor, workflow_context=context, tasks_graph=graph)
+    workflow_runner.construct_execution_tasks(context.execution, graph, executor.__class__)
+    context.execution = context.execution
+    execution_graph = workflow_runner.get_execution_graph(context.execution)
+    eng = engine.Engine(executor, context, execution_graph)
     eng.execute()
     out = context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME).attributes.get('out')
     return out.value if out else None