You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ariatosca.apache.org by mx...@apache.org on 2017/06/14 14:17:21 UTC

incubator-ariatosca git commit: tasks no longer execute their executor

Repository: incubator-ariatosca
Updated Branches:
  refs/heads/ARIA-278-Remove-core-tasks 092b45f0d -> 58e212c7d


tasks no longer execute their executor


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/58e212c7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/58e212c7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/58e212c7

Branch: refs/heads/ARIA-278-Remove-core-tasks
Commit: 58e212c7dbdea7aa9b532bba9fea4a5359a49019
Parents: 092b45f
Author: max-orlov <ma...@gigaspaces.com>
Authored: Wed Jun 14 17:17:16 2017 +0300
Committer: max-orlov <ma...@gigaspaces.com>
Committed: Wed Jun 14 17:17:16 2017 +0300

----------------------------------------------------------------------
 aria/modeling/orchestration.py                  |  36 ++----
 aria/orchestrator/context/operation.py          |   3 +
 aria/orchestrator/workflow_runner.py            | 123 +++++++++++++++++--
 aria/orchestrator/workflows/core/__init__.py    |   2 +-
 aria/orchestrator/workflows/core/engine.py      |  60 +++++----
 .../workflows/core/events_handler.py            |   2 +-
 aria/orchestrator/workflows/core/translation.py | 112 -----------------
 aria/orchestrator/workflows/executor/base.py    |  13 +-
 aria/orchestrator/workflows/executor/thread.py  |   2 -
 tests/orchestrator/context/__init__.py          |  11 +-
 tests/orchestrator/context/test_operation.py    |  20 ++-
 tests/orchestrator/context/test_serialize.py    |  15 ++-
 tests/orchestrator/context/test_toolbelt.py     |   7 +-
 .../orchestrator/execution_plugin/test_local.py |  15 ++-
 tests/orchestrator/test_workflow_runner.py      |   2 +-
 .../orchestrator/workflows/core/test_engine.py  |  21 ++--
 .../orchestrator/workflows/core/test_events.py  |  12 +-
 .../test_task_graph_into_execution_graph.py     |  48 +++-----
 .../workflows/executor/test_executor.py         |  39 +++---
 19 files changed, 271 insertions(+), 272 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/58e212c7/aria/modeling/orchestration.py
----------------------------------------------------------------------
diff --git a/aria/modeling/orchestration.py b/aria/modeling/orchestration.py
index 4a5771a..c0b7f04 100644
--- a/aria/modeling/orchestration.py
+++ b/aria/modeling/orchestration.py
@@ -24,6 +24,7 @@ classes:
 from contextlib import contextmanager
 from datetime import datetime
 
+from networkx import DiGraph
 from sqlalchemy import (
     Column,
     Integer,
@@ -309,7 +310,6 @@ class TaskBase(mixins.ModelMixin):
     api_id = Column(String)
 
     _executor = Column(PickleType)
-    _executor_kwargs = Column(modeling_types.Dict)
     _context_cls = Column(PickleType)
 
     @declared_attr
@@ -400,14 +400,14 @@ class TaskBase(mixins.ModelMixin):
     def retry(message=None, retry_interval=None):
         raise TaskRetryException(message, retry_interval=retry_interval)
 
-    # @declared_attr
-    # def dependency_fk(cls):
-    #     """For Type one-to-many to Type"""
-    #     return relationship.foreign_key('task', nullable=True)
+    @declared_attr
+    def dependency_fk(self):
+        return relationship.foreign_key('task', nullable=True)
 
     @declared_attr
-    def dependent_tasks(cls):
-        return relationship.many_to_many(cls, 'task', 'dependent', other_property='dependencies')
+    def dependencies(cls):
+        # symmetric relationship causes funky graphs
+        return relationship.one_to_many_self(cls, 'dependency_fk')
 
     def has_ended(self):
         if self.stub_type is not None:
@@ -422,7 +422,7 @@ class TaskBase(mixins.ModelMixin):
             return self.status in (self.PENDING, self.RETRYING)
 
     @classmethod
-    def from_api_task(cls, api_task, executor, executor_kwargs=None, **kwargs):
+    def from_api_task(cls, api_task, executor, **kwargs):
         from aria.orchestrator import context
         instantiation_kwargs = {}
 
@@ -454,32 +454,12 @@ class TaskBase(mixins.ModelMixin):
                 'api_id': api_task.id,
                 '_context_cls': context_cls,
                 '_executor': executor,
-                '_executor_kwargs': executor_kwargs or {}
         })
 
         instantiation_kwargs.update(**kwargs)
 
         return cls(**instantiation_kwargs)
 
-    def execute(self, ctx, executor_kwargs=None):
-        from aria.orchestrator.context import operation
-        context_cls = self._context_cls or operation.BaseOperationContext
-        op_ctx = context_cls(
-            model_storage=ctx.model,
-            resource_storage=ctx.resource,
-            workdir=ctx._workdir,
-            task_id=self.id,
-            actor_id=self.actor.id if self.actor else None,
-            service_id=self.execution.service.id,
-            execution_id=self.execution.id,
-            name=self.name
-        )
-        executor = self._executor(**dict(self._executor_kwargs or {}, **(executor_kwargs or {})))
-        try:
-            return executor.execute(op_ctx)
-        except BaseException:
-            executor.close()
-
 
 class LogBase(mixins.ModelMixin):
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/58e212c7/aria/orchestrator/context/operation.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/operation.py b/aria/orchestrator/context/operation.py
index af7220d..496739f 100644
--- a/aria/orchestrator/context/operation.py
+++ b/aria/orchestrator/context/operation.py
@@ -65,6 +65,9 @@ class BaseOperationContext(common.BaseContext):
             self._thread_local.task = self.model.task.get(self._task_id)
         return self._thread_local.task
 
