You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ariatosca.apache.org by em...@apache.org on 2017/07/03 19:56:25 UTC

[08/31] incubator-ariatosca git commit: ARIA-278 remove core tasks

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/1fee85c4/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py b/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py
index 5dd2855..f5fb17a 100644
--- a/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py
+++ b/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py
@@ -13,12 +13,13 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from networkx import topological_sort, DiGraph
+from networkx import topological_sort
 
+from aria.modeling import models
 from aria.orchestrator import context
-from aria.orchestrator.workflows import api, core
+from aria.orchestrator.workflows import api
+from aria.orchestrator.workflows.core import compile
 from aria.orchestrator.workflows.executor import base
-
 from tests import mock
 from tests import storage
 
@@ -26,8 +27,8 @@ from tests import storage
 def test_task_graph_into_execution_graph(tmpdir):
     interface_name = 'Standard'
     operation_name = 'create'
-    task_context = mock.context.simple(str(tmpdir))
-    node = task_context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
+    workflow_context = mock.context.simple(str(tmpdir))
+    node = workflow_context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
     interface = mock.models.create_interface(
         node.service,
         interface_name,
@@ -35,12 +36,12 @@ def test_task_graph_into_execution_graph(tmpdir):
         operation_kwargs=dict(function='test')
     )
     node.interfaces[interface.name] = interface
-    task_context.model.node.update(node)
+    workflow_context.model.node.update(node)
 
     def sub_workflow(name, **_):
         return api.task_graph.TaskGraph(name)
 
-    with context.workflow.current.push(task_context):
+    with context.workflow.current.push(workflow_context):
         test_task_graph = api.task.WorkflowTask(sub_workflow, name='test_task_graph')
         simple_before_task = api.task.OperationTask(
             node,
@@ -64,12 +65,9 @@ def test_task_graph_into_execution_graph(tmpdir):
     test_task_graph.add_dependency(inner_task_graph, simple_before_task)
     test_task_graph.add_dependency(simple_after_task, inner_task_graph)
 
-    # Direct check
-    execution_graph = DiGraph()
-    core.translation.build_execution_graph(task_graph=test_task_graph,
-                                           execution_graph=execution_graph,
-                                           default_executor=base.StubTaskExecutor())
-    execution_tasks = topological_sort(execution_graph)
+    compile.create_execution_tasks(workflow_context, test_task_graph, base.StubTaskExecutor)
+
+    execution_tasks = topological_sort(workflow_context._graph)
 
     assert len(execution_tasks) == 7
 
@@ -83,30 +81,23 @@ def test_task_graph_into_execution_graph(tmpdir):
         '{0}-End'.format(test_task_graph.id)
     ]
 
-    assert expected_tasks_names == execution_tasks
-
-    assert isinstance(_get_task_by_name(execution_tasks[0], execution_graph),
-                      core.task.StartWorkflowTask)
-
-    _assert_execution_is_api_task(_get_task_by_name(execution_tasks[1], execution_graph),
-                                  simple_before_task)
-    assert isinstance(_get_task_by_name(execution_tasks[2], execution_graph),
-                      core.task.StartSubWorkflowTask)
+    assert expected_tasks_names == [t._api_id for t in execution_tasks]
+    assert all(isinstance(task, models.Task) for task in execution_tasks)
+    execution_tasks = iter(execution_tasks)
 
-    _assert_execution_is_api_task(_get_task_by_name(execution_tasks[3], execution_graph),
-                                  inner_task)
-    assert isinstance(_get_task_by_name(execution_tasks[4], execution_graph),
-                      core.task.EndSubWorkflowTask)
+    assert next(execution_tasks)._stub_type == models.Task.START_WORKFLOW
+    _assert_execution_is_api_task(next(execution_tasks), simple_before_task)
+    assert next(execution_tasks)._stub_type == models.Task.START_SUBWROFKLOW
+    _assert_execution_is_api_task(next(execution_tasks), inner_task)
+    assert next(execution_tasks)._stub_type == models.Task.END_SUBWORKFLOW
+    _assert_execution_is_api_task(next(execution_tasks), simple_after_task)
+    assert next(execution_tasks)._stub_type == models.Task.END_WORKFLOW
 
