You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ariatosca.apache.org by mx...@apache.org on 2017/11/20 14:08:27 UTC

incubator-ariatosca git commit: final touches

Repository: incubator-ariatosca
Updated Branches:
  refs/heads/ARIA-408-Remove-execution-creation-from-WorkflowRunner [created] 4cf88f3fa


final touches


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/4cf88f3f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/4cf88f3f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/4cf88f3f

Branch: refs/heads/ARIA-408-Remove-execution-creation-from-WorkflowRunner
Commit: 4cf88f3fa7281b36fdc0d2022a00c6bdf5699e81
Parents: 86c1c1a
Author: max-orlov <ma...@gigaspaces.com>
Authored: Mon Nov 20 16:08:17 2017 +0200
Committer: max-orlov <ma...@gigaspaces.com>
Committed: Mon Nov 20 16:08:17 2017 +0200

----------------------------------------------------------------------
 aria/cli/commands/executions.py                 |  74 +++++---
 aria/orchestrator/execution/__init__.py         |  17 --
 aria/orchestrator/execution/compiler.py         | 149 ---------------
 aria/orchestrator/execution/runner.py           |  50 -----
 aria/orchestrator/execution_compiler.py         | 161 ++++++++++++++++
 aria/orchestrator/workflows/core/engine.py      |   4 +-
 tests/orchestrator/context/__init__.py          |   2 +-
 tests/orchestrator/context/test_serialize.py    |   2 +-
 .../execution/test_execution_compiler.py        | 189 ++++++-------------
 .../orchestrator/execution_plugin/test_local.py |   2 +-
 tests/orchestrator/execution_plugin/test_ssh.py |   2 +-
 .../orchestrator/workflows/core/test_engine.py  |   2 +-
 .../orchestrator/workflows/core/test_events.py  |   2 +-
 .../executor/test_process_executor_extension.py |   2 +-
 .../test_process_executor_tracked_changes.py    |   2 +-
 15 files changed, 276 insertions(+), 384 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4cf88f3f/aria/cli/commands/executions.py
----------------------------------------------------------------------
diff --git a/aria/cli/commands/executions.py b/aria/cli/commands/executions.py
index 162abfa..61328a1 100644
--- a/aria/cli/commands/executions.py
+++ b/aria/cli/commands/executions.py
@@ -25,8 +25,9 @@ from .. import utils
 from .. import logger as cli_logger
 from .. import execution_logging
 from ..core import aria
+from ...orchestrator import execution_compiler
 from ...modeling.models import Execution
-from ...orchestrator import execution
+from ...orchestrator.workflows.core.engine import Engine
 from ...orchestrator.workflows.executor.dry import DryExecutor
 from ...utils import formatting
 from ...utils import threading