+    def update_task(self):
+        self.model.task.update(self.task)
+
     @property
     def plugin_workdir(self):
         """

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/58e212c7/aria/orchestrator/workflow_runner.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflow_runner.py b/aria/orchestrator/workflow_runner.py
index 961796a..f09cb79 100644
--- a/aria/orchestrator/workflow_runner.py
+++ b/aria/orchestrator/workflow_runner.py
@@ -21,11 +21,15 @@ import os
 import sys
 from datetime import datetime
 
+from networkx import DiGraph
+
 from . import exceptions
 from .context.workflow import WorkflowContext
 from .workflows import builtin
 from .workflows.core.engine import Engine
 from .workflows.executor.process import ProcessExecutor
+from .workflows.executor.base import StubTaskExecutor
+from .workflows.api import task
 from ..modeling import models
 from ..modeling import utils as modeling_utils
 from ..utils.imports import import_fullname
@@ -39,7 +43,7 @@ class WorkflowRunner(object):
 
     def __init__(self, workflow_name, service_id, inputs,
                  model_storage, resource_storage, plugin_manager,
-                 executor=None, executor_kwargs=None, task_max_attempts=DEFAULT_TASK_MAX_ATTEMPTS,
+                 executor=None, task_max_attempts=DEFAULT_TASK_MAX_ATTEMPTS,
                  task_retry_interval=DEFAULT_TASK_RETRY_INTERVAL):
         """
         Manages a single workflow execution on a given service.
@@ -80,20 +84,21 @@ class WorkflowRunner(object):
             task_max_attempts=task_max_attempts,
             task_retry_interval=task_retry_interval)
 
+        # Set default executor and kwargs
+        executor = executor or ProcessExecutor(plugin_manager=plugin_manager)
+
         # transforming the execution inputs to dict, to pass them to the workflow function
         execution_inputs_dict = dict(inp.unwrapped for inp in self.execution.inputs.values())
+
         self._tasks_graph = workflow_fn(ctx=workflow_context, **execution_inputs_dict)
+        construct_execution_tasks(self.execution, self._tasks_graph, executor.__class__)
 
-        # Set default executor and kwargs
-        if executor is None:
-            executor = ProcessExecutor
-            executor_kwargs = dict(plugin_manager=plugin_manager)
+        # Update the state
+        self._model_storage.execution.update(execution)
 
-        self._engine = Engine(
-            executor=executor,
-            executor_kwargs=executor_kwargs,
-            workflow_context=workflow_context,
-            tasks_graph=self._tasks_graph)
+        self._engine = Engine(executor=executor,
+                              workflow_context=workflow_context,
+                              execution_graph=get_execution_graph(self.execution))
 
     @property
     def execution_id(self):
@@ -171,3 +176,101 @@ class WorkflowRunner(object):
                     self._workflow_name, workflow.function))
 
         return workflow_fn
