You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ariatosca.apache.org by mx...@apache.org on 2017/07/09 11:05:44 UTC

[5/6] incubator-ariatosca git commit: wip

wip


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/e7e1d895
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/e7e1d895
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/e7e1d895

Branch: refs/heads/ARIA-237-Support-for-resuming-failed-workflow-executions
Commit: e7e1d8951708bea38705a9265df07856b3f2a22c
Parents: 62875b5
Author: max-orlov <ma...@gigaspaces.com>
Authored: Sun Jul 2 21:43:43 2017 +0300
Committer: max-orlov <ma...@gigaspaces.com>
Committed: Sun Jul 9 14:05:24 2017 +0300

----------------------------------------------------------------------
 aria/modeling/orchestration.py             |  4 ++-
 aria/orchestrator/workflow_runner.py       |  7 +++--
 aria/orchestrator/workflows/core/engine.py | 39 ++++++++++++++++++++-----
 3 files changed, 40 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/e7e1d895/aria/modeling/orchestration.py
----------------------------------------------------------------------
diff --git a/aria/modeling/orchestration.py b/aria/modeling/orchestration.py
index 7068557..4d7a5b5 100644
--- a/aria/modeling/orchestration.py
+++ b/aria/modeling/orchestration.py
@@ -65,7 +65,9 @@ class ExecutionBase(mixins.ModelMixin):
         PENDING: (STARTED, CANCELLED),
         STARTED: END_STATES + (CANCELLING,),
         CANCELLING: END_STATES,
-        CANCELLED: PENDING
+        # Retrying
+        CANCELLED: PENDING,
+        FAILED: PENDING
     }
 
     # region one_to_many relationships

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/e7e1d895/aria/orchestrator/workflow_runner.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflow_runner.py b/aria/orchestrator/workflow_runner.py
index df1725f..b7de7b5 100644
--- a/aria/orchestrator/workflow_runner.py
+++ b/aria/orchestrator/workflow_runner.py
@@ -38,7 +38,8 @@ DEFAULT_TASK_RETRY_INTERVAL = 30
 class WorkflowRunner(object):
 
     def __init__(self, model_storage, resource_storage, plugin_manager,
-                 execution_id=None, service_id=None, workflow_name=None, inputs=None, executor=None,
+                 execution_id=None, retry_failed=False,
+                 service_id=None, workflow_name=None, inputs=None, executor=None,
                  task_max_attempts=DEFAULT_TASK_MAX_ATTEMPTS,
                  task_retry_interval=DEFAULT_TASK_RETRY_INTERVAL):
         """
@@ -62,6 +63,7 @@ class WorkflowRunner(object):
                 "and service id with inputs")
 
         self._is_resume = execution_id is not None
+        self._retry_failed = retry_failed
 
         self._model_storage = model_storage
         self._resource_storage = resource_storage
@@ -116,7 +118,8 @@ class WorkflowRunner(object):
         return self._model_storage.service.get(self._service_id)
 
     def execute(self):
-        self._engine.execute(ctx=self._workflow_context, resuming=self._is_resume)
+        self._engine.execute(
+            ctx=self._workflow_context, resuming=self._is_resume, retry_failing=self._retry_failed)
 
     def cancel(self):
         self._engine.cancel_execution(ctx=self._workflow_context)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/e7e1d895/aria/orchestrator/workflows/core/engine.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/engine.py b/aria/orchestrator/workflows/core/engine.py
index d9c77e9..d22b535 100644
--- a/aria/orchestrator/workflows/core/engine.py
+++ b/aria/orchestrator/workflows/core/engine.py
@@ -41,14 +41,15 @@ class Engine(logger.LoggerMixin):
         self._executors = executors.copy()
         self._executors.setdefault(StubTaskExecutor, StubTaskExecutor())
 
-    def execute(self, ctx, resuming=False):
+    def execute(self, ctx, resuming=False, retry_failing=False):
         """
         Executes the workflow.
         """
         if resuming:
             events.on_resume_workflow_signal.send(ctx)
 
-        tasks_tracker = _TasksTracker(ctx)
+        tasks_tracker = _TasksTracker(ctx, retry_failing)
+
         try:
             events.start_workflow_signal.send(ctx)
             while True:
@@ -124,13 +125,37 @@ class Engine(logger.LoggerMixin):
 
 
 class _TasksTracker(object):
-    def __init__(self, ctx):
+
+    def __init__(self, ctx, retry_failing=False):
         self._ctx = ctx
+
+        if retry_failing:
+            self._has_task_ended = self._retry_failed_has_task_ended
+            self._is_task_waiting = self._retry_failed_is_task_waiting
+
         self._tasks = ctx.execution.tasks
-        self._executed_tasks = [task for task in self._tasks if task.has_ended()]
+        self._executed_tasks = [task for task in self._tasks if self._has_task_ended(task)]
         self._executable_tasks = list(set(self._tasks) - set(self._executed_tasks))
         self._executing_tasks = []
 
+    @staticmethod
+    def _retry_failed_has_task_ended(task):
+        # only succeeded tasks have ended (for retry failing)
+        return task.status == task.SUCCESS or (task.status == task.FAILED and task.ignore_failure)
+
+    @staticmethod
+    def _retry_failed_is_task_waiting(task):
+        # failed tasks are waiting to be executed (for retry failing)
+        return task.is_waiting()
+
+    @staticmethod
+    def _has_task_ended(task):
+        return task.has_ended()
+
+    @staticmethod
+    def _is_task_waiting(task):
+        return task.is_waiting()
+
     @property
     def all_tasks_consumed(self):
         return len(self._executed_tasks) == len(self._tasks) and len(self._executing_tasks) == 0
@@ -148,15 +173,15 @@ class _TasksTracker(object):
     @property
     def ended_tasks(self):
         for task in self.executing_tasks:
-            if task.has_ended():
+            if self._has_task_ended(task):
                 yield task
 
     @property
     def executable_tasks(self):
         now = datetime.utcnow()
         # we need both lists since retrying task are in the executing task list.
-        for task in self._update_tasks(self._executing_tasks + self._executable_tasks):
-            if all([task.is_waiting(),
+        for task in self._update_tasks(set(self._executing_tasks + self._executable_tasks)):
+            if all([self._is_task_waiting(task),
                     task.due_at <= now,
                     all(dependency in self._executed_tasks for dependency in task.dependencies)
                    ]):