You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ariatosca.apache.org by mx...@apache.org on 2017/06/21 14:56:31 UTC
incubator-ariatosca git commit: review 1 fix
Repository: incubator-ariatosca
Updated Branches:
refs/heads/ARIA-236-Resumable-workflow-executions 2fc9896cf -> 3de374534
review 1 fix
Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/3de37453
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/3de37453
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/3de37453
Branch: refs/heads/ARIA-236-Resumable-workflow-executions
Commit: 3de37453468ef39f4cf3ea8af8e540f2c31ee58b
Parents: 2fc9896
Author: max-orlov <ma...@gigaspaces.com>
Authored: Wed Jun 21 17:56:27 2017 +0300
Committer: max-orlov <ma...@gigaspaces.com>
Committed: Wed Jun 21 17:56:27 2017 +0300
----------------------------------------------------------------------
aria/cli/commands/executions.py | 7 +-
aria/orchestrator/workflow_runner.py | 24 ++---
aria/orchestrator/workflows/core/engine.py | 8 +-
tests/orchestrator/test_workflow_runner.py | 122 +++++++++++++-----------
4 files changed, 90 insertions(+), 71 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/3de37453/aria/cli/commands/executions.py
----------------------------------------------------------------------
diff --git a/aria/cli/commands/executions.py b/aria/cli/commands/executions.py
index a6f06c3..b337e84 100644
--- a/aria/cli/commands/executions.py
+++ b/aria/cli/commands/executions.py
@@ -135,8 +135,8 @@ def start(workflow_name,
workflow_runner = \
WorkflowRunner(
- inputs, model_storage, resource_storage, plugin_manager,
- service_id=service.id, workflow_name=workflow_name, executor=executor,
+ model_storage, resource_storage, plugin_manager,
+ service_id=service.id, workflow_name=workflow_name, inputs=inputs, executor=executor,
task_max_attempts=task_max_attempts, task_retry_interval=task_retry_interval
)
logger.info('Starting {0}execution. Press Ctrl+C cancel'.format('dry ' if dry else ''))
@@ -158,7 +158,6 @@ def start(workflow_name,
@aria.pass_plugin_manager
@aria.pass_logger
def resume(execution_id,
- inputs,
dry,
task_max_attempts,
task_retry_interval,
@@ -171,7 +170,7 @@ def resume(execution_id,
workflow_runner = \
WorkflowRunner(
- inputs, model_storage, resource_storage, plugin_manager,
+ model_storage, resource_storage, plugin_manager,
execution_id=execution_id, executor=executor,
task_max_attempts=task_max_attempts, task_retry_interval=task_retry_interval
)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/3de37453/aria/orchestrator/workflow_runner.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflow_runner.py b/aria/orchestrator/workflow_runner.py
index 99b6edf..829b8cd 100644
--- a/aria/orchestrator/workflow_runner.py
+++ b/aria/orchestrator/workflow_runner.py
@@ -37,8 +37,8 @@ DEFAULT_TASK_RETRY_INTERVAL = 30
class WorkflowRunner(object):
- def __init__(self, inputs, model_storage, resource_storage, plugin_manager,
- execution_id=None, service_id=None, workflow_name=None, executor=None,
+ def __init__(self, model_storage, resource_storage, plugin_manager,
+ execution_id=None, service_id=None, workflow_name=None, inputs=None, executor=None,
task_max_attempts=DEFAULT_TASK_MAX_ATTEMPTS,
task_retry_interval=DEFAULT_TASK_RETRY_INTERVAL):
"""
@@ -55,23 +55,29 @@ class WorkflowRunner(object):
:param task_retry_interval: Retry interval in between retry attempts of a failing task
"""
+ self._is_resume = execution_id is not None
+ self._is_start = \
+ workflow_name is not None and service_id is not None and execution_id is None
+
self._model_storage = model_storage
self._resource_storage = resource_storage
# the IDs are stored rather than the models themselves, so this module could be used
# by several threads without raising errors on model objects shared between threads
- if workflow_name is not None and service_id is not None and execution_id is None:
+ if self._is_start:
self._service_id = service_id
self._workflow_name = workflow_name
self._validate_workflow_exists_for_service()
self._execution_id = self._create_execution_model(inputs).id
- elif execution_id is not None:
+ elif self._is_resume:
self._execution_id = execution_id
self._service_id = self.execution.service.id
self._workflow_name = model_storage.execution.get(self._execution_id).workflow_name
else:
- raise exceptions.InvalidWorkflowRunnerParams("")
+ raise exceptions.InvalidWorkflowRunnerParams(
+ "Either provide execution id in order to resume a workflow or workflow name "
+ "and service id with inputs")
self._workflow_context = WorkflowContext(
name=self.__class__.__name__,
@@ -89,17 +95,13 @@ class WorkflowRunner(object):
# transforming the execution inputs to dict, to pass them to the workflow function
execution_inputs_dict = dict(inp.unwrapped for inp in self.execution.inputs.values())
- if execution_id is None:
- # Not an existing execution
+ if self._is_start:
workflow_fn = self._get_workflow_fn()
tasks_graph = workflow_fn(ctx=self._workflow_context, **execution_inputs_dict)
compile.create_execution_tasks(self._workflow_context, tasks_graph, executor.__class__)
self._engine = engine.Engine(executors={executor.__class__: executor})
- if workflow_name is None:
- self._engine.resume_execution(self._workflow_context)
-
@property
def execution_id(self):
return self._execution_id
@@ -113,7 +115,7 @@ class WorkflowRunner(object):
return self._model_storage.service.get(self._service_id)
def execute(self):
- self._engine.execute(ctx=self._workflow_context)
+ self._engine.execute(ctx=self._workflow_context, resuming=self._is_resume)
def cancel(self):
self._engine.cancel_execution(ctx=self._workflow_context)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/3de37453/aria/orchestrator/workflows/core/engine.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/engine.py b/aria/orchestrator/workflows/core/engine.py
index 7f14500..a7e5148 100644
--- a/aria/orchestrator/workflows/core/engine.py
+++ b/aria/orchestrator/workflows/core/engine.py
@@ -41,11 +41,15 @@ class Engine(logger.LoggerMixin):
self._executors = executors.copy()
self._executors.setdefault(StubTaskExecutor, StubTaskExecutor())
- def execute(self, ctx):
+ def execute(self, ctx, resuming=False):
"""
execute the workflow
"""
executing_tasks = []
+
+ if resuming:
+ self._resuming_execution(ctx)
+
try:
events.start_workflow_signal.send(ctx)
while True:
@@ -69,7 +73,7 @@ class Engine(logger.LoggerMixin):
raise
@staticmethod
- def resume_execution(ctx):
+ def _resuming_execution(ctx):
events.on_resume_workflow_signal.send(ctx)
@staticmethod
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/3de37453/tests/orchestrator/test_workflow_runner.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/test_workflow_runner.py b/tests/orchestrator/test_workflow_runner.py
index 36ebfe1..ec4b40b 100644
--- a/tests/orchestrator/test_workflow_runner.py
+++ b/tests/orchestrator/test_workflow_runner.py
@@ -14,7 +14,7 @@
# limitations under the License.
import json
-from threading import Thread
+from threading import Thread, Event
from datetime import datetime
import mock
@@ -23,6 +23,7 @@ import pytest
from aria.modeling import exceptions as modeling_exceptions
from aria.modeling import models
from aria.orchestrator import exceptions
+from aria.orchestrator.events import on_cancelled_workflow_signal
from aria.orchestrator.workflow_runner import WorkflowRunner
from aria.orchestrator.workflows.executor.process import ProcessExecutor
from aria.orchestrator.workflows import api
@@ -45,7 +46,11 @@ from ..fixtures import ( # pylint: disable=unused-import
resource_storage as resource
)
-global_test_holder = {}
+global_test_holder = {
+ 'is_resumed': Event(),
+ 'is_active': Event(),
+ 'execution_ended': Event()
+}
def test_undeclared_workflow(request):
@@ -309,7 +314,9 @@ def _create_workflow_runner(request, workflow_name, inputs=None, executor=None,
class TestResumableWorkflows(object):
def test_resume_workflow(self, workflow_context, executor):
- self._create_interface(workflow_context, mock_success_task)
+ node = workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
+
+ self._create_interface(workflow_context, node, mock_resuming_task)
service = workflow_context.service
service.workflows['custom_workflow'] = tests_mock.models.create_operation(
@@ -327,47 +334,39 @@ class TestResumableWorkflows(object):
workflow_name='custom_workflow',
executor=executor)
wf_thread = Thread(target=wf_runner.execute)
- try:
- wf_thread.start()
-
- # Wait for the execution to start
- while global_test_holder.get('state') != 'active':
- pass
- global_test_holder['state'] = 'terminated'
- wf_runner.cancel()
-
- # Make sure the execution was canceled and the task has not ended
- while wf_runner.execution.status != workflow_context.execution.CANCELLED:
- pass
- task = workflow_context.model.task.list(filters={'_stub_type': None})[0]
- assert task.status in (task.FAILED, task.RETRYING)
- assert global_test_holder['state'] == 'idle'
-
- # Create a new workflow runner, with an existing execution id. This would cause
- # the old execution to restart.
- new_wf_runner = WorkflowRunner(
- service_id=wf_runner.service.id,
- inputs={},
- model_storage=workflow_context.model,
- resource_storage=workflow_context.resource,
- plugin_manager=None,
- execution_id=wf_runner.execution.id,
- executor=executor)
-
- # Set the resumed to True, for the execution to succeed.
- global_test_holder['resumed'] = True
- new_wf_runner.execute()
-
- # Wait for it to finish and assert changes.
- while global_test_holder.get('state') != 'ended':
- pass
- assert task.status == task.SUCCESS
- assert wf_runner.execution.status == wf_runner.execution.SUCCEEDED
-
- except:
- global_test_holder['state'] = 'terminated'
- wf_thread.join(0.5)
- raise
+ wf_thread.daemon = True
+ wf_thread.start()
+
+ # Wait for the execution to start
+ global_test_holder['is_active'].wait(5)
+ wf_runner.cancel()
+
+ # Make sure the execution was canceled and the task has not ended
+
+ global_test_holder['execution_ended'].wait(10)
+ first_task, second_task = workflow_context.model.task.list(filters={'_stub_type': None})
+ assert first_task.status == first_task.SUCCESS
+ assert second_task.status in (second_task.FAILED, second_task.RETRYING)
+ global_test_holder['is_resumed'].set()
+ assert second_task.status in (second_task.FAILED, second_task.RETRYING)
+
+ # Create a new workflow runner, with an existing execution id. This would cause
+ # the old execution to restart.
+ new_wf_runner = WorkflowRunner(
+ service_id=wf_runner.service.id,
+ inputs={},
+ model_storage=workflow_context.model,
+ resource_storage=workflow_context.resource,
+ plugin_manager=None,
+ execution_id=wf_runner.execution.id,
+ executor=executor)
+
+ new_wf_runner.execute()
+
+ # Wait for it to finish and assert changes.
+ assert second_task.status == second_task.SUCCESS
+ assert node.attributes['invocations'].value == 2
+ assert wf_runner.execution.status == wf_runner.execution.SUCCEEDED
@staticmethod
@pytest.fixture
@@ -386,8 +385,7 @@ class TestResumableWorkflows(object):
storage.release_sqlite_storage(workflow_context.model)
@staticmethod
- def _create_interface(ctx, func, arguments=None):
- node = ctx.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
+ def _create_interface(ctx, node, func, arguments=None):
interface_name = 'aria.interfaces.lifecycle'
operation_kwargs = dict(function='{name}.{func.__name__}'.format(
name=__name__, func=func))
@@ -411,22 +409,38 @@ class TestResumableWorkflows(object):
return engine.Engine(executors={executor.__class__: executor})
+ @pytest.fixture(autouse=True)
+ def register_to_events(self):
+ def execution_ended(**_):
+ global_test_holder['execution_ended'].set()
+
+ on_cancelled_workflow_signal.connect(execution_ended)
+ yield
+ on_cancelled_workflow_signal.disconnect(execution_ended)
@workflow
def mock_workflow(ctx, graph):
node = ctx.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
graph.add_tasks(
api.task.OperationTask(
+ node, interface_name='aria.interfaces.lifecycle', operation_name='create'),
+ api.task.OperationTask(
node, interface_name='aria.interfaces.lifecycle', operation_name='create')
)
@operation
-def mock_success_task(**_):
- global_test_holder['state'] = 'active'
- while global_test_holder.get('state') != 'terminated':
- if global_test_holder.get('resumed') is True:
- global_test_holder['state'] = 'ended'
- return
- global_test_holder['state'] = 'idle'
- raise Exception("The operation was terminated")
+def mock_resuming_task(ctx):
+
+ if 'invocations' not in ctx.node.attributes:
+ # This is the first node invocation
+ ctx.node.attributes['invocations'] = 1
+ else:
+ global_test_holder['is_active'].set()
+ # This is the second node invocation
+ if global_test_holder['is_resumed'].isSet():
+ # if resume was called, increase by one. o/w fail the execution - second task should
+ # fail as long it was not a part of resuming the workflow
+ ctx.node.attributes['invocations'] += 1
+ else:
+ raise BaseException("wasn't resumed yet")