You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ariatosca.apache.org by em...@apache.org on 2017/07/03 19:56:25 UTC
[08/31] incubator-ariatosca git commit: ARIA-278 remove core tasks
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/1fee85c4/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py b/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py
index 5dd2855..f5fb17a 100644
--- a/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py
+++ b/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py
@@ -13,12 +13,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from networkx import topological_sort, DiGraph
+from networkx import topological_sort
+from aria.modeling import models
from aria.orchestrator import context
-from aria.orchestrator.workflows import api, core
+from aria.orchestrator.workflows import api
+from aria.orchestrator.workflows.core import compile
from aria.orchestrator.workflows.executor import base
-
from tests import mock
from tests import storage
@@ -26,8 +27,8 @@ from tests import storage
def test_task_graph_into_execution_graph(tmpdir):
interface_name = 'Standard'
operation_name = 'create'
- task_context = mock.context.simple(str(tmpdir))
- node = task_context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
+ workflow_context = mock.context.simple(str(tmpdir))
+ node = workflow_context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
interface = mock.models.create_interface(
node.service,
interface_name,
@@ -35,12 +36,12 @@ def test_task_graph_into_execution_graph(tmpdir):
operation_kwargs=dict(function='test')
)
node.interfaces[interface.name] = interface
- task_context.model.node.update(node)
+ workflow_context.model.node.update(node)
def sub_workflow(name, **_):
return api.task_graph.TaskGraph(name)
- with context.workflow.current.push(task_context):
+ with context.workflow.current.push(workflow_context):
test_task_graph = api.task.WorkflowTask(sub_workflow, name='test_task_graph')
simple_before_task = api.task.OperationTask(
node,
@@ -64,12 +65,9 @@ def test_task_graph_into_execution_graph(tmpdir):
test_task_graph.add_dependency(inner_task_graph, simple_before_task)
test_task_graph.add_dependency(simple_after_task, inner_task_graph)
- # Direct check
- execution_graph = DiGraph()
- core.translation.build_execution_graph(task_graph=test_task_graph,
- execution_graph=execution_graph,
- default_executor=base.StubTaskExecutor())
- execution_tasks = topological_sort(execution_graph)
+ compile.create_execution_tasks(workflow_context, test_task_graph, base.StubTaskExecutor)
+
+ execution_tasks = topological_sort(workflow_context._graph)
assert len(execution_tasks) == 7
@@ -83,30 +81,23 @@ def test_task_graph_into_execution_graph(tmpdir):
'{0}-End'.format(test_task_graph.id)
]
- assert expected_tasks_names == execution_tasks
-
- assert isinstance(_get_task_by_name(execution_tasks[0], execution_graph),
- core.task.StartWorkflowTask)
-
- _assert_execution_is_api_task(_get_task_by_name(execution_tasks[1], execution_graph),
- simple_before_task)
- assert isinstance(_get_task_by_name(execution_tasks[2], execution_graph),
- core.task.StartSubWorkflowTask)
+ assert expected_tasks_names == [t._api_id for t in execution_tasks]
+ assert all(isinstance(task, models.Task) for task in execution_tasks)
+ execution_tasks = iter(execution_tasks)
- _assert_execution_is_api_task(_get_task_by_name(execution_tasks[3], execution_graph),
- inner_task)
- assert isinstance(_get_task_by_name(execution_tasks[4], execution_graph),
- core.task.EndSubWorkflowTask)
+ assert next(execution_tasks)._stub_type == models.Task.START_WORKFLOW
+ _assert_execution_is_api_task(next(execution_tasks), simple_before_task)
+ assert next(execution_tasks)._stub_type == models.Task.START_SUBWROFKLOW
+ _assert_execution_is_api_task(next(execution_tasks), inner_task)
+ assert next(execution_tasks)._stub_type == models.Task.END_SUBWORKFLOW
+ _assert_execution_is_api_task(next(execution_tasks), simple_after_task)
+ assert next(execution_tasks)._stub_type == models.Task.END_WORKFLOW
- _assert_execution_is_api_task(_get_task_by_name(execution_tasks[5], execution_graph),
- simple_after_task)
- assert isinstance(_get_task_by_name(execution_tasks[6], execution_graph),
- core.task.EndWorkflowTask)
- storage.release_sqlite_storage(task_context.model)
+ storage.release_sqlite_storage(workflow_context.model)
def _assert_execution_is_api_task(execution_task, api_task):
- assert execution_task.id == api_task.id
+ assert execution_task._api_id == api_task.id
assert execution_task.name == api_task.name
assert execution_task.function == api_task.function
assert execution_task.actor == api_task.actor
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/1fee85c4/tests/orchestrator/workflows/executor/__init__.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/__init__.py b/tests/orchestrator/workflows/executor/__init__.py
index ac6d325..83584a6 100644
--- a/tests/orchestrator/workflows/executor/__init__.py
+++ b/tests/orchestrator/workflows/executor/__init__.py
@@ -12,69 +12,80 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-import uuid
import logging
-from collections import namedtuple
+import uuid
from contextlib import contextmanager
import aria
from aria.modeling import models
+class MockContext(object):
+
+ def __init__(self, storage, task_kwargs=None):
+ self.logger = logging.getLogger('mock_logger')
+ self._task_kwargs = task_kwargs or {}
+ self._storage = storage
+ self.task = MockTask(storage, **task_kwargs)
+ self.states = []
+ self.exception = None
+
+ @property
+ def serialization_dict(self):
+ return {
+ 'context_cls': self.__class__,
+ 'context': {
+ 'storage_kwargs': self._storage.serialization_dict,
+ 'task_kwargs': self._task_kwargs
+ }
+ }
+
+ def __getattr__(self, item):
+ return None
+
+ def close(self):
+ pass
+
+ @classmethod
+ def instantiate_from_dict(cls, storage_kwargs=None, task_kwargs=None):
+ return cls(storage=aria.application_model_storage(**(storage_kwargs or {})),
+ task_kwargs=(task_kwargs or {}))
+
+ @property
+ @contextmanager
+ def persist_changes(self):
+ yield
+
+
+class MockActor(object):
+ def __init__(self):
+ self.name = 'actor_name'
+
+
class MockTask(object):
INFINITE_RETRIES = models.Task.INFINITE_RETRIES
- def __init__(self, function, arguments=None, plugin=None, storage=None):
+ def __init__(self, model, function, arguments=None, plugin_fk=None):
self.function = self.name = function
- self.plugin_fk = plugin.id if plugin else None
- self.plugin = plugin or None
+ self.plugin_fk = plugin_fk
self.arguments = arguments or {}
self.states = []
self.exception = None
self.id = str(uuid.uuid4())
self.logger = logging.getLogger()
- self.context = MockContext(storage)
self.attempts_count = 1
self.max_attempts = 1
self.ignore_failure = False
self.interface_name = 'interface_name'
self.operation_name = 'operation_name'
- self.actor = namedtuple('actor', 'name')(name='actor_name')
- self.model_task = None
+ self.actor = MockActor()
+ self.node = self.actor
+ self.model = model
for state in models.Task.STATES:
setattr(self, state.upper(), state)
- @contextmanager
- def _update(self):
- yield self
-
-
-class MockContext(object):
-
- def __init__(self, storage=None):
- self.logger = logging.getLogger('mock_logger')
- self.task = type('SubprocessMockTask', (object, ), {'plugin': None})
- self.model = storage
-
@property
- def serialization_dict(self):
- if self.model:
- return {'context': self.model.serialization_dict, 'context_cls': self.__class__}
- else:
- return {'context_cls': self.__class__, 'context': {}}
-
- def __getattr__(self, item):
- return None
-
- @classmethod
- def instantiate_from_dict(cls, **kwargs):
- if kwargs:
- return cls(storage=aria.application_model_storage(**kwargs))
- else:
- return cls()
-
- @staticmethod
- def close():
- pass
+ def plugin(self):
+ return self.model.plugin.get(self.plugin_fk) if self.plugin_fk else None
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/1fee85c4/tests/orchestrator/workflows/executor/test_executor.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_executor.py b/tests/orchestrator/workflows/executor/test_executor.py
index 3079c60..32a68e0 100644
--- a/tests/orchestrator/workflows/executor/test_executor.py
+++ b/tests/orchestrator/workflows/executor/test_executor.py
@@ -35,7 +35,7 @@ from aria.orchestrator.workflows.executor import (
)
import tests
-from . import MockTask
+from . import MockContext
def _get_function(func):
@@ -44,11 +44,17 @@ def _get_function(func):
def execute_and_assert(executor, storage=None):
expected_value = 'value'
- successful_task = MockTask(_get_function(mock_successful_task), storage=storage)
- failing_task = MockTask(_get_function(mock_failing_task), storage=storage)
- task_with_inputs = MockTask(_get_function(mock_task_with_input),
- arguments={'input': models.Argument.wrap('input', 'value')},
- storage=storage)
+ successful_task = MockContext(
+ storage, task_kwargs=dict(function=_get_function(mock_successful_task))
+ )
+ failing_task = MockContext(
+ storage, task_kwargs=dict(function=_get_function(mock_failing_task))
+ )
+ task_with_inputs = MockContext(
+ storage,
+ task_kwargs=dict(function=_get_function(mock_task_with_input),
+ arguments={'input': models.Argument.wrap('input', 'value')})
+ )
for task in [successful_task, failing_task, task_with_inputs]:
executor.execute(task)
@@ -95,10 +101,10 @@ class MockException(Exception):
@pytest.fixture
def storage(tmpdir):
- return aria.application_model_storage(
- aria.storage.sql_mapi.SQLAlchemyModelAPI,
- initiator_kwargs=dict(base_dir=str(tmpdir))
- )
+ _storage = aria.application_model_storage(aria.storage.sql_mapi.SQLAlchemyModelAPI,
+ initiator_kwargs=dict(base_dir=str(tmpdir)))
+ yield _storage
+ tests.storage.release_sqlite_storage(_storage)
@pytest.fixture(params=[
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/1fee85c4/tests/orchestrator/workflows/executor/test_process_executor.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_process_executor.py b/tests/orchestrator/workflows/executor/test_process_executor.py
index 058190e..755b9be 100644
--- a/tests/orchestrator/workflows/executor/test_process_executor.py
+++ b/tests/orchestrator/workflows/executor/test_process_executor.py
@@ -28,15 +28,17 @@ import tests.resources
from tests.fixtures import ( # pylint: disable=unused-import
plugins_dir,
plugin_manager,
- fs_model as model
)
-from . import MockTask
+from . import MockContext
class TestProcessExecutor(object):
- def test_plugin_execution(self, executor, mock_plugin, storage):
- task = MockTask('mock_plugin1.operation', plugin=mock_plugin, storage=storage)
+ def test_plugin_execution(self, executor, mock_plugin, model):
+ ctx = MockContext(
+ model,
+ task_kwargs=dict(function='mock_plugin1.operation', plugin_fk=mock_plugin.id)
+ )
queue = Queue.Queue()
@@ -46,7 +48,7 @@ class TestProcessExecutor(object):
events.on_success_task_signal.connect(handler)
events.on_failure_task_signal.connect(handler)
try:
- executor.execute(task)
+ executor.execute(ctx)
error = queue.get(timeout=60)
# tests/resources/plugins/mock-plugin1 is the plugin installed
# during this tests setup. The module mock_plugin1 contains a single
@@ -63,10 +65,10 @@ class TestProcessExecutor(object):
events.on_success_task_signal.disconnect(handler)
events.on_failure_task_signal.disconnect(handler)
- def test_closed(self, executor):
+ def test_closed(self, executor, model):
executor.close()
with pytest.raises(RuntimeError) as exc_info:
- executor.execute(task=MockTask(function='some.function'))
+ executor.execute(MockContext(model, task_kwargs=dict(function='some.function')))
assert 'closed' in exc_info.value.message
@@ -85,8 +87,8 @@ def mock_plugin(plugin_manager, tmpdir):
@pytest.fixture
-def storage(tmpdir):
- return aria.application_model_storage(
- aria.storage.sql_mapi.SQLAlchemyModelAPI,
- initiator_kwargs=dict(base_dir=str(tmpdir))
- )
+def model(tmpdir):
+ _storage = aria.application_model_storage(aria.storage.sql_mapi.SQLAlchemyModelAPI,
+ initiator_kwargs=dict(base_dir=str(tmpdir)))
+ yield _storage
+ tests.storage.release_sqlite_storage(_storage)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/1fee85c4/tests/orchestrator/workflows/executor/test_process_executor_extension.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_process_executor_extension.py b/tests/orchestrator/workflows/executor/test_process_executor_extension.py
index 5f0b75f..ba98c4f 100644
--- a/tests/orchestrator/workflows/executor/test_process_executor_extension.py
+++ b/tests/orchestrator/workflows/executor/test_process_executor_extension.py
@@ -17,7 +17,7 @@ import pytest
from aria import extension
from aria.orchestrator.workflows import api
-from aria.orchestrator.workflows.core import engine
+from aria.orchestrator.workflows.core import engine, compile
from aria.orchestrator.workflows.executor import process
from aria.orchestrator import workflow, operation
@@ -57,8 +57,9 @@ def test_decorate_extension(context, executor):
graph.add_tasks(task)
return graph
graph = mock_workflow(ctx=context) # pylint: disable=no-value-for-parameter
- eng = engine.Engine(executor=executor, workflow_context=context, tasks_graph=graph)
- eng.execute()
+ compile.create_execution_tasks(context, graph, executor.__class__)
+ eng = engine.Engine({executor.__class__: executor})
+ eng.execute(context)
out = get_node(context).attributes.get('out').value
assert out['wrapper_arguments'] == arguments
assert out['function_arguments'] == arguments
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/1fee85c4/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py b/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
index 7dbcc5a..2f1c325 100644
--- a/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
+++ b/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
@@ -18,7 +18,7 @@ import copy
import pytest
from aria.orchestrator.workflows import api
-from aria.orchestrator.workflows.core import engine
+from aria.orchestrator.workflows.core import engine, compile
from aria.orchestrator.workflows.executor import process
from aria.orchestrator import workflow, operation
from aria.orchestrator.workflows import exceptions
@@ -107,8 +107,9 @@ def _run_workflow(context, executor, op_func, arguments=None):
graph.add_tasks(task)
return graph
graph = mock_workflow(ctx=context) # pylint: disable=no-value-for-parameter
- eng = engine.Engine(executor=executor, workflow_context=context, tasks_graph=graph)
- eng.execute()
+ compile.create_execution_tasks(context, graph, executor.__class__)
+ eng = engine.Engine({executor.__class__: executor})
+ eng.execute(context)
out = context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME).attributes.get('out')
return out.value if out else None