You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ariatosca.apache.org by mx...@apache.org on 2017/07/10 13:17:36 UTC
incubator-ariatosca git commit: ARIA-237 Support for resuming failed
workflow executions [Forced Update!]
Repository: incubator-ariatosca
Updated Branches:
refs/heads/ARIA-237-Support-for-resuming-failed-workflow-executions bf753679a -> 91cbb78d3 (forced update)
ARIA-237 Support for resuming failed workflow executions
Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/91cbb78d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/91cbb78d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/91cbb78d
Branch: refs/heads/ARIA-237-Support-for-resuming-failed-workflow-executions
Commit: 91cbb78d352f1711d7a676d89c9e352772bdf793
Parents: b30a7ed
Author: max-orlov <ma...@gigaspaces.com>
Authored: Sun Jul 2 21:43:43 2017 +0300
Committer: max-orlov <ma...@gigaspaces.com>
Committed: Mon Jul 10 16:17:21 2017 +0300
----------------------------------------------------------------------
aria/modeling/orchestration.py | 4 +-
aria/orchestrator/workflow_runner.py | 7 +-
aria/orchestrator/workflows/core/engine.py | 18 +++-
.../workflows/core/events_handler.py | 9 +-
tests/modeling/test_models.py | 3 +-
tests/orchestrator/test_workflow_runner.py | 99 +++++++++++++++++---
6 files changed, 117 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/91cbb78d/aria/modeling/orchestration.py
----------------------------------------------------------------------
diff --git a/aria/modeling/orchestration.py b/aria/modeling/orchestration.py
index df2643e..4d4f0fe 100644
--- a/aria/modeling/orchestration.py
+++ b/aria/modeling/orchestration.py
@@ -65,7 +65,9 @@ class ExecutionBase(mixins.ModelMixin):
PENDING: (STARTED, CANCELLED),
STARTED: END_STATES + (CANCELLING,),
CANCELLING: END_STATES,
- CANCELLED: PENDING
+ # Retrying
+ CANCELLED: PENDING,
+ FAILED: PENDING
}
# region one_to_many relationships
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/91cbb78d/aria/orchestrator/workflow_runner.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflow_runner.py b/aria/orchestrator/workflow_runner.py
index 47270c0..2bd3043 100644
--- a/aria/orchestrator/workflow_runner.py
+++ b/aria/orchestrator/workflow_runner.py
@@ -38,7 +38,8 @@ DEFAULT_TASK_RETRY_INTERVAL = 30
class WorkflowRunner(object):
def __init__(self, model_storage, resource_storage, plugin_manager,
- execution_id=None, service_id=None, workflow_name=None, inputs=None, executor=None,
+ execution_id=None, retry_failed=False,
+ service_id=None, workflow_name=None, inputs=None, executor=None,
task_max_attempts=DEFAULT_TASK_MAX_ATTEMPTS,
task_retry_interval=DEFAULT_TASK_RETRY_INTERVAL):
"""
@@ -62,6 +63,7 @@ class WorkflowRunner(object):
"and service id with inputs")
self._is_resume = execution_id is not None
+ self._retry_failed = retry_failed
self._model_storage = model_storage
self._resource_storage = resource_storage
@@ -116,7 +118,8 @@ class WorkflowRunner(object):
return self._model_storage.service.get(self._service_id)
def execute(self):
- self._engine.execute(ctx=self._workflow_context, resuming=self._is_resume)
+ self._engine.execute(
+ ctx=self._workflow_context, resuming=self._is_resume, retry_failed=self._retry_failed)
def cancel(self):
self._engine.cancel_execution(ctx=self._workflow_context)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/91cbb78d/aria/orchestrator/workflows/core/engine.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/engine.py b/aria/orchestrator/workflows/core/engine.py
index d9c77e9..69505fc 100644
--- a/aria/orchestrator/workflows/core/engine.py
+++ b/aria/orchestrator/workflows/core/engine.py
@@ -41,14 +41,15 @@ class Engine(logger.LoggerMixin):
self._executors = executors.copy()
self._executors.setdefault(StubTaskExecutor, StubTaskExecutor())
- def execute(self, ctx, resuming=False):
+ def execute(self, ctx, resuming=False, retry_failed=False):
"""
Executes the workflow.
"""
if resuming:
- events.on_resume_workflow_signal.send(ctx)
+ events.on_resume_workflow_signal.send(ctx, retry_failed=retry_failed)
tasks_tracker = _TasksTracker(ctx)
+
try:
events.start_workflow_signal.send(ctx)
while True:
@@ -68,8 +69,15 @@ class Engine(logger.LoggerMixin):
if cancel:
self._terminate_tasks(tasks_tracker.executing_tasks)
events.on_cancelled_workflow_signal.send(ctx)
- else:
+ elif all(task.status == task.SUCCESS or task.ignore_failure
+ for task in ctx.execution.tasks):
events.on_success_workflow_signal.send(ctx)
+ else:
+ exception = "Tasks {tasks} remain failed".format(
+ tasks=
+ [t for t in ctx.execution.tasks if t.status == t.SUCCESS or t.ignore_failure]
+ )
+ events.on_failure_workflow_signal.send(ctx, exception=exception)
except BaseException as e:
# Cleanup any remaining tasks
self._terminate_tasks(tasks_tracker.executing_tasks)
@@ -124,8 +132,10 @@ class Engine(logger.LoggerMixin):
class _TasksTracker(object):
+
def __init__(self, ctx):
self._ctx = ctx
+
self._tasks = ctx.execution.tasks
self._executed_tasks = [task for task in self._tasks if task.has_ended()]
self._executable_tasks = list(set(self._tasks) - set(self._executed_tasks))
@@ -155,7 +165,7 @@ class _TasksTracker(object):
def executable_tasks(self):
now = datetime.utcnow()
# we need both lists since retrying task are in the executing task list.
- for task in self._update_tasks(self._executing_tasks + self._executable_tasks):
+ for task in self._update_tasks(set(self._executing_tasks + self._executable_tasks)):
if all([task.is_waiting(),
task.due_at <= now,
all(dependency in self._executed_tasks for dependency in task.dependencies)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/91cbb78d/aria/orchestrator/workflows/core/events_handler.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/events_handler.py b/aria/orchestrator/workflows/core/events_handler.py
index 37801de..5ac1ce8 100644
--- a/aria/orchestrator/workflows/core/events_handler.py
+++ b/aria/orchestrator/workflows/core/events_handler.py
@@ -119,7 +119,7 @@ def _workflow_cancelled(workflow_context, *args, **kwargs):
@events.on_resume_workflow_signal.connect
-def _workflow_resume(workflow_context, *args, **kwargs):
+def _workflow_resume(workflow_context, retry_failed=False, *args, **kwargs):
with workflow_context.persist_changes:
execution = workflow_context.execution
execution.status = execution.PENDING
@@ -128,6 +128,13 @@ def _workflow_resume(workflow_context, *args, **kwargs):
if not task.has_ended():
task.status = task.PENDING
+ if retry_failed:
+ for task in execution.tasks:
+ if task.status == task.FAILED and not task.ignore_failure:
+ task.attempts_count = 0
+ task.status = task.PENDING
+
+
@events.on_cancelling_workflow_signal.connect
def _workflow_cancelling(workflow_context, *args, **kwargs):
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/91cbb78d/tests/modeling/test_models.py
----------------------------------------------------------------------
diff --git a/tests/modeling/test_models.py b/tests/modeling/test_models.py
index e1167fc..25b4080 100644
--- a/tests/modeling/test_models.py
+++ b/tests/modeling/test_models.py
@@ -324,8 +324,7 @@ class TestExecution(object):
Execution.STARTED: [Execution.PENDING],
Execution.CANCELLING: [Execution.PENDING,
Execution.STARTED],
- Execution.FAILED: [Execution.PENDING,
- Execution.STARTED,
+ Execution.FAILED: [Execution.STARTED,
Execution.SUCCEEDED,
Execution.CANCELLED,
Execution.CANCELLING],
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/91cbb78d/tests/orchestrator/test_workflow_runner.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/test_workflow_runner.py b/tests/orchestrator/test_workflow_runner.py
index a77d727..112f894 100644
--- a/tests/orchestrator/test_workflow_runner.py
+++ b/tests/orchestrator/test_workflow_runner.py
@@ -51,7 +51,7 @@ custom_events = {
'is_resumed': Event(),
'is_active': Event(),
'execution_cancelled': Event(),
- 'execution_ended': Event()
+ 'execution_failed': Event(),
}
@@ -166,7 +166,8 @@ def test_execute(request, service):
assert engine_kwargs['ctx'].execution.workflow_name == 'test_workflow'
mock_engine_execute.assert_called_once_with(ctx=workflow_runner._workflow_context,
- resuming=False)
+ resuming=False,
+ retry_failed=False)
def test_cancel_execution(request):
@@ -361,7 +362,7 @@ class TestResumableWorkflows(object):
self._create_interface(workflow_context, node, mock_resuming_task)
wf_runner = self._create_initial_workflow_runner(
- workflow_context, mock_parallel_workflow, thread_executor)
+ workflow_context, mock_two_parallel_tasks_workflow, thread_executor)
wf_thread = Thread(target=wf_runner.execute)
wf_thread.daemon = True
@@ -369,6 +370,7 @@ class TestResumableWorkflows(object):
# Wait for the execution to start
self._wait_for_active_and_cancel(wf_runner)
+ node = workflow_context.model.node.refresh(node)
tasks = workflow_context.model.task.list(filters={'_stub_type': None})
assert any(task.status == task.SUCCESS for task in tasks)
@@ -390,6 +392,7 @@ class TestResumableWorkflows(object):
new_wf_runner.execute()
# Wait for it to finish and assert changes.
+ node = workflow_context.model.node.refresh(node)
assert all(task.status == task.SUCCESS for task in tasks)
assert node.attributes['invocations'].value == 3
assert wf_runner.execution.status == wf_runner.execution.SUCCEEDED
@@ -407,6 +410,7 @@ class TestResumableWorkflows(object):
wf_thread.start()
self._wait_for_active_and_cancel(wf_runner)
+ node = workflow_context.model.node.refresh(node)
task = workflow_context.model.task.list(filters={'_stub_type': None})[0]
assert node.attributes['invocations'].value == 1
assert task.status == task.STARTED
@@ -430,6 +434,7 @@ class TestResumableWorkflows(object):
new_thread_executor.close()
# Wait for it to finish and assert changes.
+ node = workflow_context.model.node.refresh(node)
assert node.attributes['invocations'].value == 2
assert task.status == task.SUCCESS
assert wf_runner.execution.status == wf_runner.execution.SUCCEEDED
@@ -446,6 +451,7 @@ class TestResumableWorkflows(object):
wf_thread.start()
self._wait_for_active_and_cancel(wf_runner)
+ node = workflow_context.model.node.refresh(node)
task = workflow_context.model.task.list(filters={'_stub_type': None})[0]
assert node.attributes['invocations'].value == 2
@@ -474,10 +480,58 @@ class TestResumableWorkflows(object):
new_thread_executor.close()
# Wait for it to finish and assert changes.
+ node = workflow_context.model.node.refresh(node)
assert node.attributes['invocations'].value == task.max_attempts - 1
assert task.status == task.SUCCESS
assert wf_runner.execution.status == wf_runner.execution.SUCCEEDED
+ def test_resume_failed_task_and_successful_task(self, workflow_context, thread_executor):
+ node = workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
+ node.attributes['invocations'] = models.Attribute.wrap('invocations', 0)
+ self._create_interface(workflow_context, node, mock_two_different_tasks)
+
+ wf_runner = self._create_initial_workflow_runner(
+ workflow_context, mock_two_parallel_tasks_workflow, thread_executor)
+ wf_thread = Thread(target=wf_runner.execute)
+ wf_thread.setDaemon(True)
+ wf_thread.start()
+
+ if custom_events['execution_failed'].wait(60) is False:
+ raise TimeoutError("Execution did not end")
+
+ tasks = workflow_context.model.task.list(filters={'_stub_type': None})
+ node = workflow_context.model.node.refresh(node)
+ assert node.attributes['invocations'].value == 2
+
+ # First task passes
+ assert any(task.status == task.FAILED for task in tasks)
+ # Second task fails
+ assert any(task.status == task.SUCCESS for task in tasks)
+ assert wf_runner.execution.status in wf_runner.execution.FAILED
+
+ custom_events['is_resumed'].set()
+ new_thread_executor = thread.ThreadExecutor()
+ try:
+ new_wf_runner = WorkflowRunner(
+ service_id=wf_runner.service.id,
+ retry_failed=True,
+ inputs={},
+ model_storage=workflow_context.model,
+ resource_storage=workflow_context.resource,
+ plugin_manager=None,
+ execution_id=wf_runner.execution.id,
+ executor=new_thread_executor)
+
+ new_wf_runner.execute()
+ finally:
+ new_thread_executor.close()
+
+ # Wait for it to finish and assert changes.
+ node = workflow_context.model.node.refresh(node)
+ assert node.attributes['invocations'].value == 3
+ assert all(task.status == task.SUCCESS for task in tasks)
+ assert wf_runner.execution.status == wf_runner.execution.SUCCEEDED
+
@staticmethod
@pytest.fixture
def thread_executor():
@@ -524,20 +578,20 @@ class TestResumableWorkflows(object):
def execution_cancelled(*args, **kwargs):
custom_events['execution_cancelled'].set()
- def execution_ended(*args, **kwargs):
- custom_events['execution_ended'].set()
+ def execution_failed(*args, **kwargs):
+ custom_events['execution_failed'].set()
events.on_cancelled_workflow_signal.connect(execution_cancelled)
- events.on_failure_workflow_signal.connect(execution_ended)
+ events.on_failure_workflow_signal.connect(execution_failed)
yield
events.on_cancelled_workflow_signal.disconnect(execution_cancelled)
- events.on_failure_workflow_signal.disconnect(execution_ended)
+ events.on_failure_workflow_signal.disconnect(execution_failed)
for event in custom_events.values():
event.clear()
@workflow
-def mock_parallel_workflow(ctx, graph):
+def mock_two_parallel_tasks_workflow(ctx, graph):
node = ctx.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
graph.add_tasks(
api.task.OperationTask(
@@ -563,11 +617,8 @@ def mock_resuming_task(ctx):
def mock_single_task_workflow(ctx, graph):
node = ctx.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
graph.add_tasks(
- api.task.OperationTask(node,
- interface_name='aria.interfaces.lifecycle',
- operation_name='create',
- retry_interval=1,
- max_attempts=10),
+ api.task.OperationTask(
+ node, 'aria.interfaces.lifecycle', 'create', retry_interval=1, max_attempts=10)
)
@@ -600,3 +651,25 @@ def mock_stuck_task(ctx):
if not custom_events['is_active'].isSet():
custom_events['is_active'].set()
time.sleep(5)
+
+
+@workflow
+def mock_two_parallel_tasks_workflow(ctx, graph):
+ node = ctx.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
+ graph.add_tasks(
+ api.task.OperationTask(
+ node, 'aria.interfaces.lifecycle', 'create', retry_interval=1, max_attempts=1),
+ api.task.OperationTask(
+ node, 'aria.interfaces.lifecycle', 'create', retry_interval=1, max_attempts=1),
+ )
+
+
+@operation
+def mock_two_different_tasks(ctx):
+ ctx.node.attributes['invocations'] += 1
+
+ # The first task should end gracefully
+ if ctx.node.attributes['invocations'] == 2:
+ # The second task should fail only before resuming
+ if not custom_events['is_resumed'].isSet():
+ raise FailingTask("First execution should fail")