@@ -143,19 +144,19 @@ def start(workflow_name,
     service = model_storage.service.get_by_name(service_name)
     executor = DryExecutor() if dry else None  # use WorkflowRunner's default executor
 
-    new_execution = execution.ExecutionCompiler(
+    compiler = execution_compiler.ExecutionCompiler(
         model_storage, 
         resource_storage, 
         plugin_manager, 
         service, 
         workflow_name
-    ).compile(inputs, executor=executor)
-    model_storage.execution.put(new_execution)
+    )
+    workflow_ctx = compiler.compile(inputs, executor=executor)
 
-    execution_runner = execution.ExecutionRunner(executor=executor)
+    engine = Engine(executor)
     logger.info('Starting {0}execution. Press Ctrl+C cancel'.format('dry ' if dry else ''))
 
-    _run_execution(execution_runner, logger, model_storage, dry, mark_pattern)
+    _run_execution(engine, workflow_ctx, logger, model_storage, dry, mark_pattern)
 
 
 @executions.command(name='resume',
@@ -188,35 +189,53 @@ def resume(execution_id,
     if execution_to_resume.status != execution_to_resume.CANCELLED:
         logger.info("Can't resume execution {execution.id} - "
                     "execution is in status {execution.status}. "
-                    "Can only resume executions in status {execution_to_resume.CANCELLED}"
+                    "Can only resume executions in status {execution.CANCELLED}"
                     .format(execution=execution_to_resume))
         return
-    
-    execution_runner = execution.ExecutionRunner(executor, True, retry_failed_tasks)
 
-    logger.info('Resuming {0}execution. Press Ctrl+C cancel'.format('dry ' if dry else ''))
-    _run_execution(execution_runner, logger, model_storage, dry, mark_pattern)
+    workflow_ctx = execution_compiler.ExecutionCompiler(
+        model_storage,
+        resource_storage,
+        plugin_manager,
+        execution_to_resume.service,
+        execution_to_resume.workflow_name
+    ).compile(execution_id=execution_to_resume.id)
 
+    engine = Engine(executor)
 
-def _run_execution(workflow_runner, logger, model_storage, dry, mark_pattern):
-    execution_thread_name = '{0}_{1}'.format(workflow_runner.service.name,
-                                             workflow_runner.execution.workflow_name)
-    execution_thread = threading.ExceptionThread(target=workflow_runner.execute,
-                                                 name=execution_thread_name)
+    logger.info('Resuming {0}execution. Press Ctrl+C cancel'.format('dry ' if dry else ''))
+    _run_execution(engine, workflow_ctx, logger, model_storage, dry, mark_pattern,
+                   engine_kwargs=dict(resuming=True, retry_failed=retry_failed_tasks))
+
+
+def _run_execution(
+        engine,
+        ctx,
+        logger,
+        model_storage,
+        dry,
+        mark_pattern,
+        engine_kwargs=None
+):
+    engine_kwargs = engine_kwargs or {}
+    engine_kwargs['ctx'] = ctx
+    execution_thread_name = '{0}_{1}'.format(ctx.execution.service.name,
+                                             ctx.execution.workflow_name)
+    execution_thread = threading.ExceptionThread(target=engine.execute,
+                                                 name=execution_thread_name,
+                                                 **engine_kwargs)
 
     execution_thread.start()
 
-    last_task_id = workflow_runner.execution.logs[-1].id if workflow_runner.execution.logs else 0
-    log_iterator = cli_logger.ModelLogIterator(model_storage,
-                                               workflow_runner.execution_id,
-                                               offset=last_task_id)
+    last_task_id = ctx.execution.logs[-1].id if ctx.execution.logs else 0
+    log_iterator = cli_logger.ModelLogIterator(model_storage, ctx.execution.id, offset=last_task_id)
     try:
         while execution_thread.is_alive():
             execution_logging.log_list(log_iterator, mark_pattern=mark_pattern)
             execution_thread.join(1)
 
     except KeyboardInterrupt:
-        _cancel_execution(workflow_runner, execution_thread, logger, log_iterator)
+        _cancel_execution(engine, ctx, execution_thread, logger, log_iterator)
 
     # It might be the case where some logs were written and the execution was terminated, thus we
     # need to drain the remaining logs.
@@ -225,19 +244,18 @@ def _run_execution(workflow_runner, logger, model_storage, dry, mark_pattern):
     # raise any errors from the execution thread (note these are not workflow execution errors)
     execution_thread.raise_error_if_exists()
 
-    execution = workflow_runner.execution
-    logger.info('Execution has ended with "{0}" status'.format(execution.status))
-    if execution.status == Execution.FAILED and execution.error:
-        logger.info('Execution error:{0}{1}'.format(os.linesep, execution.error))
+    logger.info('Execution has ended with "{0}" status'.format(ctx.execution.status))
+    if ctx.execution.status == Execution.FAILED and ctx.execution.error:
+        logger.info('Execution error:{0}{1}'.format(os.linesep, ctx.execution.error))
 
     if dry:
         # remove traces of the dry execution (including tasks, logs, inputs..)
-        model_storage.execution.delete(execution)
+        model_storage.execution.delete(ctx.execution)
 
 
-def _cancel_execution(workflow_runner, execution_thread, logger, log_iterator):
+def _cancel_execution(engine, ctx, execution_thread, logger, log_iterator):
     logger.info('Cancelling execution. Press Ctrl+C again to force-cancel.')
-    workflow_runner.cancel()
+    engine.cancel_execution(ctx)
     while execution_thread.is_alive():
         try:
             execution_logging.log_list(log_iterator)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4cf88f3f/aria/orchestrator/execution/__init__.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/execution/__init__.py b/aria/orchestrator/execution/__init__.py
deleted file mode 100644
index ef17fde..0000000
--- a/aria/orchestrator/execution/__init__.py
+++ /dev/null
@@ -1,17 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from .compiler import ExecutionCompiler
-from .runner import ExecutionRunner
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4cf88f3f/aria/orchestrator/execution/compiler.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/execution/compiler.py b/aria/orchestrator/execution/compiler.py
deleted file mode 100644
index f3866bf..0000000
--- a/aria/orchestrator/execution/compiler.py
+++ /dev/null
@@ -1,149 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import os
-import sys
-from datetime import datetime
-
-from .. import exceptions
-from ..context.workflow import WorkflowContext
-from ..workflows import builtin
-from ..workflows.core import graph_compiler
-from ..workflows.executor.process import ProcessExecutor
-from ...modeling import models
-from ...modeling import utils as modeling_utils
-from ...utils.imports import import_fullname
-
-
-DEFAULT_TASK_MAX_ATTEMPTS = 30
-DEFAULT_TASK_RETRY_INTERVAL = 30
-
-
-class ExecutionCompiler(object):
-    def __init__(self, model, resource, plugin, service, workflow_name):
-        self._model = model
-        self._resource = resource
-        self._plugin = plugin
-        self._service = service
-        self._workflow_name = workflow_name
-        self._workflow_context = None
-
-    @property
-    def workflow_ctx(self):
-        return self._workflow_context
-
-    def compile(
-            self,
-            execution_inputs=None,
-            executor=None,
-            task_max_attempts=None,
-            task_retry_interval=None):
-
-        execution = self._create_execution_model(execution_inputs)
-        self._model.execution.put(execution)
-        self._set_ctx(execution, task_max_attempts, task_retry_interval)
-        self._create_tasks(execution, executor=executor)
-        self._model.execution.update(execution)
-        return execution
-
-    def _set_ctx(self, execution, task_max_attempts=None, task_retry_interval=None):
-        self._workflow_context = WorkflowContext(
-                name=self.__class__.__name__,
-                model_storage=self._model,
-                resource_storage=self._resource,
-                service_id=execution.service.id,
-                execution_id=execution.id,
-                workflow_name=execution.workflow_name,
-                task_max_attempts=task_max_attempts or DEFAULT_TASK_MAX_ATTEMPTS,
-                task_retry_interval=task_retry_interval or DEFAULT_TASK_RETRY_INTERVAL
-            )
-
-    def _create_tasks(self, execution, executor=None):
-
-        # Set default executor and kwargs
-        executor = executor or ProcessExecutor(plugin_manager=self._plugin)
-
-        # transforming the execution inputs to dict, to pass them to the workflow function
-        execution_inputs_dict = dict(inp.unwrapped for inp in execution.inputs.itervalues())
-
-        if len(execution.tasks) == 0:
-            workflow_fn = self._get_workflow_fn(execution.workflow_name)
-            self._tasks_graph = workflow_fn(ctx=self.workflow_ctx, **execution_inputs_dict)
-            compiler = graph_compiler.GraphCompiler(self.workflow_ctx, executor.__class__)
-            compiler.compile(self._tasks_graph)
-
-    def _create_execution_model(self, inputs=None):
-        self._validate_workflow_exists_for_service()
-        self._validate_no_active_executions()
-
-        execution = models.Execution(
-            created_at=datetime.utcnow(),
-            service_fk=self._service.id,
-            workflow_name=self._workflow_name,
-            inputs={})
-
-        if self._workflow_name in builtin.BUILTIN_WORKFLOWS:
-            workflow_inputs = dict()  # built-in workflows don't have any inputs
-        else:
-            workflow_inputs = self._service.workflows[self._workflow_name].inputs
-
-        modeling_utils.validate_no_undeclared_inputs(declared_inputs=workflow_inputs,
-                                                     supplied_inputs=inputs or {})
-        modeling_utils.validate_required_inputs_are_supplied(declared_inputs=workflow_inputs,
-                                                             supplied_inputs=inputs or {})
-        execution.inputs = modeling_utils.merge_parameter_values(
-            inputs, workflow_inputs, model_cls=models.Input)
-
-        return execution
-
-    def _validate_no_active_executions(self):
-        active_executions = [e for e in self._service.executions if
-                             e.is_active()]
-        if active_executions:
-            raise exceptions.ActiveExecutionsError(
-                "Can't start execution; Service {0} has an active execution with ID {1}"
-                    .format(self._service.name, active_executions[0].id))
-
-    def _validate_workflow_exists_for_service(self):
-        if self._workflow_name not in self._service.workflows and \
-                        self._workflow_name not in builtin.BUILTIN_WORKFLOWS:
-            raise exceptions.UndeclaredWorkflowError(
-                'No workflow policy {0} declared in service {1}'
-                    .format(self._workflow_name, self._service.name))
-
-    def _get_workflow_fn(self, workflow_name):
-        if workflow_name in builtin.BUILTIN_WORKFLOWS:
-            return import_fullname('{0}.{1}'.format(builtin.BUILTIN_WORKFLOWS_PATH_PREFIX,
-                                                    workflow_name))
-
-        workflow = self._service.workflows[workflow_name]
-
-        # TODO: Custom workflow support needs improvement, currently this code uses internal
-        # knowledge of the resource storage; Instead, workflows should probably be loaded
-        # in a similar manner to operation plugins. Also consider passing to import_fullname
-        # as paths instead of appending to sys path.
-        service_template_resources_path = os.path.join(
-            self._resource.service_template.base_path,
-            str(self._service.service_template.id))
-        sys.path.append(service_template_resources_path)
-
-        try:
-            workflow_fn = import_fullname(workflow.function)
-        except ImportError:
-            raise exceptions.WorkflowImplementationNotFoundError(
-                'Could not find workflow {0} function at {1}'.format(
-                    workflow_name, workflow.function))
-
-        return workflow_fn

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4cf88f3f/aria/orchestrator/execution/runner.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/execution/runner.py b/aria/orchestrator/execution/runner.py
deleted file mode 100644
index a532901..0000000
--- a/aria/orchestrator/execution/runner.py
+++ /dev/null
@@ -1,50 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""
-Running workflows.
-"""
-
-from ..workflows.core import engine
-
-
-class ExecutionRunner(object):
-
-    def __init__(self, executor, resume=False, retry_failed_tasks=False):
-        """
-        Manages a single workflow execution on a given service.
-
-        :param workflow_name: workflow name
-        :param service_id: service ID
-        :param inputs: key-value dict of inputs for the execution
-        :param model_storage: model storage API ("MAPI")
-        :param resource_storage: resource storage API ("RAPI")
-        :param plugin_manager: plugin manager
-        :param executor: executor for tasks; defaults to a
-         :class:`~aria.orchestrator.workflows.executor.process.ProcessExecutor` instance
-        :param task_max_attempts: maximum attempts of repeating each failing task
-        :param task_retry_interval: retry interval between retry attempts of a failing task
-        """
-
-        self._is_resume = resume
-        self._retry_failed_tasks = retry_failed_tasks
-        self._engine = engine.Engine(executors={executor.__class__: executor})
-
-    def execute(self, ctx):
-        self._engine.execute(
-            ctx=ctx, resuming=self._is_resume, retry_failed=self._retry_failed_tasks)
-
-    def cancel(self, ctx):
-        self._engine.cancel_execution(ctx)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4cf88f3f/aria/orchestrator/execution_compiler.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/execution_compiler.py b/aria/orchestrator/execution_compiler.py
new file mode 100644
index 0000000..01e35c1
--- /dev/null
+++ b/aria/orchestrator/execution_compiler.py
@@ -0,0 +1,161 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+import sys
+from datetime import datetime
+
+from . import exceptions
+from .context.workflow import WorkflowContext
+from .workflows import builtin
+from .workflows.core import graph_compiler
+from .workflows.executor.process import ProcessExecutor
+from ..modeling import models
+from ..modeling import utils as modeling_utils
+from ..utils.imports import import_fullname
+
+
+DEFAULT_TASK_MAX_ATTEMPTS = 30
+DEFAULT_TASK_RETRY_INTERVAL = 30
+
+
+class ExecutionCompiler(object):
+    def __init__(
+            self,
+            model,
+            resource,
+            plugin,
+            service,
+            workflow_name,
+            task_max_attempts=None,
+            task_retry_interval=None
+    ):
+        self._model = model
+        self._resource = resource
+        self._plugin = plugin
+        self._service = service
+        self._workflow_name = workflow_name
+        self._workflow_context = None
+        self._execution = None
+        self._task_max_attempts = task_max_attempts or DEFAULT_TASK_MAX_ATTEMPTS
+        self._task_retry_interval = task_retry_interval or DEFAULT_TASK_RETRY_INTERVAL
+
+    @property
+    def workflow_ctx(self):
+        if self._workflow_context is None:
+            self._workflow_context = WorkflowContext(
+                name=self.__class__.__name__,
+                model_storage=self._model,
+                resource_storage=self._resource,
+                service_id=self._execution.service.id,
+                execution_id=self._execution.id,
+                workflow_name=self._execution.workflow_name,
+                task_max_attempts=self._task_max_attempts,
+                task_retry_interval=self._task_retry_interval,
+            )
+        return self._workflow_context
+
+    def compile(self, execution_inputs=None, executor=None, execution_id=None):
+        assert not (execution_inputs and executor and execution_id)
+
+        if execution_id is None:
+            # If the execution is new
+            self._execution = self._create_execution_model(execution_inputs)
+            self._model.execution.put(self._execution)
+            self._create_tasks(executor)
+            self._model.execution.update(self._execution)
+        else:
+            # If resuming an execution
+            self._execution = self._model.execution.get(execution_id)
+
+        return self.workflow_ctx
+
+    def _create_tasks(self, executor=None):
+
+        # Set default executor and kwargs
+        executor = executor or ProcessExecutor(plugin_manager=self._plugin)
+
+        # transforming the execution inputs to dict, to pass them to the workflow function
+        execution_inputs_dict = dict(inp.unwrapped for inp in self._execution.inputs.itervalues())
+
+        if len(self._execution.tasks) == 0:
+            workflow_fn = self._get_workflow_fn(self._execution.workflow_name)
+            self._tasks_graph = workflow_fn(ctx=self.workflow_ctx, **execution_inputs_dict)
+            compiler = graph_compiler.GraphCompiler(self.workflow_ctx, executor.__class__)
+            compiler.compile(self._tasks_graph)
+
+    def _create_execution_model(self, inputs=None):
+        self._validate_workflow_exists_for_service()
+        self._validate_no_active_executions()
+
+        execution = models.Execution(
+            created_at=datetime.utcnow(),
+            service_fk=self._service.id,
+            workflow_name=self._workflow_name,
+            inputs={})
+
+        if self._workflow_name in builtin.BUILTIN_WORKFLOWS:
+            workflow_inputs = dict()  # built-in workflows don't have any inputs
+        else:
+            workflow_inputs = self._service.workflows[self._workflow_name].inputs
+
+        modeling_utils.validate_no_undeclared_inputs(declared_inputs=workflow_inputs,
+                                                     supplied_inputs=inputs or {})
+        modeling_utils.validate_required_inputs_are_supplied(declared_inputs=workflow_inputs,
+                                                             supplied_inputs=inputs or {})
+        execution.inputs = modeling_utils.merge_parameter_values(
+            inputs, workflow_inputs, model_cls=models.Input)
+
+        return execution
+
+    def _validate_no_active_executions(self):
+        active_executions = [e for e in self._service.executions if
+                             e.is_active()]
+        if active_executions:
+            raise exceptions.ActiveExecutionsError(
+                "Can't start execution; Service {0} has an active execution with ID {1}"
+                    .format(self._service.name, active_executions[0].id))
+
+    def _validate_workflow_exists_for_service(self):
+        if self._workflow_name not in self._service.workflows and \
+                        self._workflow_name not in builtin.BUILTIN_WORKFLOWS:
+            raise exceptions.UndeclaredWorkflowError(
+                'No workflow policy {0} declared in service {1}'
+                    .format(self._workflow_name, self._service.name))
+
+    def _get_workflow_fn(self, workflow_name):
+        if workflow_name in builtin.BUILTIN_WORKFLOWS:
+            return import_fullname('{0}.{1}'.format(builtin.BUILTIN_WORKFLOWS_PATH_PREFIX,
+                                                    workflow_name))
+
+        workflow = self._service.workflows[workflow_name]
+
+        # TODO: Custom workflow support needs improvement, currently this code uses internal
+        # knowledge of the resource storage; Instead, workflows should probably be loaded
+        # in a similar manner to operation plugins. Also consider passing to import_fullname
+        # as paths instead of appending to sys path.
+        service_template_resources_path = os.path.join(
+            self._resource.service_template.base_path,
+            str(self._service.service_template.id))
+        sys.path.append(service_template_resources_path)
+
+        try:
+            workflow_fn = import_fullname(workflow.function)
+        except ImportError:
+            raise exceptions.WorkflowImplementationNotFoundError(
+                'Could not find workflow {0} function at {1}'.format(
+                    workflow_name, workflow.function))
+
+        return workflow_fn

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4cf88f3f/aria/orchestrator/workflows/core/engine.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/engine.py b/aria/orchestrator/workflows/core/engine.py
index 0ec3cd8..0d7d2ae 100644
--- a/aria/orchestrator/workflows/core/engine.py
+++ b/aria/orchestrator/workflows/core/engine.py
@@ -36,9 +36,9 @@ class Engine(logger.LoggerMixin):
     Executes workflows.
     """
 
-    def __init__(self, executors, **kwargs):
+    def __init__(self, *executors, **kwargs):
         super(Engine, self).__init__(**kwargs)
-        self._executors = executors.copy()
+        self._executors = dict((e.__class__, e) for e in executors)
         self._executors.setdefault(StubTaskExecutor, StubTaskExecutor())
 
     def execute(self, ctx, resuming=False, retry_failed=False):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4cf88f3f/tests/orchestrator/context/__init__.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/context/__init__.py b/tests/orchestrator/context/__init__.py
index 780db07..257cbf7 100644
--- a/tests/orchestrator/context/__init__.py
+++ b/tests/orchestrator/context/__init__.py
@@ -27,6 +27,6 @@ def execute(workflow_func, workflow_context, executor):
     graph = workflow_func(ctx=workflow_context)
 
     graph_compiler.GraphCompiler(workflow_context, executor.__class__).compile(graph)
-    eng = engine.Engine(executors={executor.__class__: executor})
+    eng = engine.Engine(executor)
 
     eng.execute(workflow_context)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4cf88f3f/tests/orchestrator/context/test_serialize.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/context/test_serialize.py b/tests/orchestrator/context/test_serialize.py
index 091e23c..6e9c950 100644
--- a/tests/orchestrator/context/test_serialize.py
+++ b/tests/orchestrator/context/test_serialize.py
@@ -49,7 +49,7 @@ def test_serialize_operation_context(context, executor, tmpdir):
 
     graph = _mock_workflow(ctx=context)  # pylint: disable=no-value-for-parameter
     graph_compiler.GraphCompiler(context, executor.__class__).compile(graph)
-    eng = engine.Engine({executor.__class__: executor})
+    eng = engine.Engine(executor)
     eng.execute(context)
 
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4cf88f3f/tests/orchestrator/execution/test_execution_compiler.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/execution/test_execution_compiler.py b/tests/orchestrator/execution/test_execution_compiler.py
index d8c8aa3..14332db 100644
--- a/tests/orchestrator/execution/test_execution_compiler.py
+++ b/tests/orchestrator/execution/test_execution_compiler.py
@@ -18,15 +18,13 @@ import time
 from threading import Thread, Event
 from datetime import datetime
 
-import mock
 import pytest
 
 from aria.modeling import exceptions as modeling_exceptions
 from aria.modeling import models
 from aria.orchestrator import exceptions
 from aria.orchestrator import events
-from aria.orchestrator import execution as orch_execution
-from aria.orchestrator.workflows.executor.process import ProcessExecutor
+from aria.orchestrator import execution_compiler
 from aria.orchestrator.workflows import api
 from aria.orchestrator.workflows.core import engine, graph_compiler
 from aria.orchestrator.workflows.executor import thread
@@ -84,16 +82,16 @@ def test_missing_workflow_implementation(service, request):
 def test_builtin_workflow_instantiation(request, model):
     # validates the workflow runner instantiates properly when provided with a builtin workflow
     # (expecting no errors to be raised on undeclared workflow or missing workflow implementation)
-    execution = _get_compiler(request, 'install').compile()
-    assert len(execution.tasks) == 18  # expecting 18 tasks for 2 node topology
+    workflow_ctx = _get_compiler(request, 'install').compile()
+    assert len(workflow_ctx.execution.tasks) == 18  # expecting 18 tasks for 2 node topology
 
 
 def test_custom_workflow_instantiation(request):
     # validates the workflow runner instantiates properly when provided with a custom workflow
     # (expecting no errors to be raised on undeclared workflow or missing workflow implementation)
     mock_workflow = _setup_mock_workflow_in_service(request)
-    execution = _get_compiler(request, mock_workflow).compile()
-    assert len(execution.tasks) == 2  # mock workflow creates only start workflow and end workflow task
+    workflow_ctx = _get_compiler(request, mock_workflow).compile()
+    assert len(workflow_ctx.execution.tasks) == 2  # mock workflow creates only start workflow and end workflow task
 
 
 def test_existing_active_executions(request, service, model):
@@ -116,88 +114,15 @@ def test_existing_executions_but_no_active_ones(request, service, model):
     _get_compiler(request, 'install').compile()
 
 
-def test_default_executor(request):
-    # validates the ProcessExecutor is used by the workflow runner by default
-    mock_workflow = _setup_mock_workflow_in_service(request)
-
-    with mock.patch('aria.orchestrator.execution.runner.engine.Engine') as mock_engine_cls:
-        execution = _get_compiler(request, mock_workflow).compile()
-        orch_execution.ExecutionRunner(ProcessExecutor())
-        _, engine_kwargs = mock_engine_cls.call_args
-        assert isinstance(engine_kwargs.get('executors').values()[0], ProcessExecutor)
-
-
-def test_custom_executor(request):
-    mock_workflow = _setup_mock_workflow_in_service(request)
-
-    custom_executor = mock.MagicMock()
-    with mock.patch('aria.orchestrator.execution.runner.engine.Engine') as mock_engine_cls:
-        execution = _get_compiler(request, mock_workflow).compile(executor=custom_executor)
-        orch_execution.ExecutionRunner(custom_executor)
-        _, engine_kwargs = mock_engine_cls.call_args
-        assert engine_kwargs.get('executors').values()[0] == custom_executor
-
-
-def test_task_configuration_parameters(request):
-    mock_workflow = _setup_mock_workflow_in_service(request)
-
-    task_max_attempts = 5
-    task_retry_interval = 7
-    with mock.patch('aria.orchestrator.execution.runner.engine.Engine.execute') as \
-            mock_engine_execute:
-        compiler = _get_compiler(request, mock_workflow)
-        execution = compiler.compile(task_max_attempts=task_max_attempts,
-                                     task_retry_interval=task_retry_interval)
-        orch_execution.ExecutionRunner(ProcessExecutor()).execute(compiler.workflow_ctx)
-        _, engine_kwargs = mock_engine_execute.call_args
-        assert engine_kwargs['ctx']._task_max_attempts == task_max_attempts
-        assert engine_kwargs['ctx']._task_retry_interval == task_retry_interval
-
-
-def test_execute(request, service):
-    mock_workflow = _setup_mock_workflow_in_service(request)
-
-    mock_engine = mock.MagicMock()
-    with mock.patch('aria.orchestrator.execution.runner.engine.Engine.execute',
-                    return_value=mock_engine) as mock_engine_execute:
-        compiler = _get_compiler(request, mock_workflow)
-        compiler.compile()
-
-        runner = orch_execution.runner.ExecutionRunner(ProcessExecutor())
-        runner.execute(compiler.workflow_ctx)
-
-        _, engine_kwargs = mock_engine_execute.call_args
-        assert engine_kwargs['ctx'].service.id == service.id
-        assert engine_kwargs['ctx'].execution.workflow_name == 'test_workflow'
-
-        mock_engine_execute.assert_called_once_with(ctx=compiler.workflow_ctx,
-                                                    resuming=False,
-                                                    retry_failed=False)
-
-
-def test_cancel_execution(request):
-    mock_workflow = _setup_mock_workflow_in_service(request)
-
-    mock_engine = mock.MagicMock()
-    with mock.patch('aria.orchestrator.execution.runner.engine.Engine', return_value=mock_engine):
-        compiler = _get_compiler(request, mock_workflow)
-        compiler.compile()
-
-        runner = orch_execution.ExecutionRunner(ProcessExecutor())
-        runner.cancel(ctx=compiler.workflow_ctx)
-        mock_engine.cancel_execution.assert_called_once_with(compiler.workflow_ctx)
-
-
 def test_execution_model_creation(request, service):
     mock_workflow = _setup_mock_workflow_in_service(request)
 
-    with mock.patch('aria.orchestrator.execution.runner.engine.Engine'):
-        execution = _get_compiler(request, mock_workflow).compile()
+    workflow_ctx = _get_compiler(request, mock_workflow).compile()
 
-        assert execution.service.id == service.id
-        assert execution.workflow_name == mock_workflow
-        assert execution.created_at <= datetime.utcnow()
-        assert execution.inputs == dict()
+    assert workflow_ctx.execution.service.id == service.id
+    assert workflow_ctx.execution.workflow_name == mock_workflow
+    assert workflow_ctx.execution.created_at <= datetime.utcnow()
+    assert workflow_ctx.execution.inputs == dict()
 
 
 def test_execution_inputs_override_workflow_inputs(request):
@@ -207,18 +132,17 @@ def test_execution_inputs_override_workflow_inputs(request):
         inputs=dict((name, models.Input.wrap(name, val)) for name, val
                     in wf_inputs.iteritems()))
 
-    with mock.patch('aria.orchestrator.execution.runner.engine.Engine'):
-        execution = _get_compiler(request, mock_workflow).compile(
-            execution_inputs={'input2': 'overriding-value2', 'input3': 7}
-        )
+    workflow_ctx = _get_compiler(request, mock_workflow).compile(
+        execution_inputs={'input2': 'overriding-value2', 'input3': 7}
+    )
 
-        assert len(execution.inputs) == 3
-        # did not override input1 - expecting the default value from the workflow inputs
-        assert execution.inputs['input1'].value == 'value1'
-        # overrode input2
-        assert execution.inputs['input2'].value == 'overriding-value2'
-        # overrode input of integer type
-        assert execution.inputs['input3'].value == 7
+    assert len(workflow_ctx.execution.inputs) == 3
+    # did not override input1 - expecting the default value from the workflow inputs
+    assert workflow_ctx.execution.inputs['input1'].value == 'value1'
+    # overrode input2
+    assert workflow_ctx.execution.inputs['input2'].value == 'overriding-value2'
+    # overrode input of integer type
+    assert workflow_ctx.execution.inputs['input3'].value == 7
 
 
 def test_execution_inputs_undeclared_inputs(request):
@@ -312,7 +236,7 @@ def _get_compiler(request, workflow_name):
     resource = request.getfixturevalue('resource')
     plugin_manager = request.getfixturevalue('plugin_manager')
 
-    return orch_execution.ExecutionCompiler(
+    return execution_compiler.ExecutionCompiler(
         model,
         resource,
         plugin_manager,
@@ -323,7 +247,7 @@ def _get_compiler(request, workflow_name):
 
 class TestResumableWorkflows(object):
 
-    def _create_initial_workflow_runner(
+    def _compile_execution(
             self,
             model,
             resource,
@@ -340,19 +264,19 @@ class TestResumableWorkflows(object):
             }
         )
         model.service.update(service)
-        compiler = orch_execution.ExecutionCompiler(
+        compiler = execution_compiler.ExecutionCompiler(
             model, resource, None, service, 'custom_workflow'
         )
-        execution = compiler.compile(inputs, executor)
-        model.execution.update(execution)
+        ctx = compiler.compile(inputs, executor)
+        model.execution.update(ctx.execution)
 
-        return orch_execution.ExecutionRunner(executor), compiler.workflow_ctx
+        return ctx
 
     @staticmethod
-    def _wait_for_active_and_cancel(execution_runner, ctx):
+    def _wait_for_active_and_cancel(eng, ctx):
         if custom_events['is_active'].wait(60) is False:
             raise TimeoutError("is_active wasn't set to True")
-        execution_runner.cancel(ctx)
+        eng.cancel_execution(ctx)
         if custom_events['execution_cancelled'].wait(60) is False:
             raise TimeoutError("Execution did not end")
 
@@ -360,7 +284,7 @@ class TestResumableWorkflows(object):
         node = workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
         node.attributes['invocations'] = models.Attribute.wrap('invocations', 0)
         self._create_interface(workflow_context, node, mock_pass_first_task_only)
-        runner, ctx = self._create_initial_workflow_runner(
+        ctx = self._compile_execution(
             workflow_context.model,
             workflow_context.resource,
             workflow_context.model.service.list()[0],
@@ -369,12 +293,14 @@ class TestResumableWorkflows(object):
             inputs={'number_of_tasks': 2}
         )
 
-        wf_thread = Thread(target=runner.execute, kwargs=dict(ctx=ctx))
+        eng = engine.Engine(thread_executor)
+
+        wf_thread = Thread(target=eng.execute, kwargs=dict(ctx=ctx))
         wf_thread.daemon = True
         wf_thread.start()
 
         # Wait for the execution to start
-        self._wait_for_active_and_cancel(runner, ctx)
+        self._wait_for_active_and_cancel(eng, ctx)
         node = ctx.model.node.refresh(node)
 
         tasks = ctx.model.task.list(filters={'_stub_type': None})
@@ -385,8 +311,8 @@ class TestResumableWorkflows(object):
 
         # Create a new workflow runner, with an existing execution id. This would cause
         # the old execution to restart.
-        new_wf_runner = orch_execution.ExecutionRunner(thread_executor, True)
-        new_wf_runner.execute(ctx)
+        new_engine = engine.Engine(thread_executor)
+        new_engine.execute(ctx, resuming=True)
 
         # Wait for it to finish and assert changes.
         node = workflow_context.model.node.refresh(node)
@@ -399,7 +325,7 @@ class TestResumableWorkflows(object):
         node.attributes['invocations'] = models.Attribute.wrap('invocations', 0)
         self._create_interface(workflow_context, node, mock_stuck_task)
 
-        wf_runner, ctx = self._create_initial_workflow_runner(
+        ctx = self._compile_execution(
             workflow_context.model,
             workflow_context.resource,
             workflow_context.model.service.list()[0],
@@ -408,11 +334,12 @@ class TestResumableWorkflows(object):
             inputs={'number_of_tasks': 1}
         )
 
-        wf_thread = Thread(target=wf_runner.execute, kwargs=dict(ctx=ctx))
+        eng = engine.Engine(thread_executor)
+        wf_thread = Thread(target=eng.execute, kwargs=dict(ctx=ctx))
         wf_thread.daemon = True
         wf_thread.start()
 
-        self._wait_for_active_and_cancel(wf_runner, ctx)
+        self._wait_for_active_and_cancel(eng, ctx)
         node = workflow_context.model.node.refresh(node)
         task = workflow_context.model.task.list(filters={'_stub_type': None})[0]
         assert node.attributes['invocations'].value == 1
@@ -422,9 +349,8 @@ class TestResumableWorkflows(object):
 
         new_thread_executor = thread.ThreadExecutor()
         try:
-            new_wf_runner = orch_execution.ExecutionRunner(new_thread_executor, True)
-
-            new_wf_runner.execute(ctx)
+            new_engine = engine.Engine(new_thread_executor)
+            new_engine.execute(ctx, resuming=True)
         finally:
             new_thread_executor.close()
 
@@ -439,17 +365,19 @@ class TestResumableWorkflows(object):
         node.attributes['invocations'] = models.Attribute.wrap('invocations', 0)
         self._create_interface(workflow_context, node, mock_failed_before_resuming)
 
-        wf_runner, ctx = self._create_initial_workflow_runner(
+        ctx = self._compile_execution(
             workflow_context.model,
             workflow_context.resource,
             workflow_context.model.service.list()[0],
             mock_parallel_tasks_workflow,
             thread_executor)
-        wf_thread = Thread(target=wf_runner.execute, kwargs=dict(ctx=ctx))
+
+        eng = engine.Engine(thread_executor)
+        wf_thread = Thread(target=eng.execute, kwargs=dict(ctx=ctx))
         wf_thread.setDaemon(True)
         wf_thread.start()
 
-        self._wait_for_active_and_cancel(wf_runner, ctx)
+        self._wait_for_active_and_cancel(eng, ctx)
         node = workflow_context.model.node.refresh(node)
 
         task = workflow_context.model.task.list(filters={'_stub_type': None})[0]
@@ -464,9 +392,8 @@ class TestResumableWorkflows(object):
         # the old execution to restart.
         new_thread_executor = thread.ThreadExecutor()
         try:
-            new_wf_runner = orch_execution.ExecutionRunner(new_thread_executor, True)
-
-            new_wf_runner.execute(ctx)
+            new_engine = engine.Engine(new_thread_executor)
+            new_engine.execute(ctx, resuming=True)
         finally:
             new_thread_executor.close()
 
@@ -481,7 +408,7 @@ class TestResumableWorkflows(object):
         node.attributes['invocations'] = models.Attribute.wrap('invocations', 0)
         self._create_interface(workflow_context, node, mock_pass_first_task_only)
 
-        wf_runner, ctx = self._create_initial_workflow_runner(
+        ctx = self._compile_execution(
             workflow_context.model,
             workflow_context.resource,
             workflow_context.model.service.list()[0],
@@ -489,7 +416,8 @@ class TestResumableWorkflows(object):
             thread_executor,
             inputs={'retry_interval': 1, 'max_attempts': 2, 'number_of_tasks': 2}
         )
-        wf_thread = Thread(target=wf_runner.execute, kwargs=dict(ctx=ctx))
+        eng = engine.Engine(thread_executor)
+        wf_thread = Thread(target=eng.execute, kwargs=dict(ctx=ctx))
         wf_thread.setDaemon(True)
         wf_thread.start()
 
@@ -511,8 +439,8 @@ class TestResumableWorkflows(object):
         custom_events['is_resumed'].set()
         new_thread_executor = thread.ThreadExecutor()
         try:
-            new_wf_runner = orch_execution.ExecutionRunner(new_thread_executor, True, True)
-            new_wf_runner.execute(ctx)
+            new_engine = engine.Engine(new_thread_executor)
+            new_engine.execute(ctx, resuming=True, retry_failed=True)
         finally:
             new_thread_executor.close()
 
@@ -528,7 +456,7 @@ class TestResumableWorkflows(object):
         node.attributes['invocations'] = models.Attribute.wrap('invocations', 0)
         self._create_interface(workflow_context, node, mock_fail_first_task_only)
 
-        execution_runner, ctx = self._create_initial_workflow_runner(
+        ctx = self._compile_execution(
             workflow_context.model,
             workflow_context.resource,
             workflow_context.model.service.list()[0],
@@ -536,7 +464,8 @@ class TestResumableWorkflows(object):
             thread_executor,
             inputs={'retry_interval': 1, 'max_attempts': 1, 'number_of_tasks': 2}
         )
-        wf_thread = Thread(target=execution_runner.execute, kwargs=dict(ctx=ctx))
+        eng = engine.Engine(thread_executor)
+        wf_thread = Thread(target=eng.execute, kwargs=dict(ctx=ctx))
         wf_thread.setDaemon(True)
         wf_thread.start()
 
@@ -552,8 +481,8 @@ class TestResumableWorkflows(object):
         custom_events['is_resumed'].set()
         new_thread_executor = thread.ThreadExecutor()
         try:
-            new_wf_runner = orch_execution.ExecutionRunner(new_thread_executor, resume=True)
-            new_wf_runner.execute(ctx)
+            new_engine = engine.Engine(new_thread_executor)
+            new_engine.execute(ctx, resuming=True)
         finally:
             new_thread_executor.close()
 
@@ -603,7 +532,7 @@ class TestResumableWorkflows(object):
         graph_compiler.GraphCompiler(workflow_context, executor.__class__).compile(graph)
         workflow_context.execution = execution
 
-        return engine.Engine(executors={executor.__class__: executor})
+        return engine.Engine(executor)
 
     @pytest.fixture(autouse=True)
     def register_to_events(self):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4cf88f3f/tests/orchestrator/execution_plugin/test_local.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/execution_plugin/test_local.py b/tests/orchestrator/execution_plugin/test_local.py
index 599383d..fad05de 100644
--- a/tests/orchestrator/execution_plugin/test_local.py
+++ b/tests/orchestrator/execution_plugin/test_local.py
@@ -502,7 +502,7 @@ if __name__ == '__main__':
         tasks_graph = mock_workflow(ctx=workflow_context)  # pylint: disable=no-value-for-parameter
         graph_compiler.GraphCompiler(workflow_context, executor.__class__).compile(
             tasks_graph)
-        eng = engine.Engine({executor.__class__: executor})
+        eng = engine.Engine(executor)
         eng.execute(workflow_context)
         return workflow_context.model.node.get_by_name(
             mock.models.DEPENDENCY_NODE_NAME).attributes

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4cf88f3f/tests/orchestrator/execution_plugin/test_ssh.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/execution_plugin/test_ssh.py b/tests/orchestrator/execution_plugin/test_ssh.py
index b5df939..e39f3ba 100644
--- a/tests/orchestrator/execution_plugin/test_ssh.py
+++ b/tests/orchestrator/execution_plugin/test_ssh.py
@@ -263,7 +263,7 @@ class TestWithActualSSHServer(object):
         tasks_graph = mock_workflow(ctx=self._workflow_context)  # pylint: disable=no-value-for-parameter
         graph_compiler.GraphCompiler(
             self._workflow_context, self._executor.__class__).compile(tasks_graph)
-        eng = engine.Engine({self._executor.__class__: self._executor})
+        eng = engine.Engine(self._executor)
         eng.execute(self._workflow_context)
         return self._workflow_context.model.node.get_by_name(
             mock.models.DEPENDENCY_NODE_NAME).attributes

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4cf88f3f/tests/orchestrator/workflows/core/test_engine.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/core/test_engine.py b/tests/orchestrator/workflows/core/test_engine.py
index 0c704f5..2c2a06a 100644
--- a/tests/orchestrator/workflows/core/test_engine.py
+++ b/tests/orchestrator/workflows/core/test_engine.py
@@ -52,7 +52,7 @@ class BaseTest(object):
         graph = workflow_func(ctx=workflow_context)
         graph_compiler.GraphCompiler(workflow_context, executor.__class__).compile(graph)
 
-        return engine.Engine(executors={executor.__class__: executor})
+        return engine.Engine(executor)
 
     @staticmethod
     def _create_interface(ctx, func, arguments=None):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4cf88f3f/tests/orchestrator/workflows/core/test_events.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/core/test_events.py b/tests/orchestrator/workflows/core/test_events.py
index d804de5..bb5bb75 100644
--- a/tests/orchestrator/workflows/core/test_events.py
+++ b/tests/orchestrator/workflows/core/test_events.py
@@ -132,7 +132,7 @@ def run_operation_on_node(ctx, op_name, interface_name, executor):
         single_operation_workflow(ctx, node=node, interface_name=interface_name, op_name=op_name)
     )
 
-    eng = engine.Engine(executors={executor.__class__: executor})
+    eng = engine.Engine(executor)
     eng.execute(ctx)
     return node
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4cf88f3f/tests/orchestrator/workflows/executor/test_process_executor_extension.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_process_executor_extension.py b/tests/orchestrator/workflows/executor/test_process_executor_extension.py
index b26fa43..0093976 100644
--- a/tests/orchestrator/workflows/executor/test_process_executor_extension.py
+++ b/tests/orchestrator/workflows/executor/test_process_executor_extension.py
@@ -58,7 +58,7 @@ def test_decorate_extension(context, executor):
         return graph
     graph = mock_workflow(ctx=context)  # pylint: disable=no-value-for-parameter
     graph_compiler.GraphCompiler(context, executor.__class__).compile(graph)
-    eng = engine.Engine({executor.__class__: executor})
+    eng = engine.Engine(executor)
     eng.execute(context)
     out = get_node(context).attributes.get('out').value
     assert out['wrapper_arguments'] == arguments

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4cf88f3f/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py b/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
index 47ee2f7..8aaf4ef 100644
--- a/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
+++ b/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
@@ -108,7 +108,7 @@ def _run_workflow(context, executor, op_func, arguments=None):
         return graph
     graph = mock_workflow(ctx=context)  # pylint: disable=no-value-for-parameter
     graph_compiler.GraphCompiler(context, executor.__class__).compile(graph)
-    eng = engine.Engine({executor.__class__: executor})
+    eng = engine.Engine(executor)
     eng.execute(context)
     out = context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME).attributes.get('out')
     return out.value if out else None