+
+
+def get_execution_graph(execution):
+    graph = DiGraph()
+    for task in execution.tasks:
+        for dependency in task.dependencies:
+            graph.add_edge(dependency, task)
+
+    return graph
+
+
+def construct_execution_tasks(execution,
+                              task_graph,
+                              default_executor,
+                              stub_executor=StubTaskExecutor,
+                              start_stub_type=models.Task.START_WORKFLOW,
+                              end_stub_type=models.Task.END_WORKFLOW,
+                              depends_on=()):
+    """
+    Translates the user graph to the execution graph
+    :param task_graph: The user's graph
+    :param start_stub_type: internal use
+    :param end_stub_type: internal use
+    :param depends_on: internal use
+    """
+    depends_on = list(depends_on)
+
+    # Insert start marker
+    start_task = models.Task(api_id=_start_graph_suffix(task_graph.id),
+                             _executor=stub_executor,
+                             execution=execution,
+                             stub_type=start_stub_type,
+                             dependencies=depends_on)
+
+    for api_task in task_graph.topological_order(reverse=True):
+        operation_dependencies = _get_tasks_from_dependencies(execution,
+            task_graph.get_dependencies(api_task), [start_task])
+
+        if isinstance(api_task, task.OperationTask):
+            models.Task.from_api_task(api_task=api_task,
+                                      executor=default_executor,
+                                      dependencies=operation_dependencies)
+
+        elif isinstance(api_task, task.WorkflowTask):
+            # Build the graph recursively while adding start and end markers
+            construct_execution_tasks(
+                execution=execution,
+                task_graph=api_task,
+                default_executor=default_executor,
+                stub_executor=stub_executor,
+                start_stub_type=models.Task.START_SUBWROFKLOW,
+                end_stub_type=models.Task.END_SUBWORKFLOW,
+                depends_on=operation_dependencies
+            )
+        elif isinstance(api_task, task.StubTask):
+            models.Task(api_id=api_task.id,
+                        _executor=stub_executor,
+                        execution=execution,
+                        stub_type=models.Task.STUB,
+                        dependencies=operation_dependencies)
+        else:
+            raise
+
+    # Insert end marker
+    models.Task(api_id=_end_graph_suffix(task_graph.id),
+                _executor=stub_executor,
+                execution=execution,
+                stub_type=end_stub_type,
+                dependencies=_get_non_dependent_tasks(execution) or [start_task])
+
+
+def _start_graph_suffix(api_id):
+    return '{0}-Start'.format(api_id)
+
+
+def _end_graph_suffix(api_id):
+    return '{0}-End'.format(api_id)
+
+
+def _get_non_dependent_tasks(execution):
+    dependency_tasks = set()
+    for task in execution.tasks:
+        dependency_tasks.update(task.dependencies)
+    return list(set(execution.tasks) - set(dependency_tasks))
+
+
+def _get_tasks_from_dependencies(execution, dependencies, default=()):
+        """
+        Returns task list from dependencies.
+        """
+        tasks = []
+        for dependency in dependencies:
+            if getattr(dependency, 'actor', False):
+                dependency_name = dependency.id
+            else:
+                dependency_name = _end_graph_suffix(dependency.id)
+            tasks.extend(task for task in execution.tasks if task.api_id == dependency_name)
+        return tasks or default

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/58e212c7/aria/orchestrator/workflows/core/__init__.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/__init__.py b/aria/orchestrator/workflows/core/__init__.py
index 98938be..81db43f 100644
--- a/aria/orchestrator/workflows/core/__init__.py
+++ b/aria/orchestrator/workflows/core/__init__.py
@@ -17,4 +17,4 @@
 Core for the workflow execution mechanism
 """
 
-from . import translation, engine
+from . import engine

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/58e212c7/aria/orchestrator/workflows/core/engine.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/engine.py b/aria/orchestrator/workflows/core/engine.py
index 67e8c1d..e1b6412 100644
--- a/aria/orchestrator/workflows/core/engine.py
+++ b/aria/orchestrator/workflows/core/engine.py
@@ -20,14 +20,12 @@ The workflow engine. Executes workflows
 import time
 from datetime import datetime
 
-import networkx
-
 from aria import logger
 from aria.modeling import models
 from aria.orchestrator import events
+from aria.orchestrator.context import operation
 
 from .. import exceptions
-from . import translation
 # Import required so all signals are registered
 from . import events_handler  # pylint: disable=unused-import
 
@@ -37,18 +35,11 @@ class Engine(logger.LoggerMixin):
     The workflow engine. Executes workflows
     """
 
-    def __init__(self, executor, workflow_context, tasks_graph, executor_kwargs=None, **kwargs):
+    def __init__(self, executor, workflow_context, execution_graph, **kwargs):
         super(Engine, self).__init__(**kwargs)
         self._workflow_context = workflow_context
-        self._execution_graph = networkx.DiGraph()
-        self._executor_kwargs = executor_kwargs
-        translation.store_tasks(task_graph=tasks_graph,
-                                execution_graph=self._execution_graph,
-                                default_executor=executor,
-                                execution=workflow_context.execution)
-
-        # Flush changes
-        workflow_context.model.execution.update(workflow_context.execution)
+        self._executors = {executor.__class__: executor}
+        self._execution_graph = execution_graph
 
     def execute(self):
         """
@@ -83,38 +74,57 @@ class Engine(logger.LoggerMixin):
         will be modified to 'cancelled' directly.
         """
         events.on_cancelling_workflow_signal.send(self._workflow_context)
+        self._workflow_context.execution = self._workflow_context.execution
 
     def _is_cancel(self):
-        return self._workflow_context.execution.status in (models.Execution.CANCELLING,
-                                                           models.Execution.CANCELLED)
+        execution = self._workflow_context.model.execution.update(self._workflow_context.execution)
+        return execution.status in (models.Execution.CANCELLING, models.Execution.CANCELLED)
 
     def _executable_tasks(self):
         now = datetime.utcnow()
-        return (task for task in self._tasks_iter()
-                if task.is_waiting() and
-                task.due_at <= now and
-                not self._task_has_dependencies(task))
+        return (
+            task for task in self._tasks_iter()
+            if task.is_waiting() and task.due_at <= now and not self._task_has_dependencies(task)
+        )
 
     def _ended_tasks(self):
-        return (task for task in self._tasks_iter() if task.has_ended())
+        return (task for task in self._tasks_iter()
+                if task.has_ended() and task in self._execution_graph)
 
     def _task_has_dependencies(self, task):
-        return len(self._execution_graph.pred.get(task.api_id, {})) > 0
+        return task.dependencies and all(d in self._execution_graph for d in task.dependencies)
 
     def _all_tasks_consumed(self):
         return len(self._execution_graph.node) == 0
 
     def _tasks_iter(self):
-        for _, data in self._execution_graph.nodes_iter(data=True):
-            yield self._workflow_context.model.task.get(data['task'].id)
+        for task in self._workflow_context.execution.tasks:
+            yield self._workflow_context.model.task.refresh(task)
 
     def _handle_executable_task(self, task):
         if not task.stub_type:
             events.sent_task_signal.send(task)
