You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ariatosca.apache.org by mx...@apache.org on 2017/06/21 12:51:31 UTC

incubator-ariatosca git commit: fix retying mechanism [Forced Update!]

Repository: incubator-ariatosca
Updated Branches:
  refs/heads/ARIA-284-Cleanup-and-optimize-the-task-execution 92773e206 -> a86ba295e (forced update)


fix retying mechanism


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/a86ba295
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/a86ba295
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/a86ba295

Branch: refs/heads/ARIA-284-Cleanup-and-optimize-the-task-execution
Commit: a86ba295e94569f5cd29601f1da906b2af053e4a
Parents: a66b74b
Author: max-orlov <ma...@gigaspaces.com>
Authored: Wed Jun 21 15:25:44 2017 +0300
Committer: max-orlov <ma...@gigaspaces.com>
Committed: Wed Jun 21 15:51:25 2017 +0300

----------------------------------------------------------------------
 aria/orchestrator/context/workflow.py      |  2 --
 aria/orchestrator/workflows/core/engine.py | 15 +++++++++------
 2 files changed, 9 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/a86ba295/aria/orchestrator/context/workflow.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/workflow.py b/aria/orchestrator/context/workflow.py
index 2da3d4c..18334f3 100644
--- a/aria/orchestrator/context/workflow.py
+++ b/aria/orchestrator/context/workflow.py
@@ -20,8 +20,6 @@ Workflow and operation contexts
 import threading
 from contextlib import contextmanager
 
-from networkx import DiGraph
-
 from .exceptions import ContextException
 from .common import BaseContext
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/a86ba295/aria/orchestrator/workflows/core/engine.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/engine.py b/aria/orchestrator/workflows/core/engine.py
index 8999232..603914e 100644
--- a/aria/orchestrator/workflows/core/engine.py
+++ b/aria/orchestrator/workflows/core/engine.py
@@ -53,8 +53,8 @@ class Engine(logger.LoggerMixin):
                 if cancel:
                     break
                 for task in task_tracker.ended_tasks:
-                    task_tracker.finished_(task)
                     self._handle_ended_tasks(task)
+                    task_tracker.finished_(task)
                 for task in task_tracker.executable_tasks:
                     task_tracker.executing_(task)
                     self._handle_executable_task(ctx, task)
@@ -110,7 +110,7 @@ class Engine(logger.LoggerMixin):
             raise exceptions.ExecutorException('Workflow failed')
 
 
-class _TasksTracker():
+class _TasksTracker(object):
     def __init__(self, ctx):
         self._ctx = ctx
         self._tasks = ctx.execution.tasks
@@ -123,8 +123,10 @@ class _TasksTracker():
         return len(self._executed_tasks) == len(self._tasks) and len(self._executing_tasks) == 0
 
     def executing_(self, task):
-        self._executable_tasks.remove(task)
-        self._executing_tasks.append(task)
+        # Task executing could be retrying (thus removed and added earlier)
+        if task not in self._executing_tasks:
+            self._executable_tasks.remove(task)
+            self._executing_tasks.append(task)
 
     def finished_(self, task):
         self._executing_tasks.remove(task)
@@ -139,7 +141,8 @@ class _TasksTracker():
     @property
     def executable_tasks(self):
         now = datetime.utcnow()
-        for task in self._update_tasks(self._executable_tasks):
+        # we need both list since retrying task are in the executing task list.
+        for task in self._update_tasks(self._executing_tasks + self._executable_tasks):
             if all([task.is_waiting(),
                     task.due_at <= now,
                     all(dependency in self._executed_tasks for dependency in task.dependencies)
@@ -163,4 +166,4 @@ class _TasksTracker():
 
     def _update_tasks(self, tasks):
         for task in tasks:
-            yield self._ctx.model.task.refresh(task)
\ No newline at end of file
+            yield self._ctx.model.task.refresh(task)