You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ariatosca.apache.org by mx...@apache.org on 2017/11/20 08:14:11 UTC

[1/2] incubator-ariatosca git commit: wip 2 [Forced Update!]

Repository: incubator-ariatosca
Updated Branches:
  refs/heads/extract_execution_creation_from_workflow_runner 5b35934cd -> da233c730 (forced update)


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/da233c73/tests/orchestrator/test_workflow_runner.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/test_workflow_runner.py b/tests/orchestrator/test_workflow_runner.py
deleted file mode 100644
index 011c4cc..0000000
--- a/tests/orchestrator/test_workflow_runner.py
+++ /dev/null
@@ -1,726 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import json
-import time
-from threading import Thread, Event
-from datetime import datetime
-
-import mock
-import pytest
-
-from aria.modeling import exceptions as modeling_exceptions
-from aria.modeling import models
-from aria.orchestrator import exceptions
-from aria.orchestrator import events
-from aria.orchestrator.workflow_runner import WorkflowRunner
-from aria.orchestrator.workflows.executor.process import ProcessExecutor
-from aria.orchestrator.workflows import api
-from aria.orchestrator.workflows.core import engine, graph_compiler
-from aria.orchestrator.workflows.executor import thread
-from aria.orchestrator import (
-    workflow,
-    operation,
-)
-
-from tests import (
-    mock as tests_mock,
-    storage
-)
-
-from ..fixtures import (  # pylint: disable=unused-import
-    plugins_dir,
-    plugin_manager,
-    fs_model as model,
-    resource_storage as resource
-)
-
-custom_events = {
-    'is_resumed': Event(),
-    'is_active': Event(),
-    'execution_cancelled': Event(),
-    'execution_failed': Event(),
-}
-
-
-class TimeoutError(BaseException):
-    pass
-
-
-class FailingTask(BaseException):
-    pass
-
-
-def test_undeclared_workflow(request):
-    # validating a proper error is raised when the workflow is not declared in the service
-    with pytest.raises(exceptions.UndeclaredWorkflowError):
-        _create_workflow_runner(request, 'undeclared_workflow')
-
-
-def test_missing_workflow_implementation(service, request):
-    # validating a proper error is raised when the workflow code path does not exist
-    workflow = models.Operation(
-        name='test_workflow',
-        service=service,
-        function='nonexistent.workflow.implementation')
-    service.workflows['test_workflow'] = workflow
-
-    with pytest.raises(exceptions.WorkflowImplementationNotFoundError):
-        _create_workflow_runner(request, 'test_workflow')
-
-
-def test_builtin_workflow_instantiation(request):
-    # validates the workflow runner instantiates properly when provided with a builtin workflow
-    # (expecting no errors to be raised on undeclared workflow or missing workflow implementation)
-    workflow_runner = _create_workflow_runner(request, 'install')
-    tasks = list(workflow_runner.execution.tasks)
-    assert len(tasks) == 18  # expecting 18 tasks for 2 node topology
-
-
-def test_custom_workflow_instantiation(request):
-    # validates the workflow runner instantiates properly when provided with a custom workflow
-    # (expecting no errors to be raised on undeclared workflow or missing workflow implementation)
-    mock_workflow = _setup_mock_workflow_in_service(request)
-    workflow_runner = _create_workflow_runner(request, mock_workflow)
-    tasks = list(workflow_runner.execution.tasks)
-    assert len(tasks) == 2  # mock workflow creates only start workflow and end workflow task
-
-
-def test_existing_active_executions(request, service, model):
-    existing_active_execution = models.Execution(
-        service=service,
-        status=models.Execution.STARTED,
-        workflow_name='uninstall')
-    model.execution.put(existing_active_execution)
-    with pytest.raises(exceptions.ActiveExecutionsError):
-        _create_workflow_runner(request, 'install')
-
-
-def test_existing_executions_but_no_active_ones(request, service, model):
-    existing_terminated_execution = models.Execution(
-        service=service,
-        status=models.Execution.SUCCEEDED,
-        workflow_name='uninstall')
-    model.execution.put(existing_terminated_execution)
-    # no active executions exist, so no error should be raised
-    _create_workflow_runner(request, 'install')
-
-
-def test_default_executor(request):
-    # validates the ProcessExecutor is used by the workflow runner by default
-    mock_workflow = _setup_mock_workflow_in_service(request)
-
-    with mock.patch('aria.orchestrator.workflow_runner.engine.Engine') as mock_engine_cls:
-        _create_workflow_runner(request, mock_workflow)
-        _, engine_kwargs = mock_engine_cls.call_args
-        assert isinstance(engine_kwargs.get('executors').values()[0], ProcessExecutor)
-
-
-def test_custom_executor(request):
-    mock_workflow = _setup_mock_workflow_in_service(request)
-
-    custom_executor = mock.MagicMock()
-    with mock.patch('aria.orchestrator.workflow_runner.engine.Engine') as mock_engine_cls:
-        _create_workflow_runner(request, mock_workflow, executor=custom_executor)
-        _, engine_kwargs = mock_engine_cls.call_args
-        assert engine_kwargs.get('executors').values()[0] == custom_executor
-
-
-def test_task_configuration_parameters(request):
-    mock_workflow = _setup_mock_workflow_in_service(request)
-
-    task_max_attempts = 5
-    task_retry_interval = 7
-    with mock.patch('aria.orchestrator.workflow_runner.engine.Engine.execute') as \
-            mock_engine_execute:
-        _create_workflow_runner(request, mock_workflow, task_max_attempts=task_max_attempts,
-                                task_retry_interval=task_retry_interval).execute()
-        _, engine_kwargs = mock_engine_execute.call_args
-        assert engine_kwargs['ctx']._task_max_attempts == task_max_attempts
-        assert engine_kwargs['ctx']._task_retry_interval == task_retry_interval
-
-
-def test_execute(request, service):
-    mock_workflow = _setup_mock_workflow_in_service(request)
-
-    mock_engine = mock.MagicMock()
-    with mock.patch('aria.orchestrator.workflow_runner.engine.Engine.execute',
-                    return_value=mock_engine) as mock_engine_execute:
-        workflow_runner = _create_workflow_runner(request, mock_workflow)
-        workflow_runner.execute()
-
-        _, engine_kwargs = mock_engine_execute.call_args
-        assert engine_kwargs['ctx'].service.id == service.id
-        assert engine_kwargs['ctx'].execution.workflow_name == 'test_workflow'
-
-        mock_engine_execute.assert_called_once_with(ctx=workflow_runner._workflow_context,
-                                                    resuming=False,
-                                                    retry_failed=False)
-
-
-def test_cancel_execution(request):
-    mock_workflow = _setup_mock_workflow_in_service(request)
-
-    mock_engine = mock.MagicMock()
-    with mock.patch('aria.orchestrator.workflow_runner.engine.Engine', return_value=mock_engine):
-        workflow_runner = _create_workflow_runner(request, mock_workflow)
-        workflow_runner.cancel()
-        mock_engine.cancel_execution.assert_called_once_with(ctx=workflow_runner._workflow_context)
-
-
-def test_execution_model_creation(request, service, model):
-    mock_workflow = _setup_mock_workflow_in_service(request)
-
-    with mock.patch('aria.orchestrator.workflow_runner.engine.Engine'):
-        workflow_runner = _create_workflow_runner(request, mock_workflow)
-
-        assert model.execution.get(workflow_runner.execution.id) == workflow_runner.execution
-        assert workflow_runner.execution.service.id == service.id
-        assert workflow_runner.execution.workflow_name == mock_workflow
-        assert workflow_runner.execution.created_at <= datetime.utcnow()
-        assert workflow_runner.execution.inputs == dict()
-
-
-def test_execution_inputs_override_workflow_inputs(request):
-    wf_inputs = {'input1': 'value1', 'input2': 'value2', 'input3': 5}
-    mock_workflow = _setup_mock_workflow_in_service(
-        request,
-        inputs=dict((name, models.Input.wrap(name, val)) for name, val
-                    in wf_inputs.iteritems()))
-
-    with mock.patch('aria.orchestrator.workflow_runner.engine.Engine'):
-        workflow_runner = _create_workflow_runner(
-            request, mock_workflow, inputs={'input2': 'overriding-value2', 'input3': 7})
-
-        assert len(workflow_runner.execution.inputs) == 3
-        # did not override input1 - expecting the default value from the workflow inputs
-        assert workflow_runner.execution.inputs['input1'].value == 'value1'
-        # overrode input2
-        assert workflow_runner.execution.inputs['input2'].value == 'overriding-value2'
-        # overrode input of integer type
-        assert workflow_runner.execution.inputs['input3'].value == 7
-
-
-def test_execution_inputs_undeclared_inputs(request):
-    mock_workflow = _setup_mock_workflow_in_service(request)
-
-    with pytest.raises(modeling_exceptions.UndeclaredInputsException):
-        _create_workflow_runner(request, mock_workflow, inputs={'undeclared_input': 'value'})
-
-
-def test_execution_inputs_missing_required_inputs(request):
-    mock_workflow = _setup_mock_workflow_in_service(
-        request, inputs={'required_input': models.Input.wrap('required_input', value=None)})
-
-    with pytest.raises(modeling_exceptions.MissingRequiredInputsException):
-        _create_workflow_runner(request, mock_workflow, inputs={})
-
-
-def test_execution_inputs_wrong_type_inputs(request):
-    mock_workflow = _setup_mock_workflow_in_service(
-        request, inputs={'input': models.Input.wrap('input', 'value')})
-
-    with pytest.raises(modeling_exceptions.ParametersOfWrongTypeException):
-        _create_workflow_runner(request, mock_workflow, inputs={'input': 5})
-
-
-def test_execution_inputs_builtin_workflow_with_inputs(request):
-    # built-in workflows don't have inputs
-    with pytest.raises(modeling_exceptions.UndeclaredInputsException):
-        _create_workflow_runner(request, 'install', inputs={'undeclared_input': 'value'})
-
-
-def test_workflow_function_parameters(request, tmpdir):
-    # validating the workflow function is passed with the
-    # merged execution inputs, in dict form
-
-    # the workflow function parameters will be written to this file
-    output_path = str(tmpdir.join('output'))
-    wf_inputs = {'output_path': output_path, 'input1': 'value1', 'input2': 'value2', 'input3': 5}
-
-    mock_workflow = _setup_mock_workflow_in_service(
-        request, inputs=dict((name, models.Input.wrap(name, val)) for name, val
-                             in wf_inputs.iteritems()))
-
-    _create_workflow_runner(request, mock_workflow,
-                            inputs={'input2': 'overriding-value2', 'input3': 7})
-
-    with open(output_path) as f:
-        wf_call_kwargs = json.load(f)
-    assert len(wf_call_kwargs) == 3
-    assert wf_call_kwargs.get('input1') == 'value1'
-    assert wf_call_kwargs.get('input2') == 'overriding-value2'
-    assert wf_call_kwargs.get('input3') == 7
-
-
-@pytest.fixture
-def service(model):
-    # sets up a service in the storage
-    service_id = tests_mock.topology.create_simple_topology_two_nodes(model)
-    service = model.service.get(service_id)
-    return service
-
-
-def _setup_mock_workflow_in_service(request, inputs=None):
-    # sets up a mock workflow as part of the service, including uploading
-    # the workflow code to the service's dir on the resource storage
-    service = request.getfixturevalue('service')
-    resource = request.getfixturevalue('resource')
-
-    source = tests_mock.workflow.__file__
-    resource.service_template.upload(str(service.service_template.id), source)
-    mock_workflow_name = 'test_workflow'
-    arguments = {}
-    if inputs:
-        for input in inputs.itervalues():
-            arguments[input.name] = input.as_argument()
-    workflow = models.Operation(
-        name=mock_workflow_name,
-        service=service,
-        function='workflow.mock_workflow',
-        inputs=inputs or {},
-        arguments=arguments)
-    service.workflows[mock_workflow_name] = workflow
-    return mock_workflow_name
-
-
-def _create_workflow_runner(request, workflow_name, inputs=None, executor=None,
-                            task_max_attempts=None, task_retry_interval=None):
-    # helper method for instantiating a workflow runner
-    service_id = request.getfixturevalue('service').id
-    model = request.getfixturevalue('model')
-    resource = request.getfixturevalue('resource')
-    plugin_manager = request.getfixturevalue('plugin_manager')
-
-    # task configuration parameters can't be set to None, therefore only
-    # passing those if they've been set by the test
-    task_configuration_kwargs = dict()
-    if task_max_attempts is not None:
-        task_configuration_kwargs['task_max_attempts'] = task_max_attempts
-    if task_retry_interval is not None:
-        task_configuration_kwargs['task_retry_interval'] = task_retry_interval
-
-    return WorkflowRunner(
-        workflow_name=workflow_name,
-        service_id=service_id,
-        inputs=inputs or {},
-        executor=executor,
-        model_storage=model,
-        resource_storage=resource,
-        plugin_manager=plugin_manager,
-        **task_configuration_kwargs)
-
-
-class TestResumableWorkflows(object):
-
-    def _create_initial_workflow_runner(
-            self, workflow_context, workflow, executor, inputs=None):
-
-        service = workflow_context.service
-        service.workflows['custom_workflow'] = tests_mock.models.create_operation(
-            'custom_workflow',
-            operation_kwargs={
-                'function': '{0}.{1}'.format(__name__, workflow.__name__),
-                'inputs': dict((k, models.Input.wrap(k, v)) for k, v in (inputs or {}).items())
-            }
-        )
-        workflow_context.model.service.update(service)
-
-        wf_runner = WorkflowRunner(
-            service_id=workflow_context.service.id,
-            inputs=inputs or {},
-            model_storage=workflow_context.model,
-            resource_storage=workflow_context.resource,
-            plugin_manager=None,
-            workflow_name='custom_workflow',
-            executor=executor)
-        return wf_runner
-
-    @staticmethod
-    def _wait_for_active_and_cancel(workflow_runner):
-        if custom_events['is_active'].wait(60) is False:
-            raise TimeoutError("is_active wasn't set to True")
-        workflow_runner.cancel()
-        if custom_events['execution_cancelled'].wait(60) is False:
-            raise TimeoutError("Execution did not end")
-
-    def test_resume_workflow(self, workflow_context, thread_executor):
-        node = workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
-        node.attributes['invocations'] = models.Attribute.wrap('invocations', 0)
-        self._create_interface(workflow_context, node, mock_pass_first_task_only)
-
-        wf_runner = self._create_initial_workflow_runner(
-            workflow_context, mock_parallel_tasks_workflow, thread_executor,
-            inputs={'number_of_tasks': 2})
-
-        wf_thread = Thread(target=wf_runner.execute)
-        wf_thread.daemon = True
-        wf_thread.start()
-
-        # Wait for the execution to start
-        self._wait_for_active_and_cancel(wf_runner)
-        node = workflow_context.model.node.refresh(node)
-
-        tasks = workflow_context.model.task.list(filters={'_stub_type': None})
-        assert any(task.status == task.SUCCESS for task in tasks)
-        assert any(task.status == task.RETRYING for task in tasks)
-        custom_events['is_resumed'].set()
-        assert any(task.status == task.RETRYING for task in tasks)
-
-        # Create a new workflow runner, with an existing execution id. This would cause
-        # the old execution to restart.
-        new_wf_runner = WorkflowRunner(
-            service_id=wf_runner.service.id,
-            inputs={},
-            model_storage=workflow_context.model,
-            resource_storage=workflow_context.resource,
-            plugin_manager=None,
-            execution_id=wf_runner.execution.id,
-            executor=thread_executor)
-
-        new_wf_runner.execute()
-
-        # Wait for it to finish and assert changes.
-        node = workflow_context.model.node.refresh(node)
-        assert all(task.status == task.SUCCESS for task in tasks)
-        assert node.attributes['invocations'].value == 3
-        assert wf_runner.execution.status == wf_runner.execution.SUCCEEDED
-
-    def test_resume_started_task(self, workflow_context, thread_executor):
-        node = workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
-        node.attributes['invocations'] = models.Attribute.wrap('invocations', 0)
-        self._create_interface(workflow_context, node, mock_stuck_task)
-
-        wf_runner = self._create_initial_workflow_runner(
-            workflow_context, mock_parallel_tasks_workflow, thread_executor,
-            inputs={'number_of_tasks': 1})
-
-        wf_thread = Thread(target=wf_runner.execute)
-        wf_thread.daemon = True
-        wf_thread.start()
-
-        self._wait_for_active_and_cancel(wf_runner)
-        node = workflow_context.model.node.refresh(node)
-        task = workflow_context.model.task.list(filters={'_stub_type': None})[0]
-        assert node.attributes['invocations'].value == 1
-        assert task.status == task.STARTED
-        assert wf_runner.execution.status in (wf_runner.execution.CANCELLED,
-                                              wf_runner.execution.CANCELLING)
-        custom_events['is_resumed'].set()
-
-        new_thread_executor = thread.ThreadExecutor()
-        try:
-            new_wf_runner = WorkflowRunner(
-                service_id=wf_runner.service.id,
-                inputs={},
-                model_storage=workflow_context.model,
-                resource_storage=workflow_context.resource,
-                plugin_manager=None,
-                execution_id=wf_runner.execution.id,
-                executor=new_thread_executor)
-
-            new_wf_runner.execute()
-        finally:
-            new_thread_executor.close()
-
-        # Wait for it to finish and assert changes.
-        node = workflow_context.model.node.refresh(node)
-        assert node.attributes['invocations'].value == 2
-        assert task.status == task.SUCCESS
-        assert wf_runner.execution.status == wf_runner.execution.SUCCEEDED
-
-    def test_resume_failed_task(self, workflow_context, thread_executor):
-        node = workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
-        node.attributes['invocations'] = models.Attribute.wrap('invocations', 0)
-        self._create_interface(workflow_context, node, mock_failed_before_resuming)
-
-        wf_runner = self._create_initial_workflow_runner(workflow_context,
-                                                         mock_parallel_tasks_workflow,
-                                                         thread_executor)
-        wf_thread = Thread(target=wf_runner.execute)
-        wf_thread.setDaemon(True)
-        wf_thread.start()
-
-        self._wait_for_active_and_cancel(wf_runner)
-        node = workflow_context.model.node.refresh(node)
-
-        task = workflow_context.model.task.list(filters={'_stub_type': None})[0]
-        assert node.attributes['invocations'].value == 2
-        assert task.status == task.STARTED
-        assert wf_runner.execution.status in (wf_runner.execution.CANCELLED,
-                                              wf_runner.execution.CANCELLING)
-
-        custom_events['is_resumed'].set()
-        assert node.attributes['invocations'].value == 2
-
-        # Create a new workflow runner, with an existing execution id. This would cause
-        # the old execution to restart.
-        new_thread_executor = thread.ThreadExecutor()
-        try:
-            new_wf_runner = WorkflowRunner(
-                service_id=wf_runner.service.id,
-                inputs={},
-                model_storage=workflow_context.model,
-                resource_storage=workflow_context.resource,
-                plugin_manager=None,
-                execution_id=wf_runner.execution.id,
-                executor=new_thread_executor)
-
-            new_wf_runner.execute()
-        finally:
-            new_thread_executor.close()
-
-        # Wait for it to finish and assert changes.
-        node = workflow_context.model.node.refresh(node)
-        assert node.attributes['invocations'].value == task.max_attempts - 1
-        assert task.status == task.SUCCESS
-        assert wf_runner.execution.status == wf_runner.execution.SUCCEEDED
-
-    def test_resume_failed_task_and_successful_task(self, workflow_context, thread_executor):
-        node = workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
-        node.attributes['invocations'] = models.Attribute.wrap('invocations', 0)
-        self._create_interface(workflow_context, node, mock_pass_first_task_only)
-
-        wf_runner = self._create_initial_workflow_runner(
-            workflow_context,
-            mock_parallel_tasks_workflow,
-            thread_executor,
-            inputs={'retry_interval': 1, 'max_attempts': 2, 'number_of_tasks': 2}
-        )
-        wf_thread = Thread(target=wf_runner.execute)
-        wf_thread.setDaemon(True)
-        wf_thread.start()
-
-        if custom_events['execution_failed'].wait(60) is False:
-            raise TimeoutError("Execution did not end")
-
-        tasks = workflow_context.model.task.list(filters={'_stub_type': None})
-        node = workflow_context.model.node.refresh(node)
-        assert node.attributes['invocations'].value == 3
-        failed_task = [t for t in tasks if t.status == t.FAILED][0]
-
-        # First task passes
-        assert any(task.status == task.FAILED for task in tasks)
-        assert failed_task.attempts_count == 2
-        # Second task fails
-        assert any(task.status == task.SUCCESS for task in tasks)
-        assert wf_runner.execution.status in wf_runner.execution.FAILED
-
-        custom_events['is_resumed'].set()
-        new_thread_executor = thread.ThreadExecutor()
-        try:
-            new_wf_runner = WorkflowRunner(
-                service_id=wf_runner.service.id,
-                retry_failed_tasks=True,
-                inputs={},
-                model_storage=workflow_context.model,
-                resource_storage=workflow_context.resource,
-                plugin_manager=None,
-                execution_id=wf_runner.execution.id,
-                executor=new_thread_executor)
-
-            new_wf_runner.execute()
-        finally:
-            new_thread_executor.close()
-
-        # Wait for it to finish and assert changes.
-        node = workflow_context.model.node.refresh(node)
-        assert failed_task.attempts_count == 1
-        assert node.attributes['invocations'].value == 4
-        assert all(task.status == task.SUCCESS for task in tasks)
-        assert wf_runner.execution.status == wf_runner.execution.SUCCEEDED
-
-    def test_two_sequential_task_first_task_failed(self, workflow_context, thread_executor):
-        node = workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
-        node.attributes['invocations'] = models.Attribute.wrap('invocations', 0)
-        self._create_interface(workflow_context, node, mock_fail_first_task_only)
-
-        wf_runner = self._create_initial_workflow_runner(
-            workflow_context,
-            mock_sequential_tasks_workflow,
-            thread_executor,
-            inputs={'retry_interval': 1, 'max_attempts': 1, 'number_of_tasks': 2}
-        )
-        wf_thread = Thread(target=wf_runner.execute)
-        wf_thread.setDaemon(True)
-        wf_thread.start()
-
-        if custom_events['execution_failed'].wait(60) is False:
-            raise TimeoutError("Execution did not end")
-
-        tasks = workflow_context.model.task.list(filters={'_stub_type': None})
-        node = workflow_context.model.node.refresh(node)
-        assert node.attributes['invocations'].value == 1
-        assert any(t.status == t.FAILED for t in tasks)
-        assert any(t.status == t.PENDING for t in tasks)
-
-        custom_events['is_resumed'].set()
-        new_thread_executor = thread.ThreadExecutor()
-        try:
-            new_wf_runner = WorkflowRunner(
-                service_id=wf_runner.service.id,
-                inputs={},
-                model_storage=workflow_context.model,
-                resource_storage=workflow_context.resource,
-                plugin_manager=None,
-                execution_id=wf_runner.execution.id,
-                executor=new_thread_executor)
-
-            new_wf_runner.execute()
-        finally:
-            new_thread_executor.close()
-
-        # Wait for it to finish and assert changes.
-        node = workflow_context.model.node.refresh(node)
-        assert node.attributes['invocations'].value == 2
-        assert any(t.status == t.SUCCESS for t in tasks)
-        assert any(t.status == t.FAILED for t in tasks)
-        assert wf_runner.execution.status == wf_runner.execution.SUCCEEDED
-
-
-
-    @staticmethod
-    @pytest.fixture
-    def thread_executor():
-        result = thread.ThreadExecutor()
-        try:
-            yield result
-        finally:
-            result.close()
-
-    @staticmethod
-    @pytest.fixture
-    def workflow_context(tmpdir):
-        workflow_context = tests_mock.context.simple(str(tmpdir))
-        yield workflow_context
-        storage.release_sqlite_storage(workflow_context.model)
-
-    @staticmethod
-    def _create_interface(ctx, node, func, arguments=None):
-        interface_name = 'aria.interfaces.lifecycle'
-        operation_kwargs = dict(function='{name}.{func.__name__}'.format(
-            name=__name__, func=func))
-        if arguments:
-            # the operation has to declare the arguments before those may be passed
-            operation_kwargs['arguments'] = arguments
-        operation_name = 'create'
-        interface = tests_mock.models.create_interface(node.service, interface_name, operation_name,
-                                                       operation_kwargs=operation_kwargs)
-        node.interfaces[interface.name] = interface
-        ctx.model.node.update(node)
-
-        return node, interface_name, operation_name
-
-    @staticmethod
-    def _engine(workflow_func, workflow_context, executor):
-        graph = workflow_func(ctx=workflow_context)
-        execution = workflow_context.execution
-        graph_compiler.GraphCompiler(workflow_context, executor.__class__).compile(graph)
-        workflow_context.execution = execution
-
-        return engine.Engine(executors={executor.__class__: executor})
-
-    @pytest.fixture(autouse=True)
-    def register_to_events(self):
-        def execution_cancelled(*args, **kwargs):
-            custom_events['execution_cancelled'].set()
-
-        def execution_failed(*args, **kwargs):
-            custom_events['execution_failed'].set()
-
-        events.on_cancelled_workflow_signal.connect(execution_cancelled)
-        events.on_failure_workflow_signal.connect(execution_failed)
-        yield
-        events.on_cancelled_workflow_signal.disconnect(execution_cancelled)
-        events.on_failure_workflow_signal.disconnect(execution_failed)
-        for event in custom_events.values():
-            event.clear()
-
-
-@workflow
-def mock_sequential_tasks_workflow(ctx, graph,
-                                   retry_interval=1, max_attempts=10, number_of_tasks=1):
-    node = ctx.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
-    graph.sequence(*_create_tasks(node, retry_interval, max_attempts, number_of_tasks))
-
-
-@workflow
-def mock_parallel_tasks_workflow(ctx, graph,
-                                 retry_interval=1, max_attempts=10, number_of_tasks=1):
-    node = ctx.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
-    graph.add_tasks(*_create_tasks(node, retry_interval, max_attempts, number_of_tasks))
-
-
-def _create_tasks(node, retry_interval, max_attempts, number_of_tasks):
-    return [
-        api.task.OperationTask(node,
-                               'aria.interfaces.lifecycle',
-                               'create',
-                               retry_interval=retry_interval,
-                               max_attempts=max_attempts)
-        for _ in xrange(number_of_tasks)
-    ]
-
-
-
-@operation
-def mock_failed_before_resuming(ctx):
-    """
-    The task should run atmost ctx.task.max_attempts - 1 times, and only then pass.
-    overall, the number of invocations should be ctx.task.max_attempts - 1
-    """
-    ctx.node.attributes['invocations'] += 1
-
-    if ctx.node.attributes['invocations'] == 2:
-        custom_events['is_active'].set()
-        # unfreeze the thread only when all of the invocations are done
-        while ctx.node.attributes['invocations'] < ctx.task.max_attempts - 1:
-            time.sleep(5)
-
-    elif ctx.node.attributes['invocations'] == ctx.task.max_attempts - 1:
-        # pass only just before the end.
-        return
-    else:
-        # fail o.w.
-        raise FailingTask("stop this task")
-
-
-@operation
-def mock_stuck_task(ctx):
-    ctx.node.attributes['invocations'] += 1
-    while not custom_events['is_resumed'].isSet():
-        if not custom_events['is_active'].isSet():
-            custom_events['is_active'].set()
-        time.sleep(5)
-
-
-@operation
-def mock_pass_first_task_only(ctx):
-    ctx.node.attributes['invocations'] += 1
-
-    if ctx.node.attributes['invocations'] != 1:
-        custom_events['is_active'].set()
-        if not custom_events['is_resumed'].isSet():
-            # if resume was called, increase by one. o/w fail the execution - second task should
-            # fail as long it was not a part of resuming the workflow
-            raise FailingTask("wasn't resumed yet")
-
-
-@operation
-def mock_fail_first_task_only(ctx):
-    ctx.node.attributes['invocations'] += 1
-
-    if not custom_events['is_resumed'].isSet() and ctx.node.attributes['invocations'] == 1:
-        raise FailingTask("First task should fail")

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/da233c73/tests/orchestrator/workflows/core/test_engine.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/core/test_engine.py b/tests/orchestrator/workflows/core/test_engine.py
index 0c704f5..b63416c 100644
--- a/tests/orchestrator/workflows/core/test_engine.py
+++ b/tests/orchestrator/workflows/core/test_engine.py
@@ -50,7 +50,7 @@ class BaseTest(object):
     @staticmethod
     def _engine(workflow_func, workflow_context, executor):
         graph = workflow_func(ctx=workflow_context)