-        task.execute(self._workflow_context, self._executor_kwargs)
+
+        if task._executor not in self._executors:
+            self._executors[task._executor] = task._executor()
+        executor = self._executors[task._executor]
+
+        context_cls = task._context_cls or operation.BaseOperationContext
+        op_ctx = context_cls(
+            model_storage=self._workflow_context.model,
+            resource_storage=self._workflow_context.resource,
+            workdir=self._workflow_context._workdir,
+            task_id=task.id,
+            actor_id=task.actor.id if task.actor else None,
+            service_id=task.execution.service.id,
+            execution_id=task.execution.id,
+            name=task.name
+        )
+
+        executor.execute(op_ctx)
 
     def _handle_ended_tasks(self, task):
         if task.status == models.Task.FAILED and not task.ignore_failure:
             raise exceptions.ExecutorException('Workflow failed')
         else:
-            self._execution_graph.remove_node(task.api_id)
+            self._execution_graph.remove_node(task)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/58e212c7/aria/orchestrator/workflows/core/events_handler.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/events_handler.py b/aria/orchestrator/workflows/core/events_handler.py
index 5d979c4..8b217f5 100644
--- a/aria/orchestrator/workflows/core/events_handler.py
+++ b/aria/orchestrator/workflows/core/events_handler.py
@@ -47,7 +47,7 @@ def _task_failed(ctx, exception, *args, **kwargs):
         not isinstance(exception, exceptions.TaskAbortException),
         ctx.task.attempts_count < ctx.task.max_attempts or
         ctx.task.max_attempts == ctx.task.INFINITE_RETRIES,
-        # ignore_failure check here means the task will not be retries and it will be marked
+        # ignore_failure check here means the task will not be retried and it will be marked
         # as failed. The engine will also look at ignore_failure so it won't fail the
         # workflow.
         not ctx.task.ignore_failure

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/58e212c7/aria/orchestrator/workflows/core/translation.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/translation.py b/aria/orchestrator/workflows/core/translation.py
deleted file mode 100644
index 8da9bd7..0000000
--- a/aria/orchestrator/workflows/core/translation.py
+++ /dev/null
@@ -1,112 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""
-Translation of user graph's API to the execution graph
-"""
-
-from ....modeling import models
-from .. import api
-from ..executor import base
-
-
-def store_tasks(ctx, task_graph, default_executor, execution,
-                start_stub_type=models.Task.START_WORKFLOW,
-                end_stub_type=models.Task.END_WORKFLOW,
-                depends_on=()):
-    """
-    Translates the user graph to the execution graph
-    :param task_graph: The user's graph
-    :param workflow_context: The workflow
-    :param execution_graph: The execution graph that is being built
-    :param start_stub_type: internal use
-    :param end_stub_type: internal use
-    :param depends_on: internal use
-    """
-    depends_on = list(depends_on)
-
-    # Insert start marker
-    start_task = models.Task(api_id=_start_graph_suffix(task_graph.id),
-                             _executor=base.StubTaskExecutor,
-                             execution=execution,
-                             stub_type=start_stub_type,
-                             dependencies=depends_on)
-
-    for api_task in task_graph.topological_order(reverse=True):
-        dependencies = task_graph.get_dependencies(api_task)
-        operation_dependencies = _get_tasks_from_dependencies(ctx, dependencies, [start_task])
-
-        if isinstance(api_task, api.task.OperationTask):
-             models.Task.from_api_task(
-                 api_task=api_task, executor=default_executor, dependencies=operation_dependencies)
-
-        elif isinstance(api_task, api.task.WorkflowTask):
-            # Build the graph recursively while adding start and end markers
-            store_tasks(
-                ctx=ctx,
-                task_graph=api_task,
-                default_executor=default_executor,
-                execution=execution,
-                start_stub_type=models.Task.START_SUBWROFKLOW,
-                end_stub_type=models.Task.END_SUBWORKFLOW,
-                depends_on=operation_dependencies
-            )
-        elif isinstance(api_task, api.task.StubTask):
-            models.Task(api_id=api_task.id,
-                        _executor=base.StubTaskExecutor,
-                        execution=execution,
-                        stub_type=models.Task.STUB,
-                        dependencies=operation_dependencies)
-        else:
-            raise RuntimeError('Undefined state')
-
-    # Insert end marker
-    workflow_dependencies = [task for task in ctx.model.task.list() if not task.dependent_tasks]
-    models.Task(api_id=_end_graph_suffix(task_graph.id),
-                _executor=base.StubTaskExecutor,
-                execution=execution,
-                stub_type=end_stub_type,
-                dependencies=workflow_dependencies)
-
-    ctx.model.execution.update(execution)
-
-def _start_graph_suffix(api_id):
-    return '{0}-Start'.format(api_id)
-
-
-def _end_graph_suffix(api_id):
-    return '{0}-End'.format(api_id)
-
-
-def construct_graph(graph, execution):
-    for task in execution.tasks:
-        for dependency in task.dependencies:
-            graph.add_edge(dependency, task)
-
-    return graph
-
-
-def _get_tasks_from_dependencies(ctx, dependencies, default=()):
-    """
-    Returns task list from dependencies.
-    """
-    tasks = []
-    for dependency in dependencies:
-        if isinstance(dependency, (api.task.OperationTask, api.task.StubTask)):
-            dependency_name = dependency.id
-        else:
-            dependency_name = _end_graph_suffix(dependency.id)
-        tasks.extend(list(ctx.model.task.list(filters={'name': dependency_name})))
-    return tasks or default

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/58e212c7/aria/orchestrator/workflows/executor/base.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/base.py b/aria/orchestrator/workflows/executor/base.py
index 54a9438..fc4b800 100644
--- a/aria/orchestrator/workflows/executor/base.py
+++ b/aria/orchestrator/workflows/executor/base.py
@@ -33,9 +33,9 @@ class BaseExecutor(logger.LoggerMixin):
         Execute a task
         :param task: task to execute
         """
