You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ariatosca.apache.org by mx...@apache.org on 2017/06/22 11:29:05 UTC
[3/4] incubator-ariatosca git commit: ARIA-236 Resumable workflow
executions
ARIA-236 Resumable workflow executions
Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/75112ab0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/75112ab0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/75112ab0
Branch: refs/heads/ARIA-284-Cleanup-and-optimize-the-task-execution
Commit: 75112ab052c7de7162901a7a46b5e843316cc63d
Parents: a751934
Author: max-orlov <ma...@gigaspaces.com>
Authored: Mon Jun 19 17:44:45 2017 +0300
Committer: max-orlov <ma...@gigaspaces.com>
Committed: Thu Jun 22 14:21:54 2017 +0300
----------------------------------------------------------------------
aria/cli/commands/executions.py | 57 +++++-
aria/cli/logger.py | 4 +-
aria/modeling/orchestration.py | 3 +-
aria/orchestrator/context/workflow.py | 5 +
aria/orchestrator/events.py | 1 +
aria/orchestrator/exceptions.py | 7 +
aria/orchestrator/workflow_runner.py | 43 +++--
aria/orchestrator/workflows/core/engine.py | 6 +-
.../workflows/core/events_handler.py | 7 +
tests/mock/__init__.py | 2 +-
tests/mock/models.py | 14 +-
tests/modeling/test_models.py | 5 +-
tests/orchestrator/test_workflow_runner.py | 175 +++++++++++++++++--
13 files changed, 282 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/75112ab0/aria/cli/commands/executions.py
----------------------------------------------------------------------
diff --git a/aria/cli/commands/executions.py b/aria/cli/commands/executions.py
index 6176ea2..b337e84 100644
--- a/aria/cli/commands/executions.py
+++ b/aria/cli/commands/executions.py
@@ -134,18 +134,63 @@ def start(workflow_name,
executor = DryExecutor() if dry else None # use WorkflowRunner's default executor
workflow_runner = \
- WorkflowRunner(workflow_name, service.id, inputs,
- model_storage, resource_storage, plugin_manager,
- executor, task_max_attempts, task_retry_interval)
+ WorkflowRunner(
+ model_storage, resource_storage, plugin_manager,
+ service_id=service.id, workflow_name=workflow_name, inputs=inputs, executor=executor,
+ task_max_attempts=task_max_attempts, task_retry_interval=task_retry_interval
+ )
+ logger.info('Starting {0}execution. Press Ctrl+C cancel'.format('dry ' if dry else ''))
+
+ _run_execution(workflow_runner, logger, model_storage, dry, mark_pattern)
+
- execution_thread_name = '{0}_{1}'.format(service_name, workflow_name)
+@executions.command(name='resume',
+ short_help='Resume a workflow')
+@aria.argument('execution-id')
+@aria.options.inputs(help=helptexts.EXECUTION_INPUTS)
+@aria.options.dry_execution
+@aria.options.task_max_attempts()
+@aria.options.task_retry_interval()
+@aria.options.mark_pattern()
+@aria.options.verbose()
+@aria.pass_model_storage
+@aria.pass_resource_storage
+@aria.pass_plugin_manager
+@aria.pass_logger
+def resume(execution_id,
+ dry,
+ task_max_attempts,
+ task_retry_interval,
+ mark_pattern,
+ model_storage,
+ resource_storage,
+ plugin_manager,
+ logger):
+ executor = DryExecutor() if dry else None # use WorkflowRunner's default executor
+
+ workflow_runner = \
+ WorkflowRunner(
+ model_storage, resource_storage, plugin_manager,
+ execution_id=execution_id, executor=executor,
+ task_max_attempts=task_max_attempts, task_retry_interval=task_retry_interval
+ )
+
+ logger.info('Resuming {0}execution. Press Ctrl+C cancel'.format('dry ' if dry else ''))
+ _run_execution(workflow_runner, logger, model_storage, dry, mark_pattern)
+
+
+def _run_execution(workflow_runner, logger, model_storage, dry, mark_pattern):
+ execution_thread_name = '{0}_{1}'.format(workflow_runner.service.name,
+ workflow_runner.execution.workflow_name)
execution_thread = threading.ExceptionThread(target=workflow_runner.execute,
name=execution_thread_name)
- logger.info('Starting {0}execution. Press Ctrl+C cancel'.format('dry ' if dry else ''))
execution_thread.start()
- log_iterator = cli_logger.ModelLogIterator(model_storage, workflow_runner.execution_id)
+ last_task_id = workflow_runner.execution.logs[-1].id if workflow_runner.execution.logs else 0
+ log_iterator = cli_logger.ModelLogIterator(model_storage,
+ workflow_runner.execution_id,
+ offset=last_task_id)
try:
while execution_thread.is_alive():
execution_logging.log_list(log_iterator, mark_pattern=mark_pattern)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/75112ab0/aria/cli/logger.py
----------------------------------------------------------------------
diff --git a/aria/cli/logger.py b/aria/cli/logger.py
index 5de3701..96f3fb3 100644
--- a/aria/cli/logger.py
+++ b/aria/cli/logger.py
@@ -115,8 +115,8 @@ class Logging(object):
class ModelLogIterator(object):
- def __init__(self, model_storage, execution_id, filters=None, sort=None):
- self._last_visited_id = 0
+ def __init__(self, model_storage, execution_id, filters=None, sort=None, offset=0):
+ self._last_visited_id = offset
self._model_storage = model_storage
self._execution_id = execution_id
self._additional_filters = filters or {}
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/75112ab0/aria/modeling/orchestration.py
----------------------------------------------------------------------
diff --git a/aria/modeling/orchestration.py b/aria/modeling/orchestration.py
index 17d2476..276b68e 100644
--- a/aria/modeling/orchestration.py
+++ b/aria/modeling/orchestration.py
@@ -68,7 +68,8 @@ class ExecutionBase(mixins.ModelMixin):
VALID_TRANSITIONS = {
PENDING: (STARTED, CANCELLED),
STARTED: END_STATES + (CANCELLING,),
- CANCELLING: END_STATES
+ CANCELLING: END_STATES,
+ CANCELLED: PENDING
}
@orm.validates('status')
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/75112ab0/aria/orchestrator/context/workflow.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/workflow.py b/aria/orchestrator/context/workflow.py
index aa5a786..adcd635 100644
--- a/aria/orchestrator/context/workflow.py
+++ b/aria/orchestrator/context/workflow.py
@@ -97,10 +97,15 @@ class WorkflowContext(BaseContext):
@property
def _graph(self):
+ # Constructing a graph with only not ended nodes
if self._execution_graph is None:
graph = DiGraph()
for task in self.execution.tasks:
+ if task.has_ended():
+ continue
for dependency in task.dependencies:
+ if dependency.has_ended():
+ continue
graph.add_edge(dependency, task)
self._execution_graph = graph
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/75112ab0/aria/orchestrator/events.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/events.py b/aria/orchestrator/events.py
index a1c4922..aa1b5bc 100644
--- a/aria/orchestrator/events.py
+++ b/aria/orchestrator/events.py
@@ -34,3 +34,4 @@ on_cancelling_workflow_signal = signal('on_cancelling_workflow_signal')
on_cancelled_workflow_signal = signal('on_cancelled_workflow_signal')
on_success_workflow_signal = signal('on_success_workflow_signal')
on_failure_workflow_signal = signal('on_failure_workflow_signal')
+on_resume_workflow_signal = signal('on_resume_workflow_signal')
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/75112ab0/aria/orchestrator/exceptions.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/exceptions.py b/aria/orchestrator/exceptions.py
index 8d3dcc6..71b6401 100644
--- a/aria/orchestrator/exceptions.py
+++ b/aria/orchestrator/exceptions.py
@@ -74,3 +74,10 @@ class WorkflowImplementationNotFoundError(AriaError):
Raised when attempting to import a workflow's code but the implementation is not found
"""
pass
+
+
+class InvalidWorkflowRunnerParams(AriaError):
+ """
+ Raised when invalid combination of arguments is passed to the workflow runner
+ """
+ pass
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/75112ab0/aria/orchestrator/workflow_runner.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflow_runner.py b/aria/orchestrator/workflow_runner.py
index 9e6b3ad..3ccb1ee 100644
--- a/aria/orchestrator/workflow_runner.py
+++ b/aria/orchestrator/workflow_runner.py
@@ -37,9 +37,9 @@ DEFAULT_TASK_RETRY_INTERVAL = 30
class WorkflowRunner(object):
- def __init__(self, workflow_name, service_id, inputs,
- model_storage, resource_storage, plugin_manager,
- executor=None, task_max_attempts=DEFAULT_TASK_MAX_ATTEMPTS,
+ def __init__(self, model_storage, resource_storage, plugin_manager,
+ execution_id=None, service_id=None, workflow_name=None, inputs=None, executor=None,
+ task_max_attempts=DEFAULT_TASK_MAX_ATTEMPTS,
task_retry_interval=DEFAULT_TASK_RETRY_INTERVAL):
"""
Manages a single workflow execution on a given service.
@@ -55,28 +55,36 @@ class WorkflowRunner(object):
:param task_retry_interval: Retry interval in between retry attempts of a failing task
"""
+ if not (execution_id or (workflow_name and service_id)):
+ exceptions.InvalidWorkflowRunnerParams(
+ "Either provide execution id in order to resume a workflow or workflow name "
+ "and service id with inputs")
+
+ self._is_resume = execution_id is not None
+
self._model_storage = model_storage
self._resource_storage = resource_storage
- self._workflow_name = workflow_name
# the IDs are stored rather than the models themselves, so this module could be used
# by several threads without raising errors on model objects shared between threads
- self._service_id = service_id
-
- self._validate_workflow_exists_for_service()
- workflow_fn = self._get_workflow_fn()
-
- execution = self._create_execution_model(inputs)
- self._execution_id = execution.id
+ if self._is_resume:
+ self._execution_id = execution_id
+ self._service_id = self.execution.service.id
+ self._workflow_name = model_storage.execution.get(self._execution_id).workflow_name
+ else:
+ self._service_id = service_id
+ self._workflow_name = workflow_name
+ self._validate_workflow_exists_for_service()
+ self._execution_id = self._create_execution_model(inputs).id
self._workflow_context = WorkflowContext(
name=self.__class__.__name__,
model_storage=self._model_storage,
resource_storage=resource_storage,
service_id=service_id,
- execution_id=execution.id,
- workflow_name=workflow_name,
+ execution_id=self._execution_id,
+ workflow_name=self._workflow_name,
task_max_attempts=task_max_attempts,
task_retry_interval=task_retry_interval)
@@ -86,9 +94,10 @@ class WorkflowRunner(object):
# transforming the execution inputs to dict, to pass them to the workflow function
execution_inputs_dict = dict(inp.unwrapped for inp in self.execution.inputs.values())
- self._tasks_graph = workflow_fn(ctx=self._workflow_context, **execution_inputs_dict)
- compile.create_execution_tasks(
- self._workflow_context, self._tasks_graph, executor.__class__)
+ if not self._is_resume:
+ workflow_fn = self._get_workflow_fn()
+ tasks_graph = workflow_fn(ctx=self._workflow_context, **execution_inputs_dict)
+ compile.create_execution_tasks(self._workflow_context, tasks_graph, executor.__class__)
self._engine = engine.Engine(executors={executor.__class__: executor})
@@ -105,7 +114,7 @@ class WorkflowRunner(object):
return self._model_storage.service.get(self._service_id)
def execute(self):
- self._engine.execute(ctx=self._workflow_context)
+ self._engine.execute(ctx=self._workflow_context, resuming=self._is_resume)
def cancel(self):
self._engine.cancel_execution(ctx=self._workflow_context)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/75112ab0/aria/orchestrator/workflows/core/engine.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/engine.py b/aria/orchestrator/workflows/core/engine.py
index 9f0ddd7..d5a6e70 100644
--- a/aria/orchestrator/workflows/core/engine.py
+++ b/aria/orchestrator/workflows/core/engine.py
@@ -41,11 +41,15 @@ class Engine(logger.LoggerMixin):
self._executors = executors.copy()
self._executors.setdefault(StubTaskExecutor, StubTaskExecutor())
- def execute(self, ctx):
+ def execute(self, ctx, resuming=False):
"""
execute the workflow
"""
executing_tasks = []
+
+ if resuming:
+ events.on_resume_workflow_signal.send(ctx)
+
try:
events.start_workflow_signal.send(ctx)
while True:
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/75112ab0/aria/orchestrator/workflows/core/events_handler.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/events_handler.py b/aria/orchestrator/workflows/core/events_handler.py
index 2d71d2a..7380db8 100644
--- a/aria/orchestrator/workflows/core/events_handler.py
+++ b/aria/orchestrator/workflows/core/events_handler.py
@@ -121,6 +121,13 @@ def _workflow_cancelled(workflow_context, *args, **kwargs):
execution.ended_at = datetime.utcnow()
+@events.on_resume_workflow_signal.connect
+def _workflow_resume(workflow_context, *args, **kwargs):
+ with workflow_context.persist_changes:
+ execution = workflow_context.execution
+ execution.status = execution.PENDING
+
+
@events.on_cancelling_workflow_signal.connect
def _workflow_cancelling(workflow_context, *args, **kwargs):
with workflow_context.persist_changes:
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/75112ab0/tests/mock/__init__.py
----------------------------------------------------------------------
diff --git a/tests/mock/__init__.py b/tests/mock/__init__.py
index 9004b4c..9183b77 100644
--- a/tests/mock/__init__.py
+++ b/tests/mock/__init__.py
@@ -13,4 +13,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from . import models, context, topology, operations
+from . import models, context, topology, operations, workflow
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/75112ab0/tests/mock/models.py
----------------------------------------------------------------------
diff --git a/tests/mock/models.py b/tests/mock/models.py
index 7f6bbea..23a14bd 100644
--- a/tests/mock/models.py
+++ b/tests/mock/models.py
@@ -225,20 +225,24 @@ def create_interface_template(service_template, interface_name, operation_name,
)
-def create_interface(service, interface_name, operation_name, operation_kwargs=None,
- interface_kwargs=None):
- the_type = service.service_template.interface_types.get_descendant('test_interface_type')
-
+def create_operation(operation_name, operation_kwargs=None):
if operation_kwargs and operation_kwargs.get('arguments'):
operation_kwargs['arguments'] = dict(
(argument_name, models.Argument.wrap(argument_name, argument_value))
for argument_name, argument_value in operation_kwargs['arguments'].iteritems()
if argument_value is not None)
- operation = models.Operation(
+ return models.Operation(
name=operation_name,
**(operation_kwargs or {})
)
+
+
+def create_interface(service, interface_name, operation_name, operation_kwargs=None,
+ interface_kwargs=None):
+ the_type = service.service_template.interface_types.get_descendant('test_interface_type')
+ operation = create_operation(operation_name, operation_kwargs)
+
return models.Interface(
type=the_type,
operations=_dictify(operation),
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/75112ab0/tests/modeling/test_models.py
----------------------------------------------------------------------
diff --git a/tests/modeling/test_models.py b/tests/modeling/test_models.py
index 464f432..bbc7352 100644
--- a/tests/modeling/test_models.py
+++ b/tests/modeling/test_models.py
@@ -314,7 +314,7 @@ class TestExecution(object):
Execution.CANCELLING],
Execution.FAILED: [Execution.FAILED],
Execution.SUCCEEDED: [Execution.SUCCEEDED],
- Execution.CANCELLED: [Execution.CANCELLED]
+ Execution.CANCELLED: [Execution.CANCELLED, Execution.PENDING]
}
invalid_transitions = {
@@ -334,8 +334,7 @@ class TestExecution(object):
Execution.FAILED,
Execution.CANCELLED,
Execution.CANCELLING],
- Execution.CANCELLED: [Execution.PENDING,
- Execution.STARTED,
+ Execution.CANCELLED: [Execution.STARTED,
Execution.FAILED,
Execution.SUCCEEDED,
Execution.CANCELLING],
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/75112ab0/tests/orchestrator/test_workflow_runner.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/test_workflow_runner.py b/tests/orchestrator/test_workflow_runner.py
index 40f9035..ae82476 100644
--- a/tests/orchestrator/test_workflow_runner.py
+++ b/tests/orchestrator/test_workflow_runner.py
@@ -14,21 +14,31 @@
# limitations under the License.
import json
+from threading import Thread, Event
from datetime import datetime
-import pytest
import mock
+import pytest
from aria.modeling import exceptions as modeling_exceptions
from aria.modeling import models
from aria.orchestrator import exceptions
+from aria.orchestrator.events import on_cancelled_workflow_signal
from aria.orchestrator.workflow_runner import WorkflowRunner
from aria.orchestrator.workflows.executor.process import ProcessExecutor
+from aria.orchestrator.workflows import api
+from aria.orchestrator.workflows.core import engine, compile
+from aria.orchestrator.workflows.executor import thread
+from aria.orchestrator import (
+ workflow,
+ operation,
+)
-from ..mock import (
- topology,
- workflow as workflow_mocks
+from tests import (
+ mock as tests_mock,
+ storage
)
+
from ..fixtures import ( # pylint: disable=unused-import
plugins_dir,
plugin_manager,
@@ -36,6 +46,16 @@ from ..fixtures import ( # pylint: disable=unused-import
resource_storage as resource
)
+events = {
+ 'is_resumed': Event(),
+ 'is_active': Event(),
+ 'execution_ended': Event()
+}
+
+
+class TimeoutError(BaseException):
+ pass
+
def test_undeclared_workflow(request):
# validating a proper error is raised when the workflow is not declared in the service
@@ -59,8 +79,8 @@ def test_builtin_workflow_instantiation(request):
# validates the workflow runner instantiates properly when provided with a builtin workflow
# (expecting no errors to be raised on undeclared workflow or missing workflow implementation)
workflow_runner = _create_workflow_runner(request, 'install')
- tasks = list(workflow_runner._tasks_graph.tasks)
- assert len(tasks) == 2 # expecting two WorkflowTasks
+ tasks = list(workflow_runner.execution.tasks)
+ assert len(tasks) == 18 # expecting 18 tasks for 2 node topology
def test_custom_workflow_instantiation(request):
@@ -68,8 +88,8 @@ def test_custom_workflow_instantiation(request):
# (expecting no errors to be raised on undeclared workflow or missing workflow implementation)
mock_workflow = _setup_mock_workflow_in_service(request)
workflow_runner = _create_workflow_runner(request, mock_workflow)
- tasks = list(workflow_runner._tasks_graph.tasks)
- assert len(tasks) == 0 # mock workflow creates no tasks
+ tasks = list(workflow_runner.execution.tasks)
+ assert len(tasks) == 2 # mock workflow creates only start workflow and end workflow task
def test_existing_active_executions(request, service, model):
@@ -139,7 +159,8 @@ def test_execute(request, service):
assert engine_kwargs['ctx'].service.id == service.id
assert engine_kwargs['ctx'].execution.workflow_name == 'test_workflow'
- mock_engine_execute.assert_called_once_with(ctx=workflow_runner._workflow_context)
+ mock_engine_execute.assert_called_once_with(ctx=workflow_runner._workflow_context,
+ resuming=False)
def test_cancel_execution(request):
@@ -240,7 +261,7 @@ def test_workflow_function_parameters(request, tmpdir):
@pytest.fixture
def service(model):
# sets up a service in the storage
- service_id = topology.create_simple_topology_two_nodes(model)
+ service_id = tests_mock.topology.create_simple_topology_two_nodes(model)
service = model.service.get(service_id)
return service
@@ -251,7 +272,7 @@ def _setup_mock_workflow_in_service(request, inputs=None):
service = request.getfuncargvalue('service')
resource = request.getfuncargvalue('resource')
- source = workflow_mocks.__file__
+ source = tests_mock.workflow.__file__
resource.service_template.upload(str(service.service_template.id), source)
mock_workflow_name = 'test_workflow'
arguments = {}
@@ -293,3 +314,135 @@ def _create_workflow_runner(request, workflow_name, inputs=None, executor=None,
resource_storage=resource,
plugin_manager=plugin_manager,
**task_configuration_kwargs)
+
+
+class TestResumableWorkflows(object):
+
+ def test_resume_workflow(self, workflow_context, executor):
+ node = workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
+ node.attributes['invocations'] = models.Attribute.wrap('invocations', 0)
+ self._create_interface(workflow_context, node, mock_resuming_task)
+
+ service = workflow_context.service
+ service.workflows['custom_workflow'] = tests_mock.models.create_operation(
+ 'custom_workflow',
+ operation_kwargs={'function': '{0}.{1}'.format(__name__, mock_workflow.__name__)}
+ )
+ workflow_context.model.service.update(service)
+
+ wf_runner = WorkflowRunner(
+ service_id=workflow_context.service.id,
+ inputs={},
+ model_storage=workflow_context.model,
+ resource_storage=workflow_context.resource,
+ plugin_manager=None,
+ workflow_name='custom_workflow',
+ executor=executor)
+ wf_thread = Thread(target=wf_runner.execute)
+ wf_thread.daemon = True
+ wf_thread.start()
+
+ # Wait for the execution to start
+ if events['is_active'].wait(5) is False:
+ raise TimeoutError("is_active wasn't set to True")
+ wf_runner.cancel()
+
+ if events['execution_ended'].wait(60) is False:
+ raise TimeoutError("Execution did not end")
+
+ first_task, second_task = workflow_context.model.task.list(filters={'_stub_type': None})
+ assert first_task.status == first_task.SUCCESS
+ assert second_task.status in (second_task.FAILED, second_task.RETRYING)
+ events['is_resumed'].set()
+ assert second_task.status in (second_task.FAILED, second_task.RETRYING)
+
+ # Create a new workflow runner, with an existing execution id. This would cause
+ # the old execution to restart.
+ new_wf_runner = WorkflowRunner(
+ service_id=wf_runner.service.id,
+ inputs={},
+ model_storage=workflow_context.model,
+ resource_storage=workflow_context.resource,
+ plugin_manager=None,
+ execution_id=wf_runner.execution.id,
+ executor=executor)
+
+ new_wf_runner.execute()
+
+ # Wait for it to finish and assert changes.
+ assert second_task.status == second_task.SUCCESS
+ assert node.attributes['invocations'].value == 3
+ assert wf_runner.execution.status == wf_runner.execution.SUCCEEDED
+
+ @staticmethod
+ @pytest.fixture
+ def executor():
+ result = thread.ThreadExecutor()
+ try:
+ yield result
+ finally:
+ result.close()
+
+ @staticmethod
+ @pytest.fixture
+ def workflow_context(tmpdir):
+ workflow_context = tests_mock.context.simple(str(tmpdir))
+ yield workflow_context
+ storage.release_sqlite_storage(workflow_context.model)
+
+ @staticmethod
+ def _create_interface(ctx, node, func, arguments=None):
+ interface_name = 'aria.interfaces.lifecycle'
+ operation_kwargs = dict(function='{name}.{func.__name__}'.format(
+ name=__name__, func=func))
+ if arguments:
+ # the operation has to declare the arguments before those may be passed
+ operation_kwargs['arguments'] = arguments
+ operation_name = 'create'
+ interface = tests_mock.models.create_interface(node.service, interface_name, operation_name,
+ operation_kwargs=operation_kwargs)
+ node.interfaces[interface.name] = interface
+ ctx.model.node.update(node)
+
+ return node, interface_name, operation_name
+
+ @staticmethod
+ def _engine(workflow_func, workflow_context, executor):
+ graph = workflow_func(ctx=workflow_context)
+ execution = workflow_context.execution
+ compile.create_execution_tasks(execution, graph, executor.__class__)
+ workflow_context.execution = execution
+
+ return engine.Engine(executors={executor.__class__: executor})
+
+ @pytest.fixture(autouse=True)
+ def register_to_events(self):
+ def execution_ended(*args, **kwargs):
+ events['execution_ended'].set()
+
+ on_cancelled_workflow_signal.connect(execution_ended)
+ yield
+ on_cancelled_workflow_signal.disconnect(execution_ended)
+
+
+@workflow
+def mock_workflow(ctx, graph):
+ node = ctx.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
+ graph.add_tasks(
+ api.task.OperationTask(
+ node, interface_name='aria.interfaces.lifecycle', operation_name='create'),
+ api.task.OperationTask(
+ node, interface_name='aria.interfaces.lifecycle', operation_name='create')
+ )
+
+
+@operation
+def mock_resuming_task(ctx):
+ ctx.node.attributes['invocations'] += 1
+
+ if ctx.node.attributes['invocations'] != 1:
+ events['is_active'].set()
+ if not events['is_resumed'].isSet():
+ # if resume was called, increase by one. o/w fail the execution - second task should
+ # fail as long it was not a part of resuming the workflow
+ raise BaseException("wasn't resumed yet")