You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ariatosca.apache.org by mx...@apache.org on 2017/06/20 14:22:48 UTC

incubator-ariatosca git commit: review fixes [Forced Update!]

Repository: incubator-ariatosca
Updated Branches:
  refs/heads/ARIA-278-Remove-core-tasks 43371e01f -> f84ad06b2 (forced update)


review fixes


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/f84ad06b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/f84ad06b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/f84ad06b

Branch: refs/heads/ARIA-278-Remove-core-tasks
Commit: f84ad06b2bf7c8a631db68d8c650a3202c187515
Parents: 507796e
Author: max-orlov <ma...@gigaspaces.com>
Authored: Tue Jun 20 15:34:57 2017 +0300
Committer: max-orlov <ma...@gigaspaces.com>
Committed: Tue Jun 20 17:22:37 2017 +0300

----------------------------------------------------------------------
 aria/modeling/orchestration.py                  |  14 +-
 aria/orchestrator/context/operation.py          |   2 +-
 aria/orchestrator/context/workflow.py           |   2 +-
 aria/orchestrator/workflow_runner.py            |   9 +-
 aria/orchestrator/workflows/api/task.py         |   6 +
 aria/orchestrator/workflows/core/engine.py      | 128 +++----------------
 .../workflows/core/events_handler.py            |  19 ++-
 aria/orchestrator/workflows/core/task.py        | 119 +++++++++++++++++
 aria/orchestrator/workflows/events_logging.py   |  56 ++++----
 aria/orchestrator/workflows/executor/base.py    |   4 +-
 aria/orchestrator/workflows/executor/dry.py     |   2 +-
 tests/end2end/testenv.py                        |   3 -
 tests/orchestrator/context/__init__.py          |   8 +-
 tests/orchestrator/context/test_operation.py    |   2 +-
 tests/orchestrator/context/test_serialize.py    |  10 +-
 .../orchestrator/execution_plugin/test_local.py |   8 +-
 tests/orchestrator/execution_plugin/test_ssh.py |   8 +-
 tests/orchestrator/test_workflow_runner.py      |   4 +-
 .../orchestrator/workflows/core/test_engine.py  |  10 +-
 .../orchestrator/workflows/core/test_events.py  |   9 +-
 .../test_task_graph_into_execution_graph.py     |  24 ++--
 .../orchestrator/workflows/executor/__init__.py |   2 +-
 .../executor/test_process_executor_extension.py |   7 +-
 .../test_process_executor_tracked_changes.py    |   7 +-
 24 files changed, 230 insertions(+), 233 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/f84ad06b/aria/modeling/orchestration.py
----------------------------------------------------------------------
diff --git a/aria/modeling/orchestration.py b/aria/modeling/orchestration.py
index 007eefa..17d2476 100644
--- a/aria/modeling/orchestration.py
+++ b/aria/modeling/orchestration.py
@@ -304,10 +304,11 @@ class TaskBase(mixins.ModelMixin):
     started_at = Column(DateTime, default=None)
     ended_at = Column(DateTime, default=None)
     attempts_count = Column(Integer, default=1)
-    api_id = Column(String)
 
+    _api_id = Column(String)
     _executor = Column(PickleType)
     _context_cls = Column(PickleType)
+    _stub_type = Column(Enum(*STUB_TYPES))
 
     @declared_attr
     def logs(cls):
@@ -336,8 +337,6 @@ class TaskBase(mixins.ModelMixin):
     interface_name = Column(String)
     operation_name = Column(String)
 
