You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ariatosca.apache.org by mx...@apache.org on 2017/06/21 11:56:46 UTC

incubator-ariatosca git commit: removed the usage of execution graph from the code (currently still remain in test

Repository: incubator-ariatosca
Updated Branches:
  refs/heads/ARIA-284-Cleanup-and-optimize-the-task-execution c211e1064 -> 5cb4f86a6


removed the usage of execution graph from the code (currently still remain in test


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/5cb4f86a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/5cb4f86a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/5cb4f86a

Branch: refs/heads/ARIA-284-Cleanup-and-optimize-the-task-execution
Commit: 5cb4f86a6385a1085bc791bc6ce8f5dae4f36bb8
Parents: c211e10
Author: max-orlov <ma...@gigaspaces.com>
Authored: Wed Jun 21 14:56:42 2017 +0300
Committer: max-orlov <ma...@gigaspaces.com>
Committed: Wed Jun 21 14:56:42 2017 +0300

----------------------------------------------------------------------
 aria/orchestrator/workflows/core/engine.py | 106 +++++++++++++++---------
 1 file changed, 66 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5cb4f86a/aria/orchestrator/workflows/core/engine.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/engine.py b/aria/orchestrator/workflows/core/engine.py
index 9f0ddd7..8999232 100644
--- a/aria/orchestrator/workflows/core/engine.py
+++ b/aria/orchestrator/workflows/core/engine.py
@@ -45,18 +45,20 @@ class Engine(logger.LoggerMixin):
         """
         execute the workflow
         """
-        executing_tasks = []
+        task_tracker = _TasksTracker(ctx)
         try:
             events.start_workflow_signal.send(ctx)
             while True:
                 cancel = self._is_cancel(ctx)
                 if cancel:
                     break
-                for task in self._ended_tasks(ctx, executing_tasks):
-                    self._handle_ended_tasks(ctx, task, executing_tasks)
-                for task in self._executable_tasks(ctx):
-                    self._handle_executable_task(ctx, task, executing_tasks)
-                if self._all_tasks_consumed(ctx):
+                for task in task_tracker.ended_tasks:
+                    task_tracker.finished_(task)
+                    self._handle_ended_tasks(task)
+                for task in task_tracker.executable_tasks:
+                    task_tracker.executing_(task)
+                    self._handle_executable_task(ctx, task)
+                if task_tracker.all_tasks_consumed:
                     break
                 else:
                     time.sleep(0.1)
@@ -82,34 +84,7 @@ class Engine(logger.LoggerMixin):
         execution = ctx.model.execution.refresh(ctx.execution)
         return execution.status in (models.Execution.CANCELLING, models.Execution.CANCELLED)
 
-    def _executable_tasks(self, ctx):
-        now = datetime.utcnow()
-        return (
-            task for task in self._tasks_iter(ctx)
-            if task.is_waiting() and task.due_at <= now and \
-            not self._task_has_dependencies(ctx, task)
-        )
-
-    @staticmethod
-    def _ended_tasks(ctx, executing_tasks):
-        for task in executing_tasks:
-            if task.has_ended() and task in ctx._graph:
-                yield task
-
-    @staticmethod
-    def _task_has_dependencies(ctx, task):
-        return len(ctx._graph.pred.get(task, [])) > 0
-
-    @staticmethod
-    def _all_tasks_consumed(ctx):
-        return len(ctx._graph.node) == 0
-
-    @staticmethod
-    def _tasks_iter(ctx):
-        for task in ctx.execution.tasks:
-            yield ctx.model.task.refresh(task)
-
-    def _handle_executable_task(self, ctx, task, executing_tasks):
+    def _handle_executable_task(self, ctx, task):
         task_executor = self._executors[task._executor]
 
         # If the task is a stub, a default context is provided, else it should hold the context cls
@@ -125,16 +100,67 @@ class Engine(logger.LoggerMixin):
             name=task.name
         )
 
-        executing_tasks.append(task)
-
         if not task._stub_type:
             events.sent_task_signal.send(op_ctx)
         task_executor.execute(op_ctx)
 
     @staticmethod
-    def _handle_ended_tasks(ctx, task, executing_tasks):
-        executing_tasks.remove(task)
+    def _handle_ended_tasks(task):
         if task.status == models.Task.FAILED and not task.ignore_failure:
             raise exceptions.ExecutorException('Workflow failed')
-        else:
-            ctx._graph.remove_node(task)
+
+
+class _TasksTracker():
+    def __init__(self, ctx):
+        self._ctx = ctx
+        self._tasks = ctx.execution.tasks
+        self._executable_tasks = list(self._tasks)
+        self._executing_tasks = []
+        self._executed_tasks = []
+
+    @property
+    def all_tasks_consumed(self):
+        return len(self._executed_tasks) == len(self._tasks) and len(self._executing_tasks) == 0
+
+    def executing_(self, task):
+        self._executable_tasks.remove(task)
+        self._executing_tasks.append(task)
+
+    def finished_(self, task):
+        self._executing_tasks.remove(task)
+        self._executed_tasks.append(task)
+
+    @property
+    def ended_tasks(self):
+        for task in self.executing_tasks:
+            if task.has_ended():
+                yield task
+
+    @property
+    def executable_tasks(self):
+        now = datetime.utcnow()
+        for task in self._update_tasks(self._executable_tasks):
+            if all([task.is_waiting(),
+                    task.due_at <= now,
+                    all(dependency in self._executed_tasks for dependency in task.dependencies)
+                   ]):
+                yield task
+
+    @property
+    def executing_tasks(self):
+        for task in self._update_tasks(self._executing_tasks):
+            yield task
+
+    @property
+    def executed_tasks(self):
+        for task in self._update_tasks(self._executed_tasks):
+            yield task
+
+    @property
+    def tasks(self):
+        for task in self._update_tasks(self._tasks):
+            yield task
+
+    def _update_tasks(self, tasks):
+        for task in tasks:
+            yield self._ctx.model.task.refresh(task)
\ No newline at end of file