-    _assert_execution_is_api_task(_get_task_by_name(execution_tasks[5], execution_graph),
-                                  simple_after_task)
-    assert isinstance(_get_task_by_name(execution_tasks[6], execution_graph),
-                      core.task.EndWorkflowTask)
-    storage.release_sqlite_storage(task_context.model)
+    storage.release_sqlite_storage(workflow_context.model)
 
 
 def _assert_execution_is_api_task(execution_task, api_task):
-    assert execution_task.id == api_task.id
+    assert execution_task._api_id == api_task.id
     assert execution_task.name == api_task.name
     assert execution_task.function == api_task.function
     assert execution_task.actor == api_task.actor

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/1fee85c4/tests/orchestrator/workflows/executor/__init__.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/__init__.py b/tests/orchestrator/workflows/executor/__init__.py
index ac6d325..83584a6 100644
--- a/tests/orchestrator/workflows/executor/__init__.py
+++ b/tests/orchestrator/workflows/executor/__init__.py
@@ -12,69 +12,80 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-import uuid
 import logging
-from collections import namedtuple
+import uuid
 from contextlib import contextmanager
 
 import aria
 from aria.modeling import models
 
 
+class MockContext(object):
+
+    def __init__(self, storage, task_kwargs=None):
+        self.logger = logging.getLogger('mock_logger')
+        self._task_kwargs = task_kwargs or {}
+        self._storage = storage
+        self.task = MockTask(storage, **task_kwargs)
+        self.states = []
+        self.exception = None
+
+    @property
+    def serialization_dict(self):
+        return {
+            'context_cls': self.__class__,
+            'context': {
+                'storage_kwargs': self._storage.serialization_dict,
+                'task_kwargs': self._task_kwargs
+            }
+        }
+
+    def __getattr__(self, item):
+        return None
+
+    def close(self):
+        pass
+
+    @classmethod
+    def instantiate_from_dict(cls, storage_kwargs=None, task_kwargs=None):
+        return cls(storage=aria.application_model_storage(**(storage_kwargs or {})),
+                   task_kwargs=(task_kwargs or {}))
+
+    @property
+    @contextmanager
+    def persist_changes(self):
+        yield
+
+
+class MockActor(object):
+    def __init__(self):
+        self.name = 'actor_name'
+
+
 class MockTask(object):
 
     INFINITE_RETRIES = models.Task.INFINITE_RETRIES
 
-    def __init__(self, function, arguments=None, plugin=None, storage=None):
+    def __init__(self, model, function, arguments=None, plugin_fk=None):
         self.function = self.name = function
-        self.plugin_fk = plugin.id if plugin else None
-        self.plugin = plugin or None
+        self.plugin_fk = plugin_fk
         self.arguments = arguments or {}
         self.states = []
         self.exception = None
         self.id = str(uuid.uuid4())
         self.logger = logging.getLogger()
-        self.context = MockContext(storage)
         self.attempts_count = 1
         self.max_attempts = 1
         self.ignore_failure = False
         self.interface_name = 'interface_name'
         self.operation_name = 'operation_name'
-        self.actor = namedtuple('actor', 'name')(name='actor_name')
-        self.model_task = None
+        self.actor = MockActor()
+        self.node = self.actor
+        self.model = model
 
         for state in models.Task.STATES:
             setattr(self, state.upper(), state)
 
-    @contextmanager
-    def _update(self):
-        yield self
-
-
-class MockContext(object):
-
-    def __init__(self, storage=None):
-        self.logger = logging.getLogger('mock_logger')
-        self.task = type('SubprocessMockTask', (object, ), {'plugin': None})
-        self.model = storage
-
     @property