+        ctx.update_task()
         if ctx.task.function:
             self._execute(ctx)
-            ctx.model.task.update(ctx.task)
         else:
             # In this case the task is missing a function. This task still gets to an
             # executor, but since there is nothing to run, we by default simply skip the execution
@@ -52,22 +52,17 @@ class BaseExecutor(logger.LoggerMixin):
     @staticmethod
     def _task_started(ctx):
         events.start_task_signal.send(ctx)
-        ctx.model.task.update(ctx.task)
+        ctx.update_task()
 
     def _task_failed(self, ctx, exception, traceback=None):
         events.on_failure_task_signal.send(ctx, exception=exception, traceback=traceback)
-        ctx.model.task.update(ctx.task)
+        ctx.update_task()
 
     def _task_succeeded(self, ctx):
         events.on_success_task_signal.send(ctx)
-        ctx.model.task.update(ctx.task)
+        ctx.update_task()
 
 
 class StubTaskExecutor(BaseExecutor):                                                               # pylint: disable=abstract-method
-
-    def __init__(self, *args, **kwargs):
-        # TODO: the executor kwargs delivery system is bad, so we need to aps the kwargs each time (and they are not persisted - this is bad!)
-        super(StubTaskExecutor, self).__init__()
-
     def execute(self, ctx, *args, **kwargs):
         ctx.task.status = ctx.task.SUCCESS

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/58e212c7/aria/orchestrator/workflows/executor/thread.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/thread.py b/aria/orchestrator/workflows/executor/thread.py
index 074b54b..8c447b6 100644
--- a/aria/orchestrator/workflows/executor/thread.py
+++ b/aria/orchestrator/workflows/executor/thread.py
@@ -68,8 +68,6 @@ class ThreadExecutor(BaseExecutor):
                     self._task_failed(ctx,
                                       exception=e,
                                       traceback=exceptions.get_exception_as_string(*sys.exc_info()))
-                finally:
-                    break
             # Daemon threads
             except BaseException as e:
                 pass

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/58e212c7/tests/orchestrator/context/__init__.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/context/__init__.py b/tests/orchestrator/context/__init__.py
index 89a7b2c..cb282a3 100644
--- a/tests/orchestrator/context/__init__.py
+++ b/tests/orchestrator/context/__init__.py
@@ -16,6 +16,7 @@
 import sys
 
 from aria.orchestrator.workflows.core import engine
+from aria.orchestrator import workflow_runner
 
 
 def op_path(func, module_path=None):
@@ -23,8 +24,12 @@ def op_path(func, module_path=None):
     return '{0}.{1}'.format(module_path, func.__name__)
 
 
-def execute(workflow_func, workflow_context, executor, executor_kwargs=None):
+def execute(workflow_func, workflow_context, executor):
     graph = workflow_func(ctx=workflow_context)
-    eng = engine.Engine(executor=executor, executor_kwargs=executor_kwargs,
-                        workflow_context=workflow_context, tasks_graph=graph)
+
+    workflow_runner.construct_execution_tasks(workflow_context.execution, graph, executor.__class__)
+    workflow_context.execution = workflow_context.execution
+    execution_graph = workflow_runner.get_execution_graph(workflow_context.execution)
+    eng = engine.Engine(executor, workflow_context, execution_graph)
+
     eng.execute()

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/58e212c7/tests/orchestrator/context/test_operation.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/context/test_operation.py b/tests/orchestrator/context/test_operation.py
index e8f1a63..f654fe5 100644
--- a/tests/orchestrator/context/test_operation.py
+++ b/tests/orchestrator/context/test_operation.py
@@ -51,7 +51,11 @@ def ctx(tmpdir):
 
 @pytest.fixture
 def thread_executor():
-    return thread.ThreadExecutor
+    result = thread.ThreadExecutor()
+    try:
+        yield result
+    finally:
+        result.close()
 
 
 @pytest.fixture
@@ -253,7 +257,12 @@ def test_plugin_workdir(ctx, thread_executor, tmpdir):
     (process.ProcessExecutor, {'python_path': [tests.ROOT_DIR]}),
 ])
 def executor(request):
-    return request.param
+    ex_cls, kwargs = request.param
+    ex = ex_cls(**kwargs)
+    try:
+        yield ex
+    finally:
+        ex.close()
 
 
 def test_node_operation_logging(ctx, executor):
@@ -286,8 +295,7 @@ def test_node_operation_logging(ctx, executor):
                 arguments=arguments
             )
         )
-    execute(workflow_func=basic_workflow, workflow_context=ctx, executor=executor[0],
-            executor_kwargs=executor[1])
+    execute(workflow_func=basic_workflow, workflow_context=ctx, executor=executor)
     _assert_loggins(ctx, arguments)
 
 
@@ -320,7 +328,7 @@ def test_relationship_operation_logging(ctx, executor):
             )
         )
 