-    stub_type = Column(Enum(*STUB_TYPES))
-
     @property
     def actor(self):
         """
@@ -410,21 +409,18 @@ class TaskBase(mixins.ModelMixin):
         return self.status in (self.SUCCESS, self.FAILED)
 
     def is_waiting(self):
-        if self.stub_type:
+        if self._stub_type:
             return not self.has_ended()
         else:
             return self.status in (self.PENDING, self.RETRYING)
 
     @classmethod
     def from_api_task(cls, api_task, executor, **kwargs):
-        from aria.orchestrator import context
         instantiation_kwargs = {}
 
         if hasattr(api_task.actor, 'outbound_relationships'):
-            context_cls = context.operation.NodeOperationContext
             instantiation_kwargs['node'] = api_task.actor
         elif hasattr(api_task.actor, 'source_node'):
-            context_cls = context.operation.RelationshipOperationContext
             instantiation_kwargs['relationship'] = api_task.actor
         else:
             raise RuntimeError('No operation context could be created for {actor.model_cls}'
@@ -445,8 +441,8 @@ class TaskBase(mixins.ModelMixin):
                 'plugin': api_task.plugin,
                 'function': api_task.function,
                 'arguments': api_task.arguments,
-                'api_id': api_task.id,
-                '_context_cls': context_cls,
+                '_api_id': api_task.id,
+                '_context_cls': api_task._context_cls,
                 '_executor': executor,
             }
         )

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/f84ad06b/aria/orchestrator/context/operation.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/operation.py b/aria/orchestrator/context/operation.py
index 07bf297..d43b847 100644
--- a/aria/orchestrator/context/operation.py
+++ b/aria/orchestrator/context/operation.py
@@ -109,7 +109,7 @@ class BaseOperationContext(common.BaseContext):
 
     @property
     @contextmanager
-    def track_changes(self):
+    def persist_changes(self):
         yield
         self.model.task.update(self.task)
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/f84ad06b/aria/orchestrator/context/workflow.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/workflow.py b/aria/orchestrator/context/workflow.py
index 4b7573f..aa5a786 100644
--- a/aria/orchestrator/context/workflow.py
+++ b/aria/orchestrator/context/workflow.py
@@ -109,7 +109,7 @@ class WorkflowContext(BaseContext):
 
     @property
     @contextmanager
-    def track_changes(self):
+    def persist_changes(self):
         yield
         self._model.execution.update(self.execution)
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/f84ad06b/aria/orchestrator/workflow_runner.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflow_runner.py b/aria/orchestrator/workflow_runner.py
index c30ec4b..a57a34e 100644
--- a/aria/orchestrator/workflow_runner.py
+++ b/aria/orchestrator/workflow_runner.py
@@ -24,7 +24,7 @@ from datetime import datetime
 from . import exceptions
 from .context.workflow import WorkflowContext
 from .workflows import builtin
-from .workflows.core import engine
+from .workflows.core import engine, task
 from .workflows.executor.process import ProcessExecutor
 from ..modeling import models
 from ..modeling import utils as modeling_utils
@@ -87,12 +87,9 @@ class WorkflowRunner(object):
         execution_inputs_dict = dict(inp.unwrapped for inp in self.execution.inputs.values())
 
         self._tasks_graph = workflow_fn(ctx=self._workflow_context, **execution_inputs_dict)
-        engine.construct_execution_tasks(self.execution, self._tasks_graph, executor.__class__)
+        task.create_execution_tasks(self._workflow_context, self._tasks_graph, executor.__class__)
 
-        # Update the state
-        self._model_storage.execution.update(execution)
-
-        self._engine = engine.Engine(default_executor=executor)
+        self._engine = engine.Engine(executors={executor.__class__: executor})
 
     @property
     def execution_id(self):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/f84ad06b/aria/orchestrator/workflows/api/task.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/api/task.py b/aria/orchestrator/workflows/api/task.py
index ca125a8..ce34005 100644
--- a/aria/orchestrator/workflows/api/task.py
+++ b/aria/orchestrator/workflows/api/task.py
@@ -140,6 +140,12 @@ class OperationTask(BaseTask):
         self.arguments = modeling_utils.merge_parameter_values(arguments,
                                                                operation.arguments,
                                                                model_cls=models.Argument)
+        if getattr(self.actor, 'outbound_relationships', None) is not None:
+            self._context_cls = context.operation.NodeOperationContext
+        elif getattr(self.actor, 'source_node', None) is not None:
+            self._context_cls = context.operation.RelationshipOperationContext
+        else:
+            self._context_cls = context.operation.BaseOperationContext
 
     def __repr__(self):
         return self.name

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/f84ad06b/aria/orchestrator/workflows/core/engine.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/engine.py b/aria/orchestrator/workflows/core/engine.py
index cec561f..9f0ddd7 100644
--- a/aria/orchestrator/workflows/core/engine.py
+++ b/aria/orchestrator/workflows/core/engine.py
@@ -25,7 +25,8 @@ from aria.modeling import models
 from aria.orchestrator import events
 from aria.orchestrator.context import operation
 
-from .. import exceptions, executor, api
+from .. import exceptions
+from ..executor.base import StubTaskExecutor
 # Import required so all signals are registered
 from . import events_handler  # pylint: disable=unused-import
 
@@ -35,25 +36,26 @@ class Engine(logger.LoggerMixin):
     The workflow engine. Executes workflows
     """
 
-    def __init__(self, default_executor, **kwargs):
+    def __init__(self, executors, **kwargs):
         super(Engine, self).__init__(**kwargs)
-        self._executors = {default_executor.__class__: default_executor}
-        self._executing_tasks = []
+        self._executors = executors.copy()
+        self._executors.setdefault(StubTaskExecutor, StubTaskExecutor())
 
     def execute(self, ctx):
         """
         execute the workflow
         """
+        executing_tasks = []
         try:
             events.start_workflow_signal.send(ctx)
             while True:
                 cancel = self._is_cancel(ctx)
                 if cancel:
                     break
-                for task in self._ended_tasks(ctx):
-                    self._handle_ended_tasks(ctx, task)
+                for task in self._ended_tasks(ctx, executing_tasks):
+                    self._handle_ended_tasks(ctx, task, executing_tasks)
                 for task in self._executable_tasks(ctx):
-                    self._handle_executable_task(ctx, task)
+                    self._handle_executable_task(ctx, task, executing_tasks)
                 if self._all_tasks_consumed(ctx):
                     break
                 else:
@@ -77,7 +79,7 @@ class Engine(logger.LoggerMixin):
 
     @staticmethod
     def _is_cancel(ctx):
-        execution = ctx.model.execution.update(ctx.execution)
+        execution = ctx.model.execution.refresh(ctx.execution)
         return execution.status in (models.Execution.CANCELLING, models.Execution.CANCELLED)
 
     def _executable_tasks(self, ctx):
@@ -88,8 +90,9 @@ class Engine(logger.LoggerMixin):
             not self._task_has_dependencies(ctx, task)
         )
 
-    def _ended_tasks(self, ctx):
-        for task in self._executing_tasks:
+    @staticmethod
+    def _ended_tasks(ctx, executing_tasks):
+        for task in executing_tasks:
             if task.has_ended() and task in ctx._graph:
                 yield task
 
@@ -106,12 +109,11 @@ class Engine(logger.LoggerMixin):
         for task in ctx.execution.tasks:
             yield ctx.model.task.refresh(task)
 
-    def _handle_executable_task(self, ctx, task):
-        if task._executor not in self._executors:
-            self._executors[task._executor] = task._executor()
+    def _handle_executable_task(self, ctx, task, executing_tasks):
         task_executor = self._executors[task._executor]
 
-        context_cls = task._context_cls or operation.BaseOperationContext
+        # If the task is a stub, a default context is provided, else it should hold the context cls
+        context_cls = operation.BaseOperationContext if task._stub_type else task._context_cls
         op_ctx = context_cls(
             model_storage=ctx.model,
             resource_storage=ctx.resource,
@@ -123,104 +125,16 @@ class Engine(logger.LoggerMixin):
             name=task.name
         )
 