-    def serialization_dict(self):
-        if self.model:
-            return {'context': self.model.serialization_dict, 'context_cls': self.__class__}
-        else:
-            return {'context_cls': self.__class__, 'context': {}}
-
-    def __getattr__(self, item):
-        return None
-
-    @classmethod
-    def instantiate_from_dict(cls, **kwargs):
-        if kwargs:
-            return cls(storage=aria.application_model_storage(**kwargs))
-        else:
-            return cls()
-
-    @staticmethod
-    def close():
-        pass
+    def plugin(self):
+        return self.model.plugin.get(self.plugin_fk) if self.plugin_fk else None

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/1fee85c4/tests/orchestrator/workflows/executor/test_executor.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_executor.py b/tests/orchestrator/workflows/executor/test_executor.py
index 3079c60..32a68e0 100644
--- a/tests/orchestrator/workflows/executor/test_executor.py
+++ b/tests/orchestrator/workflows/executor/test_executor.py
@@ -35,7 +35,7 @@ from aria.orchestrator.workflows.executor import (
 )
 
 import tests
-from . import MockTask
+from . import MockContext
 
 
 def _get_function(func):
@@ -44,11 +44,17 @@ def _get_function(func):
 
 def execute_and_assert(executor, storage=None):
     expected_value = 'value'
-    successful_task = MockTask(_get_function(mock_successful_task), storage=storage)
-    failing_task = MockTask(_get_function(mock_failing_task), storage=storage)
-    task_with_inputs = MockTask(_get_function(mock_task_with_input),
-                                arguments={'input': models.Argument.wrap('input', 'value')},
-                                storage=storage)
+    successful_task = MockContext(
+        storage, task_kwargs=dict(function=_get_function(mock_successful_task))
+    )
+    failing_task = MockContext(
+        storage, task_kwargs=dict(function=_get_function(mock_failing_task))
+    )
+    task_with_inputs = MockContext(
+        storage,
+        task_kwargs=dict(function=_get_function(mock_task_with_input),
+                         arguments={'input': models.Argument.wrap('input', 'value')})
+    )
 
     for task in [successful_task, failing_task, task_with_inputs]:
         executor.execute(task)
@@ -95,10 +101,10 @@ class MockException(Exception):
 
 @pytest.fixture
 def storage(tmpdir):
