You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ariatosca.apache.org by mx...@apache.org on 2017/06/14 15:05:35 UTC
incubator-ariatosca git commit: executor test fixes
Repository: incubator-ariatosca
Updated Branches:
refs/heads/ARIA-278-Remove-core-tasks 546e7af7a -> 4ad3600b2
executor test fixes
Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/4ad3600b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/4ad3600b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/4ad3600b
Branch: refs/heads/ARIA-278-Remove-core-tasks
Commit: 4ad3600b2cf7edb1190e7dc76f2d2d39e5805391
Parents: 546e7af
Author: max-orlov <ma...@gigaspaces.com>
Authored: Wed Jun 14 18:05:30 2017 +0300
Committer: max-orlov <ma...@gigaspaces.com>
Committed: Wed Jun 14 18:05:30 2017 +0300
----------------------------------------------------------------------
.../orchestrator/workflows/executor/__init__.py | 17 +++++++
.../workflows/executor/test_executor.py | 53 +++++++++++++-------
.../workflows/executor/test_process_executor.py | 42 ++++++++++++----
.../executor/test_process_executor_extension.py | 7 ++-
.../test_process_executor_tracked_changes.py | 7 ++-
5 files changed, 93 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4ad3600b/tests/orchestrator/workflows/executor/__init__.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/__init__.py b/tests/orchestrator/workflows/executor/__init__.py
index b8032b7..87910e9 100644
--- a/tests/orchestrator/workflows/executor/__init__.py
+++ b/tests/orchestrator/workflows/executor/__init__.py
@@ -15,6 +15,7 @@
import logging
import aria
+from aria.orchestrator.context.operation import NodeOperationContext
class MockContext(object):
@@ -54,3 +55,19 @@ class MockContext(object):
@staticmethod
def close():
pass
+
+
+def put_to_storage_and_get_ctx(ctx, task):
+ ctx.model.task.put(task)
+ op_ctx = NodeOperationContext(
+ model_storage=ctx.model,
+ resource_storage=ctx.resource,
+ workdir=ctx._workdir,
+ task_id=task.id,
+ actor_id=task.actor.id if task.actor else None,
+ service_id=task.execution.service.id,
+ execution_id=task.execution.id,
+ name=task.name
+ )
+ op_ctx.states = []
+ return op_ctx
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4ad3600b/tests/orchestrator/workflows/executor/test_executor.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_executor.py b/tests/orchestrator/workflows/executor/test_executor.py
index 410a982..d91b3e0 100644
--- a/tests/orchestrator/workflows/executor/test_executor.py
+++ b/tests/orchestrator/workflows/executor/test_executor.py
@@ -18,6 +18,7 @@ import pytest
import retrying
from tests import mock, storage
+from . import put_to_storage_and_get_ctx
try:
import celery as _celery
@@ -44,24 +45,31 @@ def _get_function(func):
def execute_and_assert(executor, ctx):
node = ctx.model.node.list()[0]
- execution = ctx.model.execution.list()[0]
expected_value = 'value'
- successful_ctx = models.Task(function=_get_function(mock_successful_task),
- node=node, _executor=executor, execution=execution)
- failing_ctx = models.Task(
- function=_get_function(mock_failing_task), node=node, _executor=executor, execution=execution)
- ctx_with_inputs = models.Task(
- node=node,
- function=_get_function(mock_task_with_input),
- arguments={'input': models.Argument.wrap('input', 'value')},
- _executor=executor,
- execution=execution)
-
- ctx.model.execution.update(execution)
+ successful_ctx = put_to_storage_and_get_ctx(
+ ctx,
+ models.Task(
+ function=_get_function(mock_successful_task), node=node, execution=ctx.execution
+ )
+ )
+ failing_ctx = put_to_storage_and_get_ctx(
+ ctx,
+ models.Task(
+ function=_get_function(mock_failing_task), node=node, execution=ctx.execution
+ )
+ )
+ ctx_with_inputs = put_to_storage_and_get_ctx(
+ ctx,
+ models.Task(
+ node=node,
+ function=_get_function(mock_task_with_input),
+ arguments={'input': models.Argument.wrap('input', 'value')},
+ execution=ctx.execution
+ )
+ )
for op_ctx in [successful_ctx, failing_ctx, ctx_with_inputs]:
- op_ctx.states = []
- op_ctx.execute(ctx)
+ executor.execute(op_ctx)
@retrying.retry(stop_max_delay=10000, wait_fixed=100)
def assertion():
@@ -111,9 +119,18 @@ def ctx(tmpdir):
storage.release_sqlite_storage(context.model)
-@pytest.fixture
-def thread_executor():
- return thread.ThreadExecutor
+@pytest.fixture(params=[
+ (thread.ThreadExecutor, {'pool_size': 1}),
+ (thread.ThreadExecutor, {'pool_size': 2}),
+ # subprocess needs to load a tests module so we explicitly add the root directory as if
+ # the project has been installed in editable mode
+ # (celery.CeleryExecutor, {'app': app})
+])
+def thread_executor(request):
+ executor_cls, executor_kwargs = request.param
+ result = executor_cls(**executor_kwargs)
+ yield result
+ result.close()
@pytest.fixture
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4ad3600b/tests/orchestrator/workflows/executor/test_process_executor.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_process_executor.py b/tests/orchestrator/workflows/executor/test_process_executor.py
index bca2ea3..6f9ce77 100644
--- a/tests/orchestrator/workflows/executor/test_process_executor.py
+++ b/tests/orchestrator/workflows/executor/test_process_executor.py
@@ -18,24 +18,35 @@ import Queue
import pytest
-import aria
from aria.orchestrator import events
from aria.utils.plugin import create as create_plugin
from aria.orchestrator.workflows.executor import process
+from aria.modeling import models
import tests.storage
import tests.resources
+from tests import mock, storage
from tests.fixtures import ( # pylint: disable=unused-import
plugins_dir,
plugin_manager,
fs_model as model
)
+from tests.orchestrator.workflows.executor import put_to_storage_and_get_ctx
class TestProcessExecutor(object):
- def test_plugin_execution(self, executor, mock_plugin, storage):
- task = MockTask('mock_plugin1.operation', plugin=mock_plugin, storage=storage)
+ def test_plugin_execution(self, executor, mock_plugin, ctx):
+ node = next(ctx.nodes)
+ task_ctx = put_to_storage_and_get_ctx(
+ ctx,
+ models.Task(
+ function='mock_plugin1.operation',
+ plugin_fk=mock_plugin.id,
+ node=node,
+ execution=ctx.execution
+ )
+ )
queue = Queue.Queue()
@@ -45,7 +56,7 @@ class TestProcessExecutor(object):
events.on_success_task_signal.connect(handler)
events.on_failure_task_signal.connect(handler)
try:
- executor.execute(task)
+ executor.execute(task_ctx)
error = queue.get(timeout=60)
# tests/resources/plugins/mock-plugin1 is the plugin installed
# during this tests setup. The module mock_plugin1 contains a single
@@ -62,10 +73,19 @@ class TestProcessExecutor(object):
events.on_success_task_signal.disconnect(handler)
events.on_failure_task_signal.disconnect(handler)
- def test_closed(self, executor):
+ def test_closed(self, ctx, executor):
executor.close()
+ node = next(ctx.nodes)
+ task_ctx = put_to_storage_and_get_ctx(
+ ctx,
+ models.Task(
+ function='mock_plugin1.operation',
+ node=node,
+ execution=ctx.execution
+ )
+ )
with pytest.raises(RuntimeError) as exc_info:
- executor.execute(task=MockTask(function='some.function'))
+ executor.execute(task_ctx)
assert 'closed' in exc_info.value.message
@@ -84,8 +104,8 @@ def mock_plugin(plugin_manager, tmpdir):
@pytest.fixture
-def storage(tmpdir):
- return aria.application_model_storage(
- aria.storage.sql_mapi.SQLAlchemyModelAPI,
- initiator_kwargs=dict(base_dir=str(tmpdir))
- )
+def ctx(tmpdir):
+ context = mock.context.simple(str(tmpdir))
+ ctx.states = []
+ yield context
+ storage.release_sqlite_storage(context.model)
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4ad3600b/tests/orchestrator/workflows/executor/test_process_executor_extension.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_process_executor_extension.py b/tests/orchestrator/workflows/executor/test_process_executor_extension.py
index 5f0b75f..b4c99d2 100644
--- a/tests/orchestrator/workflows/executor/test_process_executor_extension.py
+++ b/tests/orchestrator/workflows/executor/test_process_executor_extension.py
@@ -19,7 +19,7 @@ from aria import extension
from aria.orchestrator.workflows import api
from aria.orchestrator.workflows.core import engine
from aria.orchestrator.workflows.executor import process
-from aria.orchestrator import workflow, operation
+from aria.orchestrator import workflow, operation, workflow_runner
import tests
from tests import mock
@@ -57,7 +57,10 @@ def test_decorate_extension(context, executor):
graph.add_tasks(task)
return graph
graph = mock_workflow(ctx=context) # pylint: disable=no-value-for-parameter
- eng = engine.Engine(executor=executor, workflow_context=context, tasks_graph=graph)
+ workflow_runner.construct_execution_tasks(context.execution, graph, executor.__class__)
+ context.execution = context.execution
+ execution_graph = workflow_runner.get_execution_graph(context.execution)
+ eng = engine.Engine(executor, context, execution_graph)
eng.execute()
out = get_node(context).attributes.get('out').value
assert out['wrapper_arguments'] == arguments
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4ad3600b/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py b/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
index 7dbcc5a..57bf7bd 100644
--- a/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
+++ b/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
@@ -20,7 +20,7 @@ import pytest
from aria.orchestrator.workflows import api
from aria.orchestrator.workflows.core import engine
from aria.orchestrator.workflows.executor import process
-from aria.orchestrator import workflow, operation
+from aria.orchestrator import workflow, operation, workflow_runner
from aria.orchestrator.workflows import exceptions
import tests
@@ -107,7 +107,10 @@ def _run_workflow(context, executor, op_func, arguments=None):
graph.add_tasks(task)
return graph
graph = mock_workflow(ctx=context) # pylint: disable=no-value-for-parameter
- eng = engine.Engine(executor=executor, workflow_context=context, tasks_graph=graph)
+ workflow_runner.construct_execution_tasks(context.execution, graph, executor.__class__)
+ context.execution = context.execution
+ execution_graph = workflow_runner.get_execution_graph(context.execution)
+ eng = engine.Engine(executor, context, execution_graph)
eng.execute()
out = context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME).attributes.get('out')
return out.value if out else None