-        self._executing_tasks.append(task)
+        executing_tasks.append(task)
 
-        if not task.stub_type:
+        if not task._stub_type:
             events.sent_task_signal.send(op_ctx)
         task_executor.execute(op_ctx)
 
-    def _handle_ended_tasks(self, ctx, task):
-        self._executing_tasks.remove(task)
+    @staticmethod
+    def _handle_ended_tasks(ctx, task, executing_tasks):
+        executing_tasks.remove(task)
         if task.status == models.Task.FAILED and not task.ignore_failure:
             raise exceptions.ExecutorException('Workflow failed')
         else:
             ctx._graph.remove_node(task)
-
-
-def construct_execution_tasks(execution,
-                              task_graph,
-                              default_executor,
-                              stub_executor=executor.base.StubTaskExecutor,
-                              start_stub_type=models.Task.START_WORKFLOW,
-                              end_stub_type=models.Task.END_WORKFLOW,
-                              depends_on=()):
-    """
-    Translates the user graph to the execution graph
-    :param task_graph: The user's graph
-    :param start_stub_type: internal use
-    :param end_stub_type: internal use
-    :param depends_on: internal use
-    """
-    depends_on = list(depends_on)
-
-    # Insert start marker
-    start_task = models.Task(api_id=_start_graph_suffix(task_graph.id),
-                             _executor=stub_executor,
-                             execution=execution,
-                             stub_type=start_stub_type,
-                             dependencies=depends_on)
-
-    for task in task_graph.topological_order(reverse=True):
-        operation_dependencies = _get_tasks_from_dependencies(
-            execution, task_graph.get_dependencies(task), [start_task])
-
-        if isinstance(task, api.task.OperationTask):
-            models.Task.from_api_task(api_task=task,
-                                      executor=default_executor,
-                                      dependencies=operation_dependencies)
-
-        elif isinstance(task, api.task.WorkflowTask):
-            # Build the graph recursively while adding start and end markers
-            construct_execution_tasks(
-                execution=execution,
-                task_graph=task,
-                default_executor=default_executor,
-                stub_executor=stub_executor,
-                start_stub_type=models.Task.START_SUBWROFKLOW,
-                end_stub_type=models.Task.END_SUBWORKFLOW,
-                depends_on=operation_dependencies
-            )
-        elif isinstance(task, api.task.StubTask):
-            models.Task(api_id=task.id,
-                        _executor=stub_executor,
-                        execution=execution,
-                        stub_type=models.Task.STUB,
-                        dependencies=operation_dependencies)
-        else:
-            raise RuntimeError('Undefined state')
-
-    # Insert end marker
-    models.Task(api_id=_end_graph_suffix(task_graph.id),
-                _executor=stub_executor,
-                execution=execution,
-                stub_type=end_stub_type,
-                dependencies=_get_non_dependent_tasks(execution) or [start_task])
-
-
-def _start_graph_suffix(api_id):
-    return '{0}-Start'.format(api_id)
-
-
-def _end_graph_suffix(api_id):
-    return '{0}-End'.format(api_id)
-
-
-def _get_non_dependent_tasks(execution):
-    dependency_tasks = set()
-    for task in execution.tasks:
-        dependency_tasks.update(task.dependencies)
-    return list(set(execution.tasks) - set(dependency_tasks))
-
-
-def _get_tasks_from_dependencies(execution, dependencies, default=()):
-    """
-    Returns task list from dependencies.
-    """
-    tasks = []
-    for dependency in dependencies:
-        if getattr(dependency, 'actor', False):
-            dependency_name = dependency.id
-        else:
-            dependency_name = _end_graph_suffix(dependency.id)
-        tasks.extend(task for task in execution.tasks if task.api_id == dependency_name)
-    return tasks or default

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/f84ad06b/aria/orchestrator/workflows/core/events_handler.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/events_handler.py b/aria/orchestrator/workflows/core/events_handler.py
index 3a780d5..2d71d2a 100644
--- a/aria/orchestrator/workflows/core/events_handler.py
+++ b/aria/orchestrator/workflows/core/events_handler.py
@@ -31,13 +31,13 @@ from ... import exceptions
 
 @events.sent_task_signal.connect
 def _task_sent(ctx, *args, **kwargs):
-    with ctx.track_changes:
+    with ctx.persist_changes:
         ctx.task.status = ctx.task.SENT
 
 
 @events.start_task_signal.connect
 def _task_started(ctx, *args, **kwargs):
-    with ctx.track_changes:
+    with ctx.persist_changes:
         ctx.task.started_at = datetime.utcnow()
         ctx.task.status = ctx.task.STARTED
         _update_node_state_if_necessary(ctx, is_transitional=True)
@@ -45,7 +45,7 @@ def _task_started(ctx, *args, **kwargs):
 
 @events.on_failure_task_signal.connect
 def _task_failed(ctx, exception, *args, **kwargs):