-    return aria.application_model_storage(
-        aria.storage.sql_mapi.SQLAlchemyModelAPI,
-        initiator_kwargs=dict(base_dir=str(tmpdir))
-    )
+    _storage = aria.application_model_storage(aria.storage.sql_mapi.SQLAlchemyModelAPI,
+                                              initiator_kwargs=dict(base_dir=str(tmpdir)))
+    yield _storage
+    tests.storage.release_sqlite_storage(_storage)
 
 
 @pytest.fixture(params=[

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/1fee85c4/tests/orchestrator/workflows/executor/test_process_executor.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_process_executor.py b/tests/orchestrator/workflows/executor/test_process_executor.py
index 058190e..755b9be 100644
--- a/tests/orchestrator/workflows/executor/test_process_executor.py
+++ b/tests/orchestrator/workflows/executor/test_process_executor.py
@@ -28,15 +28,17 @@ import tests.resources
 from tests.fixtures import (  # pylint: disable=unused-import
     plugins_dir,
     plugin_manager,
-    fs_model as model
 )
-from . import MockTask
+from . import MockContext
 
 
 class TestProcessExecutor(object):
 
-    def test_plugin_execution(self, executor, mock_plugin, storage):
-        task = MockTask('mock_plugin1.operation', plugin=mock_plugin, storage=storage)
+    def test_plugin_execution(self, executor, mock_plugin, model):
+        ctx = MockContext(
+            model,
+            task_kwargs=dict(function='mock_plugin1.operation', plugin_fk=mock_plugin.id)
+        )
 
         queue = Queue.Queue()
 
@@ -46,7 +48,7 @@ class TestProcessExecutor(object):
         events.on_success_task_signal.connect(handler)
         events.on_failure_task_signal.connect(handler)
         try:
-            executor.execute(task)
+            executor.execute(ctx)
             error = queue.get(timeout=60)
             # tests/resources/plugins/mock-plugin1 is the plugin installed
             # during this tests setup. The module mock_plugin1 contains a single
@@ -63,10 +65,10 @@ class TestProcessExecutor(object):
             events.on_success_task_signal.disconnect(handler)
             events.on_failure_task_signal.disconnect(handler)
 
-    def test_closed(self, executor):
+    def test_closed(self, executor, model):
         executor.close()
         with pytest.raises(RuntimeError) as exc_info:
-            executor.execute(task=MockTask(function='some.function'))
+            executor.execute(MockContext(model, task_kwargs=dict(function='some.function')))
         assert 'closed' in exc_info.value.message
 
 
@@ -85,8 +87,8 @@ def mock_plugin(plugin_manager, tmpdir):
 
 
 @pytest.fixture
-def storage(tmpdir):
-    return aria.application_model_storage(
-        aria.storage.sql_mapi.SQLAlchemyModelAPI,
-        initiator_kwargs=dict(base_dir=str(tmpdir))
-    )
+def model(tmpdir):
+    _storage = aria.application_model_storage(aria.storage.sql_mapi.SQLAlchemyModelAPI,
+                                              initiator_kwargs=dict(base_dir=str(tmpdir)))
+    yield _storage
+    tests.storage.release_sqlite_storage(_storage)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/1fee85c4/tests/orchestrator/workflows/executor/test_process_executor_extension.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_process_executor_extension.py b/tests/orchestrator/workflows/executor/test_process_executor_extension.py
index 5f0b75f..ba98c4f 100644
--- a/tests/orchestrator/workflows/executor/test_process_executor_extension.py
+++ b/tests/orchestrator/workflows/executor/test_process_executor_extension.py
@@ -17,7 +17,7 @@ import pytest
 
 from aria import extension
 from aria.orchestrator.workflows import api
-from aria.orchestrator.workflows.core import engine
+from aria.orchestrator.workflows.core import engine, compile
 from aria.orchestrator.workflows.executor import process
 from aria.orchestrator import workflow, operation
 
@@ -57,8 +57,9 @@ def test_decorate_extension(context, executor):
         graph.add_tasks(task)
         return graph
     graph = mock_workflow(ctx=context)  # pylint: disable=no-value-for-parameter
-    eng = engine.Engine(executor=executor, workflow_context=context, tasks_graph=graph)
-    eng.execute()
+    compile.create_execution_tasks(context, graph, executor.__class__)
+    eng = engine.Engine({executor.__class__: executor})
+    eng.execute(context)
     out = get_node(context).attributes.get('out').value
     assert out['wrapper_arguments'] == arguments
     assert out['function_arguments'] == arguments

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/1fee85c4/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py b/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
index 7dbcc5a..2f1c325 100644
--- a/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
+++ b/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
@@ -18,7 +18,7 @@ import copy
 import pytest
 
 from aria.orchestrator.workflows import api
-from aria.orchestrator.workflows.core import engine
+from aria.orchestrator.workflows.core import engine, compile
 from aria.orchestrator.workflows.executor import process
 from aria.orchestrator import workflow, operation
 from aria.orchestrator.workflows import exceptions
@@ -107,8 +107,9 @@ def _run_workflow(context, executor, op_func, arguments=None):
         graph.add_tasks(task)
         return graph
     graph = mock_workflow(ctx=context)  # pylint: disable=no-value-for-parameter
-    eng = engine.Engine(executor=executor, workflow_context=context, tasks_graph=graph)
-    eng.execute()
+    compile.create_execution_tasks(context, graph, executor.__class__)
+    eng = engine.Engine({executor.__class__: executor})
+    eng.execute(context)
     out = context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME).attributes.get('out')
     return out.value if out else None