-    execute(workflow_func=basic_workflow, workflow_context=ctx, executor=executor[0], executor_kwargs=executor[1])
+    execute(workflow_func=basic_workflow, workflow_context=ctx, executor=executor)
     _assert_loggins(ctx, arguments)
 
 
@@ -377,7 +385,7 @@ def test_attribute_consumption(ctx, executor, dataholder):
             )
         )
 
-    execute(workflow_func=basic_workflow, workflow_context=ctx, executor=executor[0], executor_kwargs=executor[1])
+    execute(workflow_func=basic_workflow, workflow_context=ctx, executor=executor)
     target_node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
 
     assert len(source_node.attributes) == len(target_node.attributes) == 2

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/58e212c7/tests/orchestrator/context/test_serialize.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/context/test_serialize.py b/tests/orchestrator/context/test_serialize.py
index f7e441e..4975d20 100644
--- a/tests/orchestrator/context/test_serialize.py
+++ b/tests/orchestrator/context/test_serialize.py
@@ -18,7 +18,7 @@ import pytest
 from aria.orchestrator.workflows import api
 from aria.orchestrator.workflows.core import engine
 from aria.orchestrator.workflows.executor import process
-from aria.orchestrator import workflow, operation
+from aria.orchestrator import workflow, operation, workflow_runner
 import tests
 from tests import mock
 from tests import storage
@@ -28,14 +28,16 @@ TEST_FILE_ENTRY_ID = 'entry'
 TEST_FILE_NAME = 'test_file'
 
 
-def test_serialize_operation_context(context, executor, executor_kwargs, tmpdir):
+def test_serialize_operation_context(context, executor, tmpdir):
     test_file = tmpdir.join(TEST_FILE_NAME)
     test_file.write(TEST_FILE_CONTENT)
     resource = context.resource
     resource.service_template.upload(TEST_FILE_ENTRY_ID, str(test_file))
     graph = _mock_workflow(ctx=context)  # pylint: disable=no-value-for-parameter
-    eng = engine.Engine(executor=executor, workflow_context=context, tasks_graph=graph,
-                        executor_kwargs=executor_kwargs)
+    workflow_runner.construct_execution_tasks(context.execution, graph, executor.__class__)
+    context.execution = context.execution
+    execution_graph = workflow_runner.get_execution_graph(context.execution)
+    eng = engine.Engine(executor, context, execution_graph)
     eng.execute()
 
 
@@ -83,8 +85,9 @@ def _operation_mapping():
 
 @pytest.fixture
 def executor():
-    return process.ProcessExecutor
-
+    result = process.ProcessExecutor(python_path=[tests.ROOT_DIR])
+    yield result
+    result.close()
 
 @pytest.fixture
 def executor_kwargs():

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/58e212c7/tests/orchestrator/context/test_toolbelt.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/context/test_toolbelt.py b/tests/orchestrator/context/test_toolbelt.py
index df54d9d..4de9e55 100644
--- a/tests/orchestrator/context/test_toolbelt.py
+++ b/tests/orchestrator/context/test_toolbelt.py
@@ -41,7 +41,12 @@ def workflow_context(tmpdir):
 
 @pytest.fixture
 def executor():
-    return thread.ThreadExecutor
+    result = thread.ThreadExecutor()
+    try:
+        yield result
+    finally:
+        result.close()
+
 
 @pytest.fixture
 def dataholder(tmpdir):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/58e212c7/tests/orchestrator/execution_plugin/test_local.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/execution_plugin/test_local.py b/tests/orchestrator/execution_plugin/test_local.py
index 8e8c6e0..8414240 100644
--- a/tests/orchestrator/execution_plugin/test_local.py
+++ b/tests/orchestrator/execution_plugin/test_local.py
@@ -19,7 +19,7 @@ import os
 import pytest
 
 from aria import workflow
-from aria.orchestrator import events
+from aria.orchestrator import events, workflow_runner
 from aria.orchestrator.workflows import api
 from aria.orchestrator.workflows.exceptions import ExecutorException
 from aria.orchestrator.exceptions import TaskAbortException, TaskRetryException
@@ -498,17 +498,20 @@ if __name__ == '__main__':
                 arguments=arguments))
             return graph
         tasks_graph = mock_workflow(ctx=workflow_context)  # pylint: disable=no-value-for-parameter
-        eng = engine.Engine(
-            executor=executor,
-            workflow_context=workflow_context,
-            tasks_graph=tasks_graph)
+        workflow_runner.construct_execution_tasks(
+            workflow_context.execution, tasks_graph, executor.__class__)
+        workflow_context.execution = workflow_context.execution
+        execution_graph = workflow_runner.get_execution_graph(workflow_context.execution)
+        eng = engine.Engine(executor, workflow_context, execution_graph)
         eng.execute()
         return workflow_context.model.node.get_by_name(
             mock.models.DEPENDENCY_NODE_NAME).attributes
 
     @pytest.fixture
     def executor(self):
-        return process.ProcessExecutor
+        result = process.ProcessExecutor()
+        yield result
+        result.close()
 
     @pytest.fixture
     def workflow_context(self, tmpdir):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/58e212c7/tests/orchestrator/test_workflow_runner.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/test_workflow_runner.py b/tests/orchestrator/test_workflow_runner.py