-    with ctx.track_changes:
+    with ctx.persist_changes:
         should_retry = all([
             not isinstance(exception, exceptions.TaskAbortException),
             ctx.task.attempts_count < ctx.task.max_attempts or
@@ -71,7 +71,7 @@ def _task_failed(ctx, exception, *args, **kwargs):
 
 @events.on_success_task_signal.connect
 def _task_succeeded(ctx, *args, **kwargs):
-    with ctx.track_changes:
+    with ctx.persist_changes:
         ctx.task.ended_at = datetime.utcnow()
         ctx.task.status = ctx.task.SUCCESS
 
@@ -80,7 +80,7 @@ def _task_succeeded(ctx, *args, **kwargs):
 
 @events.start_workflow_signal.connect
 def _workflow_started(workflow_context, *args, **kwargs):
-    with workflow_context.track_changes:
+    with workflow_context.persist_changes:
         execution = workflow_context.execution
         # the execution may already be in the process of cancelling
         if execution.status in (execution.CANCELLING, execution.CANCELLED):
@@ -91,7 +91,7 @@ def _workflow_started(workflow_context, *args, **kwargs):
 
 @events.on_failure_workflow_signal.connect
 def _workflow_failed(workflow_context, exception, *args, **kwargs):
-    with workflow_context.track_changes:
+    with workflow_context.persist_changes:
         execution = workflow_context.execution
         execution.error = str(exception)
         execution.status = execution.FAILED
@@ -100,7 +100,7 @@ def _workflow_failed(workflow_context, exception, *args, **kwargs):
 
 @events.on_success_workflow_signal.connect
 def _workflow_succeeded(workflow_context, *args, **kwargs):
-    with workflow_context.track_changes:
+    with workflow_context.persist_changes:
         execution = workflow_context.execution
         execution.status = execution.SUCCEEDED
         execution.ended_at = datetime.utcnow()
@@ -108,7 +108,7 @@ def _workflow_succeeded(workflow_context, *args, **kwargs):
 
 @events.on_cancelled_workflow_signal.connect
 def _workflow_cancelled(workflow_context, *args, **kwargs):
-    with workflow_context.track_changes:
+    with workflow_context.persist_changes:
         execution = workflow_context.execution
         # _workflow_cancelling function may have called this function already
         if execution.status == execution.CANCELLED:
@@ -123,7 +123,7 @@ def _workflow_cancelled(workflow_context, *args, **kwargs):
 
 @events.on_cancelling_workflow_signal.connect
 def _workflow_cancelling(workflow_context, *args, **kwargs):
-    with workflow_context.track_changes:
+    with workflow_context.persist_changes:
         execution = workflow_context.execution
         if execution.status == execution.PENDING:
             return _workflow_cancelled(workflow_context=workflow_context)
@@ -132,7 +132,6 @@ def _workflow_cancelling(workflow_context, *args, **kwargs):
             _log_tried_to_cancel_execution_but_it_already_ended(workflow_context, execution.status)
         else:
             execution.status = execution.CANCELLING
-            workflow_context.execution = execution
 
 
 def _update_node_state_if_necessary(ctx, is_transitional=False):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/f84ad06b/aria/orchestrator/workflows/core/task.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/task.py b/aria/orchestrator/workflows/core/task.py
new file mode 100644
index 0000000..a9f8d18
--- /dev/null
+++ b/aria/orchestrator/workflows/core/task.py
@@ -0,0 +1,119 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+The workflow engine. Executes workflows
+"""
+
+from ....modeling import models
+from .. import executor, api
+
+
+def create_execution_tasks(ctx, task_graph, default_executor):
+    execution = ctx.execution
+    _construct_execution_tasks(execution, task_graph, default_executor)
+    ctx.model.execution.update(execution)
+    return execution.tasks
+
+
+def _construct_execution_tasks(execution,
+                               task_graph,
+                               default_executor,
+                               stub_executor=executor.base.StubTaskExecutor,
+                               start_stub_type=models.Task.START_WORKFLOW,
+                               end_stub_type=models.Task.END_WORKFLOW,
+                               depends_on=()):
+    """
+    Translates the user graph to the execution graph
+    :param task_graph: The user's graph
+    :param start_stub_type: internal use
+    :param end_stub_type: internal use
+    :param depends_on: internal use
+    """
+    depends_on = list(depends_on)
+
+    # Insert start marker
+    start_task = models.Task(execution=execution,
+                             dependencies=depends_on,
+                             _api_id=_start_graph_suffix(task_graph.id),
+                             _stub_type=start_stub_type,
+                             _executor=stub_executor)
+
+    for task in task_graph.topological_order(reverse=True):
+        operation_dependencies = _get_tasks_from_dependencies(
+            execution, task_graph.get_dependencies(task), [start_task])
+
+        if isinstance(task, api.task.OperationTask):
+            models.Task.from_api_task(api_task=task,
+                                      executor=default_executor,
+                                      dependencies=operation_dependencies)
+
+        elif isinstance(task, api.task.WorkflowTask):
+            # Build the graph recursively while adding start and end markers
+            _construct_execution_tasks(
+                execution=execution,
+                task_graph=task,
+                default_executor=default_executor,
+                stub_executor=stub_executor,
+                start_stub_type=models.Task.START_SUBWROFKLOW,
+                end_stub_type=models.Task.END_SUBWORKFLOW,
+                depends_on=operation_dependencies
+            )
+        elif isinstance(task, api.task.StubTask):
+            models.Task(execution=execution,
+                        dependencies=operation_dependencies,
+                        _api_id=task.id,
+                        _executor=stub_executor,
+                        _stub_type=models.Task.STUB,
+                       )
+        else:
+            raise RuntimeError('Undefined state')
+
+    # Insert end marker
+    models.Task(dependencies=_get_non_dependent_tasks(execution) or [start_task],
+                execution=execution,
+                _api_id=_end_graph_suffix(task_graph.id),
+                _executor=stub_executor,
+                _stub_type=end_stub_type)
+
+
+def _start_graph_suffix(api_id):
+    return '{0}-Start'.format(api_id)
+
+
+def _end_graph_suffix(api_id):
+    return '{0}-End'.format(api_id)
+
+
+def _get_non_dependent_tasks(execution):
+    tasks_with_dependencies = set()
+    for task in execution.tasks:
+        tasks_with_dependencies.update(task.dependencies)
+    return list(set(execution.tasks) - set(tasks_with_dependencies))
+
+
+def _get_tasks_from_dependencies(execution, dependencies, default=()):
+    """
+    Returns task list from dependencies.
+    """
+    tasks = []
+    for dependency in dependencies:
+        if getattr(dependency, 'actor', False):
+            # This is
+            dependency_name = dependency.id
+        else:
+            dependency_name = _end_graph_suffix(dependency.id)
+        tasks.extend(task for task in execution.tasks if task._api_id == dependency_name)
+    return tasks or default

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/f84ad06b/aria/orchestrator/workflows/events_logging.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/events_logging.py b/aria/orchestrator/workflows/events_logging.py
index 12aebab..4cee867 100644
--- a/aria/orchestrator/workflows/events_logging.py
+++ b/aria/orchestrator/workflows/events_logging.py
@@ -35,66 +35,54 @@ def _get_task_name(task):
 
 @events.start_task_signal.connect
 def _start_task_handler(ctx, **kwargs):
-    with ctx.track_changes:
-        # If the task has no function this is an empty task.
-        if ctx.task.function:
-            suffix = 'started...'
-            logger = ctx.logger.info
-        else:
-            suffix = 'has no implementation'
-            logger = ctx.logger.debug
+    # If the task has no function this is an empty task.
+    if ctx.task.function:
+        suffix = 'started...'
+        logger = ctx.logger.info
+    else:
+        suffix = 'has no implementation'
+        logger = ctx.logger.debug
 
-        logger('{name} {task.interface_name}.{task.operation_name} {suffix}'.format(
-            name=_get_task_name(ctx.task), task=ctx.task, suffix=suffix))
+    logger('{name} {task.interface_name}.{task.operation_name} {suffix}'.format(
+        name=_get_task_name(ctx.task), task=ctx.task, suffix=suffix))
 
 
 @events.on_success_task_signal.connect
 def _success_task_handler(ctx, **kwargs):
-    with ctx.track_changes:
-        if not ctx.task.function:
-            return
-        ctx.logger.info('{name} {task.interface_name}.{task.operation_name} successful'
-                        .format(name=_get_task_name(ctx.task), task=ctx.task))
+    if not ctx.task.function:
+        return
+    ctx.logger.info('{name} {task.interface_name}.{task.operation_name} successful'
+                    .format(name=_get_task_name(ctx.task), task=ctx.task))
 
 
 @events.on_failure_task_signal.connect
 def _failure_operation_handler(ctx, traceback, **kwargs):
-    with ctx.track_changes:
-        ctx.logger.error(
-            '{name} {task.interface_name}.{task.operation_name} failed'
-            .format(name=_get_task_name(ctx.task), task=ctx.task), extra=dict(traceback=traceback)
-        )
+    ctx.logger.error(
+        '{name} {task.interface_name}.{task.operation_name} failed'
+        .format(name=_get_task_name(ctx.task), task=ctx.task), extra=dict(traceback=traceback)
+    )
 
 
 @events.start_workflow_signal.connect
 def _start_workflow_handler(context, **kwargs):
-    with context.track_changes:
-        context.logger.info("Starting '{ctx.workflow_name}' workflow execution".format(ctx=context))
+    context.logger.info("Starting '{ctx.workflow_name}' workflow execution".format(ctx=context))
 
 
 @events.on_failure_workflow_signal.connect
 def _failure_workflow_handler(context, **kwargs):
-    with context.track_changes:
-        context.logger.info("'{ctx.workflow_name}' workflow execution failed".format(ctx=context))
+    context.logger.info("'{ctx.workflow_name}' workflow execution failed".format(ctx=context))
 
 
 @events.on_success_workflow_signal.connect
 def _success_workflow_handler(context, **kwargs):
-    with context.track_changes:
-        context.logger.info(
-            "'{ctx.workflow_name}' workflow execution succeeded".format(ctx=context)
-        )
+    context.logger.info("'{ctx.workflow_name}' workflow execution succeeded".format(ctx=context))
 
 
 @events.on_cancelled_workflow_signal.connect
 def _cancel_workflow_handler(context, **kwargs):
-    with context.track_changes:
-        context.logger.info("'{ctx.workflow_name}' workflow execution canceled".format(ctx=context))
+    context.logger.info("'{ctx.workflow_name}' workflow execution canceled".format(ctx=context))
 
 
 @events.on_cancelling_workflow_signal.connect
 def _cancelling_workflow_handler(context, **kwargs):
-    with context.track_changes:
-        context.logger.info(
-            "Cancelling '{ctx.workflow_name}' workflow execution".format(ctx=context)
-        )
+    context.logger.info("Cancelling '{ctx.workflow_name}' workflow execution".format(ctx=context))

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/f84ad06b/aria/orchestrator/workflows/executor/base.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/base.py b/aria/orchestrator/workflows/executor/base.py
index 9e1ce7e..257d12c 100644
--- a/aria/orchestrator/workflows/executor/base.py
+++ b/aria/orchestrator/workflows/executor/base.py
@@ -33,7 +33,7 @@ class BaseExecutor(logger.LoggerMixin):
         Execute a task
         :param task: task to execute
         """
-        with ctx.track_changes:
+        with ctx.persist_changes:
             if ctx.task.function:
                 self._execute(ctx)
             else:
@@ -64,5 +64,5 @@ class BaseExecutor(logger.LoggerMixin):
 
 class StubTaskExecutor(BaseExecutor):                                                               # pylint: disable=abstract-method
     def execute(self, ctx, *args, **kwargs):
-        with ctx.track_changes:
+        with ctx.persist_changes:
             ctx.task.status = ctx.task.SUCCESS

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/f84ad06b/aria/orchestrator/workflows/executor/dry.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/dry.py b/aria/orchestrator/workflows/executor/dry.py
index 88d2e12..9d86125 100644
--- a/aria/orchestrator/workflows/executor/dry.py
+++ b/aria/orchestrator/workflows/executor/dry.py
@@ -26,7 +26,7 @@ class DryExecutor(base.BaseExecutor):
     Executor which dry runs tasks - prints task information without causing any side effects
     """
     def execute(self, ctx):
-        with ctx.track_changes:
+        with ctx.persist_changes:
             # updating the task manually instead of calling self._task_started(task),
             # to avoid any side effects raising that event might cause
             ctx.task.started_at = datetime.utcnow()

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/f84ad06b/tests/end2end/testenv.py
----------------------------------------------------------------------
diff --git a/tests/end2end/testenv.py b/tests/end2end/testenv.py
index 87ca5bd..85714e5 100644
--- a/tests/end2end/testenv.py
+++ b/tests/end2end/testenv.py
@@ -60,9 +60,6 @@ class TestEnvironment(object):
 
     def execute_workflow(self, service_name, workflow_name, dry=False):
         self.cli.executions.start(workflow_name, service_name=service_name, dry=dry)
-        service = self.model_storage.service.get_by_name(service_name)
-        for active_execution in [e for e in service.executions if not e.has_ended()]:
-            self.model_storage.execution.refresh(active_execution)
 
     def verify_clean_storage(self):
         assert len(self.model_storage.service_template.list()) == 0

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/f84ad06b/tests/orchestrator/context/__init__.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/context/__init__.py b/tests/orchestrator/context/__init__.py
index abe92b9..777c051 100644
--- a/tests/orchestrator/context/__init__.py
+++ b/tests/orchestrator/context/__init__.py
@@ -15,8 +15,7 @@
 
 import sys
 
-from aria.orchestrator.workflows.core import engine
-from aria.orchestrator import workflow_runner
+from aria.orchestrator.workflows.core import engine, task
 
 
 def op_path(func, module_path=None):
@@ -27,8 +26,7 @@ def op_path(func, module_path=None):
 def execute(workflow_func, workflow_context, executor):
     graph = workflow_func(ctx=workflow_context)
 
-    engine.construct_execution_tasks(workflow_context.execution, graph, executor.__class__)
-    workflow_context.execution = workflow_context.execution
-    eng = engine.Engine(executor)
+    task.create_execution_tasks(workflow_context, graph, executor.__class__)
+    eng = engine.Engine(executors={executor.__class__: executor})
 
     eng.execute(workflow_context)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/f84ad06b/tests/orchestrator/context/test_operation.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/context/test_operation.py b/tests/orchestrator/context/test_operation.py
index f654fe5..9550d12 100644
--- a/tests/orchestrator/context/test_operation.py
+++ b/tests/orchestrator/context/test_operation.py
@@ -408,7 +408,7 @@ def _assert_loggins(ctx, arguments):
     assert len(executions) == 1
     execution = executions[0]
 
-    tasks = ctx.model.task.list(filters={'stub_type': None})
+    tasks = ctx.model.task.list(filters={'_stub_type': None})
     assert len(tasks) == 1
     task = tasks[0]
     assert len(task.logs) == 4

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/f84ad06b/tests/orchestrator/context/test_serialize.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/context/test_serialize.py b/tests/orchestrator/context/test_serialize.py
index 84d8952..c9227e6 100644
--- a/tests/orchestrator/context/test_serialize.py
+++ b/tests/orchestrator/context/test_serialize.py
@@ -16,7 +16,7 @@
 import pytest
 
 from aria.orchestrator.workflows import api
-from aria.orchestrator.workflows.core import engine
+from aria.orchestrator.workflows.core import engine, task
 from aria.orchestrator.workflows.executor import process
 from aria.orchestrator import workflow, operation
 import tests
@@ -48,17 +48,15 @@ def test_serialize_operation_context(context, executor, tmpdir):
     context.model.node.update(node)
 
     graph = _mock_workflow(ctx=context)  # pylint: disable=no-value-for-parameter
-    engine.construct_execution_tasks(context.execution, graph, executor.__class__)
-    context.execution = context.execution
-    eng = engine.Engine(executor)
+    task.create_execution_tasks(context, graph, executor.__class__)
+    eng = engine.Engine({executor.__class__: executor})
     eng.execute(context)
 
 
 @workflow
 def _mock_workflow(ctx, graph):
     node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
-    task = api.task.OperationTask(node, interface_name='test', operation_name='op')
-    graph.add_tasks(task)
+    graph.add_tasks(api.task.OperationTask(node, interface_name='test', operation_name='op'))
     return graph
 
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/f84ad06b/tests/orchestrator/execution_plugin/test_local.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/execution_plugin/test_local.py b/tests/orchestrator/execution_plugin/test_local.py
index cb1503b..361ddab 100644
--- a/tests/orchestrator/execution_plugin/test_local.py
+++ b/tests/orchestrator/execution_plugin/test_local.py
@@ -28,7 +28,7 @@ from aria.orchestrator.execution_plugin.exceptions import ProcessException
 from aria.orchestrator.execution_plugin import local
 from aria.orchestrator.execution_plugin import constants
 from aria.orchestrator.workflows.executor import process
-from aria.orchestrator.workflows.core import engine
+from aria.orchestrator.workflows.core import engine, task
 
 from tests import mock
 from tests import storage
@@ -500,10 +500,8 @@ if __name__ == '__main__':
                 arguments=arguments))
             return graph
         tasks_graph = mock_workflow(ctx=workflow_context)  # pylint: disable=no-value-for-parameter
-        engine.construct_execution_tasks(
-            workflow_context.execution, tasks_graph, executor.__class__)
-        workflow_context.execution = workflow_context.execution
-        eng = engine.Engine(executor)
+        task.create_execution_tasks(workflow_context, tasks_graph, executor.__class__)
+        eng = engine.Engine({executor.__class__: executor})
         eng.execute(workflow_context)
         return workflow_context.model.node.get_by_name(
             mock.models.DEPENDENCY_NODE_NAME).attributes

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/f84ad06b/tests/orchestrator/execution_plugin/test_ssh.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/execution_plugin/test_ssh.py b/tests/orchestrator/execution_plugin/test_ssh.py
index 2eb3c0b..1498cd1 100644
--- a/tests/orchestrator/execution_plugin/test_ssh.py
+++ b/tests/orchestrator/execution_plugin/test_ssh.py
@@ -29,7 +29,7 @@ from aria.orchestrator import events
 from aria.orchestrator import workflow
 from aria.orchestrator.workflows import api
 from aria.orchestrator.workflows.executor import process
-from aria.orchestrator.workflows.core import engine
+from aria.orchestrator.workflows.core import engine, task
 from aria.orchestrator.workflows.exceptions import ExecutorException
 from aria.orchestrator.exceptions import TaskAbortException, TaskRetryException
 from aria.orchestrator.execution_plugin import operations
@@ -254,10 +254,8 @@ class TestWithActualSSHServer(object):
             graph.sequence(*ops)
             return graph
         tasks_graph = mock_workflow(ctx=self._workflow_context)  # pylint: disable=no-value-for-parameter
-        engine.construct_execution_tasks(
-            self._workflow_context.execution, tasks_graph, self._executor.__class__)
-        self._workflow_context.execution = self._workflow_context.execution
-        eng = engine.Engine(self._executor)
+        task.create_execution_tasks(self._workflow_context, tasks_graph, self._executor.__class__)
+        eng = engine.Engine({self._executor.__class__: self._executor})
         eng.execute(self._workflow_context)
         return self._workflow_context.model.node.get_by_name(
             mock.models.DEPENDENCY_NODE_NAME).attributes

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/f84ad06b/tests/orchestrator/test_workflow_runner.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/test_workflow_runner.py b/tests/orchestrator/test_workflow_runner.py
index c2312b1..40f9035 100644
--- a/tests/orchestrator/test_workflow_runner.py
+++ b/tests/orchestrator/test_workflow_runner.py
@@ -99,7 +99,7 @@ def test_default_executor(request):
     with mock.patch('aria.orchestrator.workflow_runner.engine.Engine') as mock_engine_cls:
         _create_workflow_runner(request, mock_workflow)
         _, engine_kwargs = mock_engine_cls.call_args
-        assert isinstance(engine_kwargs.get('default_executor'), ProcessExecutor)
+        assert isinstance(engine_kwargs.get('executors').values()[0], ProcessExecutor)
 
 
 def test_custom_executor(request):
@@ -109,7 +109,7 @@ def test_custom_executor(request):
     with mock.patch('aria.orchestrator.workflow_runner.engine.Engine') as mock_engine_cls:
         _create_workflow_runner(request, mock_workflow, executor=custom_executor)
         _, engine_kwargs = mock_engine_cls.call_args
-        assert engine_kwargs.get('default_executor') == custom_executor
+        assert engine_kwargs.get('executors').values()[0] == custom_executor
 
 
 def test_task_configuration_parameters(request):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/f84ad06b/tests/orchestrator/workflows/core/test_engine.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/core/test_engine.py b/tests/orchestrator/workflows/core/test_engine.py
index 7bdac67..44ec1da 100644
--- a/tests/orchestrator/workflows/core/test_engine.py
+++ b/tests/orchestrator/workflows/core/test_engine.py
@@ -28,7 +28,7 @@ from aria.orchestrator.workflows import (
     api,
     exceptions,
 )
-from aria.orchestrator.workflows.core import engine
+from aria.orchestrator.workflows.core import engine, task
 from aria.orchestrator.workflows.executor import thread
 
 from tests import mock, storage
@@ -50,11 +50,9 @@ class BaseTest(object):
     @staticmethod
     def _engine(workflow_func, workflow_context, executor):
         graph = workflow_func(ctx=workflow_context)
-        execution = workflow_context.execution
-        engine.construct_execution_tasks(execution, graph, executor.__class__)
-        workflow_context.execution = execution
+        task.create_execution_tasks(workflow_context, graph, executor.__class__)
 
-        return engine.Engine(default_executor=executor)
+        return engine.Engine(executors={executor.__class__: executor})
 
     @staticmethod
     def _create_interface(ctx, func, arguments=None):
@@ -101,7 +99,7 @@ class BaseTest(object):
     @pytest.fixture(autouse=True)
     def signals_registration(self, ):
         def sent_task_handler(ctx, *args, **kwargs):
-            if ctx.task.stub_type is None:
+            if ctx.task._stub_type is None:
                 calls = global_test_holder.setdefault('sent_task_signal_calls', 0)
                 global_test_holder['sent_task_signal_calls'] = calls + 1
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/f84ad06b/tests/orchestrator/workflows/core/test_events.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/core/test_events.py b/tests/orchestrator/workflows/core/test_events.py
index f0699df..5f4868a 100644
--- a/tests/orchestrator/workflows/core/test_events.py
+++ b/tests/orchestrator/workflows/core/test_events.py
@@ -16,7 +16,7 @@
 import pytest
 
 from aria.orchestrator.decorators import operation, workflow
-from aria.orchestrator.workflows.core import engine
+from aria.orchestrator.workflows.core import engine, task
 from aria.orchestrator.workflows.executor.thread import ThreadExecutor
 from aria.orchestrator.workflows import api
 from aria.modeling.service_instance import NodeBase
@@ -113,13 +113,12 @@ def run_operation_on_node(ctx, op_name, interface_name):
         operation_name=op_name,
         operation_kwargs=dict(function='{name}.{func.__name__}'.format(name=__name__, func=func)))
     node.interfaces[interface.name] = interface
-    engine.construct_execution_tasks(
-        ctx.execution,
+    task.create_execution_tasks(
+        ctx,
         single_operation_workflow(ctx, node=node, interface_name=interface_name, op_name=op_name),
         ThreadExecutor)
-    ctx.execution = ctx.execution
 
-    eng = engine.Engine(default_executor=ThreadExecutor())
+    eng = engine.Engine(executors={ThreadExecutor: ThreadExecutor()})
     eng.execute(ctx)
     return node
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/f84ad06b/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py b/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py
index 569e8be..044d498 100644
--- a/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py
+++ b/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py
@@ -17,10 +17,8 @@ from networkx import topological_sort
 
 from aria.modeling import models
 from aria.orchestrator import context
-from aria.orchestrator.workflows import (
-    api,
-    core
-)
+from aria.orchestrator.workflows import api
+from aria.orchestrator.workflows.core import task
 from aria.orchestrator.workflows.executor import base
 from tests import mock
 from tests import storage
@@ -67,11 +65,7 @@ def test_task_graph_into_execution_graph(tmpdir):
     test_task_graph.add_dependency(inner_task_graph, simple_before_task)
     test_task_graph.add_dependency(simple_after_task, inner_task_graph)
 
-    # Direct check
-    execution = workflow_context.model.execution.list()[0]
-
-    core.engine.construct_execution_tasks(execution, test_task_graph, base.StubTaskExecutor)
-    workflow_context.execution = execution
+    task.create_execution_tasks(workflow_context, test_task_graph, base.StubTaskExecutor)
 
     execution_tasks = topological_sort(workflow_context._graph)
 
@@ -87,23 +81,23 @@ def test_task_graph_into_execution_graph(tmpdir):
         '{0}-End'.format(test_task_graph.id)
     ]
 
-    assert expected_tasks_names == [t.api_id for t in execution_tasks]
+    assert expected_tasks_names == [t._api_id for t in execution_tasks]
     assert all(isinstance(task, models.Task) for task in execution_tasks)
     execution_tasks = iter(execution_tasks)
 
-    assert next(execution_tasks).stub_type == models.Task.START_WORKFLOW
+    assert next(execution_tasks)._stub_type == models.Task.START_WORKFLOW
     _assert_execution_is_api_task(next(execution_tasks), simple_before_task)
-    assert next(execution_tasks).stub_type == models.Task.START_SUBWROFKLOW
+    assert next(execution_tasks)._stub_type == models.Task.START_SUBWROFKLOW
     _assert_execution_is_api_task(next(execution_tasks), inner_task)
-    assert next(execution_tasks).stub_type == models.Task.END_SUBWORKFLOW
+    assert next(execution_tasks)._stub_type == models.Task.END_SUBWORKFLOW
     _assert_execution_is_api_task(next(execution_tasks), simple_after_task)
-    assert next(execution_tasks).stub_type == models.Task.END_WORKFLOW
+    assert next(execution_tasks)._stub_type == models.Task.END_WORKFLOW
 
     storage.release_sqlite_storage(workflow_context.model)
 
 
 def _assert_execution_is_api_task(execution_task, api_task):
-    assert execution_task.api_id == api_task.id
+    assert execution_task._api_id == api_task.id
     assert execution_task.name == api_task.name
     assert execution_task.function == api_task.function
     assert execution_task.actor == api_task.actor

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/f84ad06b/tests/orchestrator/workflows/executor/__init__.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/__init__.py b/tests/orchestrator/workflows/executor/__init__.py
index 97b24f3..83584a6 100644
--- a/tests/orchestrator/workflows/executor/__init__.py
+++ b/tests/orchestrator/workflows/executor/__init__.py
@@ -53,7 +53,7 @@ class MockContext(object):
 
     @property
     @contextmanager
-    def track_changes(self):
+    def persist_changes(self):
         yield
 
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/f84ad06b/tests/orchestrator/workflows/executor/test_process_executor_extension.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_process_executor_extension.py b/tests/orchestrator/workflows/executor/test_process_executor_extension.py
index 8ede925..8cd8123 100644
--- a/tests/orchestrator/workflows/executor/test_process_executor_extension.py
+++ b/tests/orchestrator/workflows/executor/test_process_executor_extension.py
@@ -17,7 +17,7 @@ import pytest
 
 from aria import extension
 from aria.orchestrator.workflows import api
-from aria.orchestrator.workflows.core import engine
+from aria.orchestrator.workflows.core import engine, task
 from aria.orchestrator.workflows.executor import process
 from aria.orchestrator import workflow, operation
 
@@ -57,9 +57,8 @@ def test_decorate_extension(context, executor):
         graph.add_tasks(task)
         return graph
     graph = mock_workflow(ctx=context)  # pylint: disable=no-value-for-parameter
-    engine.construct_execution_tasks(context.execution, graph, executor.__class__)
-    context.execution = context.execution
-    eng = engine.Engine(executor)
+    task.create_execution_tasks(context, graph, executor.__class__)
+    eng = engine.Engine({executor.__class__: executor})
     eng.execute(context)
     out = get_node(context).attributes.get('out').value
     assert out['wrapper_arguments'] == arguments

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/f84ad06b/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py b/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
index 9eb8916..f0451d1 100644
--- a/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
+++ b/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
@@ -18,7 +18,7 @@ import copy
 import pytest
 
 from aria.orchestrator.workflows import api
-from aria.orchestrator.workflows.core import engine
+from aria.orchestrator.workflows.core import engine, task
 from aria.orchestrator.workflows.executor import process
 from aria.orchestrator import workflow, operation
 from aria.orchestrator.workflows import exceptions
@@ -107,9 +107,8 @@ def _run_workflow(context, executor, op_func, arguments=None):
         graph.add_tasks(task)
         return graph
     graph = mock_workflow(ctx=context)  # pylint: disable=no-value-for-parameter
-    engine.construct_execution_tasks(context.execution, graph, executor.__class__)
-    context.execution = context.execution
-    eng = engine.Engine(executor)
+    task.create_execution_tasks(context, graph, executor.__class__)
+    eng = engine.Engine({executor.__class__: executor})
     eng.execute(context)
     out = context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME).attributes.get('out')
     return out.value if out else None