-        graph_compiler.GraphCompiler(workflow_context, executor.__class__).compile(graph)
+        graph_compiler.GraphCompiler(workflow_context.execution, executor.__class__).compile(graph)
 
         return engine.Engine(executors={executor.__class__: executor})
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/da233c73/tests/orchestrator/workflows/core/test_events.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/core/test_events.py b/tests/orchestrator/workflows/core/test_events.py
index d804de5..4c1e189 100644
--- a/tests/orchestrator/workflows/core/test_events.py
+++ b/tests/orchestrator/workflows/core/test_events.py
@@ -128,7 +128,7 @@ def run_operation_on_node(ctx, op_name, interface_name, executor):
         operation_name=op_name,
         operation_kwargs=dict(function='{name}.{func.__name__}'.format(name=__name__, func=func)))
     node.interfaces[interface.name] = interface
-    graph_compiler.GraphCompiler(ctx, ThreadExecutor).compile(
+    graph_compiler.GraphCompiler(ctx.execution, ThreadExecutor).compile(
         single_operation_workflow(ctx, node=node, interface_name=interface_name, op_name=op_name)
     )
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/da233c73/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py b/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py
index 9f072f6..3770c13 100644
--- a/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py
+++ b/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py
@@ -80,7 +80,7 @@ def test_task_graph_into_execution_graph(tmpdir):
     test_task_graph.add_dependency(inner_task_graph, simple_before_task)
     test_task_graph.add_dependency(simple_after_task, inner_task_graph)
 
-    compiler = graph_compiler.GraphCompiler(workflow_context, base.StubTaskExecutor)
+    compiler = graph_compiler.GraphCompiler(workflow_context.execution, base.StubTaskExecutor)
     compiler.compile(test_task_graph)
 
     execution_tasks = tuple(topological_sort(_graph(workflow_context.execution.tasks)))

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/da233c73/tests/orchestrator/workflows/executor/test_process_executor_extension.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_process_executor_extension.py b/tests/orchestrator/workflows/executor/test_process_executor_extension.py
index b26fa43..0e10073 100644
--- a/tests/orchestrator/workflows/executor/test_process_executor_extension.py
+++ b/tests/orchestrator/workflows/executor/test_process_executor_extension.py
@@ -57,7 +57,7 @@ def test_decorate_extension(context, executor):
         graph.add_tasks(task)
         return graph
     graph = mock_workflow(ctx=context)  # pylint: disable=no-value-for-parameter
-    graph_compiler.GraphCompiler(context, executor.__class__).compile(graph)
+    graph_compiler.GraphCompiler(context.execution, executor.__class__).compile(graph)
     eng = engine.Engine({executor.__class__: executor})
     eng.execute(context)
     out = get_node(context).attributes.get('out').value

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/da233c73/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py b/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
index 47ee2f7..5458358 100644
--- a/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
+++ b/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
@@ -107,7 +107,7 @@ def _run_workflow(context, executor, op_func, arguments=None):
         graph.add_tasks(task)
         return graph
     graph = mock_workflow(ctx=context)  # pylint: disable=no-value-for-parameter
-    graph_compiler.GraphCompiler(context, executor.__class__).compile(graph)
+    graph_compiler.GraphCompiler(context.execution, executor.__class__).compile(graph)
     eng = engine.Engine({executor.__class__: executor})
     eng.execute(context)
     out = context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME).attributes.get('out')


