You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ariatosca.apache.org by mx...@apache.org on 2017/06/21 11:56:46 UTC
incubator-ariatosca git commit: removed the usage of execution graph
from the code (currently still remain in test
Repository: incubator-ariatosca
Updated Branches:
refs/heads/ARIA-284-Cleanup-and-optimize-the-task-execution c211e1064 -> 5cb4f86a6
removed the usage of execution graph from the code (currently still remain in test
Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/5cb4f86a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/5cb4f86a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/5cb4f86a
Branch: refs/heads/ARIA-284-Cleanup-and-optimize-the-task-execution
Commit: 5cb4f86a6385a1085bc791bc6ce8f5dae4f36bb8
Parents: c211e10
Author: max-orlov <ma...@gigaspaces.com>
Authored: Wed Jun 21 14:56:42 2017 +0300
Committer: max-orlov <ma...@gigaspaces.com>
Committed: Wed Jun 21 14:56:42 2017 +0300
----------------------------------------------------------------------
aria/orchestrator/workflows/core/engine.py | 106 +++++++++++++++---------
1 file changed, 66 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5cb4f86a/aria/orchestrator/workflows/core/engine.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/engine.py b/aria/orchestrator/workflows/core/engine.py
index 9f0ddd7..8999232 100644
--- a/aria/orchestrator/workflows/core/engine.py
+++ b/aria/orchestrator/workflows/core/engine.py
@@ -45,18 +45,20 @@ class Engine(logger.LoggerMixin):
"""
execute the workflow
"""
- executing_tasks = []
+ task_tracker = _TasksTracker(ctx)
try:
events.start_workflow_signal.send(ctx)
while True:
cancel = self._is_cancel(ctx)
if cancel:
break
- for task in self._ended_tasks(ctx, executing_tasks):
- self._handle_ended_tasks(ctx, task, executing_tasks)
- for task in self._executable_tasks(ctx):
- self._handle_executable_task(ctx, task, executing_tasks)
- if self._all_tasks_consumed(ctx):
+ for task in task_tracker.ended_tasks:
+ task_tracker.finished_(task)
+ self._handle_ended_tasks(task)
+ for task in task_tracker.executable_tasks:
+ task_tracker.executing_(task)
+ self._handle_executable_task(ctx, task)
+ if task_tracker.all_tasks_consumed:
break
else:
time.sleep(0.1)
@@ -82,34 +84,7 @@ class Engine(logger.LoggerMixin):
execution = ctx.model.execution.refresh(ctx.execution)
return execution.status in (models.Execution.CANCELLING, models.Execution.CANCELLED)
- def _executable_tasks(self, ctx):
- now = datetime.utcnow()
- return (
- task for task in self._tasks_iter(ctx)
- if task.is_waiting() and task.due_at <= now and \
- not self._task_has_dependencies(ctx, task)
- )
-
- @staticmethod
- def _ended_tasks(ctx, executing_tasks):
- for task in executing_tasks:
- if task.has_ended() and task in ctx._graph:
- yield task
-
- @staticmethod
- def _task_has_dependencies(ctx, task):
- return len(ctx._graph.pred.get(task, [])) > 0
-
- @staticmethod
- def _all_tasks_consumed(ctx):
- return len(ctx._graph.node) == 0
-
- @staticmethod
- def _tasks_iter(ctx):
- for task in ctx.execution.tasks:
- yield ctx.model.task.refresh(task)
-
- def _handle_executable_task(self, ctx, task, executing_tasks):
+ def _handle_executable_task(self, ctx, task):
task_executor = self._executors[task._executor]
# If the task is a stub, a default context is provided, else it should hold the context cls
@@ -125,16 +100,67 @@ class Engine(logger.LoggerMixin):
name=task.name
)
- executing_tasks.append(task)
-
if not task._stub_type:
events.sent_task_signal.send(op_ctx)
task_executor.execute(op_ctx)
@staticmethod
- def _handle_ended_tasks(ctx, task, executing_tasks):
- executing_tasks.remove(task)
+ def _handle_ended_tasks(task):
if task.status == models.Task.FAILED and not task.ignore_failure:
raise exceptions.ExecutorException('Workflow failed')
- else:
- ctx._graph.remove_node(task)
+
+
+class _TasksTracker():
+ def __init__(self, ctx):
+ self._ctx = ctx
+ self._tasks = ctx.execution.tasks
+ self._executable_tasks = list(self._tasks)
+ self._executing_tasks = []
+ self._executed_tasks = []
+
+ @property
+ def all_tasks_consumed(self):
+ return len(self._executed_tasks) == len(self._tasks) and len(self._executing_tasks) == 0
+
+ def executing_(self, task):
+ self._executable_tasks.remove(task)
+ self._executing_tasks.append(task)
+
+ def finished_(self, task):
+ self._executing_tasks.remove(task)
+ self._executed_tasks.append(task)
+
+ @property
+ def ended_tasks(self):
+ for task in self.executing_tasks:
+ if task.has_ended():
+ yield task
+
+ @property
+ def executable_tasks(self):
+ now = datetime.utcnow()
+ for task in self._update_tasks(self._executable_tasks):
+ if all([task.is_waiting(),
+ task.due_at <= now,
+ all(dependency in self._executed_tasks for dependency in task.dependencies)
+ ]):
+ yield task
+
+ @property
+ def executing_tasks(self):
+ for task in self._update_tasks(self._executing_tasks):
+ yield task
+
+ @property
+ def executed_tasks(self):
+ for task in self._update_tasks(self._executed_tasks):
+ yield task
+
+ @property
+ def tasks(self):
+ for task in self._update_tasks(self._tasks):
+ yield task
+
+ def _update_tasks(self, tasks):
+ for task in tasks:
+ yield self._ctx.model.task.refresh(task)
\ No newline at end of file