You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ariatosca.apache.org by mx...@apache.org on 2017/07/09 11:05:44 UTC
[5/6] incubator-ariatosca git commit: wip
wip
Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/e7e1d895
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/e7e1d895
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/e7e1d895
Branch: refs/heads/ARIA-237-Support-for-resuming-failed-workflow-executions
Commit: e7e1d8951708bea38705a9265df07856b3f2a22c
Parents: 62875b5
Author: max-orlov <ma...@gigaspaces.com>
Authored: Sun Jul 2 21:43:43 2017 +0300
Committer: max-orlov <ma...@gigaspaces.com>
Committed: Sun Jul 9 14:05:24 2017 +0300
----------------------------------------------------------------------
aria/modeling/orchestration.py | 4 ++-
aria/orchestrator/workflow_runner.py | 7 +++--
aria/orchestrator/workflows/core/engine.py | 39 ++++++++++++++++++++-----
3 files changed, 40 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/e7e1d895/aria/modeling/orchestration.py
----------------------------------------------------------------------
diff --git a/aria/modeling/orchestration.py b/aria/modeling/orchestration.py
index 7068557..4d7a5b5 100644
--- a/aria/modeling/orchestration.py
+++ b/aria/modeling/orchestration.py
@@ -65,7 +65,9 @@ class ExecutionBase(mixins.ModelMixin):
PENDING: (STARTED, CANCELLED),
STARTED: END_STATES + (CANCELLING,),
CANCELLING: END_STATES,
- CANCELLED: PENDING
+ # Retrying
+ CANCELLED: PENDING,
+ FAILED: PENDING
}
# region one_to_many relationships
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/e7e1d895/aria/orchestrator/workflow_runner.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflow_runner.py b/aria/orchestrator/workflow_runner.py
index df1725f..b7de7b5 100644
--- a/aria/orchestrator/workflow_runner.py
+++ b/aria/orchestrator/workflow_runner.py
@@ -38,7 +38,8 @@ DEFAULT_TASK_RETRY_INTERVAL = 30
class WorkflowRunner(object):
def __init__(self, model_storage, resource_storage, plugin_manager,
- execution_id=None, service_id=None, workflow_name=None, inputs=None, executor=None,
+ execution_id=None, retry_failed=False,
+ service_id=None, workflow_name=None, inputs=None, executor=None,
task_max_attempts=DEFAULT_TASK_MAX_ATTEMPTS,
task_retry_interval=DEFAULT_TASK_RETRY_INTERVAL):
"""
@@ -62,6 +63,7 @@ class WorkflowRunner(object):
"and service id with inputs")
self._is_resume = execution_id is not None
+ self._retry_failed = retry_failed
self._model_storage = model_storage
self._resource_storage = resource_storage
@@ -116,7 +118,8 @@ class WorkflowRunner(object):
return self._model_storage.service.get(self._service_id)
def execute(self):
- self._engine.execute(ctx=self._workflow_context, resuming=self._is_resume)
+ self._engine.execute(
+ ctx=self._workflow_context, resuming=self._is_resume, retry_failing=self._retry_failed)
def cancel(self):
self._engine.cancel_execution(ctx=self._workflow_context)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/e7e1d895/aria/orchestrator/workflows/core/engine.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/engine.py b/aria/orchestrator/workflows/core/engine.py
index d9c77e9..d22b535 100644
--- a/aria/orchestrator/workflows/core/engine.py
+++ b/aria/orchestrator/workflows/core/engine.py
@@ -41,14 +41,15 @@ class Engine(logger.LoggerMixin):
self._executors = executors.copy()
self._executors.setdefault(StubTaskExecutor, StubTaskExecutor())
- def execute(self, ctx, resuming=False):
+ def execute(self, ctx, resuming=False, retry_failing=False):
"""
Executes the workflow.
"""
if resuming:
events.on_resume_workflow_signal.send(ctx)
- tasks_tracker = _TasksTracker(ctx)
+ tasks_tracker = _TasksTracker(ctx, retry_failing)
+
try:
events.start_workflow_signal.send(ctx)
while True:
@@ -124,13 +125,37 @@ class Engine(logger.LoggerMixin):
class _TasksTracker(object):
- def __init__(self, ctx):
+
+ def __init__(self, ctx, retry_failing=False):
self._ctx = ctx
+
+ if retry_failing:
+ self._has_task_ended = self._retry_failed_has_task_ended
+ self._is_task_waiting = self._retry_failed_is_task_waiting
+
self._tasks = ctx.execution.tasks
- self._executed_tasks = [task for task in self._tasks if task.has_ended()]
+ self._executed_tasks = [task for task in self._tasks if self._has_task_ended(task)]
self._executable_tasks = list(set(self._tasks) - set(self._executed_tasks))
self._executing_tasks = []
+ @staticmethod
+ def _retry_failed_has_task_ended(task):
+ # only succeeded tasks have ended (for retry failing)
+ return task.status == task.SUCCESS or (task.status == task.FAILED and task.ignore_failure)
+
+ @staticmethod
+ def _retry_failed_is_task_waiting(task):
+ # failed tasks are waiting to be executed (for retry failing)
+ return task.is_waiting()
+
+ @staticmethod
+ def _has_task_ended(task):
+ return task.has_ended()
+
+ @staticmethod
+ def _is_task_waiting(task):
+ return task.is_waiting()
+
@property
def all_tasks_consumed(self):
return len(self._executed_tasks) == len(self._tasks) and len(self._executing_tasks) == 0
@@ -148,15 +173,15 @@ class _TasksTracker(object):
@property
def ended_tasks(self):
for task in self.executing_tasks:
- if task.has_ended():
+ if self._has_task_ended(task):
yield task
@property
def executable_tasks(self):
now = datetime.utcnow()
# we need both lists since retrying task are in the executing task list.
- for task in self._update_tasks(self._executing_tasks + self._executable_tasks):
- if all([task.is_waiting(),
+ for task in self._update_tasks(set(self._executing_tasks + self._executable_tasks)):
+ if all([self._is_task_waiting(task),
task.due_at <= now,
all(dependency in self._executed_tasks for dependency in task.dependencies)
]):