[2/2] incubator-ariatosca git commit: wip 2

Posted by mx...@apache.org.
wip 2


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/da233c73
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/da233c73
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/da233c73

Branch: refs/heads/extract_execution_creation_from_workflow_runner
Commit: da233c730aee305806aaa83177b6b0dbd852c264
Parents: 9611f61
Author: max-orlov <ma...@gigaspaces.com>
Authored: Sun Nov 19 15:56:32 2017 +0200
Committer: max-orlov <ma...@gigaspaces.com>
Committed: Mon Nov 20 10:14:05 2017 +0200

----------------------------------------------------------------------
 aria/cli/commands/executions.py                 |  44 +-
 aria/orchestrator/execution/__init__.py         |  17 +
 aria/orchestrator/execution/compiler.py         | 149 ++++
 aria/orchestrator/execution/runner.py           |  50 ++
 aria/orchestrator/workflow_runner.py            | 210 ------
 .../workflows/core/graph_compiler.py            |  12 +-
 test_ssh.py                                     | 528 --------------
 tests/orchestrator/context/__init__.py          |   2 +-
 tests/orchestrator/context/test_serialize.py    |   2 +-
 tests/orchestrator/execution/__init__.py        |  14 +
 .../execution/test_execution_compiler.py        | 698 ++++++++++++++++++
 .../orchestrator/execution_plugin/test_local.py |   3 +-
 tests/orchestrator/execution_plugin/test_ssh.py |   2 +-
 tests/orchestrator/test_workflow_runner.py      | 726 -------------------
 .../orchestrator/workflows/core/test_engine.py  |   2 +-
 .../orchestrator/workflows/core/test_events.py  |   2 +-
 .../test_task_graph_into_execution_graph.py     |   2 +-
 .../executor/test_process_executor_extension.py |   2 +-
 .../test_process_executor_tracked_changes.py    |   2 +-
 19 files changed, 962 insertions(+), 1505 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/da233c73/aria/cli/commands/executions.py
----------------------------------------------------------------------
diff --git a/aria/cli/commands/executions.py b/aria/cli/commands/executions.py
index 2ab3a33..162abfa 100644
--- a/aria/cli/commands/executions.py
+++ b/aria/cli/commands/executions.py
@@ -26,7 +26,7 @@ from .. import logger as cli_logger
 from .. import execution_logging
 from ..core import aria
 from ...modeling.models import Execution
-from ...orchestrator import workflow_runner
+from ...orchestrator import execution
 from ...orchestrator.workflows.executor.dry import DryExecutor
 from ...utils import formatting
 from ...utils import threading