index baf45d7..c5a62ae 100644
--- a/tests/orchestrator/test_workflow_runner.py
+++ b/tests/orchestrator/test_workflow_runner.py
@@ -99,7 +99,7 @@ def test_default_executor(request):
     with mock.patch('aria.orchestrator.workflow_runner.Engine') as mock_engine_cls:
         _create_workflow_runner(request, mock_workflow)
         _, engine_kwargs = mock_engine_cls.call_args
-        assert engine_kwargs.get('executor') == ProcessExecutor
+        assert isinstance(engine_kwargs.get('executor'), ProcessExecutor)
 
 
 def test_custom_executor(request):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/58e212c7/tests/orchestrator/workflows/core/test_engine.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/core/test_engine.py b/tests/orchestrator/workflows/core/test_engine.py
index 7ffb92a..108360f 100644
--- a/tests/orchestrator/workflows/core/test_engine.py
+++ b/tests/orchestrator/workflows/core/test_engine.py
@@ -24,6 +24,7 @@ from aria.orchestrator import (
     operation,
 )
 from aria.modeling import models
+from aria.orchestrator import workflow_runner
 from aria.orchestrator.workflows import (
     api,
     exceptions,
@@ -40,21 +41,23 @@ global_test_holder = {}
 class BaseTest(object):
 
     @classmethod
-    def _execute(cls, workflow_func, workflow_context, executor, executor_kwargs=None):
+    def _execute(cls, workflow_func, workflow_context, executor):
         eng = cls._engine(workflow_func=workflow_func,
                           workflow_context=workflow_context,
-                          executor=executor,
-                          executor_kwargs=executor_kwargs)
+                          executor=executor)
         eng.execute()
         return eng
 
     @staticmethod
-    def _engine(workflow_func, workflow_context, executor, executor_kwargs=None):
+    def _engine(workflow_func, workflow_context, executor):
         graph = workflow_func(ctx=workflow_context)
+        execution = workflow_context.execution
+        workflow_runner.construct_execution_tasks(execution, graph, executor.__class__)
+        workflow_context.execution = execution
+
         return engine.Engine(executor=executor,
-                             executor_kwargs=executor_kwargs,
                              workflow_context=workflow_context,
-                             tasks_graph=graph)
+                             execution_graph=workflow_runner.get_execution_graph(execution))
 
     @staticmethod
     def _op(ctx,
@@ -128,7 +131,11 @@ class BaseTest(object):
 
     @pytest.fixture
     def executor(self):
-        return thread.ThreadExecutor
+        result = thread.ThreadExecutor()
+        try:
+            yield result
+        finally:
+            result.close()
 
     @pytest.fixture
     def workflow_context(self, tmpdir):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/58e212c7/tests/orchestrator/workflows/core/test_events.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/core/test_events.py b/tests/orchestrator/workflows/core/test_events.py
index d63a8ef..92582a9 100644
--- a/tests/orchestrator/workflows/core/test_events.py
+++ b/tests/orchestrator/workflows/core/test_events.py
@@ -15,6 +15,7 @@
 
 import pytest
 
+from aria.orchestrator import workflow_runner
 from tests import mock, storage
 from aria.modeling.service_instance import NodeBase
 from aria.orchestrator.decorators import operation, workflow
@@ -112,13 +113,14 @@ def run_operation_on_node(ctx, op_name, interface_name):
         operation_name=op_name,
         operation_kwargs=dict(function='{name}.{func.__name__}'.format(name=__name__, func=func)))
     node.interfaces[interface.name] = interface
+    workflow_runner.construct_execution_tasks(
+        ctx.execution, single_operation_workflow(
+            ctx=ctx, node=node, interface_name=interface_name, op_name=op_name), ThreadExecutor)
+    ctx.execution = ctx.execution
 
-    eng = engine.Engine(executor=ThreadExecutor,
+    eng = engine.Engine(executor=ThreadExecutor(),
                         workflow_context=ctx,
-                        tasks_graph=single_operation_workflow(ctx=ctx,  # pylint: disable=no-value-for-parameter
-                                                              node=node,
-                                                              interface_name=interface_name,
-                                                              op_name=op_name))
+                        execution_graph=workflow_runner.get_execution_graph(ctx.execution))
     eng.execute()
     return node
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/58e212c7/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py b/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py
index 4abed37..aebae38 100644
--- a/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py
+++ b/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py
@@ -13,13 +13,15 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from networkx import topological_sort, DiGraph
+from networkx import topological_sort
 
 from aria.modeling import models
-from aria.orchestrator import context
-from aria.orchestrator.workflows import api, core
+from aria.orchestrator import (
+    context,
+    workflow_runner
+)
+from aria.orchestrator.workflows import api
 from aria.orchestrator.workflows.executor import base
-
 from tests import mock
 from tests import storage
 
@@ -66,12 +68,12 @@ def test_task_graph_into_execution_graph(tmpdir):
     test_task_graph.add_dependency(simple_after_task, inner_task_graph)
 
     # Direct check
-    core.translation.store_tasks(ctx=task_context,
-                                 task_graph=test_task_graph,
-                                 execution=task_context.model.execution.list()[0],
-                                 default_executor=base.StubTaskExecutor)
+    execution = task_context.model.execution.list()[0]
+
+    workflow_runner.construct_execution_tasks(execution, test_task_graph, base.StubTaskExecutor)
+    task_context.execution = execution
 