@@ -143,20 +143,19 @@ def start(workflow_name,
     service = model_storage.service.get_by_name(service_name)
     executor = DryExecutor() if dry else None  # use WorkflowRunner's default executor
 
-    execution = workflow_runner.create_execution_model(
-        service, workflow_name, inputs
-    )
-    model_storage.execution.put(execution)
-
-    wf_runner = \
-        workflow_runner.WorkflowRunner(
-            model_storage, resource_storage, plugin_manager,
-            execution=execution, executor=executor,
-            task_max_attempts=task_max_attempts, task_retry_interval=task_retry_interval
-        )
+    new_execution = execution.ExecutionCompiler(
+        model_storage, 
+        resource_storage, 
+        plugin_manager, 
+        service, 
+        workflow_name
+    ).compile(inputs, executor=executor)
+    model_storage.execution.put(new_execution)
+
+    execution_runner = execution.ExecutionRunner(executor=executor)
     logger.info('Starting {0}execution. Press Ctrl+C cancel'.format('dry ' if dry else ''))
 
-    _run_execution(wf_runner, logger, model_storage, dry, mark_pattern)
+    _run_execution(execution_runner, logger, model_storage, dry, mark_pattern)
 
 
 @executions.command(name='resume',
@@ -185,23 +184,18 @@ def resume(execution_id,
     """
     executor = DryExecutor() if dry else None  # use WorkflowRunner's default executor
 
-    execution = model_storage.execution.get(execution_id)
-    if execution.status != execution.CANCELLED:
+    execution_to_resume = model_storage.execution.get(execution_id)
+    if execution_to_resume.status != execution_to_resume.CANCELLED:
         logger.info("Can't resume execution {execution.id} - "
                     "execution is in status {execution.status}. "
-                    "Can only resume executions in status {valid_status}"
-                    .format(execution=execution, valid_status=execution.CANCELLED))
+                    "Can only resume executions in status {execution_to_resume.CANCELLED}"
+                    .format(execution=execution_to_resume))
         return
-
-    wf_runner = \
-        workflow_runner.WorkflowRunner(
-            model_storage, resource_storage, plugin_manager,
-            execution=execution, executor=executor, resume=True,
-            retry_failed_tasks=retry_failed_tasks,
-        )
+    
+    execution_runner = execution.ExecutionRunner(executor, True, retry_failed_tasks)
 
     logger.info('Resuming {0}execution. Press Ctrl+C cancel'.format('dry ' if dry else ''))
-    _run_execution(wf_runner, logger, model_storage, dry, mark_pattern)
+    _run_execution(execution_runner, logger, model_storage, dry, mark_pattern)
 
 
 def _run_execution(workflow_runner, logger, model_storage, dry, mark_pattern):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/da233c73/aria/orchestrator/execution/__init__.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/execution/__init__.py b/aria/orchestrator/execution/__init__.py
new file mode 100644
index 0000000..ef17fde
--- /dev/null
+++ b/aria/orchestrator/execution/__init__.py
@@ -0,0 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from .compiler import ExecutionCompiler
+from .runner import ExecutionRunner
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/da233c73/aria/orchestrator/execution/compiler.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/execution/compiler.py b/aria/orchestrator/execution/compiler.py
new file mode 100644
index 0000000..5db52d4
--- /dev/null
+++ b/aria/orchestrator/execution/compiler.py
@@ -0,0 +1,149 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+import sys
+from datetime import datetime
+
+from .. import exceptions
+from ..context.workflow import WorkflowContext
+from ..workflows import builtin
+from ..workflows.core import graph_compiler
+from ..workflows.executor.process import ProcessExecutor
+from ...modeling import models
+from ...modeling import utils as modeling_utils
+from ...utils.imports import import_fullname
+
+
+DEFAULT_TASK_MAX_ATTEMPTS = 30
+DEFAULT_TASK_RETRY_INTERVAL = 30
+
+
+class ExecutionCompiler(object):
+    def __init__(self, model, resource, plugin, service, workflow_name):
+        self._model = model
+        self._resource = resource
+        self._plugin = plugin
+        self._service = service
+        self._workflow_name = workflow_name
+        self._workflow_context = None
+
+    @property
+    def workflow_ctx(self):
+        return self._workflow_context
+
+    def compile(
+            self,
+            execution_inputs=None,
+            executor=None,
+            task_max_attempts=None,
+            task_retry_interval=None):
+
+        execution = self._create_execution_model(execution_inputs)
+        self._model.execution.put(execution)
+        self._set_ctx(execution, task_max_attempts, task_retry_interval)
+        self._create_tasks(execution, executor=executor)
+        self._model.execution.update(execution)
+        return execution
+
+    def _set_ctx(self, execution, task_max_attempts=None, task_retry_interval=None):
+        self._workflow_context = WorkflowContext(
+                name=self.__class__.__name__,
+                model_storage=self._model,
+                resource_storage=self._resource,
+                service_id=execution.service.id,
+                execution_id=execution.id,
+                workflow_name=execution.workflow_name,
+                task_max_attempts=task_max_attempts or DEFAULT_TASK_MAX_ATTEMPTS,
+                task_retry_interval=task_retry_interval or DEFAULT_TASK_RETRY_INTERVAL
+            )
+
+    def _create_tasks(self, execution, executor=None):
+
+        # Set default executor and kwargs
+        executor = executor or ProcessExecutor(plugin_manager=self._plugin)
+
+        # transforming the execution inputs to dict, to pass them to the workflow function
+        execution_inputs_dict = dict(inp.unwrapped for inp in execution.inputs.itervalues())
+
+        if len(execution.tasks) == 0:
+            workflow_fn = self._get_workflow_fn(execution.workflow_name)
+            self._tasks_graph = workflow_fn(ctx=self.workflow_ctx, **execution_inputs_dict)
+            compiler = graph_compiler.GraphCompiler(self.workflow_ctx.execution, executor.__class__)
+            compiler.compile(self._tasks_graph)
+
+    def _create_execution_model(self, inputs=None):
+        self._validate_workflow_exists_for_service()
+        self._validate_no_active_executions()
+
+        execution = models.Execution(
+            created_at=datetime.utcnow(),
+            service_fk=self._service.id,
+            workflow_name=self._workflow_name,
+            inputs={})
+
+        if self._workflow_name in builtin.BUILTIN_WORKFLOWS:
+            workflow_inputs = dict()  # built-in workflows don't have any inputs
+        else:
+            workflow_inputs = self._service.workflows[self._workflow_name].inputs
+
+        modeling_utils.validate_no_undeclared_inputs(declared_inputs=workflow_inputs,
+                                                     supplied_inputs=inputs or {})
+        modeling_utils.validate_required_inputs_are_supplied(declared_inputs=workflow_inputs,
+                                                             supplied_inputs=inputs or {})
+        execution.inputs = modeling_utils.merge_parameter_values(
+            inputs, workflow_inputs, model_cls=models.Input)
+
+        return execution
+
+    def _validate_no_active_executions(self):
+        active_executions = [e for e in self._service.executions if
+                             e.is_active()]
+        if active_executions:
+            raise exceptions.ActiveExecutionsError(
+                "Can't start execution; Service {0} has an active execution with ID {1}"
+                    .format(self._service.name, active_executions[0].id))
+
+    def _validate_workflow_exists_for_service(self):
+        if self._workflow_name not in self._service.workflows and \
+                        self._workflow_name not in builtin.BUILTIN_WORKFLOWS:
+            raise exceptions.UndeclaredWorkflowError(
+                'No workflow policy {0} declared in service {1}'
+                    .format(self._workflow_name, self._service.name))
+
+    def _get_workflow_fn(self, workflow_name):
+        if workflow_name in builtin.BUILTIN_WORKFLOWS:
+            return import_fullname('{0}.{1}'.format(builtin.BUILTIN_WORKFLOWS_PATH_PREFIX,
+                                                    workflow_name))
+
+        workflow = self._service.workflows[workflow_name]
+
+        # TODO: Custom workflow support needs improvement, currently this code uses internal
+        # knowledge of the resource storage; Instead, workflows should probably be loaded
+        # in a similar manner to operation plugins. Also consider passing to import_fullname
+        # as paths instead of appending to sys path.
+        service_template_resources_path = os.path.join(
+            self._resource.service_template.base_path,
+            str(self._service.service_template.id))
+        sys.path.append(service_template_resources_path)
+
+        try:
+            workflow_fn = import_fullname(workflow.function)
+        except ImportError:
+            raise exceptions.WorkflowImplementationNotFoundError(
+                'Could not find workflow {0} function at {1}'.format(
+                    workflow_name, workflow.function))
+
+        return workflow_fn

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/da233c73/aria/orchestrator/execution/runner.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/execution/runner.py b/aria/orchestrator/execution/runner.py
new file mode 100644
index 0000000..a532901
--- /dev/null
+++ b/aria/orchestrator/execution/runner.py
@@ -0,0 +1,50 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Running workflows.
+"""
+
+from ..workflows.core import engine
+
+
+class ExecutionRunner(object):
+
+    def __init__(self, executor, resume=False, retry_failed_tasks=False):
+        """
+        Manages a single workflow execution on a given service.
+
+        :param workflow_name: workflow name
+        :param service_id: service ID
+        :param inputs: key-value dict of inputs for the execution
+        :param model_storage: model storage API ("MAPI")
+        :param resource_storage: resource storage API ("RAPI")
+        :param plugin_manager: plugin manager
+        :param executor: executor for tasks; defaults to a
+         :class:`~aria.orchestrator.workflows.executor.process.ProcessExecutor` instance
+        :param task_max_attempts: maximum attempts of repeating each failing task
+        :param task_retry_interval: retry interval between retry attempts of a failing task
+        """
+
+        self._is_resume = resume
+        self._retry_failed_tasks = retry_failed_tasks
+        self._engine = engine.Engine(executors={executor.__class__: executor})
+
+    def execute(self, ctx):
+        self._engine.execute(
+            ctx=ctx, resuming=self._is_resume, retry_failed=self._retry_failed_tasks)
+
+    def cancel(self, ctx):
+        self._engine.cancel_execution(ctx)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/da233c73/aria/orchestrator/workflow_runner.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflow_runner.py b/aria/orchestrator/workflow_runner.py
deleted file mode 100644
index aebde8e..0000000
--- a/aria/orchestrator/workflow_runner.py
+++ /dev/null
@@ -1,210 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""
-Running workflows.
-"""
-
-import os
-import sys
-from datetime import datetime
-
-from . import exceptions
-from .context.workflow import WorkflowContext
-from .workflows import builtin
-from .workflows.core import engine, graph_compiler
-from .workflows.executor.process import ProcessExecutor
-from ..modeling import models
-from ..modeling import utils as modeling_utils
-from ..utils.imports import import_fullname
-
-
-DEFAULT_TASK_MAX_ATTEMPTS = 30
-DEFAULT_TASK_RETRY_INTERVAL = 30
-
-
-class WorkflowRunner(object):
-
-    def __init__(
-            self,
-            model_storage,
-            resource_storage,
-            plugin_manager,
-            execution,
-            executor=None,
-            resume=False,
-            retry_failed_tasks=False,
-            task_max_attempts=DEFAULT_TASK_MAX_ATTEMPTS,
-            task_retry_interval=DEFAULT_TASK_RETRY_INTERVAL
-    ):
-        """
-        Manages a single workflow execution on a given service.
-
-        :param workflow_name: workflow name
-        :param service_id: service ID
-        :param inputs: key-value dict of inputs for the execution
-        :param model_storage: model storage API ("MAPI")
-        :param resource_storage: resource storage API ("RAPI")
-        :param plugin_manager: plugin manager
-        :param executor: executor for tasks; defaults to a
-         :class:`~aria.orchestrator.workflows.executor.process.ProcessExecutor` instance
-        :param task_max_attempts: maximum attempts of repeating each failing task
-        :param task_retry_interval: retry interval between retry attempts of a failing task
-        """
-
-        self._is_resume = resume
-        self._retry_failed_tasks = retry_failed_tasks
-
-        self._model_storage = model_storage
-        self._resource_storage = resource_storage
-        self._execution = execution
-
-        # the IDs are stored rather than the models themselves, so this module could be used
-        # by several threads without raising errors on model objects shared between threads
-
-        self._workflow_context = WorkflowContext(
-            name=self.__class__.__name__,
-            model_storage=self._model_storage,
-            resource_storage=resource_storage,
-            service_id=self.execution.service.id,
-            execution_id=self.execution.id,
-            workflow_name=self.execution.workflow_name,
-            task_max_attempts=task_max_attempts,
-            task_retry_interval=task_retry_interval)
-
-        # Set default executor and kwargs
-        executor = executor or ProcessExecutor(plugin_manager=plugin_manager)
-
-        # transforming the execution inputs to dict, to pass them to the workflow function
-        execution_inputs_dict = dict(inp.unwrapped for inp in self.execution.inputs.itervalues())
-
-        if not self._is_resume:
-            workflow_fn = self._get_workflow_fn()
-            self._tasks_graph = workflow_fn(ctx=self._workflow_context, **execution_inputs_dict)
-            compiler = graph_compiler.GraphCompiler(self._workflow_context, executor.__class__)
-            compiler.compile(self._tasks_graph)
-
-        self._engine = engine.Engine(executors={executor.__class__: executor})
-
-    @property
-    def execution_id(self):
-        return self.execution.id
-
-    @property
-    def execution(self):
-        return self._model_storage.execution.get(self.execution_id)
-
-    @property
-    def service(self):
-        return self._model_storage.service.get(self._execution.service.id)
-
-    def execute(self):
-        self._engine.execute(ctx=self._workflow_context,
-                             resuming=self._is_resume,
-                             retry_failed=self._retry_failed_tasks)
-
-    def cancel(self):
-        self._engine.cancel_execution(ctx=self._workflow_context)
-
-    def _get_workflow_fn(self):
-        if self._workflow_name in builtin.BUILTIN_WORKFLOWS:
-            return import_fullname('{0}.{1}'.format(builtin.BUILTIN_WORKFLOWS_PATH_PREFIX,
-                                                    self.execution.workflow_name))
-
-        workflow = self.service.workflows[self.execution.workflow_name]
-
-        # TODO: Custom workflow support needs improvement, currently this code uses internal
-        # knowledge of the resource storage; Instead, workflows should probably be loaded
-        # in a similar manner to operation plugins. Also consider passing to import_fullname
-        # as paths instead of appending to sys path.
-        service_template_resources_path = os.path.join(
-            self._resource_storage.service_template.base_path,
-            str(self.service.service_template.id))
-        sys.path.append(service_template_resources_path)
-
-        try:
-            workflow_fn = import_fullname(workflow.function)
-        except ImportError:
-            raise exceptions.WorkflowImplementationNotFoundError(
-                'Could not find workflow {0} function at {1}'.format(
-                    self._workflow_name, workflow.function))
-
-        return workflow_fn
-
-
-def create_execution_model(service, workflow_name, inputs):
-    _validate_workflow_exists_for_service(service, workflow_name)
-    _validate_no_active_executions(service)
-    execution = models.Execution(
-        created_at=datetime.utcnow(),
-        service_fk=service.id,
-        workflow_name=workflow_name,
-        inputs={})
-
-    if workflow_name in builtin.BUILTIN_WORKFLOWS:
-        workflow_inputs = dict()  # built-in workflows don't have any inputs
-    else:
-        workflow_inputs = service.workflows[workflow_name].inputs
-
-    modeling_utils.validate_no_undeclared_inputs(declared_inputs=workflow_inputs,
-                                                 supplied_inputs=inputs or {})
-    modeling_utils.validate_required_inputs_are_supplied(declared_inputs=workflow_inputs,
-                                                         supplied_inputs=inputs or {})
-    execution.inputs = modeling_utils.merge_parameter_values(
-        inputs, workflow_inputs, model_cls=models.Input)
-
-    return execution
-
-
-
-def _create_execution_model(self, inputs):
-    execution = models.Execution(
-        created_at=datetime.utcnow(),
-        service=self.service,
-        workflow_name=self._workflow_name,
-        inputs={})
-
-    if self._workflow_name in builtin.BUILTIN_WORKFLOWS:
-        workflow_inputs = dict()  # built-in workflows don't have any inputs
-    else:
-        workflow_inputs = self.service.workflows[self._workflow_name].inputs
-
-    modeling_utils.validate_no_undeclared_inputs(declared_inputs=workflow_inputs,
-                                                 supplied_inputs=inputs or {})
-    modeling_utils.validate_required_inputs_are_supplied(declared_inputs=workflow_inputs,
-                                                         supplied_inputs=inputs or {})
-    execution.inputs = modeling_utils.merge_parameter_values(
-        inputs, workflow_inputs, model_cls=models.Input)
-    # TODO: these two following calls should execute atomically
-    self._validate_no_active_executions(execution)
-    self._model_storage.execution.put(execution)
-    return execution
-
-
-
-def _validate_no_active_executions(service):
-    active_executions = [e for e in service.executions if e.is_active()]
-    if active_executions:
-        raise exceptions.ActiveExecutionsError(
-            "Can't start execution; Service {0} has an active execution with ID {1}"
-            .format(service.name, active_executions[0].id))
-
-
-def _validate_workflow_exists_for_service(service, workflow_name):
-    if workflow_name not in service.workflows and \
-                    workflow_name not in builtin.BUILTIN_WORKFLOWS:
-            raise exceptions.UndeclaredWorkflowError(
-                'No workflow policy {0} declared in service {1}'
-                .format(workflow_name, service.name))
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/da233c73/aria/orchestrator/workflows/core/graph_compiler.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/graph_compiler.py b/aria/orchestrator/workflows/core/graph_compiler.py
index 81543d5..aeb05bb 100644
--- a/aria/orchestrator/workflows/core/graph_compiler.py
+++ b/aria/orchestrator/workflows/core/graph_compiler.py
@@ -19,8 +19,8 @@ from .. import executor, api
 
 
 class GraphCompiler(object):
-    def __init__(self, ctx, default_executor):
-        self._ctx = ctx
+    def __init__(self, execution, default_executor):
+        self._execution = execution
         self._default_executor = default_executor
         self._stub_executor = executor.base.StubTaskExecutor
         self._model_to_api_id = {}
@@ -65,7 +65,7 @@ class GraphCompiler(object):
         # Insert end marker
         self._create_stub_task(
             end_stub_type,
-            self._get_non_dependent_tasks(self._ctx.execution) or [start_task],
+            self._get_non_dependent_tasks(self._execution) or [start_task],
             self._end_graph_suffix(task_graph.id),
             task_graph.name
         )
@@ -74,17 +74,15 @@ class GraphCompiler(object):
         model_task = models.Task(
             name=name,
             dependencies=dependencies,
-            execution=self._ctx.execution,
+            execution=self._execution,
             _executor=self._stub_executor,
             _stub_type=stub_type)
-        self._ctx.model.task.put(model_task)
         self._model_to_api_id[model_task.id] = api_id
         return model_task
 
     def _create_operation_task(self, api_task, dependencies):
         model_task = models.Task.from_api_task(
             api_task, self._default_executor, dependencies=dependencies)
-        self._ctx.model.task.put(model_task)
         self._model_to_api_id[model_task.id] = api_task.id
         return model_task
 
@@ -113,6 +111,6 @@ class GraphCompiler(object):
                 dependency_name = dependency.id
             else:
                 dependency_name = self._end_graph_suffix(dependency.id)
-            tasks.extend(task for task in self._ctx.execution.tasks
+            tasks.extend(task for task in self._execution.tasks
                          if self._model_to_api_id.get(task.id, None) == dependency_name)
         return tasks

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/da233c73/test_ssh.py
----------------------------------------------------------------------
diff --git a/test_ssh.py b/test_ssh.py
deleted file mode 100644
index 5256cf8..0000000
--- a/test_ssh.py
+++ /dev/null
@@ -1,528 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import contextlib
-import json
-import logging
-import os
-
-import pytest
-
-import fabric.api
-from fabric.contrib import files
-from fabric import context_managers
-
-from aria.modeling import models
-from aria.orchestrator import events
-from aria.orchestrator import workflow
-from aria.orchestrator.workflows import api
-from aria.orchestrator.workflows.executor import process
-from aria.orchestrator.workflows.core import engine, graph_compiler
-from aria.orchestrator.workflows.exceptions import ExecutorException
-from aria.orchestrator.exceptions import TaskAbortException, TaskRetryException
-from aria.orchestrator.execution_plugin import operations
-from aria.orchestrator.execution_plugin import constants
-from aria.orchestrator.execution_plugin.exceptions import ProcessException, TaskException
-from aria.orchestrator.execution_plugin.ssh import operations as ssh_operations
-
-from tests import mock, storage, resources
-from tests.orchestrator.workflows.helpers import events_collector
-
-_CUSTOM_BASE_DIR = '/tmp/new-aria-ctx'
-
-import tests
-KEY_FILENAME = os.path.join(tests.ROOT_DIR, 'tests/resources/keys/test')
-
-_FABRIC_ENV = {
-    'disable_known_hosts': True,
-    'user': 'test',
-    'key_filename': KEY_FILENAME
-}
-
-
-import mockssh
-@pytest.fixture(scope='session')
-def server():
-    with mockssh.Server({'test': KEY_FILENAME}) as s:
-        yield s
-
-
-#@pytest.mark.skipif(not os.environ.get('TRAVIS'), reason='actual ssh server required')
-class TestWithActualSSHServer(object):
-
-    def test_run_script_basic(self):
-        expected_attribute_value = 'some_value'
-        props = self._execute(env={'test_value': expected_attribute_value})
-        assert props['test_value'].value == expected_attribute_value
-
-    @pytest.mark.skip(reason='sudo privileges are required')
-    def test_run_script_as_sudo(self):
-        self._execute(use_sudo=True)
-        with self._ssh_env():
-            assert files.exists('/opt/test_dir')
-            fabric.api.sudo('rm -rf /opt/test_dir')
-
-    def test_run_script_default_base_dir(self):
-        props = self._execute()
-        assert props['work_dir'].value == '{0}/work'.format(constants.DEFAULT_BASE_DIR)
-
-    @pytest.mark.skip(reason='Re-enable once output from process executor can be captured')
-    @pytest.mark.parametrize('hide_groups', [[], ['everything']])
-    def test_run_script_with_hide(self, hide_groups):
-        self._execute(hide_output=hide_groups)
-        output = 'TODO'
-        expected_log_message = ('[localhost] run: source {0}/scripts/'
-                                .format(constants.DEFAULT_BASE_DIR))
-        if hide_groups:
-            assert expected_log_message not in output
-        else:
-            assert expected_log_message in output
-
-    def test_run_script_process_config(self):
-        expected_env_value = 'test_value_env'
-        expected_arg1_value = 'test_value_arg1'
-        expected_arg2_value = 'test_value_arg2'
-        expected_cwd = '/tmp'
-        expected_base_dir = _CUSTOM_BASE_DIR
-        props = self._execute(
-            env={'test_value_env': expected_env_value},
-            process={
-                'args': [expected_arg1_value, expected_arg2_value],
-                'cwd': expected_cwd,
-                'base_dir': expected_base_dir
-            })
-        assert props['env_value'].value == expected_env_value
-        assert len(props['bash_version'].value) > 0
-        assert props['arg1_value'].value == expected_arg1_value
-        assert props['arg2_value'].value == expected_arg2_value
-        assert props['cwd'].value == expected_cwd
-        assert props['ctx_path'].value == '{0}/ctx'.format(expected_base_dir)
-
-    def test_run_script_command_prefix(self):
-        props = self._execute(process={'command_prefix': 'bash -i'})
-        assert 'i' in props['dollar_dash'].value
-
-    def test_run_script_reuse_existing_ctx(self):
-        expected_test_value_1 = 'test_value_1'
-        expected_test_value_2 = 'test_value_2'
-        props = self._execute(
-            test_operations=['{0}_1'.format(self.test_name),
-                             '{0}_2'.format(self.test_name)],
-            env={'test_value1': expected_test_value_1,
-                 'test_value2': expected_test_value_2})
-        assert props['test_value1'].value == expected_test_value_1
-        assert props['test_value2'].value == expected_test_value_2
-
-    def test_run_script_download_resource_plain(self, tmpdir):
-        resource = tmpdir.join('resource')
-        resource.write('content')
-        self._upload(str(resource), 'test_resource')
-        props = self._execute()
-        assert props['test_value'].value == 'content'
-
-    def test_run_script_download_resource_and_render(self, tmpdir):
-        resource = tmpdir.join('resource')
-        resource.write('{{ctx.service.name}}')
-        self._upload(str(resource), 'test_resource')
-        props = self._execute()
-        assert props['test_value'].value == self._workflow_context.service.name
-
-    @pytest.mark.parametrize('value', ['string-value', [1, 2, 3], {'key': 'value'}])
-    def test_run_script_inputs_as_env_variables_no_override(self, value):
-        props = self._execute(custom_input=value)
-        return_value = props['test_value'].value
-        expected = return_value if isinstance(value, basestring) else json.loads(return_value)
-        assert value == expected
-
-    @pytest.mark.parametrize('value', ['string-value', [1, 2, 3], {'key': 'value'}])
-    def test_run_script_inputs_as_env_variables_process_env_override(self, value):
-        props = self._execute(custom_input='custom-input-value',
-                              env={'custom_env_var': value})
-        return_value = props['test_value'].value
-        expected = return_value if isinstance(value, basestring) else json.loads(return_value)
-        assert value == expected
-
-    def test_run_script_error_in_script(self):
-        exception = self._execute_and_get_task_exception()
-        assert isinstance(exception, TaskException)
-
-    def test_run_script_abort_immediate(self):
-        exception = self._execute_and_get_task_exception()
-        assert isinstance(exception, TaskAbortException)
-        assert exception.message == 'abort-message'
-
-    def test_run_script_retry(self):
-        exception = self._execute_and_get_task_exception()
-        assert isinstance(exception, TaskRetryException)
-        assert exception.message == 'retry-message'
-
-    def test_run_script_abort_error_ignored_by_script(self):
-        exception = self._execute_and_get_task_exception()
-        assert isinstance(exception, TaskAbortException)
-        assert exception.message == 'abort-message'
-
-    def test_run_commands(self):
-        temp_file_path = '/tmp/very_temporary_file'
-        with self._ssh_env():
-            if files.exists(temp_file_path):
-                fabric.api.run('rm {0}'.format(temp_file_path))
-        self._execute(commands=['touch {0}'.format(temp_file_path)])
-        with self._ssh_env():
-            assert files.exists(temp_file_path)
-            fabric.api.run('rm {0}'.format(temp_file_path))
-
-    @pytest.fixture(autouse=True)
-    def _setup(self, request, workflow_context, executor, capfd, server):
-        print 'HI!!!!!!!!!!', server.port
-        self._workflow_context = workflow_context
-        self._executor = executor
-        self._capfd = capfd
-        self.test_name = request.node.originalname or request.node.name
-        with self._ssh_env(server):
-            for directory in [constants.DEFAULT_BASE_DIR, _CUSTOM_BASE_DIR]:
-                if files.exists(directory):
-                    fabric.api.run('rm -rf {0}'.format(directory))
-
-    @contextlib.contextmanager
-    def _ssh_env(self, server):
-        with self._capfd.disabled():
-            with context_managers.settings(fabric.api.hide('everything'),
-                                           host_string='localhost:{0}'.format(server.port),
-                                           **_FABRIC_ENV):
-                yield
-
-    def _execute(self,
-                 env=None,
-                 use_sudo=False,
-                 hide_output=None,
-                 process=None,
-                 custom_input='',
-                 test_operations=None,
-                 commands=None):
-        process = process or {}
-        if env:
-            process.setdefault('env', {}).update(env)
-
-        test_operations = test_operations or [self.test_name]
-
-        local_script_path = os.path.join(resources.DIR, 'scripts', 'test_ssh.sh')
-        script_path = os.path.basename(local_script_path)
-        self._upload(local_script_path, script_path)
-
-        if commands:
-            operation = operations.run_commands_with_ssh
-        else:
-            operation = operations.run_script_with_ssh
-
-        node = self._workflow_context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
-        arguments = {
-            'script_path': script_path,
-            'fabric_env': _FABRIC_ENV,
-            'process': process,
-            'use_sudo': use_sudo,
-            'custom_env_var': custom_input,
-            'test_operation': '',
-        }
-        if hide_output:
-            arguments['hide_output'] = hide_output
-        if commands:
-            arguments['commands'] = commands
-        interface = mock.models.create_interface(
-            node.service,
-            'test',
-            'op',
-            operation_kwargs=dict(
-                function='{0}.{1}'.format(
-                    operations.__name__,
-                    operation.__name__),
-                arguments=arguments)
-        )
-        node.interfaces[interface.name] = interface
-
-        @workflow
-        def mock_workflow(ctx, graph):
-            ops = []
-            for test_operation in test_operations:
-                op_arguments = arguments.copy()
-                op_arguments['test_operation'] = test_operation
-                ops.append(api.task.OperationTask(
-                    node,
-                    interface_name='test',
-                    operation_name='op',
-                    arguments=op_arguments))
-
-            graph.sequence(*ops)
-            return graph
-        tasks_graph = mock_workflow(ctx=self._workflow_context)  # pylint: disable=no-value-for-parameter
-        graph_compiler.GraphCompiler(
-            self._workflow_context, self._executor.__class__).compile(tasks_graph)
-        eng = engine.Engine({self._executor.__class__: self._executor})
-        eng.execute(self._workflow_context)
-        return self._workflow_context.model.node.get_by_name(
-            mock.models.DEPENDENCY_NODE_NAME).attributes
-
-    def _execute_and_get_task_exception(self, *args, **kwargs):
-        signal = events.on_failure_task_signal
-        with events_collector(signal) as collected:
-            with pytest.raises(ExecutorException):
-                self._execute(*args, **kwargs)
-        return collected[signal][0]['kwargs']['exception']
-
-    def _upload(self, source, path):
-        self._workflow_context.resource.service.upload(
-            entry_id=str(self._workflow_context.service.id),
-            source=source,
-            path=path)
-
-    @pytest.fixture
-    def executor(self):
-        result = process.ProcessExecutor()
-        try:
-            yield result
-        finally:
-            result.close()
-
-    @pytest.fixture
-    def workflow_context(self, tmpdir):
-        workflow_context = mock.context.simple(str(tmpdir))
-        workflow_context.states = []
-        workflow_context.exception = None
-        yield workflow_context
-        storage.release_sqlite_storage(workflow_context.model)
-
-
-class TestFabricEnvHideGroupsAndRunCommands(object):
-
-    def test_fabric_env_default_override(self):
-        # first sanity for no override
-        self._run()
-        assert self.mock.settings_merged['timeout'] == constants.FABRIC_ENV_DEFAULTS['timeout']
-        # now override
-        invocation_fabric_env = self.default_fabric_env.copy()
-        timeout = 1000000
-        invocation_fabric_env['timeout'] = timeout
-        self._run(fabric_env=invocation_fabric_env)
-        assert self.mock.settings_merged['timeout'] == timeout
-
-    def test_implicit_host_string(self, mocker):
-        expected_host_address = '1.1.1.1'
-        mocker.patch.object(self._Ctx.task.actor, 'host')
-        mocker.patch.object(self._Ctx.task.actor.host, 'host_address', expected_host_address)
-        fabric_env = self.default_fabric_env.copy()
-        del fabric_env['host_string']
-        self._run(fabric_env=fabric_env)
-        assert self.mock.settings_merged['host_string'] == expected_host_address
-
-    def test_explicit_host_string(self):
-        fabric_env = self.default_fabric_env.copy()
-        host_string = 'explicit_host_string'
-        fabric_env['host_string'] = host_string
-        self._run(fabric_env=fabric_env)
-        assert self.mock.settings_merged['host_string'] == host_string
-
-    def test_override_warn_only(self):
-        fabric_env = self.default_fabric_env.copy()
-        self._run(fabric_env=fabric_env)
-        assert self.mock.settings_merged['warn_only'] is True
-        fabric_env = self.default_fabric_env.copy()
-        fabric_env['warn_only'] = False
-        self._run(fabric_env=fabric_env)
-        assert self.mock.settings_merged['warn_only'] is False
-
-    def test_missing_host_string(self):
-        with pytest.raises(TaskAbortException) as exc_ctx:
-            fabric_env = self.default_fabric_env.copy()
-            del fabric_env['host_string']
-            self._run(fabric_env=fabric_env)
-        assert '`host_string` not supplied' in str(exc_ctx.value)
-
-    def test_missing_user(self):
-        with pytest.raises(TaskAbortException) as exc_ctx:
-            fabric_env = self.default_fabric_env.copy()
-            del fabric_env['user']
-            self._run(fabric_env=fabric_env)
-        assert '`user` not supplied' in str(exc_ctx.value)
-
-    def test_missing_key_or_password(self):
-        with pytest.raises(TaskAbortException) as exc_ctx:
-            fabric_env = self.default_fabric_env.copy()
-            del fabric_env['key_filename']
-            self._run(fabric_env=fabric_env)
-        assert 'Access credentials not supplied' in str(exc_ctx.value)
-
-    def test_hide_in_settings_and_non_viable_groups(self):
-        groups = ('running', 'stdout')
-        self._run(hide_output=groups)
-        assert set(self.mock.settings_merged['hide_output']) == set(groups)
-        with pytest.raises(TaskAbortException) as exc_ctx:
-            self._run(hide_output=('running', 'bla'))
-        assert '`hide_output` must be a subset of' in str(exc_ctx.value)
-
-    def test_run_commands(self):
-        def test(use_sudo):
-            commands = ['command1', 'command2']
-            self._run(
-                commands=commands,
-                use_sudo=use_sudo)
-            assert all(item in self.mock.settings_merged.items() for
-                       item in self.default_fabric_env.items())
-            assert self.mock.settings_merged['warn_only'] is True
-            assert self.mock.settings_merged['use_sudo'] == use_sudo
-            assert self.mock.commands == commands
-            self.mock.settings_merged = {}
-            self.mock.commands = []
-        test(use_sudo=False)
-        test(use_sudo=True)
-
-    def test_failed_command(self):
-        with pytest.raises(ProcessException) as exc_ctx:
-            self._run(commands=['fail'])
-        exception = exc_ctx.value
-        assert exception.stdout == self.MockCommandResult.stdout
-        assert exception.stderr == self.MockCommandResult.stderr
-        assert exception.command == self.MockCommandResult.command
-        assert exception.exit_code == self.MockCommandResult.return_code
-
-    class MockCommandResult(object):
-        stdout = 'mock_stdout'
-        stderr = 'mock_stderr'
-        command = 'mock_command'
-        return_code = 1
-
-        def __init__(self, failed):
-            self.failed = failed
-
-    class MockFabricApi(object):
-
-        def __init__(self):
-            self.commands = []
-            self.settings_merged = {}
-
-        @contextlib.contextmanager
-        def settings(self, *args, **kwargs):
-            self.settings_merged.update(kwargs)
-            if args:
-                groups = args[0]
-                self.settings_merged.update({'hide_output': groups})
-            yield
-
-        def run(self, command):
-            self.commands.append(command)
-            self.settings_merged['use_sudo'] = False
-            return TestFabricEnvHideGroupsAndRunCommands.MockCommandResult(command == 'fail')
-
-        def sudo(self, command):
-            self.commands.append(command)
-            self.settings_merged['use_sudo'] = True
-            return TestFabricEnvHideGroupsAndRunCommands.MockCommandResult(command == 'fail')
-
-        def hide(self, *groups):
-            return groups
-
-        def exists(self, *args, **kwargs):
-            raise RuntimeError
-
-    class _Ctx(object):
-        INSTRUMENTATION_FIELDS = ()
-
-        class Task(object):
-            @staticmethod
-            def abort(message=None):
-                models.Task.abort(message)
-            actor = None
-
-        class Actor(object):
-            host = None
-
-        class Model(object):
-            @contextlib.contextmanager
-            def instrument(self, *args, **kwargs):
-                yield
-        task = Task
-        task.actor = Actor
-        model = Model()
-        logger = logging.getLogger()
-
-    @staticmethod
-    @contextlib.contextmanager
-    def _mock_self_logging(*args, **kwargs):
-        yield
-    _Ctx.logging_handlers = _mock_self_logging
-
-    @pytest.fixture(autouse=True)
-    def _setup(self, mocker):
-        self.default_fabric_env = {
-            'host_string': 'test',
-            'user': 'test',
-            'key_filename': 'test',
-        }
-        self.mock = self.MockFabricApi()
-        mocker.patch('fabric.api', self.mock)
-
-    def _run(self,
-             commands=(),
-             fabric_env=None,
-             process=None,
-             use_sudo=False,
-             hide_output=None):
-        operations.run_commands_with_ssh(
-            ctx=self._Ctx,
-            commands=commands,
-            process=process,
-            fabric_env=fabric_env or self.default_fabric_env,
-            use_sudo=use_sudo,
-            hide_output=hide_output)
-
-
-class TestUtilityFunctions(object):
-
-    def test_paths(self):
-        base_dir = '/path'
-        local_script_path = '/local/script/path.py'
-        paths = ssh_operations._Paths(base_dir=base_dir,
-                                      local_script_path=local_script_path)
-        assert paths.local_script_path == local_script_path
-        assert paths.remote_ctx_dir == base_dir
-        assert paths.base_script_path == 'path.py'
-        assert paths.remote_ctx_path == '/path/ctx'
-        assert paths.remote_scripts_dir == '/path/scripts'
-        assert paths.remote_work_dir == '/path/work'
-        assert paths.remote_env_script_path.startswith('/path/scripts/env-path.py-')
-        assert paths.remote_script_path.startswith('/path/scripts/path.py-')
-
-    def test_write_environment_script_file(self):
-        base_dir = '/path'
-        local_script_path = '/local/script/path.py'
-        paths = ssh_operations._Paths(base_dir=base_dir,
-                                      local_script_path=local_script_path)
-        env = {'one': "'1'"}
-        local_socket_url = 'local_socket_url'
-        remote_socket_url = 'remote_socket_url'
-        env_script_lines = set([l for l in ssh_operations._write_environment_script_file(
-            process={'env': env},
-            paths=paths,
-            local_socket_url=local_socket_url,
-            remote_socket_url=remote_socket_url
-        ).getvalue().split('\n') if l])
-        expected_env_script_lines = set([
-            'export PATH=/path:$PATH',
-            'export PYTHONPATH=/path:$PYTHONPATH',
-            'chmod +x /path/ctx',
-            'chmod +x {0}'.format(paths.remote_script_path),
-            'export CTX_SOCKET_URL={0}'.format(remote_socket_url),
-            'export LOCAL_CTX_SOCKET_URL={0}'.format(local_socket_url),
-            'export one=\'1\''
-        ])
-        assert env_script_lines == expected_env_script_lines

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/da233c73/tests/orchestrator/context/__init__.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/context/__init__.py b/tests/orchestrator/context/__init__.py
index 780db07..d0b85d3 100644
--- a/tests/orchestrator/context/__init__.py
+++ b/tests/orchestrator/context/__init__.py
@@ -26,7 +26,7 @@ def op_path(func, module_path=None):
 def execute(workflow_func, workflow_context, executor):
     graph = workflow_func(ctx=workflow_context)
 
-    graph_compiler.GraphCompiler(workflow_context, executor.__class__).compile(graph)
+    graph_compiler.GraphCompiler(workflow_context.execution, executor.__class__).compile(graph)
     eng = engine.Engine(executors={executor.__class__: executor})
 
     eng.execute(workflow_context)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/da233c73/tests/orchestrator/context/test_serialize.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/context/test_serialize.py b/tests/orchestrator/context/test_serialize.py
index 091e23c..8e08e72 100644
--- a/tests/orchestrator/context/test_serialize.py
+++ b/tests/orchestrator/context/test_serialize.py
@@ -48,7 +48,7 @@ def test_serialize_operation_context(context, executor, tmpdir):
     context.model.node.update(node)
 
     graph = _mock_workflow(ctx=context)  # pylint: disable=no-value-for-parameter
-    graph_compiler.GraphCompiler(context, executor.__class__).compile(graph)
+    graph_compiler.GraphCompiler(context.execution, executor.__class__).compile(graph)
     eng = engine.Engine({executor.__class__: executor})
     eng.execute(context)
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/da233c73/tests/orchestrator/execution/__init__.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/execution/__init__.py b/tests/orchestrator/execution/__init__.py
new file mode 100644
index 0000000..ae1e83e
--- /dev/null
+++ b/tests/orchestrator/execution/__init__.py
@@ -0,0 +1,14 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/da233c73/tests/orchestrator/execution/test_execution_compiler.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/execution/test_execution_compiler.py b/tests/orchestrator/execution/test_execution_compiler.py
new file mode 100644
index 0000000..b044872
--- /dev/null
+++ b/tests/orchestrator/execution/test_execution_compiler.py
@@ -0,0 +1,698 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+import time
+from threading import Thread, Event
+from datetime import datetime
+
+import mock
+import pytest
+
+from aria.modeling import exceptions as modeling_exceptions
+from aria.modeling import models
+from aria.orchestrator import exceptions
+from aria.orchestrator import events
+from aria.orchestrator import execution as orch_execution
+from aria.orchestrator.workflows.executor.process import ProcessExecutor
+from aria.orchestrator.workflows import api
+from aria.orchestrator.workflows.core import engine, graph_compiler
+from aria.orchestrator.workflows.executor import thread
+from aria.orchestrator import (
+    workflow,
+    operation,
+)
+
+from tests import (
+    mock as tests_mock,
+    storage
+)
+
+from ...fixtures import (  # pylint: disable=unused-import
+    plugins_dir,
+    plugin_manager,
+    fs_model as model,
+    resource_storage as resource
+)
+
+custom_events = {
+    'is_resumed': Event(),
+    'is_active': Event(),
+    'execution_cancelled': Event(),
+    'execution_failed': Event(),
+}
+
+
+class TimeoutError(BaseException):
+    pass
+
+
+class FailingTask(BaseException):
+    pass
+
+
+def test_undeclared_workflow(request):
+    # validating a proper error is raised when the workflow is not declared in the service
+    with pytest.raises(exceptions.UndeclaredWorkflowError):
+        _get_compiler(request, 'undeclared_workflow').compile()
+
+
+def test_missing_workflow_implementation(service, request):
+    # validating a proper error is raised when the workflow code path does not exist
+    workflow = models.Operation(
+        name='test_workflow',
+        service=service,
+        function='nonexistent.workflow.implementation')
+    service.workflows['test_workflow'] = workflow
+
+    with pytest.raises(exceptions.WorkflowImplementationNotFoundError):
+        _get_compiler(request, 'test_workflow').compile()
+
+
+def test_builtin_workflow_instantiation(request, model):
+    # validates the workflow runner instantiates properly when provided with a builtin workflow
+    # (expecting no errors to be raised on undeclared workflow or missing workflow implementation)
+    execution = _get_compiler(request, 'install').compile()
+    assert len(execution.tasks) == 18  # expecting 18 tasks for 2 node topology
+
+
+def test_custom_workflow_instantiation(request):
+    # validates the workflow runner instantiates properly when provided with a custom workflow
+    # (expecting no errors to be raised on undeclared workflow or missing workflow implementation)
+    mock_workflow = _setup_mock_workflow_in_service(request)
+    execution = _get_compiler(request, mock_workflow).compile()
+    assert len(execution.tasks) == 2  # mock workflow creates only start workflow and end workflow task
+
+
+def test_existing_active_executions(request, service, model):
+    existing_active_execution = models.Execution(
+        service=service,
+        status=models.Execution.STARTED,
+        workflow_name='uninstall')
+    model.execution.put(existing_active_execution)
+    with pytest.raises(exceptions.ActiveExecutionsError):
+        _get_compiler(request, 'install').compile()
+
+
+def test_existing_executions_but_no_active_ones(request, service, model):
+    existing_terminated_execution = models.Execution(
+        service=service,
+        status=models.Execution.SUCCEEDED,
+        workflow_name='uninstall')
+    model.execution.put(existing_terminated_execution)
+    # no active executions exist, so no error should be raised
+    _get_compiler(request, 'install').compile()
+
+
+def test_default_executor(request):
+    # validates the ProcessExecutor is used by the workflow runner by default
+    mock_workflow = _setup_mock_workflow_in_service(request)
+
+    with mock.patch('aria.orchestrator.execution.runner.engine.Engine') as mock_engine_cls:
+        execution = _get_compiler(request, mock_workflow).compile()
+        orch_execution.ExecutionRunner(ProcessExecutor())
+        _, engine_kwargs = mock_engine_cls.call_args
+        assert isinstance(engine_kwargs.get('executors').values()[0], ProcessExecutor)
+
+
+def test_custom_executor(request):
+    mock_workflow = _setup_mock_workflow_in_service(request)
+
+    custom_executor = mock.MagicMock()
+    with mock.patch('aria.orchestrator.execution.runner.engine.Engine') as mock_engine_cls:
+        execution = _get_compiler(request, mock_workflow).compile(executor=custom_executor)
+        orch_execution.ExecutionRunner(custom_executor)
+        _, engine_kwargs = mock_engine_cls.call_args
+        assert engine_kwargs.get('executors').values()[0] == custom_executor
+
+
+def test_task_configuration_parameters(request):
+    mock_workflow = _setup_mock_workflow_in_service(request)
+
+    task_max_attempts = 5
+    task_retry_interval = 7
+    with mock.patch('aria.orchestrator.execution.runner.engine.Engine.execute') as \
+            mock_engine_execute:
+        compiler = _get_compiler(request, mock_workflow)
+        execution = compiler.compile(task_max_attempts=task_max_attempts,
+                                     task_retry_interval=task_retry_interval)
+        orch_execution.ExecutionRunner(ProcessExecutor()).execute(compiler.workflow_ctx)
+        _, engine_kwargs = mock_engine_execute.call_args
+        assert engine_kwargs['ctx']._task_max_attempts == task_max_attempts
+        assert engine_kwargs['ctx']._task_retry_interval == task_retry_interval
+
+
+def test_execute(request, service):
+    mock_workflow = _setup_mock_workflow_in_service(request)
+
+    mock_engine = mock.MagicMock()
+    with mock.patch('aria.orchestrator.execution.runner.engine.Engine.execute',
+                    return_value=mock_engine) as mock_engine_execute:
+        compiler = _get_compiler(request, mock_workflow).compile()
+        compiler.compile()
+
+        runner = orch_execution.runner.ExecutionRunner(ProcessExecutor())
+        runner.execute(compiler.workflow_ctx)
+
+        _, engine_kwargs = mock_engine_execute.call_args
+        assert engine_kwargs['ctx'].service.id == service.id
+        assert engine_kwargs['ctx'].execution.workflow_name == 'test_workflow'
+
+        mock_engine_execute.assert_called_once_with(ctx=compiler.workflow_ctx,
+                                                    resuming=False,
+                                                    retry_failed=False)
+
+
+def test_cancel_execution(request):
+    mock_workflow = _setup_mock_workflow_in_service(request)
+
+    mock_engine = mock.MagicMock()
+    with mock.patch('aria.orchestrator.execution.runner.engine.Engine', return_value=mock_engine):
+        compiler = _get_compiler(request, mock_workflow)
+        execution = compiler.compile()
+
+        runner = orch_execution.ExecutionRunner(ProcessExecutor())
+        runner.cancel(ctx=compiler.workflow_ctx)
+        mock_engine.cancel_execution.assert_called_once_with(compiler.workflow_ctx)
+
+
+def test_execution_model_creation(request, service):
+    mock_workflow = _setup_mock_workflow_in_service(request)
+
+    with mock.patch('aria.orchestrator.execution.runner.engine.Engine'):
+        execution = _get_compiler(request, mock_workflow).compile()
+
+        assert execution.service.id == service.id
+        assert execution.workflow_name == mock_workflow
+        assert execution.created_at <= datetime.utcnow()
+        assert execution.inputs == dict()
+
+
+def test_execution_inputs_override_workflow_inputs(request):
+    wf_inputs = {'input1': 'value1', 'input2': 'value2', 'input3': 5}
+    mock_workflow = _setup_mock_workflow_in_service(
+        request,
+        inputs=dict((name, models.Input.wrap(name, val)) for name, val
+                    in wf_inputs.iteritems()))
+
+    with mock.patch('aria.orchestrator.execution.runner.engine.Engine'):
+        execution = _get_compiler(request, mock_workflow).compile(
+            execution_inputs={'input2': 'overriding-value2', 'input3': 7}
+        )
+
+        assert len(execution.inputs) == 3
+        # did not override input1 - expecting the default value from the workflow inputs
+        assert execution.inputs['input1'].value == 'value1'
+        # overrode input2
+        assert execution.inputs['input2'].value == 'overriding-value2'
+        # overrode input of integer type
+        assert execution.inputs['input3'].value == 7
+
+
+def test_execution_inputs_undeclared_inputs(request):
+    mock_workflow = _setup_mock_workflow_in_service(request)
+
+    with pytest.raises(modeling_exceptions.UndeclaredInputsException):
+        _get_compiler(request, mock_workflow).compile(
+            execution_inputs={'undeclared_input': 'value'})
+
+
+def test_execution_inputs_missing_required_inputs(request):
+    mock_workflow = _setup_mock_workflow_in_service(
+        request, inputs={'required_input': models.Input.wrap('required_input', value=None)})
+
+    with pytest.raises(modeling_exceptions.MissingRequiredInputsException):
+        _get_compiler(request, mock_workflow).compile(execution_inputs={})
+
+
+def test_execution_inputs_wrong_type_inputs(request):
+    mock_workflow = _setup_mock_workflow_in_service(
+        request, inputs={'input': models.Input.wrap('input', 'value')})
+
+    with pytest.raises(modeling_exceptions.ParametersOfWrongTypeException):
+        _get_compiler(request, mock_workflow).compile(execution_inputs={'input': 5})
+
+
+def test_execution_inputs_builtin_workflow_with_inputs(request):
+    # built-in workflows don't have inputs
+    with pytest.raises(modeling_exceptions.UndeclaredInputsException):
+        _get_compiler(request, 'install').compile(execution_inputs={'undeclared_input': 'value'})
+
+
+def test_workflow_function_parameters(request, tmpdir):
+    # validating the workflow function is passed with the
+    # merged execution inputs, in dict form
+
+    # the workflow function parameters will be written to this file
+    output_path = str(tmpdir.join('output'))
+    wf_inputs = {'output_path': output_path, 'input1': 'value1', 'input2': 'value2', 'input3': 5}
+
+    mock_workflow = _setup_mock_workflow_in_service(
+        request, inputs=dict((name, models.Input.wrap(name, val)) for name, val
+                             in wf_inputs.iteritems()))
+
+    _get_compiler(request, mock_workflow).compile(
+        execution_inputs={'input2': 'overriding-value2', 'input3': 7})
+
+    with open(output_path) as f:
+        wf_call_kwargs = json.load(f)
+    assert len(wf_call_kwargs) == 3
+    assert wf_call_kwargs.get('input1') == 'value1'
+    assert wf_call_kwargs.get('input2') == 'overriding-value2'
+    assert wf_call_kwargs.get('input3') == 7
+
+
+@pytest.fixture
+def service(model):
+    # sets up a service in the storage
+    service_id = tests_mock.topology.create_simple_topology_two_nodes(model)
+    service = model.service.get(service_id)
+    return service
+
+
+def _setup_mock_workflow_in_service(request, inputs=None):
+    # sets up a mock workflow as part of the service, including uploading
+    # the workflow code to the service's dir on the resource storage
+    service = request.getfixturevalue('service')
+    resource = request.getfixturevalue('resource')
+
+    source = tests_mock.workflow.__file__
+    resource.service_template.upload(str(service.service_template.id), source)
+    mock_workflow_name = 'test_workflow'
+    arguments = {}
+    if inputs:
+        for input in inputs.itervalues():
+            arguments[input.name] = input.as_argument()
+    workflow = models.Operation(
+        name=mock_workflow_name,
+        service=service,
+        function='workflow.mock_workflow',
+        inputs=inputs or {},
+        arguments=arguments)
+    service.workflows[mock_workflow_name] = workflow
+    return mock_workflow_name
+
+
+def _get_compiler(request, workflow_name):
+    # helper method for instantiating a workflow runner
+    service = request.getfixturevalue('service')
+    model = request.getfixturevalue('model')
+    resource = request.getfixturevalue('resource')
+    plugin_manager = request.getfixturevalue('plugin_manager')
+
+    return orch_execution.ExecutionCompiler(
+        model,
+        resource,
+        plugin_manager,
+        service,
+        workflow_name
+    )
+
+
+class TestResumableWorkflows(object):
+
+    def _create_initial_workflow_runner(
+            self,
+            model,
+            resource,
+            service,
+            workflow,
+            executor,
+            inputs=None):
+
+        service.workflows['custom_workflow'] = tests_mock.models.create_operation(
+            'custom_workflow',
+            operation_kwargs={
+                'function': '{0}.{1}'.format(__name__, workflow.__name__),
+                'inputs': dict((k, models.Input.wrap(k, v)) for k, v in (inputs or {}).items())
+            }
+        )
+        model.service.update(service)
+        compiler = orch_execution.ExecutionCompiler(
+            model, resource, None, service, 'custom_workflow'
+        )
+        execution = compiler.compile(inputs, executor)
+        model.execution.update(execution)
+
+        return orch_execution.ExecutionRunner(executor), compiler.workflow_ctx
+
+    @staticmethod
+    def _wait_for_active_and_cancel(execution_runner, ctx):
+        if custom_events['is_active'].wait(60) is False:
+            raise TimeoutError("is_active wasn't set to True")
+        execution_runner.cancel(ctx)
+        if custom_events['execution_cancelled'].wait(60) is False:
+            raise TimeoutError("Execution did not end")
+
+    def test_resume_workflow(self, workflow_context, thread_executor):
+        node = workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
+        node.attributes['invocations'] = models.Attribute.wrap('invocations', 0)
+        self._create_interface(workflow_context, node, mock_pass_first_task_only)
+        runner, ctx = self._create_initial_workflow_runner(
+            workflow_context.model,
+            workflow_context.resource,
+            workflow_context.model.service.list()[0],
+            mock_parallel_tasks_workflow,
+            thread_executor,
+            inputs={'number_of_tasks': 2}
+        )
+
+        wf_thread = Thread(target=runner.execute, kwargs=dict(ctx=ctx))
+        wf_thread.daemon = True
+        wf_thread.start()
+
+        # Wait for the execution to start
+        self._wait_for_active_and_cancel(runner, ctx)
+        node = ctx.model.node.refresh(node)
+
+        tasks = ctx.model.task.list(filters={'_stub_type': None})
+        assert any(task.status == task.SUCCESS for task in tasks)
+        assert any(task.status == task.RETRYING for task in tasks)
+        custom_events['is_resumed'].set()
+        assert any(task.status == task.RETRYING for task in tasks)
+
+        # Create a new workflow runner, with an existing execution id. This would cause
+        # the old execution to restart.
+        new_wf_runner = orch_execution.ExecutionRunner(thread_executor, True)
+        new_wf_runner.execute(ctx)
+
+        # Wait for it to finish and assert changes.
+        node = workflow_context.model.node.refresh(node)
+        assert all(task.status == task.SUCCESS for task in tasks)
+        assert node.attributes['invocations'].value == 3
+        assert ctx.execution.status == ctx.execution.SUCCEEDED
+
+    def test_resume_started_task(self, workflow_context, thread_executor):
+        node = workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
+        node.attributes['invocations'] = models.Attribute.wrap('invocations', 0)
+        self._create_interface(workflow_context, node, mock_stuck_task)
+
+        wf_runner, ctx = self._create_initial_workflow_runner(
+            workflow_context.model,
+            workflow_context.resource,
+            workflow_context.model.service.list()[0],
+            mock_parallel_tasks_workflow,
+            thread_executor,
+            inputs={'number_of_tasks': 1}
+        )
+
+        wf_thread = Thread(target=wf_runner.execute, kwargs=dict(ctx=ctx))
+        wf_thread.daemon = True
+        wf_thread.start()
+
+        self._wait_for_active_and_cancel(wf_runner, ctx)
+        node = workflow_context.model.node.refresh(node)
+        task = workflow_context.model.task.list(filters={'_stub_type': None})[0]
+        assert node.attributes['invocations'].value == 1
+        assert task.status == task.STARTED
+        assert ctx.execution.status in (ctx.execution.CANCELLED, ctx.execution.CANCELLING)
+        custom_events['is_resumed'].set()
+
+        new_thread_executor = thread.ThreadExecutor()
+        try:
+            new_wf_runner = orch_execution.ExecutionRunner(new_thread_executor, True)
+
+            new_wf_runner.execute(ctx)
+        finally:
+            new_thread_executor.close()
+
+        # Wait for it to finish and assert changes.
+        node = workflow_context.model.node.refresh(node)
+        assert node.attributes['invocations'].value == 2
+        assert task.status == task.SUCCESS
+        assert ctx.execution.status == ctx.execution.SUCCEEDED
+
+    def test_resume_failed_task(self, workflow_context, thread_executor):
+        node = workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
+        node.attributes['invocations'] = models.Attribute.wrap('invocations', 0)
+        self._create_interface(workflow_context, node, mock_failed_before_resuming)
+
+        wf_runner, ctx = self._create_initial_workflow_runner(
+            workflow_context.model,
+            workflow_context.resource,
+            workflow_context.model.service.list()[0],
+            mock_parallel_tasks_workflow,
+            thread_executor)
+        wf_thread = Thread(target=wf_runner.execute, kwargs=dict(ctx=ctx))
+        wf_thread.setDaemon(True)
+        wf_thread.start()
+
+        self._wait_for_active_and_cancel(wf_runner, ctx)
+        node = workflow_context.model.node.refresh(node)
+
+        task = workflow_context.model.task.list(filters={'_stub_type': None})[0]
+        assert node.attributes['invocations'].value == 2
+        assert task.status == task.STARTED
+        assert ctx.execution.status in (ctx.execution.CANCELLED, ctx.execution.CANCELLING)
+
+        custom_events['is_resumed'].set()
+        assert node.attributes['invocations'].value == 2
+
+        # Create a new workflow runner, with an existing execution id. This would cause
+        # the old execution to restart.
+        new_thread_executor = thread.ThreadExecutor()
+        try:
+            new_wf_runner = orch_execution.ExecutionRunner(new_thread_executor, True)
+
+            new_wf_runner.execute(ctx)
+        finally:
+            new_thread_executor.close()
+
+        # Wait for it to finish and assert changes.
+        node = workflow_context.model.node.refresh(node)
+        assert node.attributes['invocations'].value == task.max_attempts - 1
+        assert task.status == task.SUCCESS
+        assert ctx.execution.status == ctx.execution.SUCCEEDED
+
+    def test_resume_failed_task_and_successful_task(self, workflow_context, thread_executor):
+        node = workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
+        node.attributes['invocations'] = models.Attribute.wrap('invocations', 0)
+        self._create_interface(workflow_context, node, mock_pass_first_task_only)
+
+        wf_runner, ctx = self._create_initial_workflow_runner(
+            workflow_context.model,
+            workflow_context.resource,
+            workflow_context.model.service.list()[0],
+            mock_parallel_tasks_workflow,
+            thread_executor,
+            inputs={'retry_interval': 1, 'max_attempts': 2, 'number_of_tasks': 2}
+        )
+        wf_thread = Thread(target=wf_runner.execute, kwargs=dict(ctx=ctx))
+        wf_thread.setDaemon(True)
+        wf_thread.start()
+
+        if custom_events['execution_failed'].wait(60) is False:
+            raise TimeoutError("Execution did not end")
+
+        tasks = workflow_context.model.task.list(filters={'_stub_type': None})
+        node = workflow_context.model.node.refresh(node)
+        assert node.attributes['invocations'].value == 3
+        failed_task = [t for t in tasks if t.status == t.FAILED][0]
+
+        # First task passes
+        assert any(task.status == task.FAILED for task in tasks)
+        assert failed_task.attempts_count == 2
+        # Second task fails
+        assert any(task.status == task.SUCCESS for task in tasks)
+        assert ctx.execution.status in ctx.execution.FAILED
+
+        custom_events['is_resumed'].set()
+        new_thread_executor = thread.ThreadExecutor()
+        try:
+            new_wf_runner = orch_execution.ExecutionRunner(new_thread_executor, True, True)
+            new_wf_runner.execute(ctx)
+        finally:
+            new_thread_executor.close()
+
+        # Wait for it to finish and assert changes.
+        node = workflow_context.model.node.refresh(node)
+        assert failed_task.attempts_count == 1
+        assert node.attributes['invocations'].value == 4
+        assert all(task.status == task.SUCCESS for task in tasks)
+        assert ctx.execution.status == ctx.execution.SUCCEEDED
+
+    def test_two_sequential_task_first_task_failed(self, workflow_context, thread_executor):
+        node = workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
+        node.attributes['invocations'] = models.Attribute.wrap('invocations', 0)
+        self._create_interface(workflow_context, node, mock_fail_first_task_only)
+
+        execution_runner, ctx = self._create_initial_workflow_runner(
+            workflow_context.model,
+            workflow_context.resource,
+            workflow_context.model.service.list()[0],
+            mock_sequential_tasks_workflow,
+            thread_executor,
+            inputs={'retry_interval': 1, 'max_attempts': 1, 'number_of_tasks': 2}
+        )
+        wf_thread = Thread(target=execution_runner.execute, kwargs=dict(ctx=ctx))
+        wf_thread.setDaemon(True)
+        wf_thread.start()
+
+        if custom_events['execution_failed'].wait(60) is False:
+            raise TimeoutError("Execution did not end")
+
+        tasks = workflow_context.model.task.list(filters={'_stub_type': None})
+        node = workflow_context.model.node.refresh(node)
+        assert node.attributes['invocations'].value == 1
+        assert any(t.status == t.FAILED for t in tasks)
+        assert any(t.status == t.PENDING for t in tasks)
+
+        custom_events['is_resumed'].set()
+        new_thread_executor = thread.ThreadExecutor()
+        try:
+            new_wf_runner = orch_execution.ExecutionRunner(new_thread_executor, resume=True)
+            new_wf_runner.execute(ctx)
+        finally:
+            new_thread_executor.close()
+
+        # Wait for it to finish and assert changes.
+        node = workflow_context.model.node.refresh(node)
+        assert node.attributes['invocations'].value == 2
+        assert any(t.status == t.SUCCESS for t in tasks)
+        assert any(t.status == t.FAILED for t in tasks)
+        assert ctx.execution.status == ctx.execution.SUCCEEDED
+
+    @staticmethod
+    @pytest.fixture
+    def thread_executor():
+        result = thread.ThreadExecutor()
+        try:
+            yield result
+        finally:
+            result.close()
+
+    @staticmethod
+    @pytest.fixture
+    def workflow_context(tmpdir):
+        workflow_context = tests_mock.context.simple(str(tmpdir))
+        yield workflow_context
+        storage.release_sqlite_storage(workflow_context.model)
+
+    @staticmethod
+    def _create_interface(ctx, node, func, arguments=None):
+        interface_name = 'aria.interfaces.lifecycle'
+        operation_kwargs = dict(function='{name}.{func.__name__}'.format(
+            name=__name__, func=func))
+        if arguments:
+            # the operation has to declare the arguments before those may be passed
+            operation_kwargs['arguments'] = arguments
+        operation_name = 'create'
+        interface = tests_mock.models.create_interface(node.service, interface_name, operation_name,
+                                                       operation_kwargs=operation_kwargs)
+        node.interfaces[interface.name] = interface
+        ctx.model.node.update(node)
+
+        return node, interface_name, operation_name
+
+    @staticmethod
+    def _engine(workflow_func, workflow_context, executor):
+        graph = workflow_func(ctx=workflow_context)
+        execution = workflow_context.execution
+        graph_compiler.GraphCompiler(workflow_context.execution, executor.__class__).compile(graph)
+        workflow_context.execution = execution
+
+        return engine.Engine(executors={executor.__class__: executor})
+
+    @pytest.fixture(autouse=True)
+    def register_to_events(self):
+        def execution_cancelled(*args, **kwargs):
+            custom_events['execution_cancelled'].set()
+
+        def execution_failed(*args, **kwargs):
+            custom_events['execution_failed'].set()
+
+        events.on_cancelled_workflow_signal.connect(execution_cancelled)
+        events.on_failure_workflow_signal.connect(execution_failed)
+        yield
+        events.on_cancelled_workflow_signal.disconnect(execution_cancelled)
+        events.on_failure_workflow_signal.disconnect(execution_failed)
+        for event in custom_events.values():
+            event.clear()
+
+
+@workflow
+def mock_sequential_tasks_workflow(ctx, graph,
+                                   retry_interval=1, max_attempts=10, number_of_tasks=1):
+    node = ctx.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
+    graph.sequence(*_create_tasks(node, retry_interval, max_attempts, number_of_tasks))
+
+
+@workflow
+def mock_parallel_tasks_workflow(ctx, graph,
+                                 retry_interval=1, max_attempts=10, number_of_tasks=1):
+    node = ctx.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
+    graph.add_tasks(*_create_tasks(node, retry_interval, max_attempts, number_of_tasks))
+
+
+def _create_tasks(node, retry_interval, max_attempts, number_of_tasks):
+    return [
+        api.task.OperationTask(node,
+                               'aria.interfaces.lifecycle',
+                               'create',
+                               retry_interval=retry_interval,
+                               max_attempts=max_attempts)
+        for _ in xrange(number_of_tasks)
+    ]
+
+
+@operation
+def mock_failed_before_resuming(ctx):
+    """
+    The task should run atmost ctx.task.max_attempts - 1 times, and only then pass.
+    overall, the number of invocations should be ctx.task.max_attempts - 1
+    """
+    ctx.node.attributes['invocations'] += 1
+
+    if ctx.node.attributes['invocations'] == 2:
+        custom_events['is_active'].set()
+        # unfreeze the thread only when all of the invocations are done
+        while ctx.node.attributes['invocations'] < ctx.task.max_attempts - 1:
+            time.sleep(5)
+
+    elif ctx.node.attributes['invocations'] == ctx.task.max_attempts - 1:
+        # pass only just before the end.
+        return
+    else:
+        # fail o.w.
+        raise FailingTask("stop this task")
+
+
+@operation
+def mock_stuck_task(ctx):
+    ctx.node.attributes['invocations'] += 1
+    while not custom_events['is_resumed'].isSet():
+        if not custom_events['is_active'].isSet():
+            custom_events['is_active'].set()
+        time.sleep(5)
+
+
+@operation
+def mock_pass_first_task_only(ctx):
+    ctx.node.attributes['invocations'] += 1
+
+    if ctx.node.attributes['invocations'] != 1:
+        custom_events['is_active'].set()
+        if not custom_events['is_resumed'].isSet():
+            # if resume was called, increase by one. o/w fail the execution - second task should
+            # fail as long it was not a part of resuming the workflow
+            raise FailingTask("wasn't resumed yet")
+
+
+@operation
+def mock_fail_first_task_only(ctx):
+    ctx.node.attributes['invocations'] += 1
+
+    if not custom_events['is_resumed'].isSet() and ctx.node.attributes['invocations'] == 1:
+        raise FailingTask("First task should fail")

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/da233c73/tests/orchestrator/execution_plugin/test_local.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/execution_plugin/test_local.py b/tests/orchestrator/execution_plugin/test_local.py
index 7f33318..467ed36 100644
--- a/tests/orchestrator/execution_plugin/test_local.py
+++ b/tests/orchestrator/execution_plugin/test_local.py
@@ -500,7 +500,8 @@ if __name__ == '__main__':
                 arguments=arguments))
             return graph
         tasks_graph = mock_workflow(ctx=workflow_context)  # pylint: disable=no-value-for-parameter
-        graph_compiler.GraphCompiler(workflow_context, executor.__class__).compile(tasks_graph)
+        graph_compiler.GraphCompiler(workflow_context.execution, executor.__class__).compile(
+            tasks_graph)
         eng = engine.Engine({executor.__class__: executor})
         eng.execute(workflow_context)
         return workflow_context.model.node.get_by_name(

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/da233c73/tests/orchestrator/execution_plugin/test_ssh.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/execution_plugin/test_ssh.py b/tests/orchestrator/execution_plugin/test_ssh.py
index b5df939..8992a04 100644
--- a/tests/orchestrator/execution_plugin/test_ssh.py
+++ b/tests/orchestrator/execution_plugin/test_ssh.py
@@ -262,7 +262,7 @@ class TestWithActualSSHServer(object):
             return graph
         tasks_graph = mock_workflow(ctx=self._workflow_context)  # pylint: disable=no-value-for-parameter
         graph_compiler.GraphCompiler(
-            self._workflow_context, self._executor.__class__).compile(tasks_graph)
+            self._workflow_context.execution, self._executor.__class__).compile(tasks_graph)
         eng = engine.Engine({self._executor.__class__: self._executor})
         eng.execute(self._workflow_context)
         return self._workflow_context.model.node.get_by_name(