-    execution_graph = core.translation.construct_graph(DiGraph(), task_context.execution)
+    execution_graph = workflow_runner.get_execution_graph(execution)
     execution_tasks = topological_sort(execution_graph)
 
     assert len(execution_tasks) == 7
@@ -87,27 +89,17 @@ def test_task_graph_into_execution_graph(tmpdir):
     ]
 
     assert expected_tasks_names == [t.api_id for t in execution_tasks]
-    assert all(isinstance(_get_task_by_name(task_name, execution_graph), models.Task)
-               for task_name in execution_tasks)
-
-    first_task = _get_task_by_name(execution_tasks[0], execution_graph)
-    assert first_task.stub_type == models.Task.START_WORKFLOW
-
-    second_task = _get_task_by_name(execution_tasks[1], execution_graph)
-    _assert_execution_is_api_task(second_task, simple_before_task)
-
-    third_task = _get_task_by_name(execution_tasks[2], execution_graph)
-    assert third_task.stub_type == models.Task.START_SUBWROFKLOW
+    assert all(isinstance(task, models.Task) for task in execution_tasks)
+    execution_tasks = iter(execution_tasks)
 
-    fourth_task = _get_task_by_name(execution_tasks[3], execution_graph)
-    _assert_execution_is_api_task(fourth_task, inner_task)
-    fifth_task = _get_task_by_name(execution_tasks[4], execution_graph)
-    assert fifth_task.stub_type == models.Task.END_SUBWORKFLOW
+    assert next(execution_tasks).stub_type == models.Task.START_WORKFLOW
+    _assert_execution_is_api_task(next(execution_tasks), simple_before_task)
+    assert next(execution_tasks).stub_type == models.Task.START_SUBWROFKLOW
+    _assert_execution_is_api_task(next(execution_tasks), inner_task)
+    assert next(execution_tasks).stub_type == models.Task.END_SUBWORKFLOW
+    _assert_execution_is_api_task(next(execution_tasks), simple_after_task)
+    assert next(execution_tasks).stub_type == models.Task.END_WORKFLOW
 
-    sixth_task = _get_task_by_name(execution_tasks[5], execution_graph)
-    _assert_execution_is_api_task(sixth_task, simple_after_task)
-    seventh_task = _get_task_by_name(execution_tasks[6], execution_graph)
-    assert seventh_task.stub_type == models.Task.END_WORKFLOW
     storage.release_sqlite_storage(task_context.model)
 
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/58e212c7/tests/orchestrator/workflows/executor/test_executor.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_executor.py b/tests/orchestrator/workflows/executor/test_executor.py
index 3fa75ad..410a982 100644
--- a/tests/orchestrator/workflows/executor/test_executor.py
+++ b/tests/orchestrator/workflows/executor/test_executor.py
@@ -36,7 +36,6 @@ from aria.orchestrator.workflows.executor import (
 )
 
 import tests
-from . import MockContext
 
 
 def _get_function(func):
@@ -45,18 +44,24 @@ def _get_function(func):
 
 def execute_and_assert(executor, ctx):
     node = ctx.model.node.list()[0]
+    execution = ctx.model.execution.list()[0]
     expected_value = 'value'
-    successful_ctx = ctx.model.task.model_cls(function=_get_function(mock_successful_task),
-                                              node=node,
-                                              _executor=executor)
-    failing_ctx = ctx.model.task.model_cls(function=_get_function(mock_failing_task), node=node)
-    ctx_with_inputs = ctx.model.task.model_cls(
+    successful_ctx = models.Task(function=_get_function(mock_successful_task),
+                                 node=node, _executor=executor, execution=execution)
+    failing_ctx = models.Task(
+        function=_get_function(mock_failing_task), node=node, _executor=executor, execution=execution)
+    ctx_with_inputs = models.Task(
         node=node,
         function=_get_function(mock_task_with_input),
-        arguments={'input': models.Argument.wrap('input', 'value')})
+        arguments={'input': models.Argument.wrap('input', 'value')},
+        _executor=executor,
+        execution=execution)
 
-    for task in [successful_ctx, failing_ctx, ctx_with_inputs]:
-        task.execute(ctx)
+    ctx.model.execution.update(execution)
+
+    for op_ctx in [successful_ctx, failing_ctx, ctx_with_inputs]:
+        op_ctx.states = []
+        op_ctx.execute(ctx)
 
     @retrying.retry(stop_max_delay=10000, wait_fixed=100)
     def assertion():
@@ -101,22 +106,14 @@ class MockException(Exception):
 @pytest.fixture
 def ctx(tmpdir):
     context = mock.context.simple(str(tmpdir))
+    ctx.states = []
     yield context
     storage.release_sqlite_storage(context.model)
 
 
-@pytest.fixture(params=[
-    (thread.ThreadExecutor, {'pool_size': 1}),
-    # (thread.ThreadExecutor, {'pool_size': 2}),
-    # subprocess needs to load a tests module so we explicitly add the root directory as if
-    # the project has been installed in editable mode
-    # (celery.CeleryExecutor, {'app': app})
-])
-def thread_executor(request):
-    executor_cls, executor_kwargs = request.param
-    result = executor_cls(**executor_kwargs)
-    yield result
-    result.close()
+@pytest.fixture
+def thread_executor():
+    return thread.ThreadExecutor
 
 
 @pytest.fixture